Sem categoria

Python SDK e Twisted

Estou trabalhando em uma interface Twisted para o cliente Couchbase (https://github.com/couchbase/couchbase-python-client). O link aponta para a interface síncrona. O ramo trançado experimental está em https://github.com/couchbaselabs/couchbase-twisted-client

Para explicar como o cliente Twisted funciona, explicarei um pouco sobre como a extensão C funciona internamente.

Fundamentos básicos do SDK

libcouchbase: A biblioteca multiplataforma

O cliente síncrono em si é uma extensão C que utiliza libcouchbase (http://github.com/couchbase/libcouchbase). A libcouchbase atua como a camada de plataforma comum para vários outros clientes do Couchbase escritos em várias outras linguagens (como Ruby, PHP, node.js, Perl e várias outras).

 

A extensão Python é uma extensão C que usa a API C do Python (o motivo pelo qual ffi, Cython etc. não são usados é outro assunto a ser discutido, mas direi apenas que estou muito satisfeito com a API C do Python e com a estabilidade geral e a manutenção da extensão até o momento).

 

A forma como a extensão funciona é bastante complexa. A API exposta mostrada ao Python oferece uma variedade de métodos de acesso a dados (por exemplo definir, obter, adicionar, anexar, excluirbem como variantes que podem operar em vários valores, como set_multi, get_multi etc.). Esses métodos chamam diretamente a extensão Python, que faz a validação de alguns argumentos e, em seguida, agenda essas operações na libcouchbase (usando, por exemplo lcb_set(), lcb_get()etc.). Depois que essas operações são programadas, o ramal chama lcb_wait() que bloqueia até que os resultados das operações programadas tenham sido concluídos (ou tenham falhado com um erro).

O código geral tem a seguinte aparência:

lcb_get_cmd_t command;
lcb_get_cmd_t *cmdp = &comando;
comando.v.v0.chave = chave; /* onde 'key' é extraído de algum PyObject */
comando.v.v0.nkey = nkey; /* comprimento da chave */
comando.v.v0.tempo de expiração = tempo de expiração; /* opcional, extraído do parâmetro */

errstatus = lcb_get(instância, /* identificador de biblioteca */,
                    biscoito /* ponteiro opaco */,
                    &cmdp, /* comando "list" */
                    1 /* número de itens na lista */);

/** verificar 'errstatus' */
errstatus = lcb_wait(instância); /* bloco */

 

lcb_wait executa um loop de evento interno implementado pela libcouchbase e, por fim, chama as callbacks de resultado:

Os resultados reais não são entregues pela libcouchbase como um valor de retorno, mas sim passados como uma chamada de retorno. A ideia por trás do uso de retornos de chamada é que a estrutura de resultados contém ponteiros para buffers de rede. Se os resultados fossem realmente "retornados" pela biblioteca, isso exigiria que a biblioteca copiasse os buffers de rede temporários para a memória alocada e que o usuário os liberasse, o que afetaria o desempenho. A chamada de retorno do resultado tem a seguinte aparência:

vazio get_callback(instância lcb_t, const vazio *biscoito, lcb_error_t status, const lcb_get_resp_t *resp)

{
    char *chave = resp->v.v0.chave;
    char *valor = resp->v.v0.bytes;
    printf("Obtive resultado para %.*s: %.*sn, resp->v.v0.nkey, chave, resp->v.v0.nbytes, valor);
    printf("Sinalizadores: %lu, CAS %llun, resp->v.v0.bandeiras, resp->v.v0.cas);
}

(Correção de Const e elenco omitidos para fins de clareza).

O usuário da biblioteca C libcouchbase deve instalar retornos de chamada a serem invocados para operações específicas. Essas callbacks recebem as informações normais de resposta (ou seja, resposta, status de erro, etc.), bem como um ponteiro opaco por operação (o 'biscoito') que é passado pelo usuário (neste caso, a extensão C). Esse ponteiro é controlado e gerenciado pelo usuário e geralmente contém dados específicos do contexto/aplicativo que o usuário pode usar para associar uma resposta a uma determinada solicitação.

Interagindo com a libcouchbase e o CPython

Da perspectiva da API pública exposta pela extensão, cada operação resulta em um 'Resultado' (ou uma subclasse adequada da mesma) sendo retornada ao usuário. A instância 'Resultado' contém informações sobre o resultado da operação e todos os metadados associados. Ao interagir com a biblioteca C, o objeto Result é uma classe de extensão que herda de PyObject e é alocado antes de cada solicitação à biblioteca C. Em seguida, ele é passado como o parâmetro 'biscoito' para a operação programada. Quando a biblioteca C invoca o retorno de chamada instalado, o retorno de chamada preenche a variável Resultado com as informações relevantes. Finalmente, quando o loop de eventos sai, o objeto Resultado é então para o usuário.

Uma implementação simplista seria semelhante a esta:

estático PyObject *
Connection_get(pycbc_Connection *autônomo, PyObject *argumentos)
{
    const char *chave;
    se (!PyArg_ParseTuple("s", &chave, argumentos) {
         retorno NULL;
    }

    lcb_get_cmd_t command, *comandop = &comando;
    comando->v.v0.chave = chave;
    comando->v.v0.nkey = strlen(chave);
    pycbc_ValueResult *vresultado = PyObject_CallObject(&pycbc_ValueResultType, NULL);
    lcb_get(autônomo->instância, vresultado, &comandop, 1);
    lcb_wait(autônomo->instância);
    retorno vresultado;
}

 

O lcb_wait() será bloqueado até que o resultado chegue e sua chamada de retorno seja invocada. O retorno de chamada seria algo parecido com isto:

vazio get_callback(instância lcb_t, const vazio *biscoito, lcb_error_t err, const lcb_get_resp_t *resp)
{

    pycbc_ValueResult *resultado = (pycbc_ValueResult *)biscoito;
    resultado->valor = PyString_FromStringAndSize(resp->v.v0.bytes, resp->v.v0.nbytes);
    resultado->rc = erro;
    resultado->bandeiras = resp->v.v0.bandeiras;
    resultado->cas = resp->v.v0.cas;
}

[Observe que, como o SDK oferece suporte a coisas como transcodificadores e diferentes formatos de valores, o código real de conversão de e para chaves e valores é significativamente mais complexo do que isso, mas não é relevante para a arquitetura real que está sendo discutida aqui.]

Para ser mais específico, o Resultado só é alocado a partir da chamada de retorno, e o que é realmente alocado antes da sequência de espera é um objeto MultiResultado que é um objeto ditado subclasse. Essa MultiResultado estende o dict adicionando alguns parâmetros internos específicos; para cada retorno de chamada invocado durante a sequência de espera, um novo objeto Resultado é alocado e inserido nesse objeto MultiResultado com a chave sendo a chave do item de dados que está sendo acessado. Assim, o código realmente se parece com o seguinte:

/* …. */
pycbc_MultiResult *mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
lcb_get(autônomo->instância, mres, &comandop, 1);
/* … */

e no retorno de chamada

/* … */
pycbc_MultiResult *mres = (pycbc_MultiResult *)biscoito;
pycbc_ValueResult *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);

/* ... atribuir membros de vres */

PyObject *chaveiro = PyString_FromStringAndSize(resp->v.v0.chave, resp->v.v0.nkey);
vres->chave = chaveiro;
PyDict_SetItem(&mres->ditado, chave, vres);
Py_DECREF(vres);
/* … */

Esse design interno, embora complexo, permite uma reutilização de código muito eficiente (e, portanto, testes e depuração) e tem um desempenho muito bom. Essa abstração também é implementada inteiramente em C puro, o que torna a sobrecarga mínima.

Implementação de E/S assíncrona

Integração do loop de eventos

A biblioteca C mencionada anteriormente oferece uma interface de plug-in de E/S extensível. Anteriormente, mencionei que a biblioteca C usa seu próprio loop de eventos interno, mas isso não é totalmente verdade; em vez disso, o que a biblioteca faz é expor uma API de plug-in de E/S que permite implementar suas próprias funções para agendar primitivos assíncronos comuns, como socket watchers e eventos de tempo limite. Como a biblioteca em si é totalmente assíncrona, ela permite a integração com o fluxo de programa sem bloqueio.

Essa API pode ser vista aqui: https://github.com/couchbase/libcouchbase/blob/62300167f6f7d0f84ee6ac2162591805dfbf163d/include/libcouchbase/types.h#L196-221

Assim, para me integrar ao loop de eventos do reator do Twisted, tive que escrever um "plug-in de IO" que implementaria as primitivas assíncronas comuns usando os métodos do reator do Twisted; o resultado pode ser visto aqui e, por si só, acho que é bastante simples: https://github.com/couchbaselabs/couchbase-twisted-client/blob/twisted/txcouchbase/iops.py

[Observe que ois_sync' está lá apenas para testar a funcionalidade básica em um padrão síncrono usando o reactor do Twisted como backend do loop de eventos. Esse não é o padrão].

Para expor essa interface do plug-in de E/S ao Python, criei classes de wrapper do Python para essas primitivas; portanto, temos IOEvent, TimerEventetc. A ideia básica é que esses objetos contêm ponteiros internos para dados de retorno de chamada C. Além disso, há o objeto 'IOPS' que gerencia um ou mais desses objetos. A ideia básica é que a biblioteca C chama (por meio da extensão) a classe 'IOPS', passando a ele um dos objetos Event; solicitando algum tipo de modificação de agendamento (ou seja, assistir, cancelar a observação, destruir etc.). O objeto IOPS chama a implementação real do loop de eventos (nesse caso, o reactor) para agendar o evento desejado, passando a ele o objeto Event relevante. Quando o evento estiver pronto, um dos objetos de evento 'pronto_*Compreensivelmente, tudo isso causa um certo impacto no desempenho, mas com a vantagem de que o código agora é capaz de interagir de forma assíncrona em qualquer loop de eventos do Python.

Juntando tudo isso, fica mais ou menos assim:

Quando a libcouchbase cria um soquete pela primeira vez, ela o associa a algum tipo de objeto IO Event, que é exposto como um ponteiro:

vazio *evento = instância->iops.create_event(instância->iops);

 

Isso chama o nosso código Python, que tem a seguinte aparência:

estático vazio *
create_event(lcb_io_opt_t *iobase)

{
    pycbc_iops_t = (pycbc_iops_t *)iobase;
    PyObject *event_factory = PyObject_GetAttrString(io->py_impl, "create_event" (criar evento));
    retorno PyObject_CallObject(event_factory, NULL);
}

Esse 'event_factory' deve retornar uma subclasse de IOEvent, com alguns campos adicionais; ele pode ter a seguinte aparência

assim:

de couchbase._libcouchbase importação IOEvent

classe MyIOEvent(IOEvent):
    def doRead(autônomo):
       autônomo.pronto_r()

    def doWrite(autônomo):
       autônomo.pronto_w()

classe IOPS(objeto):
    def create_event(autônomo):
       retorno MyIOEvent()

Agora, quando a biblioteca quiser receber notificações quando um determinado soquete estiver disponível, ela fará algo parecido com isto:

instância->iops->update_event(instância->iops, sockfd, LCB_READ_EVENT, algum_retorno, algum_ponteiro);

O que então chama isso:

estático vazio
update_event(lcb_io_opt_t iobase, vazio *evento, int sockfd, curto bandeiras, vazio (*retorno de chamada)(int, curto, vazio *), vazio *dados)
{

    pycbc_io_opt_t *io = (pycbc_io_opt_t *)iobase;
    pycbc_IOEvent *ev = (pycbc_IOEvent *)evento;

    evento->fd = sockfd; /* armazenamento para '.fileno()' */
    evento->callback_info.retorno de chamada = retorno de chamada;
    evento->callback_info.dados = dados;

    PyObject *argumentos = Py_BuildValue("(O,I)", ev, bandeiras);
    PyObject *metanfetamina = PyObject_GetAttrString(io->py_impl, "update_event");
    PyObject_CallObject(metanfetamina, argumentos);
}

O que, em Python, pode ser parecido com:

def update_event(autônomo, evento, bandeiras):
    se sinalizadores & READ_EVENT:
       autônomo.reator.addReader(evento)

O parâmetro "event" é um objeto retornado pelo nosso método "create_event" anterior, que retorna uma instância de MyIOEvent, que contém a implementação necessária de doRead.

Em algum momento no futuro, o reator detecta que o arquivo subjacente fileno() está disponível para leitura e chama o método "doRead()", mostrado acima. Em nossa implementação, doRead chama 'ready_r()', que é um método implementado em C:

estático vazio *
Event_ready_r(pycbc_IOEvent *evento)
{
    evento->callback_info.retorno de chamada(evento->fd, READ_EVENT, evento->callback_info.dados);
}

API de conexão futura/diferida

Para tornar a API de acesso a dados real assíncrona, adicionei alguns parâmetros privados no objeto Connection que o marca como "assíncrono" - basicamente, ele define um campo dentro do objeto Connection e aloca um objeto 'IOPS' instância. Depois que cada operação tiver sido programada, a extensão verificará se o objeto de conexão 'F_ASYNC' está definido. Se estiver, ele não chamará lcb_wait(), mas retorna um Resultado assíncrono (que é uma subclasse de MultiResultado), em vez de aguardar o resultado. Esse 'Resultado assíncrono' contém um objeto 'errback' e 'retorno de chamada' que são invocadas quando o resultado está pronto.

Da mesma forma, no código de retorno de chamada, se a conexão 'F_ASYNC' for definido, toda vez que um resultado for recebido, ele invocará a função AsyncResult.callback ou AsyncResult.errback relevante (dependendo do sucesso ou da falha).

O melhor da estrutura interna é que poucas modificações foram necessárias para permitir a operação assíncrona e, portanto, toda a estabilidade da API de sincronização foi adicionada à nova interface assíncrona.

Nossa implementação simples de 'get' agora tem a seguinte aparência em C:

estático PyObject *

obter(pycbc_Connection *autônomo, PyObject *argumentos)

{

  /* Faça a validação do argumento normalmente */
  …

  pycbc_MultiResult *mres;

  se (autônomo->bandeiras & F_ASYNC) {
    mres = PyObject_CallObject(&pycbc_AsyncResultType, NULL);
  } mais {
    mres = PyObject_CallObject(&pycbc_MultiResultType, NULL);
  }

  /** Validar argumentos, etc. */
  …

  erro = lcb_get(autônomo->instância, mres, &comandop, 1);

  se (autônomo->bandeiras & F_ASYNC) {
     lcb_wait(autônomo->instância);
  }

  retorno mres;
}

E o mesmo acontece no retorno de chamada;

estático vazio
get_callback(instância lcb_t, const vazio *biscoito, lcb_error_t err, const lcb_get_resp_t *resp)

{
  pycbc_MultiResult *mres = biscoito;
  pycbc_ValueResult *vres = PyObject_CallObject(&pycbc_ValueResultType, NULL);

  /** Configurar o resultado do valor e inserir no dicionário ... */
  …

  se (mres->pai->bandeiras & F_ASYNC) {
    /* é um AsyncResult, que é uma subclasse */
    PyObject_CallObject( ((pycbc_AsyncResult *)mres)->retorno de chamada, NULL);
    Py_DECREF(mres); /* não o estamos devolvendo */
  }
}

Um novo 'txcouchbase' foi adicionado, o qual contém seu próprio pacote Conexão classe. Essa Conexão durante a construção define esses sinalizadores internos. Além disso, para cada operação, o sinalizador Resultado assíncrono é agrupado em um retorno de chamada com o equivalente à seguinte construção:

def obter(autônomo, *args, **kwargs):
    async_res = super(Conexão, autônomo).obter(*args, **kwargs)
    d = Diferido()
    async_res.retorno de chamada = d.retorno de chamada
    async_res.errback = d.errback
    retorno d

Quando o resultado estiver pronto, o retorno de chamada é chamado com um MultiResultado ou Resultado (dependendo do fato de uma variante única ou múltipla da operação ter sido executada na API).

Compartilhe este artigo
Receba atualizações do blog do Couchbase em sua caixa de entrada
Esse campo é obrigatório.

Autor

Postado por Mark Nunberg, engenheiro de software, Couchbase

Mark Nunberg é um engenheiro de software que trabalha na Couchbase. Ele mantém a biblioteca do cliente em C (libcouchbase), bem como o cliente em Python. Ele também desenvolveu o cliente Perl (para uso em sua empresa anterior), o que o levou inicialmente a trabalhar no Couchbase. Antes de ingressar no Couchbase, ele trabalhou em sistemas de roteamento distribuídos e de alto desempenho em uma empresa de análise de comércio eletrônico. Mark estudou Linguística na Universidade Hebraica de Jerusalém.

2 Comentários

  1. Isso é fantástico, Mark. Minha equipe usa extensivamente o couchbase e o twisted (com autobahn), vamos experimentar nas próximas semanas e fornecer feedback.

  2. Excelente trabalho!

Deixe um comentário

Pronto para começar a usar o Couchbase Capella?

Iniciar a construção

Confira nosso portal do desenvolvedor para explorar o NoSQL, procurar recursos e começar a usar os tutoriais.

Use o Capella gratuitamente

Comece a trabalhar com o Couchbase em apenas alguns cliques. O Capella DBaaS é a maneira mais fácil e rápida de começar.

Entre em contato

Deseja saber mais sobre as ofertas do Couchbase? Deixe-nos ajudar.