¡Bienvenido al nuevo conector kafka de Couchbase! Proporciona funcionalidad para dirigir el flujo de eventos desde Couchbase Server (3.0 o posterior) a Kafka. Todavía está en desarrollo, así que úsalo con cuidado y abre issues si te los encuentras. Su rastreador de problemas se encuentra en https://issues.couchbase.com/browse/KAFKAC. Muchas gracias a Shibi de PayPal que tuvo escribir otro conector basado en una interfaz más antiguainspiró ésta.
Conseguirlo
Puede encontrar el código fuente del proyecto en github aquí. Los avances para desarrolladores están disponibles a través de nuestro propio repositorio maven, los artefactos GA estarán disponibles en maven central. Aquí están las coordenadas:
- ID de grupo: com.couchbase.client
- Identificación del artefacto: kafka-connector
- Versión: 1.0.0-dp1
|
1 2 3 4 5 6 7 8 9 10 11 |
apply plugin: 'java' repositories { mavenCentral() maven { url { "https://files.couchbase.com/maven2" } } mavenLocal() } dependencies { compile(group: 'com.couchbase.client', name: 'kafka-connector', version: '1.0.0-dp1') } |
Utilización
El uso de la librería es bastante sencillo. Digamos que queremos recibir todas las modificaciones del servidor Couchbase y enviar a Kafka sólo el cuerpo del documento (por defecto el conector serializa el cuerpo del documento y los metadatos a JSON). Para conseguirlo necesitamos definir una clase filtro que permita sólo instancias de MutationMessage para pasar:
|
1 2 3 4 5 6 7 8 9 10 11 12 |
package example; import com.couchbase.client.core.message.dcp.MutationMessage; import com.couchbase.kafka.DCPEvent; import com.couchbase.kafka.filter.Filter; public class SampleFilter implements Filter { @Override public boolean pass(final DCPEvent dcpEvent) { return dcpEvent.message() instanceof MutationMessage; } } |
Y una clase codificadora, que toma el valor del documento y lo convierte en una matriz de bytes:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
package example; import com.couchbase.client.core.message.dcp.MutationMessage; import com.couchbase.client.deps.io.netty.util.CharsetUtil; import com.couchbase.kafka.DCPEvent; import com.couchbase.kafka.coder.AbstractEncoder; import kafka.utils.VerifiableProperties; public class SampleEncoder extends AbstractEncoder { public SampleEncoder(final VerifiableProperties properties) { super(properties); } @Override public byte[] toBytes(final DCPEvent dcpEvent) { MutationMessage message = (MutationMessage)dcpEvent.message(); return message.content().toString(CharsetUtil.UTF_8).getBytes(); } } |
Eso es esencialmente suficiente para configurar el puente Couchbase-Kafka:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
package example; import com.couchbase.kafka.CouchbaseKafkaConnector; import com.couchbase.kafka.CouchbaseKafkaEnvironment; import com.couchbase.kafka.DefaultCouchbaseKafkaEnvironment; public class Example { public static void main(String[] args) { DefaultCouchbaseKafkaEnvironment.Builder builder = (DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment .builder() .kafkaFilterClass("example.SampleFilter") .kafkaValueSerializerClass("example.SampleEncoder") .dcpEnabled(true); CouchbaseKafkaEnvironment env = builder.build(); CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create( env, "couchbase1.vagrant", "default", "", "kafka1.vagrant", "default"); connector.run(); } } |
En couchbase1.vagrant y kafka1.vagrant son las ubicaciones correspondientes de Couchbase Server y Kafka, que pueden configurarse fácilmente utilizando scripts de aprovisionamiento de env/ directorio. Navegue hasta allí y ejecute vagabundo arriba.