No momento, temos um mecanismo capaz de executar get e set load, mas ele está fazendo IO sincronizada do sistema de arquivos. Não podemos atender nosso cliente mais rápido do que podemos ler o item do disco, mas podemos atender outros enquanto estivermos lendo o item do disco.

Nesta entrada, vamos corrigir nosso get e store para que eles não bloqueiem a API do mecanismo. Como eu disse anteriormente, a intenção deste tutorial é focar no método API do mecanismo. Isso significa que não tentarei criar um design eficaz, pois isso poderia desviar o foco do que estou tentando explicar. Se as pessoas estiverem interessadas em saber como podemos otimizar isso, eu poderia adicionar uma segunda parte do tutorial no futuro... É só me avisar.

Para implementar operações assíncronas em nosso mecanismo, precisamos usar a API que o servidor disponibiliza para nós em create_instance. Vamos estender a estrutura do nosso mecanismo para manter o controle da API do servidor:

 

struct fs_engine {
   Motor ENGINE_HANDLE_V1;
   SERVER_HANDLE_V1 sapi;
};

MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
                                  GET_SERVER_API get_server_api,
                                  ENGINE_HANDLE **handle)
{

   h->engine.item_set_cas = fs_item_set_cas;
   h->engine.get_item_info = fs_get_item_info;
   h->sapi = *get_server_api();

     

Para implementar uma função assíncrona na interface do mecanismo, o mecanismo precisa enviar a solicitação para outro thread antes de retornar ENGINE_EWOULDBLOCK da função do mecanismo. Quando o backend termina de processar o resultado, ele notifica o núcleo do memcached usando a função notify_io_complete na interface do servidor. Se ocorrer um erro durante o processamento da solicitação, o núcleo do memcached retornará a mensagem de erro ao cliente. Se seu mecanismo chamou notify_io_complete com ENGINE_SUCCESSo núcleo do memcached chamará a função de interface do mecanismo mais uma vez com o mesmo argumento que o argumento
primeira vez.

Se você observar a API do servidor, verá que ela tem uma interface para armazenar um ponteiro específico do mecanismo. Isso facilitará nossa vida quando quisermos implementar o io assíncrono. Portanto, vamos em frente e atualizar nosso fs_get método:

 

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)engine;
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   Se (res != NULL) {
      *item = res;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      retornar ENGINE_SUCCESS;

   }


     

A próxima coisa que precisamos fazer é criar uma função que seja executada de forma assíncrona e armazene o resultado na configuração específica do mecanismo para o cookie. Como usaremos tarefas assíncronas para todos os métodos do mecanismo, vamos criar uma função para executar tarefas em outro thread:

 

static ENGINE_ERROR_CODE execute(struct task *task)
{
   pthread_attr_t attr;
   pthread_t tid;

   Se (pthread_attr_init(&attr) != 0 ||
       pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
       pthread_create(&tid, &attr, task->callback, task) != 0) {
      retornar ENGINE_FAILED;
   }

   retornar ENGINE_EWOULDBLOCK;
}
     

Como você pode ver no código, vou criar um novo thread para executar cada operação. Isso não é muito eficiente, pois a criação de um novo thread gera uma sobrecarga substancial. Em seu projeto, você provavelmente desejaria um pool de threads para executar suas tarefas.

O thread recém-criado executaria o retorno de chamada especializado com um ponteiro para a tarefa como seu único argumento. Então, qual é a aparência dessa estrutura de tarefa?

 

struct task {
   struct fs_engine *engine; /* Ponteiro para o mecanismo */
   const void *cookie; /* O cookie que está solicitando a operação */
   void *(*callback)(void *arg);
   união {
      struct {
         char key[PATH_MAX];
         size_t nkey;
      } get; /* Dados usados pela operação get */
      struct {
         item *item;
         Operação ENGINE_STORE_OPERATION;
      } store; /* Dados usados pela operação de armazenamento */
   } dados;
};
     

Então, vamos terminar fs_get:

 

static ENGINE_ERROR_CODE fs_get(ENGINE_HANDLE* handle,
                                const void* cookie,
                                item** item,
                                const void* key,
                                const int nkey,
                                uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)handle;
   /* Verificar se esta é a chamada de retorno de um ewouldblock anterior */
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   Se (res != NULL) {
      *item = res;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      retornar ENGINE_SUCCESS;
   }

   /* Não oferecemos suporte a chaves maiores que PATH_MAX */
   se (nkey >= PATH_MAX) {
      retornar ENGINE_FAILED;
   }

   /* Configurar a estrutura de retorno de chamada */
   struct task *task = calloc(1, sizeof(*task));
   Se (tarefa == NULL) {
      return ENGINE_ENOMEM;
   }

   task->engine = (struct fs_engine *)handle;
   task->cookie = cookie;
   task->callback = fs_get_callback;
   memcpy(task->data.get.key, key, nkey);
   task->data.get.nkey = nkey;

   ENGINE_ERROR_CODE ret = execute(task);
   Se (ret != ENGINE_EWOULDBLOCK) {
      free(task);
   }
   return ret;
}
     

Se você observar o código acima, verá que especificamos gs_get_callback como a função a ser executada. Então, vamos em frente e implementar o retorno de chamada:

 

void estático *fs_get_callback(void *arg)
{
   struct task *task = arg;
   char *fname = task->data.get.key;
   task->data.get.key[task->data.get.nkey] = '

   struct stat st;
   se (stat(fname, &st) == -1) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_KEY_ENOENT);
      free(task);
      retornar NULL;
   }

   struct fs_item* it = NULL;
   ENGINE_ERROR_CODE ret = fs_allocate((ENGINE_HANDLE*)task->engine,
                                       task->cookie, (void**)&it,
                                       task->data.get.key,
                                       tarefa->dados.get.nkey,
                                       st.st_size, 0, 0);
   Se (ret != ENGINE_SUCCESS) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_ENOMEM);
      free(task);
      retornar NULL;
   }

   FILE *fp = fopen(fname, "r");
   Se (fp == NULL) {
      fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_FAILED);
      free(task);
      retornar NULL;
   }

   size_t nr = fread(it->data, 1, it->ndata, fp);
   fclose(fp);
   se (nr != it->ndata) {
      fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_FAILED);
      free(task);
      retornar NULL;
   }

   task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
   task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                 ENGINE_SUCCESS);
   retornar NULL;
}
     

Como você pode ver, é muito fácil adicionar suporte assíncrono às funções do mecanismo. Vamos seguir em frente e fazer o mesmo para fs_store:

 

static ENGINE_ERROR_CODE fs_store(ENGINE_HANDLE* handle,
                                  const void *cookie,
                                  item* item,
                                  uint64_t *cas,
                                  Operação ENGINE_STORE_OPERATION,
                                  uint16_t vbucket)
{
   struct fs_engine *engine = (struct fs_engine *)handle;
   /* Verificar se esta é a chamada de retorno de um ewouldblock anterior */
   void *res = engine->sapi.cookie->get_engine_specific(cookie);
   Se (res != NULL) {
      *cas = 0;
      engine->sapi.cookie->store_engine_specific(cookie, NULL);
      retornar ENGINE_SUCCESS;
   }

   /* Configurar a estrutura de retorno de chamada */
   struct task *task = calloc(1, sizeof(*task));
   Se (tarefa == NULL) {
      return ENGINE_ENOMEM;
   }

   task->engine = (struct fs_engine *)handle;
   task->cookie = cookie;
   task->callback = fs_store_callback;
   task->data.store.item = item;
   task->data.store.operation = operation;

   ENGINE_ERROR_CODE ret = execute(task);
   Se (ret != ENGINE_EWOULDBLOCK) {
      free(task);
   }
   return ret;
}

E fs_store_callback se parece com o seguinte:

 

void estático *fs_store_callback(void *arg)
{
   struct task *task = arg;
   struct fs_item* it = task->data.store.item;
   char fname[it->nkey + 1];
   memcpy(fname, it->key, it->nkey);
   fname[it->nkey] = '

   FILE *fp = fopen(fname, "w");
   Se (fp == NULL) {
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_NOT_STORED);
      free(task);
      retornar NULL;
   }

   size_t nw = fwrite(it->data, 1, it->ndata, fp);
   fclose(fp);
   se (nw != it->ndata) {
      remove(fname);
      task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                    ENGINE_NOT_STORED);
      free(task);
      retornar NULL;
   }

   task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
   task->engine->sapi.cookie->notify_io_complete(task->cookie,
                                                 ENGINE_SUCCESS);
   retornar NULL;
}

Se você observar atentamente o código acima, verá que ainda não diferenciamos entre add/set/substituirmas vamos corrigir isso na próxima sessão.

Autor

Postado por Trond Norbye, desenvolvedor sênior, Couchbase

Trond Norbye é arquiteto de software na Couchbase. Principal colaborador dos projetos Couchbase e Memcached. Criou as bibliotecas de clientes C/C++ e node.js do Couchbase.

Deixar uma resposta