Hello,
i’ve started dealing with couchbase-kafka-connector 4.0.6 and confluent platform 6.X.X
I’ve got some timeout issues on the sink connector wheni push the kafka connector config and publish some messages in kafka.
The error is :
ERROR WorkerSinkTask{id=test.couchbase.connect.sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:571)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.couchbase.client.core.error.AmbiguousTimeoutException: UpsertRequest, Reason: TIMEOUT {“cancelled”:true,“completed”:true,“coreId”:“0xbe38ac4b00000006”,“idempotent”:false,“reason”:“TIMEOUT”,“requestId”:1288,“requestType”:“UpsertRequest”,“retried”:14,“retryReasons”:[“BUCKET_OPEN_IN_PROGRESS”],“service”:{“bucket”:“kafka-connect”,“collection”:"_default",“documentId”:“4132b868-72cd-4152-9510-e7840eb5746e”,“opaque”:“0x506”,“scope”:"_default",“type”:“kv”},“timeoutMs”:2500,“timings”:{“encodingMicros”:7,“totalMicros”:2503124}}
at com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:170)
at com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at com.couchbase.client.core.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
… 1 more
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2519)
at com.couchbase.connect.kafka.CouchbaseSinkTask.put(CouchbaseSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:549)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Can anyone help me to resolve that issue ?
My config is like :
{
“name”: “test.couchbase.connect.sink”,
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSinkConnector”,
“tasks.max”: “2”,
“topics”: “test.topic1”,
“couchbase.seed.nodes”: “127.0.0.1”,
“couchbase.bucket”: “kafka-connect”,
“couchbase.username”: “kafka-connect”,
“couchbase.password”: “XXXXX”,
“couchbase.persist.to”: “NONE”,
“couchbase.replicate.to”: “NONE”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“couchbase.enable.tls”: true,
“couchbase.trust.store.path”: “path/to/jks”,
“couchbase.trust.store.password” : “XXXXX”,
“couchbase.log.redaction” : “FULL”
}
Thanks in advance