Tenho o prazer de anunciar a primeira prévia para desenvolvedores da próxima grande iteração de nossa integração com o Kafka. Essa versão é baseada em uma nova biblioteca para DCP e oferece suporte à estrutura do Kafka Connect. Nesta postagem, mostrarei como ela pode ser integrada para retransmitir dados do Couchbase para o HDFS.
Aqui, mostrarei as etapas para as distribuições Linux CentOS/Fedora. As etapas em outros sistemas operacionais serão semelhantes. Primeiro, instale o Confluent Platform (http://docs.confluent.io/3.0.0/installation.html#rpm-packages-via-yum) e baixe o arquivo zip do Couchbase com a integração do conector http://packages.couchbase.com/clients/kafka/3.0.0-DP1/kafka-connect-couchbase-3.0.0-DP1.zip
Para registrar o conector, basta extrair o conteúdo para o caminho de classe padrão, por exemplo, no CentOS (Fedora) é /usr/share/java
:
1 2 |
descompactar kafka-conectar-couchbase-3.0.0-DP1.zíper sudo cp -a kafka-conectar-couchbase-3.0.0-DP1/compartilhar /usr/ |
Agora, execute o Confluent Control Center e todos os serviços dependentes. Leia mais sobre o que esses comandos fazem em Confluent's guia de início rápido
1 2 3 4 5 6 |
sudo tratador de zoológico-servidor-iniciar /etc/kafka/tratador de zoológico.propriedades sudo kafka-servidor-iniciar /etc/kafka/servidor.propriedades sudo esquema-registro-iniciar /etc/esquema-registro/esquema-registro.propriedades sudo conectar-distribuído /etc/kafka/conectar-distribuído.propriedades sudo controle-centro-iniciar /etc/confluente-controle-centro/controle-centro.propriedades |
Neste ponto, tudo está pronto para configurar o link para transferir documentos do Couchbase para o HDFS usando o Kafka Connect. Presumimos que você esteja executando o Couchbase Server em http://127.0.0.1:8091/
e o Confluent Control Center em http://127.0.0.1:9021/
. Para este exemplo, certifique-se de que você tenha o amostra de viagem
carregado no Couchbase. Se você não o definiu ao configurar o cluster, poderá adicioná-lo por meio da parte de configurações da interface do usuário da Web.
Depois que todos esses pré-requisitos forem atendidos, navegue até a seção "Kafka Connect" no Confluent Control Center. Selecione "New source" e, em seguida, selecione "CouchbaseSourceConnector" como uma classe de conector e preencha as configurações para que o JSON final seja semelhante a:
1 2 3 4 5 6 7 |
{ "connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector", "name" (nome): "fonte de viagem", "connection.bucket": "amostra de viagem", "connection.cluster_address": "127.0.0.1", "topic.name": "tópico de viagem" } |
Depois que você salvar a conexão Source, o daemon Connect começará a receber mutações e armazená-las no tópico Kafka especificado. Para demonstrar um pipeline completo, vamos configurar uma conexão Sink para obter dados do Kafka. Para isso, vá para a guia "Sinks" e clique no botão "New sink". Ele deverá solicitar um tópico onde os dados interessantes estão armazenados; digite "travel-topic". Em seguida, selecione "HdfsSinkConnector" e preencha as configurações para que a configuração JSON tenha a seguinte aparência (supondo que o nó de nome do HDFS esteja escutando em hdfs://127.0.0.1:8020/
):
1 2 3 4 5 6 7 8 9 10 |
{ "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "name" (nome): "hdfs-travel-sink", "flush.size": "10", "partitioner.class": "io.confluent.connect.hdfs.partitioner.FieldPartitioner", "partition.field.name": "partição", "hdfs.url": "hdfs://127.0.0.1:8020", "tópicos": "tópico de viagem" } |
Depois que a conexão do Sink estiver configurada, você verá os dados aparecendo no HDFS em /topics/travel-topic/
com o diretório de tópicos padrão. Vamos inspecionar um deles:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
$ hdfs dfs -fs hdfs://localhost:8020 -cat /topics/travel-topic/partition=89/travel-topic+0+0000000101+0000000101.avro | avropipe / [] /0 {} /0/partição 89 /0/chave "route_28879" /0/expiração 0 /0/bandeiras 33554438 /0/cas 1471633063247347712 /0/lockTime 0 /0/porSeqno 1 /0/revSeqno 1 /0/conteúdo "{"id":28879,"tipo":"rota","companhia aérea":"G4","companhia aérea":"companhia aérea_35","aeroporto de origem":"AZA","aeroporto de destino":"FWA","paradas":0,"equipamentos":"319","cronograma":[{"dia":0,"utc":"01:59:00","voo":"G4097"},{"dia":1,"utc":"09:30:00","voo":"G4697"},{"dia":1,"utc":"09:50:00","voo":"G4879"},{"dia":1,"utc":"07:44:00","voo":"G4310"},{"dia":1,"utc":"01:23:00","voo":"G4226"},{"dia":2,"utc":"19:58:00","voo":"G4921"},{"dia":2,"utc":"09:49:00","voo":"G4376"},{"dia":2,"utc":"17:57:00","voo":"G4446"},{"dia":2,"utc":"21:06:00","voo":"G4032"},{"dia":3,"utc":"17:05:00","voo":"G4198"},{"dia":3,"utc":"12:21:00","voo":"G4098"},{"dia":3,"utc":"19:31:00","voo":"G4571"},{"dia":4,"utc":"05:27:00","voo":"G4001"},{"dia":4,"utc":"07:03:00","voo":"G4023"},{"dia":4,"utc":"16:50:00","voo":"G4631"},{"dia":5,"utc":"18:13:00","voo":"G4757"},{"dia":6,"utc":"20:35:00","voo":"G4157"},{"dia":6,"utc":"21:52:00","voo":"G4582"},{"dia":6,"utc":"00:55:00","voo":"G4348"},{"dia":6,"utc":"06:01:00","voo":"G4731"}],"distância":2483.859992489083}" |
Esse é o meu exemplo de execução rápida! O cliente DCP ainda está em desenvolvimento ativo e tem alguns recursos adicionais sendo adicionados para lidar com várias mudanças de topologia e cenários de falha. As próximas atualizações do nosso conector Kafka incluirão essas atualizações. Também devo observar brevemente que A interface do cliente DCP do Couchbase&apos deve ser considerada volátil por enquanto. Nós o usamos em vários projetos, mas você só deve usá-lo diretamente por sua própria conta e risco.
O código-fonte do conector está em https://github.com/couchbaselabs/kafka-connect-couchbase. O rastreador de problemas está em https://issues.couchbase.com/projects/KAFKACe fique à vontade para fazer qualquer pergunta em https://www.couchbase.com/forums/.