Estou trabalhando em uma interface Twisted para o cliente Couchbase (https://github.com/couchbase/couchbase-python-client). O link aponta para a interface síncrona. O ramo trançado experimental está em https://github.com/couchbaselabs/couchbase-twisted-client
Para explicar como o cliente Twisted funciona, explicarei um pouco sobre como a extensão C funciona internamente.
Fundamentos básicos do SDK
libcouchbase: A biblioteca multiplataforma
O cliente síncrono em si é uma extensão C que utiliza libcouchbase (http://github.com/couchbase/libcouchbase). A libcouchbase atua como a camada de plataforma comum para vários outros clientes do Couchbase escritos em várias outras linguagens (como Ruby, PHP, node.js, Perl e várias outras).
A extensão Python é uma extensão C que usa a API C do Python (o motivo pelo qual ffi, Cython etc. não são usados é outro assunto a ser discutido, mas direi apenas que estou muito satisfeito com a API C do Python e com a estabilidade geral e a manutenção da extensão até o momento).
A forma como a extensão funciona é bastante complexa. A API exposta mostrada ao Python oferece uma variedade de métodos de acesso a dados (por exemplo definir, obter, adicionar, anexar, excluirbem como variantes que podem operar em vários valores, como set_multi, get_multi etc.). Esses métodos chamam diretamente a extensão Python, que faz a validação de alguns argumentos e, em seguida, agenda essas operações na libcouchbase (usando, por exemplo lcb_set(), lcb_get()etc.). Depois que essas operações são programadas, o ramal chama lcb_wait() que bloqueia até que os resultados das operações programadas tenham sido concluídos (ou tenham falhado com um erro).
O código geral tem a seguinte aparência:
lcb_get_cmd_t *cmdp = &comando;
comando.v.v0.chave = chave; /* onde 'key' é extraído de algum PyObject */
comando.v.v0.nkey = nkey; /* comprimento da chave */
comando.v.v0.tempo de expiração = tempo de expiração; /* opcional, extraído do parâmetro */
errstatus = lcb_get(instância, /* identificador de biblioteca */,
biscoito /* ponteiro opaco */,
&cmdp, /* comando "list" */
1 /* número de itens na lista */);
/** verificar 'errstatus' */
errstatus = lcb_wait(instância); /* bloco */
lcb_wait executa um loop de evento interno implementado pela libcouchbase e, por fim, chama as callbacks de resultado:
Os resultados reais não são entregues pela libcouchbase como um valor de retorno, mas sim passados como uma chamada de retorno. A ideia por trás do uso de retornos de chamada é que a estrutura de resultados contém ponteiros para buffers de rede. Se os resultados fossem realmente "retornados" pela biblioteca, isso exigiria que a biblioteca copiasse os buffers de rede temporários para a memória alocada e que o usuário os liberasse, o que afetaria o desempenho. A chamada de retorno do resultado tem a seguinte aparência:
{
char *chave = resp->v.v0.chave;
char *valor = resp->v.v0.bytes;
printf("Obtive resultado para %.*s: %.*sn“, resp->v.v0.nkey, chave, resp->v.v0.nbytes, valor);
printf("Sinalizadores: %lu, CAS %llun“, resp->v.v0.bandeiras, resp->v.v0.cas);
}
(Correção de Const e elenco omitidos para fins de clareza).
O usuário da biblioteca C libcouchbase deve instalar retornos de chamada a serem invocados para operações específicas. Essas callbacks recebem as informações normais de resposta (ou seja, resposta, status de erro, etc.), bem como um ponteiro opaco por operação (o 'biscoito') que é passado pelo usuário (neste caso, a extensão C). Esse ponteiro é controlado e gerenciado pelo usuário e geralmente contém dados específicos do contexto/aplicativo que o usuário pode usar para associar uma resposta a uma determinada solicitação.
Interagindo com a libcouchbase e o CPython
Da perspectiva da API pública exposta pela extensão, cada operação resulta em um 'Resultado' (ou uma subclasse adequada da mesma) sendo retornada ao usuário. A instância 'Resultado' contém informações sobre o resultado da operação e todos os metadados associados. Ao interagir com a biblioteca C, o objeto Result é uma classe de extensão que herda de PyObject e é alocado antes de cada solicitação à biblioteca C. Em seguida, ele é passado como o parâmetro 'biscoito' para a operação programada. Quando a biblioteca C invoca o retorno de chamada instalado, o retorno de chamada preenche a variável Resultado com as informações relevantes. Finalmente, quando o loop de eventos sai, o objeto Resultado é então para o usuário.
Uma implementação simplista seria semelhante a esta:
Connection_get(pycbc_Connection *autônomo, PyObject *argumentos)
{
const char *chave;
se (!PyArg_ParseTuple("s", &chave, argumentos) {
retorno NULL;
}
lcb_get_cmd_t command, *comandop = &comando;
comando->v.v0.chave = chave;
comando->v.v0.nkey = strlen(chave);
pycbc_ValueResult *vresultado = PyObject_CallObject(&pycbc_ValueResultType, NULL);
lcb_get(autônomo->instância, vresultado, &comandop, 1);
lcb_wait(autônomo->instância);
retorno vresultado;
}
O lcb_wait() será bloqueado até que o resultado chegue e sua chamada de retorno seja invocada. O retorno de chamada seria algo parecido com isto:
{
pycbc_ValueResult *resultado = (pycbc_ValueResult *)biscoito;
resultado->valor = PyString_FromStringAndSize(resp->v.v0.bytes, resp->v.v0.nbytes);
resultado->rc = erro;
resultado->bandeiras = resp->v.v0.bandeiras;
resultado->cas = resp->v.v0.cas;
}
[Observe que, como o SDK oferece suporte a coisas como transcodificadores e diferentes formatos de valores, o código real de conversão de e para chaves e valores é significativamente mais complexo do que isso, mas não é relevante para a arquitetura real que está sendo discutida aqui.]
Para ser mais específico, o Resultado só é alocado a partir da chamada de retorno, e o que é realmente alocado antes da sequência de espera é um objeto MultiResultado que é um objeto ditado subclasse. Essa MultiResultado estende o dict adicionando alguns parâmetros internos específicos; para cada retorno de chamada invocado durante a sequência de espera, um novo objeto Resultado é alocado e inserido nesse objeto MultiResultado com a chave sendo a chave do item de dados que está sendo acessado. Assim, o código realmente se parece com o seguinte:
pycbc_MultiResult *mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
lcb_get(autônomo->instância, mres, &comandop, 1);
/* … */
e no retorno de chamada
pycbc_MultiResult *mres = (pycbc_MultiResult *)biscoito;
pycbc_ValueResult *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);
/* ... atribuir membros de vres */
PyObject *chaveiro = PyString_FromStringAndSize(resp->v.v0.chave, resp->v.v0.nkey);
vres->chave = chaveiro;
PyDict_SetItem(&mres->ditado, chave, vres);
Py_DECREF(vres);
/* … */
Esse design interno, embora complexo, permite uma reutilização de código muito eficiente (e, portanto, testes e depuração) e tem um desempenho muito bom. Essa abstração também é implementada inteiramente em C puro, o que torna a sobrecarga mínima.
Implementação de E/S assíncrona
Integração do loop de eventos
A biblioteca C mencionada anteriormente oferece uma interface de plug-in de E/S extensível. Anteriormente, mencionei que a biblioteca C usa seu próprio loop de eventos interno, mas isso não é totalmente verdade; em vez disso, o que a biblioteca faz é expor uma API de plug-in de E/S que permite implementar suas próprias funções para agendar primitivos assíncronos comuns, como socket watchers e eventos de tempo limite. Como a biblioteca em si é totalmente assíncrona, ela permite a integração com o fluxo de programa sem bloqueio.
Essa API pode ser vista aqui: https://github.com/couchbase/libcouchbase/blob/62300167f6f7d0f84ee6ac2162591805dfbf163d/include/libcouchbase/types.h#L196-221
Assim, para me integrar ao loop de eventos do reator do Twisted, tive que escrever um "plug-in de IO" que implementaria as primitivas assíncronas comuns usando os métodos do reator do Twisted; o resultado pode ser visto aqui e, por si só, acho que é bastante simples: https://github.com/couchbaselabs/couchbase-twisted-client/blob/twisted/txcouchbase/iops.py
[Observe que ois_sync' está lá apenas para testar a funcionalidade básica em um padrão síncrono usando o reactor do Twisted como backend do loop de eventos. Esse não é o padrão].
Para expor essa interface do plug-in de E/S ao Python, criei classes de wrapper do Python para essas primitivas; portanto, temos IOEvent, TimerEventetc. A ideia básica é que esses objetos contêm ponteiros internos para dados de retorno de chamada C. Além disso, há o objeto 'IOPS' que gerencia um ou mais desses objetos. A ideia básica é que a biblioteca C chama (por meio da extensão) a classe 'IOPS', passando a ele um dos objetos Event; solicitando algum tipo de modificação de agendamento (ou seja, assistir, cancelar a observação, destruir etc.). O objeto IOPS chama a implementação real do loop de eventos (nesse caso, o reactor) para agendar o evento desejado, passando a ele o objeto Event relevante. Quando o evento estiver pronto, um dos objetos de evento 'pronto_*Compreensivelmente, tudo isso causa um certo impacto no desempenho, mas com a vantagem de que o código agora é capaz de interagir de forma assíncrona em qualquer loop de eventos do Python.
Juntando tudo isso, fica mais ou menos assim:
Quando a libcouchbase cria um soquete pela primeira vez, ela o associa a algum tipo de objeto IO Event, que é exposto como um ponteiro:
Isso chama o nosso código Python, que tem a seguinte aparência:
create_event(lcb_io_opt_t *iobase)
{
pycbc_iops_t = (pycbc_iops_t *)iobase;
PyObject *event_factory = PyObject_GetAttrString(io->py_impl, "create_event" (criar evento));
retorno PyObject_CallObject(event_factory, NULL);
}
Esse 'event_factory' deve retornar uma subclasse de IOEvent, com alguns campos adicionais; ele pode ter a seguinte aparência
assim:
classe MyIOEvent(IOEvent):
def doRead(autônomo):
autônomo.pronto_r()
def doWrite(autônomo):
autônomo.pronto_w()
classe IOPS(objeto):
def create_event(autônomo):
retorno MyIOEvent()
Agora, quando a biblioteca quiser receber notificações quando um determinado soquete estiver disponível, ela fará algo parecido com isto:
O que então chama isso:
update_event(lcb_io_opt_t iobase, vazio *evento, int sockfd, curto bandeiras, vazio (*retorno de chamada)(int, curto, vazio *), vazio *dados)
{
pycbc_io_opt_t *io = (pycbc_io_opt_t *)iobase;
pycbc_IOEvent *ev = (pycbc_IOEvent *)evento;
evento->fd = sockfd; /* armazenamento para '.fileno()' */
evento->callback_info.retorno de chamada = retorno de chamada;
evento->callback_info.dados = dados;
PyObject *argumentos = Py_BuildValue("(O,I)", ev, bandeiras);
PyObject *metanfetamina = PyObject_GetAttrString(io->py_impl, "update_event");
PyObject_CallObject(metanfetamina, argumentos);
}
O que, em Python, pode ser parecido com:
se sinalizadores & READ_EVENT:
autônomo.reator.addReader(evento)
O parâmetro "event" é um objeto retornado pelo nosso método "create_event" anterior, que retorna uma instância de MyIOEvent, que contém a implementação necessária de doRead.
Em algum momento no futuro, o reator detecta que o arquivo subjacente fileno() está disponível para leitura e chama o método "doRead()", mostrado acima. Em nossa implementação, doRead chama 'ready_r()', que é um método implementado em C:
Event_ready_r(pycbc_IOEvent *evento)
{
evento->callback_info.retorno de chamada(evento->fd, READ_EVENT, evento->callback_info.dados);
}
API de conexão futura/diferida
Para tornar a API de acesso a dados real assíncrona, adicionei alguns parâmetros privados no objeto Connection que o marca como "assíncrono" - basicamente, ele define um campo dentro do objeto Connection e aloca um objeto 'IOPS' instância. Depois que cada operação tiver sido programada, a extensão verificará se o objeto de conexão 'F_ASYNC' está definido. Se estiver, ele não chamará lcb_wait(), mas retorna um Resultado assíncrono (que é uma subclasse de MultiResultado), em vez de aguardar o resultado. Esse 'Resultado assíncrono' contém um objeto 'errback' e 'retorno de chamada' que são invocadas quando o resultado está pronto.
Da mesma forma, no código de retorno de chamada, se a conexão 'F_ASYNC' for definido, toda vez que um resultado for recebido, ele invocará a função AsyncResult.callback ou AsyncResult.errback relevante (dependendo do sucesso ou da falha).
O melhor da estrutura interna é que poucas modificações foram necessárias para permitir a operação assíncrona e, portanto, toda a estabilidade da API de sincronização foi adicionada à nova interface assíncrona.
Nossa implementação simples de 'get' agora tem a seguinte aparência em C:
obter(pycbc_Connection *autônomo, PyObject *argumentos)
{
/* Faça a validação do argumento normalmente */
…
pycbc_MultiResult *mres;
se (autônomo->bandeiras & F_ASYNC) {
mres = PyObject_CallObject(&pycbc_AsyncResultType, NULL);
} mais {
mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
}
/** Validar argumentos, etc. */
…
erro = lcb_get(autônomo->instância, mres, &comandop, 1);
se (autônomo->bandeiras & F_ASYNC) {
lcb_wait(autônomo->instância);
}
retorno mres;
}
E o mesmo acontece no retorno de chamada;
get_callback(instância lcb_t, const vazio *biscoito, lcb_error_t err, const lcb_get_resp_t *resp)
{
pycbc_MultiResult *mres = biscoito;
pycbc_ValueResult *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);
/** Configurar o resultado do valor e inserir no dicionário ... */
…
se (mres->pai->bandeiras & F_ASYNC) {
/* é um AsyncResult, que é uma subclasse */
PyObject_CallObject( ((pycbc_AsyncResult *)mres)->retorno de chamada, NULL);
Py_DECREF(mres); /* não o estamos devolvendo */
}
}
Um novo 'txcouchbase' foi adicionado, o qual contém seu próprio pacote Conexão classe. Essa Conexão durante a construção define esses sinalizadores internos. Além disso, para cada operação, o sinalizador Resultado assíncrono é agrupado em um retorno de chamada com o equivalente à seguinte construção:
async_res = super(Conexão, autônomo).obter(*args, **kwargs)
d = Diferido()
async_res.retorno de chamada = d.retorno de chamada
async_res.errback = d.errback
retorno d
Quando o resultado estiver pronto, o retorno de chamada é chamado com um MultiResultado ou Resultado (dependendo do fato de uma variante única ou múltipla da operação ter sido executada na API).
Isso é fantástico, Mark. Minha equipe usa extensivamente o couchbase e o twisted (com autobahn), vamos experimentar nas próximas semanas e fornecer feedback.
Excelente trabalho!