Hi there,
I’m very new to couchbase. My goal is extend & develop for our requirement the existing kafka-CB-connector in java to connect to Couchbase bucket and get the document by document published to kafka topic. For which I’m suing this link GitHub - couchbase/couchbase-kafka-connector: Legacy Couchbase to Kafka connector, superseded by Kafka Connect based..
My couchbase in my local machine is couchabe 6.0 and Kafka 2.12
My question. Why is SampleEncoder getting properties from somewhere and overriding ? Because of this its trying to connect to some IP address:9092 kafka its not able to publish to kafka topic. Please help me understand this problem and how do I publish to kafka topic?
my Example Code :
public class Example {
public static void main(String args) {
DefaultCouchbaseKafkaEnvironment.Builder builder =
(DefaultCouchbaseKafkaEnvironment.Builder) DefaultCouchbaseKafkaEnvironment.builder()
.kafkaFilterClass(“example.SampleFilter”)
.kafkaTopic(“test”)
.kafkaZookeeperAddress(“127.0.0.1:2181”)
.couchbaseNodes(“localhost”)
.couchbaseBucket(“Devdb”)
.couchbasePassword(“password-working”)
.kafkaValueSerializerClass("example.SampleEncoder")
.couchbaseStateSerializerClass("example.NullStateSerializer")
.dcpEnabled(true);
CouchbaseKafkaConnector connector = CouchbaseKafkaConnector.create(builder.build());
connector.run();
}
}
ERROR
**
WARN - Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:0,host:9.102.20.225,port:9092] failed
**
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
at kafka.utils.Utils$.swallow(Utils.scala:172)
at kafka.utils.Logging$class.swallowError(Logging.scala:106)
at kafka.utils.Utils$.swallowError(Utils.scala:45)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at com.couchbase.kafka.KafkaWriter.onEvent(KafkaWriter.java:77)
at com.couchbase.kafka.KafkaWriter.onEvent(KafkaWriter.java:37)
at com.couchbase.client.deps.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:129)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:748)
INFO - Disconnecting from 9.102.20.225:9092
ERROR - fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:9.102.20.225,port:9092)] failed
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:0,host:9.102.20.225,port:9092)] failed
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
I should probably use GitHub - couchbase/kafka-connect-couchbase: Kafka Connect connector for Couchbase Server this and develop for my requirement. but there is no good example on how to start getting data from bucket for this version of github code