Bem-vindo ao novo conector kafka do Couchbase! Ele fornece funcionalidade para direcionar o fluxo de eventos do Couchbase Server (3.0 ou posterior) para o Kafka. Ele ainda está em desenvolvimento, portanto, use-o com cuidado e abra problemas caso os encontre. Seu rastreador de problemas está localizado em https://issues.couchbase.com/browse/KAFKAC. Muito obrigado a Shibi do PayPal que teve escreveu outro conector com base em uma interface mais antigaEla inspirou esta.
Obtendo
Você pode encontrar o código-fonte do projeto no github aqui. As prévias para desenvolvedores estão disponíveis em nosso próprio repositório maven, e os artefatos GA estarão disponíveis no maven central. Aqui estão as coordenadas:
- ID do grupo: com.couchbase.client
- ID do artefato: conector kafka
- Versão: 1.0.0-dp1
1 2 3 4 5 6 7 8 9 10 11 |
aplicar plug-in: 'java' repositórios { mavenCentral() mentor { url { "http://files.couchbase.com/maven2" } } mavenLocal() } dependências { compilar(grupo: 'com.couchbase.client', name: 'kafka-connector', version: '1.0.0-dp1') } |
Uso
O uso da biblioteca é bastante simples. Digamos que gostaríamos de receber todas as modificações do servidor Couchbase e enviar ao Kafka apenas o corpo do documento (por padrão, o conector serializa o corpo do documento e os metadados em JSON). Para isso, você precisa definir uma classe de filtro que permita apenas instâncias de MutationMessage para passar por ele:
1 2 3 4 5 6 7 8 9 10 11 12 |
pacote exemplo; importação com.couchbase.cliente.núcleo.mensagem.dcp.MutationMessage; importação com.couchbase.kafka.Evento DCPE; importação com.couchbase.kafka.filtro.Filtro; público classe Filtro de amostra implementa Filtro { @Substituir público booleano passe(final Evento DCPE dcpEvent) { retorno dcpEvent.mensagem() instância de MutationMessage; } } |
E uma classe codificadora, que recebe o valor do documento e o converte em uma matriz de bytes:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
pacote exemplo; importação com.couchbase.cliente.núcleo.mensagem.dcp.MutationMessage; importação com.couchbase.cliente.deps.io.líquido.util.CharsetUtil; importação com.couchbase.kafka.Evento DCPE; importação com.couchbase.kafka.codificador.AbstractEncoder; importação kafka.utilitários.VerifiableProperties; público classe SampleEncoder se estende AbstractEncoder { público SampleEncoder(final VerifiableProperties propriedades) { super(propriedades); } @Substituir público byte[] toBytes(final Evento DCPE dcpEvent) { MutationMessage mensagem = (MutationMessage)dcpEvent.mensagem(); retorno mensagem.conteúdo().toString(CharsetUtil.UTF_8).getBytes(); } } |
Isso é basicamente suficiente para configurar a ponte Couchbase-Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
pacote exemplo; importação com.couchbase.kafka.CouchbaseKafkaConnector; importação com.couchbase.kafka.CouchbaseKafkaEnvironment; importação com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment; público classe Exemplo { público estático vazio principal(Cordas[] argumentos) { DefaultCouchbaseKafkaEnvironment.Construtor construtor = (DefaultCouchbaseKafkaEnvironment.Construtor) DefaultCouchbaseKafkaEnvironment .construtor() .kafkaFilterClass("example.SampleFilter") .kafkaValueSerializerClass("example.SampleEncoder") .dcpEnabled(verdadeiro); CouchbaseKafkaEnvironment env = construtor.construir(); CouchbaseKafkaConnector conector = CouchbaseKafkaConnector.criar( env, "couchbase1.vagrant", "default", "", "kafka1.vagrant", "default"); conector.executar(); } } |
O couchbase1.vagrant e kafka1.vagrant Os endereços acima são os locais do Couchbase Server e do Kafka correspondentes, que podem ser facilmente configurados usando scripts de provisionamento do env/ diretório. Basta navegar até lá e executar vagabundo para cima.