Me complace anunciar la primera versión preliminar para desarrolladores de la próxima gran iteración de nuestra integración con Kafka. Esta versión está basada en una nueva librería para DCP, y soporta el framework Kafka Connect. En este post mostraré cómo podría integrarse para retransmitir datos desde Couchbase a HDFS.
Aquí mostraré los pasos para las distribuciones CentOS/Fedora de Linux. Los pasos en otros sistemas operativos serán similares. Primero, instala Confluent Platform (http://docs.confluent.io/3.0.0/installation.html#rpm-packages-via-yum) y descarga el archivo zip de Couchbase con la integración del conector http://packages.couchbase.com/clients/kafka/3.0.0-DP1/kafka-connect-couchbase-3.0.0-DP1.zip
Para registrar el conector, simplemente extraiga el contenido a la ruta de clase por defecto, por ejemplo en CentOS (Fedora) es /usr/share/java
:
1 2 |
descomprimir kafka-conecte-couchbase-3.0.0-DP1.zip sudo cp -a kafka-conecte-couchbase-3.0.0-DP1/compartir /usr/ |
Ahora ejecute el Centro de Control de Confluent y todos los servicios dependientes. Más información sobre lo que hacen estos comandos en Confluent's guía de inicio rápido
1 2 3 4 5 6 |
sudo cuidador del zoo-servidor-iniciar /etc/kafka/cuidador del zoo.propiedades sudo kafka-servidor-iniciar /etc/kafka/servidor.propiedades sudo esquema-registro-iniciar /etc/esquema-registro/esquema-registro.propiedades sudo conecte-distribuido /etc/kafka/conecte-distribuido.propiedades sudo control-centro-iniciar /etc/confluente-control-centro/control-centro.propiedades |
En este punto todo está listo para configurar el enlace para transferir documentos de Couchbase a HDFS usando Kafka Connect. Asumimos que estás ejecutando Couchbase Server en http://127.0.0.1:8091/
y Confluent Control Center en http://127.0.0.1:9021/
. Para este ejemplo, asegúrese de que tiene el viaje-muestra
cargado en Couchbase. Si no lo configuraste al configurar el clúster, puedes añadirlo a través de la parte de configuración de la interfaz web.
Una vez que tengas todos estos prerrequisitos fuera del camino, navega a la sección "Kafka Connect" en tu Confluent Control Center. Selecciona "New source", luego selecciona "CouchbaseSourceConnector" como clase de conector y rellena los ajustes para que el JSON final sea similar a:
1 2 3 4 5 6 7 |
{ "conector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector", "nombre": "fuente de viajes", "conexión.cubo": "viaje-muestra", "conexión.dirección_clúster": "127.0.0.1", "topic.name": "tema-viaje" } |
Una vez guardada la conexión de origen, el demonio Connect empezará a recibir mutaciones y a almacenarlas en el tema Kafka especificado. Para demostrar un pipeline completo, vamos a configurar una conexión Sink para obtener datos de Kafka. Para ello, vaya a la pestaña "Sinks" y haga clic en el botón "New sink". Nos preguntará por un tema donde se almacenan los datos interesantes, introducimos "travel-topic". A continuación, seleccione "HdfsSinkConnector" y rellenar los ajustes de modo que, la configuración JSON se verá así (suponiendo que el nombre HDFS nodo está escuchando en hdfs://127.0.0.1:8020/
):
1 2 3 4 5 6 7 8 9 10 |
{ "conector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "nombre": "hdfs-travel-sink", "flush.size": "10", "partitioner.class": "io.confluent.connect.hdfs.partitioner.FieldPartitioner", "nombre.campo.partición": "partición", "hdfs.url": "hdfs://127.0.0.1:8020", "temas": "tema-viaje" } |
Una vez configurada la conexión Sink, verá aparecer los datos en HDFS en /temas/viajes-temas/
con el directorio de temas por defecto. Vamos a inspeccionar uno de ellos:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
$ hdfs dfs -fs hdfs://localhost:8020 -cat /topics/travel-topic/partition=89/travel-topic+0+0000000101+0000000101.avro | avropipe / [] /0 {} /0/partición 89 /0/clave "route_28879" /0/caducidad 0 /0/banderas 33554438 /0/cas 1471633063247347712 /0/lockTime 0 /0/bySeqno 1 /0/revSeqno 1 /0/contenido "{"id":28879,"tipo":"ruta","aerolínea":"G4","airlineid":"aerolínea_35","fuenteaeropuerto":"AZA","destinoaeropuerto":"FWA","para":0,"equipo":"319","horario":[{"día":0,"utc":"01:59:00","vuelo":"G4097"},{"día":1,"utc":"09:30:00","vuelo":"G4697"},{"día":1,"utc":"09:50:00","vuelo":"G4879"},{"día":1,"utc":"07:44:00","vuelo":"G4310"},{"día":1,"utc":"01:23:00","vuelo":"G4226"},{"día":2,"utc":"19:58:00","vuelo":"G4921"},{"día":2,"utc":"09:49:00","vuelo":"G4376"},{"día":2,"utc":"17:57:00","vuelo":"G4446"},{"día":2,"utc":"21:06:00","vuelo":"G4032"},{"día":3,"utc":"17:05:00","vuelo":"G4198"},{"día":3,"utc":"12:21:00","vuelo":"G4098"},{"día":3,"utc":"19:31:00","vuelo":"G4571"},{"día":4,"utc":"05:27:00","vuelo":"G4001"},{"día":4,"utc":"07:03:00","vuelo":"G4023"},{"día":4,"utc":"16:50:00","vuelo":"G4631"},{"día":5,"utc":"18:13:00","vuelo":"G4757"},{"día":6,"utc":"20:35:00","vuelo":"G4157"},{"día":6,"utc":"21:52:00","vuelo":"G4582"},{"día":6,"utc":"00:55:00","vuelo":"G4348"},{"día":6,"utc":"06:01:00","vuelo":"G4731"}],"distancia":2483.859992489083}" |
Este es mi ejemplo rápido. El cliente DCP está todavía en desarrollo activo y tiene algunas características adicionales que se están añadiendo para manejar varios cambios de topología, escenarios de fallo. Las próximas actualizaciones de nuestro conector Kafka recogerán esas actualizaciones. También debo señalar brevemente que La interfaz cliente DCP de Couchbase debe considerarse volátil por el momento. Nosotros lo utilizamos en varios proyectos, pero usted sólo debe utilizarlo directamente por su cuenta y riesgo.
El código fuente del conector se encuentra en https://github.com/couchbaselabs/kafka-connect-couchbase. El rastreador de problemas está en https://issues.couchbase.com/projects/KAFKACy no dude en hacer cualquier pregunta en https://www.couchbase.com/forums/.