Sem categoria

Kafka e Couchbase: Comece a usar o consumidor Kafka de amostra em 10 minutos

Introdução

O Couchbase Kafka Connector 1.2.0 acaba de ser lançado. Juntamente com as várias correções de bugs, há um novo código de amostra para um consumidor Kafka, além do produtor Kafka que estava disponível anteriormente. Para revisar rapidamente os termos:

  • Um produtor do Kafka grava dados no Kafka, portanto, é uma fonte de mensagens do ponto de vista do Kafka.
  • Na terminologia do Kafka, um consumidor é um processo que se inscreve em tópicos e, em seguida, faz algo com o feed de mensagens publicadas que são emitidas por um cluster do Kafka. É basicamente um sink.

Neste blog, você começará a trabalhar com um consumidor Kafka de amostra no estilo "Hello World!" que grava no Couchbase. Ao longo do caminho, você também obterá um ambiente sandbox com um broker Kafka e um servidor Couchbase de nó único para que possa executar e modificar o consumidor e o produtor de amostra.

Instalação de pré-requisitos

O amostras fazem parte do Conector Kafka do Couchbase árvore de origem. Para obtê-los, basta clonar o repositório inteiro:

Agora, vamos configurar seu ambiente de teste usando imagens pré-configuradas do Kafka e do Couchbase Server. Você precisa instalar o Vagrant, o VirtualBox e o Ansible para configurá-los localmente. Se você tiver esses serviços instalados em outro lugar, certifique-se de ajustar adequadamente os endereços de host ao longo deste guia.

Verificar as versões das dependências:

Você pode atribuir nomes legíveis por humanos às caixas usando o plug-in do Vagrant. Se você ainda não o tiver instalado, use o seguinte comando:

Agora você está pronto para provisionar os servidores e começar a trabalhar:

Observação: se um servidor falhar na instalação devido a tempos limite, tente novamente "vagrant up" após alguns minutos e talvez funcione.

Verifique se os hosts estão respondendo:

Se você navegar até o Couchbase Server de nó único configurado com credenciais Administrador/senha.

Criando as amostras

Para evitar problemas de classpath, use o maven para criar um arquivo JAR autônomo para cada aplicativo de amostra.

O aplicativo gerador é um aplicativo CLI mínimo. Ele usa o SDK Java do Couchbase para agrupar as linhas de entrada do STDIN em documentos JSON e os envia para o bucket "padrão" no Couchbase Server:

O produtor se conecta ao Couchbase Server e transmite todas as mutações para o Kafka. Esse aplicativo usa o projeto couchbase-kafkaconnector nos bastidores.

O consumidor é um consumidor típico do Kafka, que, por padrão, apenas envia qualquer mensagem recebida no tópico "default" para STDOUT.

Executando as amostras

Agora que você tem tudo preparado, é hora de executar todas as amostras. Você precisará de três sessões de shell diferentes, pois cada uma delas executa um processo até ser interrompida. Vamos supor que você esteja na sessão  /tmp/kafka-connector/samples diretório.

Primeiro, ligue o gerador:

Ele deve exibir as configurações de conexão e, em seguida, cair em um prompt de comando >. Você pode digitar qualquer coisa lá e verificar se ele está sendo criado corretamente, olhando na interface de administração do Couchbase Server:

Documentos do gerador no balde

Neste momento, você pode executar o exemplo do conector

Para cada linha digitada no gerador, você verá uma linha do produtor como esta:

O exemplo o grava logo antes de enviar a carga útil para o Kafka, na implementação da classe de filtro. Vamos verificar como o Kafka recebe essas mensagens.

Você pode continuar brincando com ele enquanto todos os três serviços estiverem em execução.

Desenvolvimento com o conector Kafka do Couchbase

Vamos seguir em frente e dar uma olhada no código. Todos os três aplicativos são bastante amigáveis para experimentos, por exemplo, o gerador cabe em apenas algumas linhas:

Basicamente, o gerador abre uma conexão com o bucket "default" em sua instância "couchbase1.vagrant" e grava suas mensagens em chaves aleatórias. Você pode estendê-lo para enviar outros tipos de eventos. Outra coisa que você pode tentar fazer é remover chaves.

Por padrão, o Conector do Couchbase para Kafka é executado no modo de servidor, no qual toma emprestado o thread ativo e escuta ativamente o Couchbase Server em busca de novos eventos. Há vários pontos em que você pode aplicar suas ideias ou alterações. O mais óbvio é o construtor de configuração, onde você não apenas especifica as credenciais e os endereços dos serviços aos quais está se conectando, mas também pode especificar várias classes de serializador e filtro.

O aplicativo de amostra implementa várias delas. A classe Filter é a mais simples:

Aqui você pode inserir as verificações personalizadas que desejar e, se passar() retornos  falsoo conector descarta a mensagem e não a envia para o Kafka.

O codificador padrão, que vem com a distribuição do conector, tenta representar cada mensagem como JSON, mas isso provavelmente não é o que você precisa, portanto, você pode aplicar e converter para Evento DCPE e retorna uma matriz de bytes, que será armazenada no Kafka. Neste exemplo, apenas convertemos os eventos em sua representação de cadeia de caracteres.

Uma configuração mais avançada é StateSerializer interface. Ao implementá-la, você pode controlar como a biblioteca rastreará os cursores de fluxo (ou seja, os números de sequência de cada partição dentro do Couchbase Server) e se ela será retomada após a reinicialização do conector. Há uma implementação do serializador de estado do Zookeeper na distribuição. Aqui na amostra, implementamos NullStateSerializer que não persiste em nada, mas mostra uma implementação mínima.

O último componente de seu cluster de demonstração é o consumidor Kafka AbstractConsumerque é uma instância bastante típica de um consumidor. Ele consiste em duas partes: A primeira, que implementa o bootstrap e o posicionamento no tópico do Kafka, e PrintConsumerque carrega sua "lógica de negócios", ou simplesmente emite todas as mensagens pelas quais passa AbstractConsumer:

Como nos outros exemplos aqui, você pode brincar com a modificação do consumidor de amostra. Você pode até mesmo fechar o circuito enviando tudo de volta ao Couchbase Server. O Kafka é um software distribuído, assim como o Couchbase Server, portanto, tenha isso em mente ao executar em seu próprio cluster e ajuste a função main() de acordo. Em nossa amostra, temos apenas uma única partição, a partição (0) no Kafka, de modo que o nosso principal tem a seguinte aparência:

É claro que, em um cluster de produção, você executará mais de uma partição.

Conclusão

Espero que isso o ajude a ter um bom começo com o Couchbase e o Kafka. Parabéns!

Compartilhe este artigo
Receba atualizações do blog do Couchbase em sua caixa de entrada
Esse campo é obrigatório.

Autor

Postado por Sergey Avseyev, engenheiro de SDK, Couchbase

Sergey Avseyev é engenheiro de SDK na Couchbase. Sergey Avseyev é responsável pelo desenvolvimento do conector Kafka e da biblioteca subjacente, que implementa o DCP, o protocolo de replicação do Couchbase. Também mantém o PHP SDK para o Couchbase.

Deixe um comentário

Pronto para começar a usar o Couchbase Capella?

Iniciar a construção

Confira nosso portal do desenvolvedor para explorar o NoSQL, procurar recursos e começar a usar os tutoriais.

Use o Capella gratuitamente

Comece a trabalhar com o Couchbase em apenas alguns cliques. O Capella DBaaS é a maneira mais fácil e rápida de começar.

Entre em contato

Deseja saber mais sobre as ofertas do Couchbase? Deixe-nos ajudar.