En este momento tenemos un motor capaz de ejecutar get y set load, pero está haciendo IO de sistema de archivos synchrounus. No podemos servir a nuestro cliente más rápido de lo que podemos leer el elemento desde el disco, pero podríamos servir a otros mientras leemos el elemento del disco.
En esta entrada vamos a arreglar nuestro get y store para que no bloqueen la API del motor. Como he dicho antes, la intención de este tutorial es centrarse en la API del motor. Eso significa que no voy a tratar de hacer un diseño efectivo, porque eso podría distraer la atención de lo que estoy tratando de explicar. Si la gente está interesada en cómo podríamos optimizar esto, podría añadir una segunda parte del tutorial en el futuro... Sólo házmelo saber.
Para implementar operaciones asíncronas en nuestro motor, necesitamos hacer uso de la API que el servidor pone a nuestra disposición en crear_instancia. Vamos a ampliar la estructura de nuestro motor para realizar un seguimiento de la API del servidor:
motor ENGINE_HANDLE_V1;
SERVER_HANDLE_V1 sapi;
};
…
MEMCACHED_PUBLIC_API
ENGINE_ERROR_CODE crear_instancia(uint64_t interfaz,
GET_SERVER_API get_server_api,
ENGINE_HANDLE **mango)
{
…
h->engine.item_set_cas = fs_item_set_cas;
h->engine.get_item_info = fs_get_item_info;
h->sapi = *get_server_api();
…
Para implementar una función asíncrona en la interfaz del motor, éste debe enviar la solicitud a otro subproceso antes de devolverla. ENGINE_EWOULDBLOCK de la función del motor. Cuando el backend termina de procesar el resultado, notifica al núcleo de memcached mediante la función notify_io_complete en la interfaz del servidor. Si se ha producido un error al procesar la petición, el núcleo de memcached devolverá el mensaje de error al cliente. Si tu motor llamó a notify_io_complete con ENGINE_SUCCESSel núcleo de memcached llamará a la función de interfaz del motor una vez más con el mismo argumento que la función
primera vez.
Si te fijas en la api del servidor, verás que tiene una interfaz para almacenar un puntero específico del motor. Esto nos hará la vida más fácil cuando queramos implementar async io. Así que vamos a seguir adelante y actualizar nuestra fs_get método:
const void* cookie,
item** item,
const void* key,
const int nkey,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)motor;
void *res = engine->sapi.cookie->get_engine_specific(cookie);
if (res != NULL) {
*item = res;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
return ÉXITO_INGENIO;
}
…
Lo siguiente que tenemos que hacer es crear una función que se ejecute de forma asíncrona y almacene el resultado en la configuración engine_specific de la cookie. Ya que vamos a utilizar tareas asíncronas para todos los métodos del motor, vamos a seguir adelante y crear una función para ejecutar tareas en otro hilo:
{
pthread_attr_t attr;
pthread_t tid;
if (pthread_attr_init(&attr) != 0 ||| pthread_attr_init(&attr) != 0 || 0
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||| Pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0
pthread_create(&tid, &attr, task->callback, task) != 0) {
return ENGINE_FAILED;
}
return ENGINE_EWOULDBLOCK;
}
Como puedes ver en el código, voy a crear un nuevo hilo para ejecutar cada operación. Esto no es muy eficiente, porque crear un nuevo hilo tiene una sobrecarga sustancial. En tu diseño probablemente querrías un pool de hilos para ejecutar tus tareas.
El hilo recién creado ejecutaría la llamada de retorno especializada con un puntero a la tarea como único argumento. ¿Qué aspecto tiene esta estructura de tareas?
struct fs_engine *engine; /* Puntero al motor */
const void *cookie; /* La cookie que solicita la operación */
void *(*callback)(void *arg);
unión {
struct {
char clave[PATH_MAX];
size_t nkey;
} get; /* Datos utilizados por la operación get */
struct {
item *item;
Operación ENGINE_STORE_OPERATION;
} store; /* Datos utilizados por la operación store */
} datos;
};
Así que vamos a terminar fs_get:
const void* cookie,
item** item,
const void* key,
const int nkey,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)handle;
/* Comprueba si este es el callback de un ewouldblock anterior */
void *res = engine->sapi.cookie->get_engine_specific(cookie);
if (res != NULL) {
*item = res;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
return ÉXITO_INGENIO;
}
/* No admitimos claves superiores a PATH_MAX */
if (nkey >= PATH_MAX) {
return ENGINE_FAILED;
}
/* Configurar la estructura de devolución de llamada */
struct tarea *tarea = calloc(1, sizeof(*tarea));
if (task == NULL) {
return ENGINE_ENOMEM;
}
task->engine = (struct fs_engine *)handle;
tarea->cookie = cookie;
task->callback = fs_get_callback;
memcpy(tarea->datos.obtener.clave, clave, nclave);
tarea->datos.get.nkey = nkey;
ENGINE_ERROR_CODE ret = ejecutar(tarea);
if (ret != ENGINE_EWOULDBLOCK) {
liberar(tarea);
}
devolver ret;
}
Si miras el código anterior, verás que especificamos gs_get_callback como la función a ejecutar. Así que vamos a seguir adelante e implementar la devolución de llamada:
{
struct task *task = arg;
char *fname = tarea->datos.get.key;
tarea->datos.get.key[tarea->datos.get.nkey] = '
struct stat st;
if (stat(fname, &st) == -1) {
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
ENGINE_KEY_ENOENT);
liberar(tarea);
devuelve NULL;
}
struct fs_item* it = NULL;
ENGINE_ERROR_CODE ret = fs_allocate((ENGINE_HANDLE*)tarea->motor,
tarea->cookie, (void**)&it,
tarea->datos.get.key,
tarea->datos.get.nkey,
st.st_tamaño, 0, 0);
if (ret != ENGINE_SUCCESS) {
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
MOTOR_ENOMEM);
liberar(tarea);
devuelve NULL;
}
FILE *fp = fopen(fname, "r");
if (fp == NULL) {
fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
MOTOR_FALLIDO);
liberar(tarea);
devuelve NULL;
}
size_t nr = fread(it->data, 1, it->ndata, fp);
fclose(fp);
if (nr != it->ndata) {
fs_release((ENGINE_HANDLE*)task->engine, task->cookie, it);
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
MOTOR_FALLIDO);
liberar(tarea);
devuelve NULL;
}
task->motor->sapi.cookie->store_engine_specific(task->cookie, it);
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
MOTOR_ÉXITO);
devuelve NULL;
}
Como ves es bastante fácil añadir soporte asíncrono para las funciones del motor. Sigamos adelante y hagamos lo mismo para fs_store:
const void *cookie,
item* item,
uint64_t *cas,
Operación ENGINE_STORE_OPERATION,
uint16_t vbucket)
{
struct fs_engine *engine = (struct fs_engine *)handle;
/* Comprueba si este es el callback de un ewouldblock anterior */
void *res = engine->sapi.cookie->get_engine_specific(cookie);
if (res != NULL) {
*cas = 0;
engine->sapi.cookie->store_engine_specific(cookie, NULL);
return ÉXITO_INGENIO;
}
/* Configurar la estructura de devolución de llamada */
struct tarea *tarea = calloc(1, sizeof(*tarea));
if (task == NULL) {
return ENGINE_ENOMEM;
}
task->engine = (struct fs_engine *)handle;
tarea->cookie = cookie;
task->callback = fs_store_callback;
task->data.store.item = item;
task->data.store.operation = operación;
ENGINE_ERROR_CODE ret = ejecutar(tarea);
if (ret != ENGINE_EWOULDBLOCK) {
liberar(tarea);
}
devolver ret;
}
Y fs_store_callback se parece a lo siguiente:
{
struct task *task = arg;
struct fs_item* it = tarea->datos.store.item;
char fname[it->nkey + 1];
memcpy(fname, it->clave, it->clave);
fname[it->nkey] = '
FILE *fp = fopen(fname, "w");
if (fp == NULL) {
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
MOTOR_NO_ALMACENADO);
liberar(tarea);
devuelve NULL;
}
size_t nw = fwrite(it->data, 1, it->ndata, fp);
fclose(fp);
if (nw != it->ndata) {
remove(fname);
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
MOTOR_NO_ALMACENADO);
liberar(tarea);
devuelve NULL;
}
task->motor->sapi.cookie->store_engine_specific(task->cookie, it);
tarea->motor->sapi.cookie->notify_io_complete(tarea->cookie,
MOTOR_ÉXITO);
devuelve NULL;
}
Si observas detenidamente el código anterior verás que seguimos sin diferenciar entre add/set/reemplazarpero lo arreglaremos en la próxima sesión.