Couchbase es genial como fuente para Apache Kafka usando el conector DCP.
Sin embargo, también es estupendo como punto final para digerir datos, ya que es rápido, primero memoria y almacenamiento fiable.
En esta entrada del blog te mostraré cómo construir una aplicación Java simple para un productor y un consumidor que guardan los mensajes publicados de Kafka en Couchbase.
Asumo aquí, que ya tienes un cluster Kafka (incluso si es un cluster de un solo nodo). Si no es así, intenta seguir esa guía de instalación.
Este entorno de blog tiene 4 partes:
- Productor de Kafka
- Cola de Apache Kafka
- Consumidor Kafka
- Servidor Couchbase
Productor
Necesitamos el productor para enviar mensajes a nuestra cola.
En la cola, esos mensajes están siendo digeridos y cada aplicación que se suscribió al tema - puede leer esos mensajes.
La fuente de nuestros mensajes será un archivo JSON ficticio que he creado usando Mockaroo, que dividiremos y enviaremos a la cola.
Nuestros datos JSON de muestra tienen un aspecto similar a:
1 2 3 4 5 6 7 8 9 |
{ "id":1, "género":"Mujer", "nombre":"Jane", "apellido":"Holmes", "email":"jholmes0@myspace.com", "dirección_ip":"230.49.112.20", "ciudad":"Houston" } |
El código del productor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
importar com.fasterxml.jackson.databind.JsonNode; importar com.fasterxml.jackson.databind.ObjectMapper; importar com.fasterxml.jackson.databind.nodo.ArrayNode; importar org.apache.kafka.clientes.productor.KafkaProducer; importar org.apache.kafka.clientes.productor.ProducerConfig; importar org.apache.kafka.clientes.productor.ProductorRegistro; importar org.apache.kafka.clientes.productor.RecordMetadata; importar java.io.Archivo; importar java.nio.conjunto de caracteres.Juego de caracteres; importar java.nio.archivo.Archivos; importar java.nio.archivo.Caminos; importar java.util.ArrayList; importar java.util.HashMap; importar java.util.Lista; importar java.util.Mapa; importar java.util.concurrente.Futuro; público clase KafkaSimpleProducer { público estático void principal(Cadena[] args) lanza Excepción { Mapa<Cadena, Objeto> config = nuevo HashMap<>(); config.poner(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.poner("valor.serializador", "org.apache.kafka.common.serialization.StringSerializer"); config.poner("clave.serializador", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<Cadena, Cadena> productor = nuevo KafkaProducer<Cadena, Cadena>(config); Archivo entrada = nuevo Archivo("sampleJsonData.json"); byte[] codificado = Archivos.readAllBytes(Caminos.consiga(entrada.getPath() )); Cadena jsons = nuevo Cadena(codificado, Juego de caracteres.defaultCharset()); Sistema.fuera.println("Dividir archivo en jsons...."); Lista splittedJsons = dividir(jsons); Sistema.fuera.println("Convirtiendo a JsonDocuments...."); int docCount = splittedJsons.talla(); Sistema.fuera.println("El número de documentos es: " + docCount ); Sistema.fuera.println("Empezando a enviar msg a kafka...."); int cuente = 0; para ( Cadena doc : splittedJsons) { Sistema.fuera.println("enviando msg...." + cuente); ProductorRegistro<Cadena,Cadena> registro = nuevo ProductorRegistro<>( "couchbaseTopic", doc ); Futuro meta = productor.enviar(registro); Sistema.fuera.println("msg sent...." + cuente); cuente++; } Sistema.fuera.println("Total de " + cuente + " mensajes enviados"); productor.cerrar(); } público estático Lista dividir(Cadena jsonArray) lanza Excepción { Lista splittedJsonElements = nuevo ArrayList(); ObjectMapper jsonMapper = nuevo ObjectMapper(); JsonNode jsonNode = jsonMapper.readTree(jsonArray); si (jsonNode.isArray()) { ArrayNode arrayNodo = (ArrayNode) jsonNode; para (int i = 0; i < arrayNodo.talla(); i++) { JsonNode elementoindividual = arrayNodo.consiga(i); splittedJsonElements.añada(elementoindividual.toString()); } } devolver splittedJsonElements; } } |

Salida de la aplicación productora de Kafka
Consumidores
Esta es una simple, muy directa, solo obtén los mensajes de la cola, y usa el Couchbase Java SDK para insertar documentos en Couchbase. Por simplicidad, usaré el sync java SDK, pero usar el async es totalmente posible e incluso recomendable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
importar com.couchbase.cliente.java.Cubo; importar com.couchbase.cliente.java.Grupo; importar com.couchbase.cliente.java.CouchbaseCluster; importar com.couchbase.cliente.java.documento.JsonDocument; importar com.couchbase.cliente.java.documento.json.JsonObject; importar kafka.consumidor.Consumidores; importar kafka.consumidor.ConsumerConfig; importar kafka.consumidor.KafkaStream; importar kafka.javaapi.consumidor.ConectorConsumidor; importar kafka.mensaje.Mensaje y metadatos; importar java.util.*; público clase KafkaSimpleConsumer { público estático void principal(Cadena[] args) { Propiedades config = nuevo Propiedades(); config.poner("zookeeper.connect", "localhost:2181"); config.poner("zookeeper.connectiontimeout.ms", "10000"); config.poner("grupo.id", "por defecto"); ConsumerConfig consumerConfig = nuevo kafka.consumidor.ConsumerConfig(config); ConectorConsumidor consumerConnector = Consumidores.createJavaConsumerConnector(consumerConfig); Mapa<Cadena, Entero> topicCountMap = nuevo HashMap<>(); topicCountMap.poner("couchbaseTopic", 1); Mapa<Cadena, Lista<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); Lista<KafkaStream<byte[], byte[]>> arroyos = consumerMap.consiga("couchbaseTopic"); Lista nodos = nuevo ArrayList<>(); nodos.añada("localhost"); Grupo grupo = CouchbaseCluster.crear(nodos); final Cubo cubo = grupo.openBucket("kafkaExample"); pruebe { para (final KafkaStream<byte[], byte[]> flujo : arroyos) { para (Mensaje y metadatos<byte[], byte[]> msgAndMetaData : flujo) { Cadena msg = convertPayloadToString(msgAndMetaData.mensaje()); Sistema.fuera.println(msgAndMetaData.tema() + ": " + msg); pruebe { JsonObject doc = JsonObject.fromJson(msg); Cadena id = UUID.randomUUID().toString(); cubo.upsert(JsonDocument.crear(id, doc)); } captura (Excepción ex) { Sistema.fuera.println("No es un objeto json: " + ex.getMessage()); } } } } captura (Excepción ex) { Sistema.fuera.println("EXCEPTION!!!!" + ex.getMessage()); grupo.desconectar(); } grupo.desconectar(); } privado estático Cadena convertPayloadToString(final byte[] mensaje) { Cadena cadena = nuevo Cadena(mensaje); devolver cadena; } } |

Salida de la consola de nuestro consumidor Kafka
Servidor Couchbase
Ahora podemos ver el resultado en el servidor Couchbase.
Mira el bucket kafkaExample - Lleno con 1000 documentos.

Cubos Couchbase
Cada documento tiene un aspecto similar:

Documento de muestra
Solución sencilla en 3 partes.
Tenga en cuenta que en un entorno de producción, Producer, Consumer, Kafka o Couchbase estarán en una o más máquinas cada uno.
Código completo (incluidas las dependencias de Maven) en GitHub.
Roi.