Nesta segunda parte de "Por dentro do Java SDK", daremos uma olhada detalhada em como o SDK gerencia e agrupa soquetes para os vários nós e serviços. Embora não seja necessário seguir o exemplo, recomendo que você dê uma olhada na primeira postagem sobre bootstrapping também.
Observe que esta postagem foi escrita tendo em mente as versões 2.5.9 / 2.6.0 do Java SDK. As coisas podem mudar com o tempo, mas a abordagem geral deve permanecer praticamente a mesma.
No espírito dos modelos OSI e TCP, proponho um modelo de três camadas que representa a pilha de conexões dos SDKs:
1 2 3 4 5 6 7 |
+-----------------+ | Serviço Camada | +-----------------+ | Ponto final Camada | +-----------------+ | Canal Camada | +-----------------+ |
Os níveis mais altos são construídos sobre os níveis mais baixos, portanto, começaremos com a camada Channel (Canal) e subiremos na pilha.
A camada de canal
A camada de canal é o nível mais baixo em que o SDK lida com a rede e é construída sobre a excelente biblioteca de E/S totalmente assíncrona chamada Netty Somos usuários extensivos do Netty há anos e também contribuímos com patches, bem como com o codec de memcache de volta ao projeto.
Cada Netty Canal corresponde a um soquete e é multiplexado em cima de loops de eventos. Abordaremos o modelo de threading em uma postagem posterior do blog, mas, por enquanto, é importante saber que, em vez do modelo "um thread por soquete" da E/S de bloqueio tradicional, o Netty pega todos os soquetes abertos e os distribui em um punhado de loops de eventos. Ele faz isso de forma muito eficiente, portanto, não é de se admirar que o Netty seja usado em todo o setor para componentes de rede de alto desempenho e baixa latência.
Como um canal se preocupa apenas com a entrada e saída de bytes, precisamos de uma maneira de codificar e decodificar solicitações em nível de aplicativo (como uma consulta N1QL ou uma solicitação de obtenção de chave/valor) em sua representação binária adequada. No Netty, isso é feito adicionando manipuladores para o canal de distribuição. Todas as operações de gravação na rede descem pelo pipeline e as respostas do servidor voltam pelo pipeline (também chamadas de entrada e saída na terminologia Netty).
Alguns manipuladores são adicionados independentemente do serviço usado (como registro ou criptografia) e outros dependem do tipo de serviço (por exemplo, para uma resposta N1QL, temos analisadores de fluxo JSON personalizados para a estrutura da resposta).
Se você já se perguntou como obter uma saída de registro em nível de pacote durante o desenvolvimento ou a depuração (para produção, use tcpdump, wireshark ou similar), tudo o que precisa fazer é ativar o nível de registro TRACE em sua biblioteca de registro favorita e verá uma saída como esta:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
[cb-io-1-1] 2018-06-28 14:03:34 RASTREAR Gerenciador de registro:94 - [id: 0x41407638, L:/127.0.0.1:60923 - R:localhost/127.0.0.1:11210] ESCREVER: 243B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 80 1f 00 db 00 00 00 00 00 00 00 e5 00 00 00 00 |................| |00000010| 00 00 00 00 00 00 00 00 7b 22 61 22 3a 22 63 6f |........{"a":"co| |00000020| 75 63 68 62 61 73 65 2d 6a 61 76 61 2d 63 6c 69 |uchbase-java-cli| |00000030| 65 6e 74 2f 32 2e 36 2e 30 2d 53 4e 41 50 53 48 |ent/2.6.0-SNAPSH| |00000040| 4f 54 20 28 67 69 74 3a 20 32 2e 36 2e 30 2d 62 |OT (git: 2.6.0-b| |00000050| 65 74 61 2d 31 36 2d 67 35 63 65 30 38 62 30 2c |eta-16-g5ce08b0,| |00000060| 20 63 6f 72 65 3a 20 31 2e 36 2e 30 2d 62 65 74 | core: 1.6.0-bet| |00000070| 61 2d 33 33 2d 67 31 62 33 65 36 66 62 29 20 28 |a-33-g1b3e6fb) (|00000070| 61 2d 33 33 2d 67 31 62 33 65 36 66 62 29 20 28 |a-33-g1b3e6fb) (|00000070| 61 2d 33 33 33 2d 67 31 62 33 65 36 66 62 29 20 28 |a-33-g1b3e6fb) |00000080| 4d 61 63 20 4f 53 20 58 2f 31 30 2e 31 33 2e 34 |Mac OS X/10.13.4| |00000090| 20 78 38 36 5f 36 34 3b 20 4a 61 76 61 20 48 6f | x86_64; Java Ho| |000000a0| 74 53 70 6f 74 28 54 4d 29 20 36 34 2d 42 69 74 |tSpot(TM) 64-Bit| |000000b0| 20 53 65 72 76 65 72 20 56 4d 20 31 2e 38 2e 30 | Server VM 1.8.0| |000000c0| 5f 31 30 31 2d 62 31 33 29 22 2c 22 69 22 3a 22 |_101-b13)","i":"| |000000d0| 30 43 34 37 35 41 43 41 35 46 33 38 30 41 32 31 |0C475ACA5F380A21| |000000e0| 2f 30 30 30 30 30 30 30 30 30 30 34 31 34 30 37 36 33 |/000000004140763| |000000f0| 38 22 7d |8"} | +--------+-------------------------------------------------+----------------+ |
Observe o pequeno Gerenciador de registro lá em cima? Isso ocorre porque só adicionamos o manipulador de registro se o rastreamento estiver ativado no pipeline, para que você não pague a sobrecarga se não o estiver usando (o que ocorre na maioria das vezes):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
bootstrap = novo BootstrapAdapter(novo Bootstrap() // *snip* .opção(ChannelOption.ALOCADOR, alocador) .opção(ChannelOption.TCP_NODELAY, tcpNodelay) .opção(ChannelOption.CONNECT_TIMEOUT_MILLIS, env.socketConnectTimeout()) .manipulador(novo Inicializador de canal<Channel>() { @Override protegida vazio initChannel(Canal canal) lançamentos Exceção { CanalPipeline tubulação = canal.tubulação(); se (env.sslEnabled()) { tubulação.addLast(novo SslHandler(sslEngineFactory.obter())); } se (LOGGER.isTraceEnabled()) { tubulação.addLast(INSTÂNCIA DO MANIPULADOR DE REGISTRO); } customEndpointHandlers(tubulação); } })); |
Você também pode ver que, dependendo da configuração do ambiente, fazemos outros ajustes, como adicionar um manipulador de SSL/TLS ao pipeline ou configurar o nodelay do TCP e os tempos limite do soquete.
O customEndpointHandlers é substituído para cada serviço, aqui está o pipeline para a camada KV (ligeiramente simplificado):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
se (ambiente().keepAliveInterval() > 0) { tubulação.addLast(novo IdleStateHandler(ambiente().keepAliveInterval(), 0, 0, TimeUnit.MILISSEGUNDOS)); } tubulação .addLast(novo BinaryMemcacheClientCodec()) .addLast(novo BinaryMemcacheObjectAggregator(Inteiro.MAX_VALUE)); tubulação .addLast(novo Manipulador KeyValueFeatureHandler(contexto())) .addLast(novo Manipulador de KeyValueErrorMap()); se (!ambiente().certAuthEnabled()) { tubulação.addLast(novo KeyValueAuthHandler(nome de usuário(), senha(), ambiente().forceSaslPlain())); } tubulação .addLast(novo KeyValueSelectBucketHandler(balde())) .addLast(novo Manipulador de valor-chave(este, responseBuffer(), falso, verdadeiro)); |
Há muita coisa acontecendo aqui! Vamos analisar um por um:
- O IdleStateHandler é usado para acionar keepalives no nível do aplicativo.
- Os próximos dois manipuladores BinaryMemcacheClientCodec e BinaryMemcacheObjectAggregator lidam com a codificação de objetos de solicitação e resposta do memcache em suas representações de bytes e vice-versa.
- Manipulador KeyValueFeatureHandler , Manipulador de KeyValueErrorMap , KeyValueAuthHandler e KeyValueSelectBucketHandler todos realizam handshaking, autenticação, seleção de bucket e assim por diante durante a fase de conexão e se retiram do pipeline após a conclusão.
- Por fim, o Manipulador de valor-chave faz a maior parte do trabalho e "conhece" todos os diferentes tipos de solicitação que entram e saem do sistema.
Se quiser dar uma olhada em um diferente, aqui é o pipeline N1QL, por exemplo.
Antes de subirmos uma camada, há uma parte importante. O observável RxJava conclusão também acontece nessa camada. Depois que uma resposta é decodificada, ela é concluída diretamente no loop de eventos ou em um pool de threads (configurado por padrão).
É importante saber que, quando um canal é desativado (porque o soquete subjacente é fechado), todo o estado nesse nível desaparece. Em uma tentativa de reconexão, um novo canal é criado. Então, quem gerencia um canal? Vamos subir uma camada.
A camada de endpoint
O Ponto final é responsável por gerenciar o ciclo de vida de um canal, incluindo a inicialização, a reconexão e a desconexão. Você pode encontrar o código aqui.
Há sempre uma relação de 1:1 entre o endpoint e o canal que ele gerencia, mas se um canal for removido e um soquete precisar ser reconectado, o endpoint permanecerá o mesmo e receberá um novo internamente. O endpoint também é o local em que a solicitação é entregue aos loops de eventos (simplificado):
1 2 3 4 5 6 7 8 |
@Override público vazio enviar(final CouchbaseRequest solicitação) { se (canal.isActive() && canal.isWritable()) { canal.escrever(solicitação, canal.voidPromise()); } mais { responseBuffer.publishEvent(Gerenciador de respostas.TRADUTOR_DE_RESPOSTA, solicitação, solicitação.observável()); } } |
Se o nosso canal estiver ativo e for gravável, escreveremos a solicitação no pipeline; caso contrário, ela será enviada de volta e enfileirada novamente para outra tentativa.
Aqui está um aspecto muito importante do ponto de extremidade que deve ser lembrado: se um canal for fechado, o ponto de extremidade tentará se reconectar (com o backoff configurado), desde que seja explicitamente instruído a parar. Ele para quando o gerente do canal Ponto final chamadas desconectar nele, o que ocorrerá quando o respectivo serviço/nó não fizer mais parte da configuração. Portanto, ao final de um rebalanceamento ou durante um failover, o cliente receberá uma nova configuração de cluster a partir da qual ele infere que esse ponto de extremidade pode ser encerrado e o faz de acordo. Se, por qualquer motivo, houver um atraso entre a desconexão de um soquete e a propagação dessas informações, você poderá ver algumas tentativas de reconexão que acabarão parando.
Um endpoint é muito bom, mas mais é sempre melhor, certo? Portanto, vamos subir mais uma camada para descobrir como os endpoints são agrupados para criar pools de conexão sofisticados por nó e serviço.
A camada de serviço
O Serviço gerencia um ou mais endpoints por nó. Cada serviço é responsável apenas por um nó, portanto, por exemplo, se você tiver um cluster do Couchbase de 5 nós com apenas o serviço KV ativado em cada um deles, se inspecionar um heap dump, encontrará 5 instâncias do serviço Serviço KeyValue .
Em versões anteriores do cliente, você só podia configurar um número fixo de endpoints por serviço por meio de métodos como kvEndpoints , pontos de consulta e assim por diante. Devido a requisitos mais complexos, descontinuamos essa abordagem "fixa" com uma implementação avançada de pool de conexões. É por isso que, em vez de i.e. pontos de consulta agora você deve usar queryServiceConfig e equivalentes.
Aqui estão os pools padrão atuais por serviço nas versões 2.5.9 e 2.6.0:
- Serviço KeyValue : 1 ponto de extremidade por nó, fixo.
- Serviço de consulta : de 0 a 12 endpoints por nó, dinâmico.
- ViewService : de 0 a 12 endpoints por nó, dinâmico.
- AnalyticsService : de 0 a 12 endpoints por nó, dinâmico.
- Serviço de busca : de 0 a 12 endpoints por nó, dinâmico.
O motivo pelo qual o KV não é agrupado por padrão é que o handshaking da conexão é muito mais caro (lembre-se de todos os manipuladores no pipeline) e o padrão de tráfego geralmente é muito diferente dos serviços mais pesados baseados em consultas. A experiência de campo mostrou que o aumento do número de endpoints KV só faz sentido em cenários de "carga em massa" e em tráfego muito irregular, em que o "tubo" de um soquete é muito pequeno. Se isso não for avaliado adequadamente, também pode ser que a adição de mais soquetes à camada de KV possa degradar seu desempenho em vez de melhorá-lo - acho que mais nem sempre é melhor.
A lógica de agrupamento pode ser encontrada aqui se você estiver curioso, mas vale a pena examinar certas semânticas ali.
Durante a fase de conexão do serviço, ele garante que o número mínimo de pontos de extremidade seja estabelecido antecipadamente. Se o mínimo for igual ao máximo, o pooling dinâmico será efetivamente desativado e o código escolherá um dos endpoints para cada solicitação:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
sincronizado (epMutex) { int numToConnect = minEndpoints - pontos finais.tamanho(); se (numToConnect == 0) { LOGGER.depurar("Não são necessários pontos de extremidade para se conectar, ignorando."); retorno Observável.apenas(estado()); } para (int i = 0; i < numToConnect; i++) { Ponto final ponto final = endpointFactory.criar(nome do host, balde, nome de usuário, senha, porto, ctx); pontos finais.adicionar(ponto final); endpointStates.registro(ponto final, ponto final); } LOGGER.depurar(logIdent(nome do host, PooledService.este) + "O novo número de pontos de extremidade é {}", pontos finais.tamanho()); } |
Isso pode ser observado nos registros imediatamente durante o bootstrap:
1 2 |
[cb-cálculos-5] 2018-06-28 14:03:34 DEBUG Serviço:257 - [localhost][Serviço KeyValue]: Novo número de pontos finais é 1 [cb-cálculos-8] 2018-06-28 14:03:35 DEBUG Serviço:248 - [localhost][Serviço de consulta]: Não pontos finais necessário para conectar, pular. |
Quando uma solicitação chega, ela é despachada ou, se outro endpoint precisar ser criado (ainda há espaço no pool), isso também é tratado (de forma ligeiramente simplificada):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@Override público vazio enviar(final CouchbaseRequest solicitação) { Ponto final ponto final = pontos finais.tamanho() > 0 ? selectionStrategy.selecionar(solicitação, pontos finais) : nulo; se (ponto final == nulo) { se (fixedEndpoints || (pontos finais.tamanho() >= maxEndpoints)) { Ajudante de repetição.retryOrCancel(env, solicitação, responseBuffer); } mais { maybeOpenAndSend(solicitação); } } mais { ponto final.enviar(solicitação); } } |
Observe que, se não conseguirmos encontrar um endpoint adequado e o pool for fixo ou se tivermos atingido nosso limite máximo, a operação será agendada para nova tentativa, de forma muito semelhante à lógica do endpoint quando ele não estiver ativo ou gravável.
Nos serviços baseados em HTTP em pool, não queremos manter esses soquetes para sempre, portanto, você pode configurar um tempo ocioso (que é de 300s por padrão). Cada pool executa um cronômetro de inatividade que examina regularmente os pontos de extremidade para ver se eles ficaram ociosos por mais tempo do que o intervalo configurado e, se for o caso, desconecta-os. Observe que a lógica sempre garante que não fiquemos abaixo do número mínimo.
Erros comuns relacionados à conexão
Agora que você tem uma boa ideia de como o SDK lida com soquetes e os agrupa, vamos falar sobre alguns cenários de erro que podem ocorrer.
Solicitação de cancelamento
Vamos falar sobre o RequestCancelledException Primeiro.
Se você estiver executando uma operação e ela falhar com um RequestCancelledException geralmente há duas causas diferentes:
- A operação circulou dentro do cliente (sem ser enviada pela rede) por mais tempo do que o configurado. maxRequestLifetime .
- Uma solicitação foi gravada na rede, mas antes de recebermos uma resposta, o canal subjacente foi fechado.
Há outros motivos menos comuns (por exemplo, problemas durante a codificação de uma solicitação), mas, para fins deste blog, vamos nos concentrar na segunda causa.
Então, por que temos que cancelar a solicitação e não tentar novamente em outro soquete que ainda esteja ativo? O motivo é que não sabemos se a operação já causou um efeito colateral no servidor (por exemplo, uma mutação aplicada). Se tentássemos novamente operações não idempotentes, haveria efeitos estranhos que seriam difíceis de diagnosticar na prática. Em vez disso, informamos ao chamador que a solicitação falhou e, em seguida, cabe à lógica do aplicativo descobrir o que fazer em seguida. Se foi uma simples solicitação de obtenção e você ainda estiver no tempo limite, poderá tentar novamente por conta própria. Se for uma mutação, você precisará implementar mais alguma lógica para ler o documento e descobrir se ele foi aplicado ou se ele pode ser enviado novamente imediatamente. E sempre há a opção de propagar o erro de volta para o chamador de sua API. De qualquer forma, isso é previsível do lado do SDK e não causará mais nenhum dano em segundo plano.
Problemas com o Bootstrap
A outra fonte de erros que vale a pena conhecer são os problemas durante a fase de conexão do soquete. Normalmente, você encontrará erros descritivos nos logs que informam o que está acontecendo (por exemplo, credenciais erradas), mas há dois que podem ser um pouco mais difíceis de decifrar: O tempo limite de proteção da conexão e os erros de seleção de bucket durante o rebalanceamento.
Como você viu anteriormente, o pipeline do KV contém muitos manipuladores que trabalham com o servidor durante a inicialização para descobrir todos os tipos de configurações e negociar os recursos suportados. No momento em que escrevo, cada operação individual não tem um tempo limite individual, mas, em vez disso, o tempo limite de proteção da conexão é acionado se demorar mais do que o permitido para a fase de conexão em termos de orçamento total.
Portanto, se você vir o ConnectTimeoutException nos registros com a mensagem Conectar retorno de chamada fez não retorno, atingido proteção tempo limite. O que isso significa é que uma operação ou a soma de todas elas demorou mais do que o orçamento previsto e outra tentativa de reconexão será executada. Em geral, isso não é prejudicial, pois vamos nos reconectar, mas é uma boa indicação de que pode haver alguma lentidão na rede ou em algum outro ponto da pilha que deve ser analisado com mais cuidado. Uma boa próxima etapa seria iniciar wireshark / tcpdump e registre as fases de bootstrap para descobrir onde o tempo é gasto e, em seguida, passe para o lado do cliente ou do servidor, dependendo dos tempos registrados. Por padrão, o tempo limite de proteção é configurado como o valor socketConnectTimeout mais o connectCallbackGracePeriod que está definido para 2 segundos e pode ser ajustado por meio do com.couchbase.connectCallbackGracePeriod propriedade do sistema.
Uma das etapas durante a inicialização, desde que adicionamos suporte ao RBAC (controle de acesso baseado em função), é chamada de "select bucket" por meio do KeyValueSelectBucketHandler . Como há uma desconexão entre a autenticação e o acesso a um bucket, é possível que o cliente se conecte a um serviço de KV, mas o mecanismo de KV em si ainda não esteja pronto para atendê-lo. O cliente lidará com a situação e tentará novamente - e nenhum impacto em uma carga de trabalho real será observado - mas, como a higiene do registro também é uma preocupação, estamos atualmente aprimorando o algoritmo do SDK aqui. Se quiser, você pode acompanhar o progresso em JVMCBC-553.
Considerações finais
A esta altura, você já deve ter uma sólida compreensão de como o SDK gerencia seus soquetes subjacentes e os agrupa na camada de serviço. Se você quiser se aprofundar na base de código, comece com aqui e, em seguida, examine os respectivos namespaces para serviço e ponto final . Todos os manipuladores de canal do Netty estão abaixo do ponto final também.
Se você tiver outras dúvidas, comente abaixo! A próxima postagem discutirá o modelo geral de threading do SDK.