No momento, temos um mecanismo capaz de executar get e set load, mas ele está fazendo IO sincronizada do sistema de arquivos. Não podemos atender nosso cliente mais rápido do que podemos ler o item do disco, mas podemos atender outros enquanto estivermos lendo o item do disco.
Nesta entrada, vamos corrigir nosso get
e store
para que eles não bloqueiem a API do mecanismo. Como eu disse anteriormente, a intenção deste tutorial é focar no método API do mecanismo. Isso significa que não tentarei criar um design eficaz, pois isso poderia desviar o foco do que estou tentando explicar. Se as pessoas estiverem interessadas em saber como podemos otimizar isso, eu poderia adicionar uma segunda parte do tutorial no futuro... É só me avisar.
Para implementar operações assíncronas em nosso mecanismo, precisamos usar a API que o servidor disponibiliza para nós em create_instance
. Vamos estender a estrutura do nosso mecanismo para manter o controle da API do servidor:
Motor ENGINE_HANDLE_V1;
SERVER_HANDLE_V1 sapi;
};
…
MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE create_instance(uint64_t interface,
GET_SERVER_API get_server_api,
ENGINE_HANDLE **handle)
{
…
h->engine.item_set_cas = fs_item_set_cas;
h->engine.get_item_info = fs_get_item_info;
h->sapi = *get_server_api();
…
Para implementar uma função assíncrona na interface do mecanismo, o mecanismo precisa enviar a solicitação para outro thread antes de retornar ENGINE_EWOULDBLOCK
da função do mecanismo. Quando o backend termina de processar o resultado, ele notifica o núcleo do memcached usando a função notify_io_complete
na interface do servidor. Se ocorrer um erro durante o processamento da solicitação, o núcleo do memcached retornará a mensagem de erro ao cliente. Se seu mecanismo chamou notify_io_complete
com ENGINE_SUCCESS
o núcleo do memcached chamará a função de interface do mecanismo mais uma vez com o mesmo argumento que o argumento
primeira vez.
Se você observar a API do servidor, verá que ela tem uma interface para armazenar um ponteiro específico do mecanismo. Isso facilitará nossa vida quando quisermos implementar o io assíncrono. Portanto, vamos em frente e atualizar nosso fs_get
método:
const void* cookie,
item** item,
const void* key,
const int nkey,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)engine;
void *res = engine->sapi.cookie->get_engine_specific(cookie);
Se (res != NULL) {
*item = res;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
retornar ENGINE_SUCCESS;
}
…
A próxima coisa que precisamos fazer é criar uma função que seja executada de forma assíncrona e armazene o resultado na configuração específica do mecanismo para o cookie. Como usaremos tarefas assíncronas para todos os métodos do mecanismo, vamos criar uma função para executar tarefas em outro thread:
{
pthread_attr_t attr;
pthread_t tid;
Se (pthread_attr_init(&attr) != 0 ||
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
pthread_create(&tid, &attr, task->callback, task) != 0) {
retornar ENGINE_FAILED;
}
retornar ENGINE_EWOULDBLOCK;
}
Como você pode ver no código, vou criar um novo thread para executar cada operação. Isso não é muito eficiente, pois a criação de um novo thread gera uma sobrecarga substancial. Em seu projeto, você provavelmente desejaria um pool de threads para executar suas tarefas.
O thread recém-criado executaria o retorno de chamada especializado com um ponteiro para a tarefa como seu único argumento. Então, qual é a aparência dessa estrutura de tarefa?
struct fs_engine *engine; /* Ponteiro para o mecanismo */
const void *cookie; /* O cookie que está solicitando a operação */
void *(*callback)(void *arg);
união {
struct {
char key[PATH_MAX];
size_t nkey;
} get; /* Dados usados pela operação get */
struct {
item *item;
Operação ENGINE_STORE_OPERATION;
} store; /* Dados usados pela operação de armazenamento */
} dados;
};
Então, vamos terminar fs_get
:
const void* cookie,
item** item,
const void* key,
const int nkey,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)handle;
/* Verificar se esta é a chamada de retorno de um ewouldblock anterior */
void *res = engine->sapi.cookie->get_engine_specific(cookie);
Se (res != NULL) {
*item = res;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
retornar ENGINE_SUCCESS;
}
/* Não oferecemos suporte a chaves maiores que PATH_MAX */
se (nkey >= PATH_MAX) {
retornar ENGINE_FAILED;
}
/* Configurar a estrutura de retorno de chamada */
struct task *task = calloc(1, sizeof(*task));
Se (tarefa == NULL) {
return ENGINE_ENOMEM;
}
task->engine = (struct fs_engine *)handle;
task->cookie = cookie;
task->callback = fs_get_callback;
memcpy(task->data.get.key, key, nkey);
task->data.get.nkey = nkey;
ENGINE_ERROR_CODE ret = execute(task);
Se (ret != ENGINE_EWOULDBLOCK) {
free(task);
}
return ret;
}
Se você observar o código acima, verá que especificamos gs_get_callback
como a função a ser executada. Então, vamos em frente e implementar o retorno de chamada:
{
struct task *task = arg;
char *fname = task->data.get.key;
task->data.get.key[task->data.get.nkey] = '
struct stat st;
se (stat(fname, &st) == -1) {
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_KEY_ENOENT);
free(task);
retornar NULL;
}
struct fs_item* it = NULL;
ENGINE_ERROR_CODE ret = fs_allocate((ENGINE_HANDLE*)task->engine,
task->cookie, (void**)&it,
task->data.get.key,
tarefa->dados.get.nkey,
st.st_size, 0, 0);
Se (ret != ENGINE_SUCCESS) {
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_ENOMEM);
free(task);
retornar NULL;
}
FILE *fp = fopen(fname, "r");
Se (fp == NULL) {
fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_FAILED);
free(task);
retornar NULL;
}
size_t nr = fread(it->data, 1, it->ndata, fp);
fclose(fp);
se (nr != it->ndata) {
fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_FAILED);
free(task);
retornar NULL;
}
task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_SUCCESS);
retornar NULL;
}
Como você pode ver, é muito fácil adicionar suporte assíncrono às funções do mecanismo. Vamos seguir em frente e fazer o mesmo para fs_store
:
const void *cookie,
item* item,
uint64_t *cas,
Operação ENGINE_STORE_OPERATION,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)handle;
/* Verificar se esta é a chamada de retorno de um ewouldblock anterior */
void *res = engine->sapi.cookie->get_engine_specific(cookie);
Se (res != NULL) {
*cas = 0;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
retornar ENGINE_SUCCESS;
}
/* Configurar a estrutura de retorno de chamada */
struct task *task = calloc(1, sizeof(*task));
Se (tarefa == NULL) {
return ENGINE_ENOMEM;
}
task->engine = (struct fs_engine *)handle;
task->cookie = cookie;
task->callback = fs_store_callback;
task->data.store.item = item;
task->data.store.operation = operation;
ENGINE_ERROR_CODE ret = execute(task);
Se (ret != ENGINE_EWOULDBLOCK) {
free(task);
}
return ret;
}
E fs_store_callback
se parece com o seguinte:
{
struct task *task = arg;
struct fs_item* it = task->data.store.item;
char fname[it->nkey + 1];
memcpy(fname, it->key, it->nkey);
fname[it->nkey] = '
FILE *fp = fopen(fname, "w");
Se (fp == NULL) {
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_NOT_STORED);
free(task);
retornar NULL;
}
size_t nw = fwrite(it->data, 1, it->ndata, fp);
fclose(fp);
se (nw != it->ndata) {
remove(fname);
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_NOT_STORED);
free(task);
retornar NULL;
}
task->engine->sapi.cookie->store_engine_specific(task->cookie, it);
task->engine->sapi.cookie->notify_io_complete(task->cookie,
ENGINE_SUCCESS);
retornar NULL;
}
Se você observar atentamente o código acima, verá que ainda não diferenciamos entre add
/set
/substituir
mas vamos corrigir isso na próxima sessão.