Couchbase es genial como fuente para Apache Kafka usando el conector DCP.
Sin embargo, también es estupendo como punto final para digerir datos, ya que es rápido, primero memoria y almacenamiento fiable.
En esta entrada del blog te mostraré cómo construir una aplicación Java simple para un productor y un consumidor que guardan los mensajes publicados de Kafka en Couchbase.
Asumo aquí, que ya tienes un cluster Kafka (incluso si es un cluster de un solo nodo). Si no es así, intenta seguir esa guía de instalación.
Este entorno de blog tiene 4 partes:
- Productor de Kafka
- Cola de Apache Kafka
- Consumidor Kafka
- Servidor Couchbase
Productor
Necesitamos el productor para enviar mensajes a nuestra cola.
En la cola, esos mensajes están siendo digeridos y cada aplicación que se suscribió al tema - puede leer esos mensajes.
La fuente de nuestros mensajes será un archivo JSON ficticio que he creado usando Mockaroo, que dividiremos y enviaremos a la cola.
Nuestros datos JSON de muestra tienen un aspecto similar a:
|
1 2 3 4 5 6 7 8 9 |
{ "id":1, "gender":"Female", "first_name":"Jane", "last_name":"Holmes", "email":"jholmes0@myspace.com", "ip_address":"230.49.112.20", "city":"Houston" } |
El código del productor:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.io.File; import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Future; public class KafkaSimpleProducer { public static void main(String[] args) throws Exception { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config); File input = new File("sampleJsonData.json"); byte[] encoded = Files.readAllBytes(Paths.get(input.getPath() )); String jsons = new String(encoded, Charset.defaultCharset()); System.out.println("Splitting file to jsons...."); List splittedJsons = split(jsons); System.out.println("Converting to JsonDocuments...."); int docCount = splittedJsons.size(); System.out.println("Number of documents is: " + docCount ); System.out.println("Starting sending msg to kafka...."); int count = 0; for ( String doc : splittedJsons) { System.out.println("sending msg...." + count); ProducerRecord<String,String> record = new ProducerRecord<>( "couchbaseTopic", doc ); Future meta = producer.send(record); System.out.println("msg sent...." + count); count++; } System.out.println("Total of " + count + " messages sent"); producer.close(); } public static List split(String jsonArray) throws Exception { List splittedJsonElements = new ArrayList(); ObjectMapper jsonMapper = new ObjectMapper(); JsonNode jsonNode = jsonMapper.readTree(jsonArray); if (jsonNode.isArray()) { ArrayNode arrayNode = (ArrayNode) jsonNode; for (int i = 0; i < arrayNode.size(); i++) { JsonNode individualElement = arrayNode.get(i); splittedJsonElements.add(individualElement.toString()); } } return splittedJsonElements; } } |

Salida de la aplicación productora de Kafka
Consumidores
Esta es una simple, muy directa, solo obtén los mensajes de la cola, y usa el Couchbase Java SDK para insertar documentos en Couchbase. Por simplicidad, usaré el sync java SDK, pero usar el async es totalmente posible e incluso recomendable.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
import com.couchbase.client.java.Bucket; import com.couchbase.client.java.Cluster; import com.couchbase.client.java.CouchbaseCluster; import com.couchbase.client.java.document.JsonDocument; import com.couchbase.client.java.document.json.JsonObject; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata; import java.util.*; public class KafkaSimpleConsumer { public static void main(String[] args) { Properties config = new Properties(); config.put("zookeeper.connect", "localhost:2181"); config.put("zookeeper.connectiontimeout.ms", "10000"); config.put("group.id", "default"); ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(config); ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); Map<String, Integer> topicCountMap = new HashMap<>(); topicCountMap.put("couchbaseTopic", 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get("couchbaseTopic"); List nodes = new ArrayList<>(); nodes.add("localhost"); Cluster cluster = CouchbaseCluster.create(nodes); final Bucket bucket = cluster.openBucket("kafkaExample"); try { for (final KafkaStream<byte[], byte[]> stream : streams) { for (MessageAndMetadata<byte[], byte[]> msgAndMetaData : stream) { String msg = convertPayloadToString(msgAndMetaData.message()); System.out.println(msgAndMetaData.topic() + ": " + msg); try { JsonObject doc = JsonObject.fromJson(msg); String id = UUID.randomUUID().toString(); bucket.upsert(JsonDocument.create(id, doc)); } catch (Exception ex) { System.out.println("Not a json object: " + ex.getMessage()); } } } } catch (Exception ex) { System.out.println("EXCEPTION!!!!" + ex.getMessage()); cluster.disconnect(); } cluster.disconnect(); } private static String convertPayloadToString(final byte[] message) { String string = new String(message); return string; } } |

Salida de la consola de nuestro consumidor Kafka
Servidor Couchbase
Ahora podemos ver el resultado en el servidor Couchbase.
Mira el bucket kafkaExample - Lleno con 1000 documentos.

Cubos Couchbase
Cada documento tiene un aspecto similar:

Documento de muestra
Solución sencilla en 3 partes.
Tenga en cuenta que en un entorno de producción, Producer, Consumer, Kafka o Couchbase estarán en una o más máquinas cada uno.
Código completo (incluidas las dependencias de Maven) en GitHub.
Roi.