O Couchbase é excelente como fonte para o Apache Kafka usando o conector DCP.
No entanto, ele também é excelente como um ponto de extremidade para digerir dados, pois é rápido, com memória em primeiro lugar e armazenamento confiável.
Nesta postagem do blog, mostrarei como criar um aplicativo Java simples para um produtor e um consumidor que salvam as mensagens publicadas do Kafka no Couchbase.
Presumo que você já tenha um cluster do Kafka (mesmo que seja um cluster de nó único). Caso contrário, tente seguir esse guia de instalação.
Este ambiente de blog tem 4 partes:
- Produtor da Kafka
- Fila do Apache Kafka
- Consumidor Kafka
- Servidor Couchbase
Produtor
Precisamos do produtor para enviar mensagens à nossa fila.
Na fila, essas mensagens estão sendo digeridas e todos os aplicativos que se inscreveram no tópico podem ler essas mensagens.
A fonte de nossas mensagens será um arquivo JSON fictício que criei usando o Mockaroo, que será dividido e enviado para a fila.
Nossos dados JSON de amostra são semelhantes a:
1 2 3 4 5 6 7 8 9 |
{ "id":1, "gênero":"Feminino", "first_name":"Jane", "last_name":"Holmes", "email":"jholmes0@myspace.com", "ip_address":"230.49.112.20", "cidade":"Houston" } |
O código do produtor:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
importação com.xml mais rápido.jackson.vinculação de dados.JsonNode; importação com.xml mais rápido.jackson.vinculação de dados.Mapeador de objetos; importação com.xml mais rápido.jackson.vinculação de dados.nó.ArrayNode; importação org.apache.kafka.clientes.produtor.KafkaProducer; importação org.apache.kafka.clientes.produtor.ProducerConfig; importação org.apache.kafka.clientes.produtor.ProdutorRegistro; importação org.apache.kafka.clientes.produtor.RecordMetadata; importação java.io.Arquivo; importação java.nio.conjunto de caracteres.Conjunto de caracteres; importação java.nio.arquivo.Arquivos; importação java.nio.arquivo.Caminhos; importação java.util.ArrayList; importação java.util.HashMap; importação java.util.Lista; importação java.util.Mapa; importação java.util.concomitante.Futuro; público classe KafkaSimpleProducer { público estático vazio principal(Cordas[] argumentos) lançamentos Exceção { Mapa<Cordas, Objeto> configuração = novo HashMap<>(); configuração.colocar(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configuração.colocar("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); configuração.colocar("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<Cordas, Cordas> produtor = novo KafkaProducer<Cordas, Cordas>(configuração); Arquivo entrada = novo Arquivo("sampleJsonData.json"); byte[] codificado = Arquivos.readAllBytes(Caminhos.obter(entrada.getPath() )); Cordas jsons = novo Cordas(codificado, Conjunto de caracteres.defaultCharset()); Sistema.fora.println("Splitting file to jsons...."); Lista splittedJsons = dividir(jsons); Sistema.fora.println("Converting to JsonDocuments...."); int docCount = splittedJsons.tamanho(); Sistema.fora.println("O número de documentos é: " + docCount ); Sistema.fora.println("Começando a enviar mensagens para kafka...."); int contagem = 0; para ( Cordas doc : splittedJsons) { Sistema.fora.println("sending msg...." + contagem); ProdutorRegistro<Cordas,Cordas> registro = novo ProdutorRegistro<>( "couchbaseTopic", doc ); Futuro meta = produtor.enviar(registro); Sistema.fora.println("msg sent...." + contagem); contagem++; } Sistema.fora.println("Total de " + contagem + " mensagens enviadas"); produtor.próximo(); } público estático Lista dividir(Cordas jsonArray) lançamentos Exceção { Lista splittedJsonElements = novo ArrayList(); Mapeador de objetos jsonMapper = novo Mapeador de objetos(); JsonNode jsonNode = jsonMapper.readTree(jsonArray); se (jsonNode.isArray()) { ArrayNode arrayNode = (ArrayNode) jsonNode; para (int i = 0; i < arrayNode.tamanho(); i++) { JsonNode individualElement = arrayNode.obter(i); splittedJsonElements.adicionar(individualElement.toString()); } } retorno splittedJsonElements; } } |

Saída do aplicativo produtor Kafka
Consumidor
Essa é uma solução simples, muito direta, basta obter as mensagens da fila e usar o Couchbase Java SDK para inserir documentos no Couchbase. Para simplificar, usarei o sync java SDK, mas o uso do async é totalmente possível e até recomendado.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
importação com.couchbase.cliente.java.Balde; importação com.couchbase.cliente.java.Aglomerado; importação com.couchbase.cliente.java.CouchbaseCluster; importação com.couchbase.cliente.java.documento.JsonDocument; importação com.couchbase.cliente.java.documento.json.JsonObject; importação kafka.consumidor.Consumidor; importação kafka.consumidor.ConsumerConfig; importação kafka.consumidor.KafkaStream; importação kafka.javaapi.consumidor.ConsumerConnector; importação kafka.mensagem.MessageAndMetadata; importação java.util.*; público classe KafkaSimpleConsumer { público estático vazio principal(Cordas[] argumentos) { Propriedades configuração = novo Propriedades(); configuração.colocar("zookeeper.connect", "localhost:2181"); configuração.colocar("zookeeper.connectiontimeout.ms", "10000"); configuração.colocar("group.id", "default"); ConsumerConfig consumerConfig = novo kafka.consumidor.ConsumerConfig(configuração); ConsumerConnector consumerConnector = Consumidor.createJavaConsumerConnector(consumerConfig); Mapa<Cordas, Inteiro> topicCountMap = novo HashMap<>(); topicCountMap.colocar("couchbaseTopic", 1); Mapa<Cordas, Lista<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector.createMessageStreams(topicCountMap); Lista<KafkaStream<byte[], byte[]>> fluxos = consumerMap.obter("couchbaseTopic"); Lista nós = novo ArrayList<>(); nós.adicionar("localhost"); Aglomerado agrupamento = CouchbaseCluster.criar(nós); final Balde balde = agrupamento.openBucket("kafkaExample"); tentar { para (final KafkaStream<byte[], byte[]> fluxo : fluxos) { para (MessageAndMetadata<byte[], byte[]> msgAndMetaData : fluxo) { Cordas mensagem = convertPayloadToString(msgAndMetaData.mensagem()); Sistema.fora.println(msgAndMetaData.tópico() + ": " + mensagem); tentar { JsonObject doc = JsonObject.fromJson(mensagem); Cordas id = UUID.UUUID aleatório().toString(); balde.upsert(JsonDocument.criar(id, doc)); } captura (Exceção ex) { Sistema.fora.println("Não é um objeto json: " + ex.getMessage()); } } } } captura (Exceção ex) { Sistema.fora.println("EXCEPTION!!!!" + ex.getMessage()); agrupamento.desconectar(); } agrupamento.desconectar(); } privado estático Cordas convertPayloadToString(final byte[] mensagem) { Cordas string = novo Cordas(mensagem); retorno string; } } |

Saída do console do nosso consumidor Kafka
Servidor Couchbase
Agora podemos ver o resultado no servidor Couchbase.
Veja o bucket kafkaExample - preenchido com 1.000 documentos.

Baldes do Couchbase
Cada documento tem uma aparência semelhante a essa:

Documento de amostra
Solução simples de três partes.
Observe que, em um ambiente de produção, o Producer, o Consumer, o Kafka ou o Couchbase estarão em uma ou mais máquinas cada.
Código completo (incluindo dependências Maven) em GitHub.
Roi.