Introducción
Couchbase Kafka Connector 1.2.0 acaba de ser enviado. Junto con las diversas correcciones de errores, hay un nuevo código de ejemplo para un consumidor Kafka, además del productor Kafka que estaba disponible anteriormente. Para revisar rápidamente las condiciones:
- Un productor de Kafka escribe datos en Kafka, por lo que es una fuente de mensajes desde la perspectiva de Kafka.
- Un consumidor en la terminología de Kafka es un proceso que se suscribe a temas y luego hace algo con la alimentación de mensajes publicados que se emiten desde un clúster Kafka. Es básicamente un sumidero.
En este blog, pondrás en marcha un ejemplo de consumidor Kafka al estilo "¡Hola Mundo!" que escribe en Couchbase. En el camino, también obtendrás un entorno sandbox con un broker Kafka y un único nodo Couchbase Server para que puedas ejecutar y modificar el consumidor y productor de ejemplo.
Instalación Requisitos previos
En muestras forman parte del Conector Kafka de Couchbase árbol fuente. Para obtenerlos, basta con clonar todo el repositorio:
|
1 |
$ git clone git://github.com/couchbase/couchbase-kafka-connector.git /tmp/kafka-connector |
Ahora, vamos a configurar tu entorno de pruebas usando imágenes pre-configuradas de Kafka y Couchbase Server. Tienes que instalar Vagrant, VirtualBox y Ansible para configurarlos localmente. Si tienes estos servicios instalados en otro lugar, asegúrate de ajustar las direcciones de host a lo largo de esta guía apropiadamente.
|
1 |
$ cd /tmp/kafka-connector/env |
Compruebe las versiones de las dependencias:
|
1 2 3 |
$ ansible --version $ vboxmanage --version $ vagrant -v |
Puede asignar nombres legibles por humanos a las cajas utilizando el plugin para Vagrant. Si aún no lo tienes instalado, utiliza el siguiente comando:
|
1 |
$ vagrant plugin install vagrant-hostsupdater |
Ahora ya puedes aprovisionar los servidores y ponerte en marcha:
|
1 |
$ vagrant up |
Nota: Si un servidor no se instala debido a los tiempos de espera, vuelva a intentar "vagrant up" después de unos minutos y puede que funcione.
Compruebe que los hosts responden:
|
1 2 |
$ ping couchbase1.vagrant $ ping kafka1.vagrant |
Si navegas a deberías poder ver tu Couchbase Server de nodo único configurado con credenciales Administrador/contraseña.
Construcción de las muestras
Para evitar cualquier problema de classpath, utilice maven para crear un archivo JAR autónomo para cada aplicación de ejemplo.
La aplicación generadora es una aplicación CLI mínima. Utiliza el SDK Java de Couchbase para envolver líneas de entrada desde STDIN en documentos JSON y los envía al bucket "por defecto" en Couchbase Server:
|
1 2 |
$ cd /tmp/kafka-connector/samples/generator $ mvn assembly:assembly |
El productor se conecta a Couchbase Server y transmite todas las mutaciones a Kafka. Esta aplicación utiliza el proyecto couchbase-kafka-connector entre bastidores.
|
1 2 |
$ cd /tmp/kafka-connector/samples/producer $ mvn assembly:assembly |
El consumidor es un consumidor típico de Kafka, que por defecto envía cualquier mensaje entrante en el tema "default" a STDOUT.
|
1 2 3 |
$ cd /tmp/kafka-connector/samples/consumer $ mvn assembly:assembly |
Ejecución de las muestras
Ahora que lo tienes todo preparado, es el momento de ejecutar todas las muestras. Necesitarás tres sesiones shell diferentes porque cada una de ellas ejecuta un proceso hasta que se detiene. Asumiremos que estás en la sesión /tmp/conector-kafka/muestras directorio.
En primer lugar, pon en marcha el generador:
|
1 2 |
$ java -jar generator/target/kafka-samples-generator-1.0-SNAPSHOT-jar-with-dependencies.jar |
Debería mostrar la configuración de la conexión y, a continuación, pasar al símbolo del sistema >. Puede escribir cualquier cosa allí y verificar que se está creando correctamente mirando en la interfaz de usuario de administración del servidor Couchbase:

Documentos del generador en el cubo
|
1 2 3 4 5 |
... INFO: Opened bucket default > hello, kafka demo! >> key=key-5, value={"line":"hello, kafka demo!"} |
En este momento puedes ejecutar el ejemplo de conector
|
1 |
$ java -jar producer/target/kafka-samples-producer-1.0-SNAPSHOT-jar-with-dependencies.jar |
Por cada línea que escriba en el generador, verá una línea del productor como ésta:
|
1 |
RECEIVED: com.couchbase.kafka.DCPEvent@4e44cb88 |
La muestra lo escribe justo antes de enviar la carga útil a Kafka, en la implementación de la clase filtro. Comprobemos cómo recibe Kafka estos mensajes.
|
1 2 3 |
$ java -jar consumer/target/kafka-samples-consumer-1.0-SNAPSHOT-jar-with-dependencies.jar 1: {"line":"hello, kafka demo!"} 2: {"line":"hello, this is a test"} |
Puedes seguir jugando con él mientras los tres servicios estén en marcha.
Desarrollando con Couchbase Kafka Connector
Sigamos adelante y echemos un vistazo al código. Las tres aplicaciones son bastante amigables para los experimentos, por ejemplo, el generador cabe en unas pocas líneas:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
public class Example { public static void main(String args[]) throws IOException { Random random = new Random(); Cluster cluster = CouchbaseCluster.create("couchbase1.vagrant"); Bucket bucket = cluster.openBucket(); BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); String line; do { System.out.print("> "); line = input.readLine(); if (line == null) { break; } String key = "key-" + random.nextInt(10); JsonObject value = JsonObject.create().put("line", line); bucket.upsert(JsonDocument.create(key, value)); System.out.printf(">> key=%s, value=%sn", key, value); } while (true); } } |
Básicamente, el generador abre una conexión al bucket "default" en tu instancia "couchbase1.vagrant" y escribe tus mensajes en claves aleatorias. Puedes extenderlo para enviar otros tipos de eventos. Otra cosa que puedes intentar es eliminar claves.
Por defecto, Couchbase Connector para Kafka se ejecuta en modo servidor, donde toma prestado el hilo activo y escucha activamente a Couchbase Server en busca de nuevos eventos. Hay varios puntos donde puedes aplicar tus ideas o cambios. El más obvio es el constructor de configuración, donde no sólo especificas las credenciales y direcciones de los servicios a los que te conectas, sino que también puedes especificar varias clases de serializadores y filtros.
La aplicación de ejemplo implementa varias de ellas. La clase Filter es la más sencilla:
|
1 2 3 4 5 6 7 8 9 |
public class SampleFilter implements Filter { @Override public boolean pass(DCPEvent dcpEvent) { System.out.println("RECEIVED: " + dcpEvent); return true; } } |
Aquí puedes poner las comprobaciones personalizadas que quieras, y si pasar() devuelve falsoel conector descarta el mensaje y no lo envía a Kafka.
Default Encoder, que viene con la distribución del conector, intenta representar cada mensaje como JSON, pero eso probablemente no es lo que necesitas, así que puedes aplicar y conversión a Evento DCPEvent y devuelve una matriz de bytes que se almacenará en Kafka. En este ejemplo, solo convertimos los eventos a su representación de cadena.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class SampleEncoder extends AbstractEncoder { public SampleEncoder(final VerifiableProperties properties) { super(properties); } @Override public byte[] toBytes(final DCPEvent dcpEvent) { if (dcpEvent.message() instanceof MutationMessage) { MutationMessage message = (MutationMessage) dcpEvent.message(); return message.content().toString(CharsetUtil.UTF_8).getBytes(); } else { return dcpEvent.message().toString().getBytes(); } } } |
Un ajuste más avanzado es SerializadorEstado de Couchbase. Implementándolo, puedes controlar cómo la librería rastreará los cursores de flujo (es decir, los números de secuencia para cada partición dentro de Couchbase Server), y si se reanudará después de reiniciar el conector. Hay una implementación Zookeeper del serializador de estado en la distribución. Aquí en el ejemplo, hemos implementado NullStateSerializer que no persiste nada, pero muestra una implementación mínima.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class NullStateSerializer implements StateSerializer { public NullStateSerializer(final CouchbaseKafkaEnvironment environment) { } @Override public void dump(BucketStreamAggregatorState aggregatorState) { } @Override public void dump(BucketStreamAggregatorState aggregatorState, short partition) { } @Override public BucketStreamAggregatorState load(BucketStreamAggregatorState aggregatorState) { return new BucketStreamAggregatorState(aggregatorState.name()); } @Override public BucketStreamState load(BucketStreamAggregatorState aggregatorState, short partition) { return new BucketStreamState(partition, 0, 0, 0xffffffff, 0, 0xffffffff); } } |
El último componente de su clúster de demostración es el consumidor Kafka ConsumidorAbstractoque es un ejemplo bastante típico de consumidor. Consta de dos partes: , que implementa el arranque y el posicionamiento en el tema Kafka, y ImprimirConsumidorque lleva su "lógica de negocio", o simplemente emite cada mensaje que se le pasa por ConsumidorAbstracto:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class PrintConsumer extends AbstractConsumer { public PrintConsumer(String[] seedBrokers, int port) { super(seedBrokers, port); } public PrintConsumer(String seedBroker, int port) { super(seedBroker, port); } @Override public void handleMessage(long offset, byte[] bytes) { System.out.println(String.valueOf(offset) + ": " + new String(bytes)); } } |
Como en los otros ejemplos aquí, puedes jugar modificando el consumidor de ejemplo. Incluso puedes cerrar el circuito enviando todo de vuelta a Couchbase Server. Kafka es software distribuido, al igual que Couchbase Server, así que tenlo en cuenta cuando ejecutes en tu propio cluster y ajusta la función main() en consecuencia. En nuestro ejemplo, sólo tenemos una única partición, partition (0) en Kafka, por lo que nuestro principal tiene este aspecto:
|
1 2 3 4 5 6 |
public class Example { public static void main(String args[]) { PrintConsumer example = new PrintConsumer("kafka1.vagrant", 9092); example.run("default", 0); } } |
Por supuesto, en un clúster de producción se ejecutará más de una partición.
Conclusión
Espero que esto te ayude a empezar con buen pie con Couchbase y Kafka. Gracias.