Estoy trabajando en una interfaz Twisted para el cliente Couchbase (https://github.com/couchbase/couchbase-python-client). El enlace apunta a la interfaz síncrona. La rama trenzada experimental se encuentra en https://github.com/couchbaselabs/couchbase-twisted-client
Para explicar cómo funciona el cliente Twisted, explicaré un poco cómo funciona internamente la extensión C.
SDK interno básico
libcouchbase: La biblioteca multiplataforma
El propio cliente síncrono es una extensión de C que utiliza libcouchbase (http://github.com/couchbase/libcouchbase). libcouchbase actúa como la capa de plataforma común para una serie de otros clientes Couchbase escritos en una variedad de otros lenguajes (como Ruby, PHP, node.js, Perl, y varios otros).
La extensión Python es una extensión C que utiliza la API C de Python (por qué no se utilizan ffi, Cython, etc. es otro tema de discusión, pero sólo diré que he estado muy contento con la API C de Python y con la estabilidad general y el mantenimiento de la extensión hasta ahora).
El funcionamiento de la extensión es bastante complejo. La API expuesta que se muestra a Python ofrece una variedad de métodos de acceso a datos (p. ej. configure, consiga, añada, añadir, borrarasí como variantes que pueden operar sobre múltiples valores, como set_multi, get_multi etc.). Estos métodos llaman directamente a la extensión Python, que realiza algunas validaciones de argumentos y luego programa estas operaciones en libcouchbase (utilizando, por ejemplo. lcb_set(), lcb_get()etc.). Una vez programadas estas operaciones, la extensión llama a lcb_wait() que se bloquea hasta que los resultados de las operaciones programadas se han completado (o han fallado con un error).
El código general es el siguiente:
lcb_get_cmd_t *cmdp = &comando;
mando.v.v0.clave = clave; /* donde 'key' se extrae de algún PyObject */
mando.v.v0.nkey = nkey; /* longitud de la clave */
mando.v.v0.exptime = exptime; /* opcional, extraído del parámetro */
errstatus = lcb_get(instancia, /* asa de biblioteca */,
galleta /* puntero opaco */,
&cmdp, /* comando "list" */
1 /* número de elementos de la lista */);
/** comprobar 'errstatus' */
errstatus = lcb_wait(instancia); /* bloque */
lcb_wait ejecuta un bucle de eventos interno implementado por libcouchbase, y eventualmente llama a los callbacks de resultado:
Los resultados reales no se entregan desde libcouchbase como un valor de retorno, sino que se pasan como una devolución de llamada. La idea detrás del uso de callbacks es que la estructura de resultados contiene punteros a buffers de red. Si los resultados fueran realmente "devueltos" por la librería, sería necesario que la librería copiara los buffers de red temporales a la memoria asignada y que el usuario los liberara, lo que afectaría al rendimiento. La llamada de retorno del resultado es algo parecido a esto:
{
char *clave = resp->v.v0.clave;
char *valor = resp->v.v0.bytes;
printf("Resultado obtenido para %.*s: %.*sn“, resp->v.v0.nkey, clave, resp->v.v0.nbytes, valor);
printf("Banderas: %lu, CAS %llun“, resp->v.v0.banderas, resp->v.v0.cas);
}
(En aras de la claridad, se han omitido la const-corrección y la fundición).
El usuario de la librería C libcouchbase debe instalar callbacks para ser invocados para operaciones específicas. Estos callbacks reciben la información de respuesta normal (es decir, respuesta, estado de error, etc.) así como un puntero opaco por operación (el 'galleta') que pasa el usuario (en este caso la extensión C). Este puntero está controlado y gestionado por el usuario, y generalmente contiene datos específicos del contexto/aplicación que el usuario puede utilizar para asociar una respuesta a una solicitud determinada.
Interacción con libcouchbase y CPython
Desde la perspectiva de la API pública expuesta por la extensión, cada operación da lugar a un 'Resultado(o una subclase adecuada de la misma) que se devuelve al usuario. La 'Resultadocontiene información sobre el resultado de la operación y los metadatos asociados. Al interactuar con la biblioteca C, el objeto Result es una clase de extensión que hereda de PyObject y se asigna antes de cada petición a la biblioteca C. A continuación, se pasa como 'galletaa la operación programada. Una vez que la biblioteca C invoca la llamada de retorno instalada, la llamada de retorno rellena el campo Resultado con la información pertinente. Por último, cuando el bucle de eventos sale del Resultado al usuario.
Una aplicación simplista sería la siguiente:
Conexión_get(pycbc_Conexión *auto, PyObject *args)
{
const char *clave;
si (!PyArg_ParseTuple("s", &clave, args) {
devolver NULL;
}
lcb_get_cmd_t comando, *comandop = &comando;
comando->v.v0.clave = clave;
comando->v.v0.nkey = strlen(clave);
pycbc_ValorResultado *vresult = PyObject_CallObject(&pycbc_ValueResultType, NULL);
lcb_get(auto->instancia, vresult, &comandop, 1);
lcb_wait(auto->instancia);
devolver vresult;
}
lcb_wait() se bloqueará hasta que llegue el resultado y se invoque su callback. La llamada de retorno sería algo como esto:
{
pycbc_ValorResultado *resultado = (pycbc_ValorResultado *)galleta;
resultado->valor = PyString_FromStringAndSize(resp->v.v0.bytes, resp->v.v0.nbytes);
resultado->rc = err;
resultado->banderas = resp->v.v0.banderas;
resultado->cas = resp->v.v0.cas;
}
[Tenga en cuenta que debido a que el SDK soporta cosas como transcodificadores y diferentes formatos de valores, el código real de conversión hacia y desde claves y valores es significativamente más complejo que esto, pero no es relevante para la arquitectura real que se está discutiendo aquí].
Para ser más específicos, el Resultado sólo se asigna desde dentro de la llamada de retorno, y lo que realmente se asigna antes de la secuencia de espera es un objeto MultiResultado que es un objeto dic subclase. Este MultiResultado extiende dict añadiendo algunos parámetros internos específicos; para cada llamada de retorno invocada durante la secuencia de espera, se crea un nuevo objeto Resultado y se inserta en este MultiResultado siendo la clave la del elemento de datos al que se accede. Por lo tanto, el código en realidad se parece a esto:
pycbc_MultiResultado *mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
lcb_get(auto->instancia, mres, &comandop, 1);
/* … */
y en la devolución de llamada
pycbc_MultiResultado *mres = (pycbc_MultiResultado *)galleta;
pycbc_ValorResultado *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);
/* .. asignar miembros de vres */
PyObject *keyobj = PyString_FromStringAndSize(resp->v.v0.clave, resp->v.v0.nkey);
vres->clave = keyobj;
PyDict_SetItem(&mres->dic, clave, vres);
Py_DECREF(vres);
/* … */
Este diseño interno, aunque complejo, permite una reutilización muy eficiente del código (y por tanto de las pruebas y la depuración) y funciona bastante bien. Además, esta abstracción se implementa íntegramente en C puro, por lo que la sobrecarga es mínima.
Implementación de E/S asíncronas
Integración del bucle de eventos
La biblioteca C antes mencionada ofrece una interfaz extensible de complementos de E/S. Anteriormente mencioné que la librería C usa su propio bucle de eventos interno, pero eso no es del todo cierto; más bien lo que hace la librería es exponer una API de complemento de E/S que permite implementar sus propias funciones para programar primitivas asíncronas comunes como vigilantes de socket y eventos de tiempo de espera. Debido a que la biblioteca en sí es totalmente asíncrona, permite la integración con el flujo de programas no bloqueantes.
Esta API puede consultarse aquí: https://github.com/couchbase/libcouchbase/blob/62300167f6f7d0f84ee6ac2162591805dfbf163d/include/libcouchbase/types.h#L196-221
Por lo tanto, con el fin de integrarse con el bucle de eventos del reactor de Twisted, tuve que escribir un 'plugin IO' que implementaría las primitivas asíncronas comunes utilizando los métodos del reactor de Twisted; el resultado se puede ver aquí, y por sí mismo es, creo, bastante sencillo: https://github.com/couchbaselabs/couchbase-twisted-client/blob/twisted/txcouchbase/iops.py
[Tenga en cuenta que elis_syncestá ahí para probar la funcionalidad básica en un patrón síncrono utilizando el reactor de Twisted como backend del bucle de eventos. Este no es el valor por defecto].
Con el fin de exponer esta interfaz IO Plugin a Python, hice clases envoltorio de Python para estas primitivas; así que tenemos IOEvent, TimerEventetc. La idea básica es que estos objetos contienen punteros internos a los datos de llamada de retorno de C. Además, existe el objeto 'IOPSque gestiona uno o varios de estos objetos. La idea básica es que la biblioteca C llama (a través de la extensión) a la clase 'IOPSpasándole uno de los objetos Evento; solicitando algún tipo de modificación de la programación (es decir, vigilar, des vigilar, destruir, etc.). La página IOPS llama a la implementación del bucle de eventos (en este caso, el reactor) para programar el evento deseado, pasándole el objeto Event correspondiente. Cuando el evento está listo, uno de los eventos 'listo_*', que llama a C. Comprensiblemente, todo esto causa un cierto impacto en el rendimiento - pero con el beneficio de que el código es ahora capaz de interactuar de forma asíncrona en cualquier bucle de eventos de Python.
Poniéndolo todo junto, parece algo así:
Cuando libcouchbase crea un socket por primera vez, lo asocia con algún tipo de objeto IO Event, que se expone como un puntero:
Esto llama a nuestro código Python que se parece a esto:
crear_evento(lcb_io_opt_t *iobase)
{
pycbc_iops_t = (pycbc_iops_t *)iobase;
PyObject *fábrica_de_eventos = PyObject_GetAttrString(io->py_impl, "crear_evento");
devolver PyObject_CallObject(fábrica_de_eventos, NULL);
}
Esta 'event_factory' debe devolver una subclase de IOEvent, con algunos campos añadidos; puede tener un aspecto similar a
así:
clase MyIOEvent(IOEvent):
def doRead(auto):
auto.ready_r()
def doWrite(auto):
auto.listo_w()
clase IOPS(objeto):
def crear_evento(auto):
devolver MyIOEvent()
Ahora, cuando la biblioteca quiere recibir notificaciones cuando un determinado socket está disponible, hace algo como esto:
Que luego llama a esto:
actualizar_evento(lcb_io_opt_t iobase, void *evento, int sockfd, corto banderas, void (*devolución de llamada)(int, corto, void *), void *datos)
{
pycbc_io_opt_t *io = (pycbc_io_opt_t *)iobase;
pycbc_IOEvent *ev = (pycbc_IOEvent *)evento;
evento->fd = sockfd; /* almacenamiento para '.fileno()' */
evento->callback_info.devolución de llamada = devolución de llamada;
evento->callback_info.datos = datos;
PyObject *args = Py_BuildValue("(O,I)", ev, banderas);
PyObject *metanfetamina = PyObject_GetAttrString(io->py_impl, "actualizar_evento");
PyObject_CallObject(metanfetamina, args);
}
Que en Python podría parecerse a:
si banderas & READ_EVENT:
auto.reactor.addReader(evento)
El parámetro 'event' es un objeto devuelto por nuestro método anterior 'create_event', que devuelve una instancia de MyIOEvent, que contiene la implementación necesaria de doRead.
En algún momento en el futuro, el reactor detecta que el fileno() subyacente está disponible para lectura, y llama al método 'doRead()' - mostrado arriba. En nuestra implementación, doRead llama a 'ready_r()', que es un método implementado en C:
Evento_listo_r(pycbc_IOEvent *evento)
{
evento->callback_info.devolución de llamada(evento->fd, READ_EVENT, evento->callback_info.datos);
}
API de conexión futura/diferida
Para que la API de acceso a datos sea asíncrona, he añadido algunos parámetros privados en el objeto Connection que lo marcan como 'asíncrono' - básicamente establece un campo dentro del objeto Connection y asigna un campo 'IOPS'. Una vez programada cada operación, la extensión comprobará si el objeto Connection 'F_ASYNCestá activada. Si lo está, no llamará a lcb_wait(), pero devuelve un AsyncResult (que es una subclase de MultiResultado), en lugar de esperar el resultado. Este 'AsyncResultcontiene un objeto 'errbackydevolución de llamadaque se invocan cuando el resultado está listo.
Del mismo modo, en el código de devolución de llamada, si la conexión 'F_ASYNCcada vez que se reciba un resultado, se invocará a la función AsyncResult.callback o AsyncResult.errback correspondiente (en función del éxito o el fracaso).
Lo bueno de la estructura interna es que se han necesitado muy pocas modificaciones para permitir el funcionamiento asíncrono; y, por tanto, toda la estabilidad de la API de sincronización se añade a la nueva interfaz asíncrona.
Nuestra sencilla implementación de 'get' tiene ahora este aspecto en C:
consiga(pycbc_Conexión *auto, PyObject *args)
{
/* Valida los argumentos normalmente */
…
pycbc_MultiResultado *mres;
si (auto->banderas & F_ASYNC) {
mres = PyObject_CallObject(&pycbc_AsyncResultType, NULL);
} si no {
mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
}
/** Validar argumentos, etc. */
…
err = lcb_get(auto->instancia, mres, &comandop, 1);
si (auto->banderas & F_ASYNC) {
lcb_wait(auto->instancia);
}
devolver mres;
}
Y lo mismo en la devolución de llamada;
get_callback(lcb_t instancia, const void *galleta, lcb_error_t err, const lcb_get_resp_t *resp)
{
pycbc_MultiResultado *mres = galleta;
pycbc_ValorResultado *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);
/** Configurar el valor resultante, e insertar en el diccionario ... */
…
si (mres->padre->banderas & F_ASYNC) {
/* es un AsyncResult, que es una subclase */
PyObject_CallObject( ((pycbc_AsyncResultado *)mres)->devolución de llamada, NULL);
Py_DECREF(mres); /* no lo devolvemos */
}
}
Un nuevo 'txcouchbaseque contiene su propio paquete Conexión clase. Este Conexión durante la construcción establece estas banderas internas. Además, para cada operación, la clase AsyncResult se envuelve dentro de un callback con el equivalente a la siguiente construcción:
async_res = super(Conexión, auto).consiga(*args, **kwargs)
d = Aplazado()
async_res.devolución de llamada = d.devolución de llamada
async_res.errback = d.errback
devolver d
Una vez que el resultado está listo, se invoca la llamada de retorno con un comando MultiResultado o Resultado (dependiendo de si se ha realizado una variante única o múltiple de la operación en la API).
Esto es impresionante Mark. Mi equipo utiliza couchbase y twisted (con autobahn) ampliamente, vamos a probarlo en las próximas semanas y proporcionar retroalimentación.
¡Buen trabajo!