Bem-vindo ao novo conector kafka do Couchbase! Ele fornece funcionalidade para direcionar o fluxo de eventos do Couchbase Server (3.0 ou posterior) para o Kafka. Ele ainda está em desenvolvimento, portanto, use-o com cuidado e abra problemas caso os encontre. Seu rastreador de problemas está localizado em https://issues.couchbase.com/browse/KAFKAC. Muito obrigado a Shibi do PayPal que teve escreveu outro conector com base em uma interface mais antigaEla inspirou esta.
Obtendo
Você pode encontrar o código-fonte do projeto no github aqui. As prévias para desenvolvedores estão disponíveis em nosso próprio repositório maven, e os artefatos GA estarão disponíveis no maven central. Aqui estão as coordenadas:
- ID do grupo: com.couchbase.client
- ID do artefato: conector kafka
- Versão: 1.0.0-dp1
|
1 2 3 4 5 6 7 8 9 10 11 |
apply plugin: 'java' repositories { mavenCentral() maven { url { "https://files.couchbase.com/maven2" } } mavenLocal() } dependencies { compile(group: 'com.couchbase.client', name: 'kafka-connector', version: '1.0.0-dp1') } |
Uso
O uso da biblioteca é bastante simples. Digamos que gostaríamos de receber todas as modificações do servidor Couchbase e enviar ao Kafka apenas o corpo do documento (por padrão, o conector serializa o corpo do documento e os metadados em JSON). Para isso, você precisa definir uma classe de filtro que permita apenas instâncias de MutationMessage para passar por ele:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
package example; import com.couchbase.client.core.message.dcp.MutationMessage; import com.couchbase.kafka.DCPEvent; import com.couchbase.kafka.filter.Filter; public class SampleFilter implements Filter { @Override public boolean pass(final DCPEvent dcpEvent) { return dcpEvent.message() instanceof MutationMessage; } } |
E uma classe codificadora, que recebe o valor do documento e o converte em uma matriz de bytes:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
package example; import com.couchbase.client.core.message.dcp.MutationMessage; import com.couchbase.client.deps.io.netty.util.CharsetUtil; import com.couchbase.kafka.DCPEvent; import com.couchbase.kafka.coder.AbstractEncoder; import kafka.utils.VerifiableProperties; public class SampleEncoder extends AbstractEncoder { public SampleEncoder(final VerifiableProperties properties) { super(properties); } @Override public byte[] toBytes(final DCPEvent dcpEvent) { MutationMessage message = (MutationMessage)dcpEvent.message(); return message.content().toString(CharsetUtil.UTF_8).getBytes(); } } |
Isso é basicamente suficiente para configurar a ponte Couchbase-Kafka:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
package example; import com.couchbase.kafka.CouchbaseKafkaConnector; import com.couchbase.kafka.CouchbaseKafkaEnvironment; import com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment; public class Example { public static void main(String[] args) { DefaultCouchbaseKafkaEnvironment.Builder builder = (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment .builder() .kafkaFilterClass("example.SampleFilter") .kafkaValueSerializerClass("example.SampleEncoder") .dcpEnabled(true); CouchbaseKafkaEnvironment env = builder.build(); CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create( env, "couchbase1.vagrant", "default", "", "kafka1.vagrant", "default"); connector.run(); } } |
O couchbase1.vagrant e kafka1.vagrant Os endereços acima são os locais do Couchbase Server e do Kafka correspondentes, que podem ser facilmente configurados usando scripts de provisionamento do env/ diretório. Basta navegar até lá e executar vagabundo para cima.