Coucbase Kafka Connector Timeout

Hello,
i’ve started dealing with couchbase-kafka-connector 4.0.6 and confluent platform 6.X.X
I’ve got some timeout issues on the sink connector wheni push the kafka connector config and publish some messages in kafka.

The error is :

ERROR WorkerSinkTask{id=test.couchbase.connect.sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:571)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.couchbase.client.core.error.AmbiguousTimeoutException: UpsertRequest, Reason: TIMEOUT {“cancelled”:true,“completed”:true,“coreId”:“0xbe38ac4b00000006”,“idempotent”:false,“reason”:“TIMEOUT”,“requestId”:1288,“requestType”:“UpsertRequest”,“retried”:14,“retryReasons”:[“BUCKET_OPEN_IN_PROGRESS”],“service”:{“bucket”:“kafka-connect”,“collection”:“_default”,“documentId”:“4132b868-72cd-4152-9510-e7840eb5746e”,“opaque”:“0x506”,“scope”:“_default”,“type”:“kv”},“timeoutMs”:2500,“timings”:{“encodingMicros”:7,“totalMicros”:2503124}}
at com.couchbase.client.core.msg.BaseRequest.cancel(BaseRequest.java:170)
at com.couchbase.client.core.Timer.lambda$register$2(Timer.java:157)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747)
at com.couchbase.client.core.deps.io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472)
at com.couchbase.client.core.deps.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
… 1 more
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2519)
at com.couchbase.connect.kafka.CouchbaseSinkTask.put(CouchbaseSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:549)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
Can anyone help me to resolve that issue ?

My config is like :
{
“name”: “test.couchbase.connect.sink”,
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSinkConnector”,
“tasks.max”: “2”,
“topics”: “test.topic1”,
“couchbase.seed.nodes”: “127.0.0.1”,
“couchbase.bucket”: “kafka-connect”,
“couchbase.username”: “kafka-connect”,
“couchbase.password”: “XXXXX”,
“couchbase.persist.to”: “NONE”,
“couchbase.replicate.to”: “NONE”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“couchbase.enable.tls”: true,
“couchbase.trust.store.path”: “path/to/jks”,
“couchbase.trust.store.password” : “XXXXX”,
“couchbase.log.redaction” : “FULL”
}

Thanks in advance :grinning:

Hi @mfouze,

Looks like it’s taking longer than 2.5 seconds (the KV timeout) for the connector to establish a connection to Couchbase server.

I filed KAFKAC-254, an enhancement request to give the connector more time to connect during startup.

In the mean time, as a workaround you can try increasing the KV timeout to, say, 10 seconds by adding this property to your connector config:

couchbase.env.timeout.kvTimeout=10s

or for JSON:

"couchbase.env.timeout.kvTimeout":"10s"

References:

Thanks,
David

Hi @david.nault ,
i’ve tried your solution but i got the same issue.
After the 10 seconds, it finishes in timeout again. I’ve also tried to increase the timeout delay and that didn’t resolve the problem.
I don’t know if there is another way to fix it
Thanks,
Mafouze

Interesting. I guess we need to look for other possible causes.

First, I would recommend double-checking the Couchbase hostname and credentials in the connector config.

If that fails, here are some questions that might help us find the problem:

  • What is in the log before the ERROR message?
  • Are the connector and Couchbase Server on the same network?
  • Are you using Couchbase Cloud? Kubernetes?

It might also help to enable debug logging.

Thanks,
David

Hello @david.nault ,
I checked for all questions you told me .
First the log before the error is

org.apache.kafka.connect.json.JsonConverterConfig)
[2021-06-03 11:10:01,204] INFO WorkerSinkTask{id=test.couchbase.connect.sink-1} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask)
[2021-06-03 11:10:01,254] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Cluster ID: pRx4x71eRhWL-19wD_WoOw (org.apache.kafka.clients.Metadata)
[2021-06-03 11:10:01,254] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Discovered group coordinator broker-dns:9093 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-06-03 11:10:01,256] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-06-03 11:10:01,299] INFO [com.couchbase.core][CoreCreatedEvent] {“clientVersion”:“3.1.3”,“clientGitHash”:“51119ad8”,“coreVersion”:“2.1.3”,“coreGitHash”:“51119ad8”,“userAgent”:“couchbase-java/3.1.3 (Linux 4.15.0-88-generic amd64; OpenJDK 64-Bit Server VM 1.8.0_222-b10)”,“maxNumRequestsInRetry”:32768,“ioEnvironment”:{“nativeIoEnabled”:true,“eventLoopThreadCount”:2,“eventLoopGroups”:[“EpollEventLoopGroup”]},“ioConfig”:{“captureTraffic”:,“mutationTokensEnabled”:true,“networkResolution”:“auto”,“dnsSrvEnabled”:true,“tcpKeepAlivesEnabled”:true,“tcpKeepAliveTimeMs”:60000,“configPollIntervalMs”:2500,“kvCircuitBreakerConfig”:“disabled”,“queryCircuitBreakerConfig”:“disabled”,“viewCircuitBreakerConfig”:“disabled”,“searchCircuitBreakerConfig”:“disabled”,“analyticsCircuitBreakerConfig”:“disabled”,“managerCircuitBreakerConfig”:“disabled”,“numKvConnections”:1,“maxHttpConnections”:12,“idleHttpConnectionTimeoutMs”:4500,“configIdleRedialTimeoutMs”:300000},“compressionConfig”:{“enabled”:true,“minRatio”:0.83,“minSize”:32},“securityConfig”:{“tlsEnabled”:true,“nativeTlsEnabled”:true,“hostnameVerificationEnabled”:true,“hasTrustCertificates”:false,“trustManagerFactory”:“TrustManagerFactory”},“timeoutConfig”:{“kvMs”:2500,“kvDurableMs”:10000,“managementMs”:75000,“queryMs”:75000,“viewMs”:75000,“searchMs”:75000,“analyticsMs”:75000,“connectMs”:30000,“disconnectMs”:10000},“loggerConfig”:{“customLogger”:null,“fallbackToConsole”:false,“disableSlf4j”:false,“loggerName”:“CouchbaseLogger”,“diagnosticContextEnabled”:false},“orphanReporterConfig”:{“emitIntervalMs”:10000,“sampleSize”:10,“queueLength”:1024,“enabled”:true},“thresholdRequestTracerConfig”:{“emitIntervalMs”:10000,“sampleSize”:10,“queueLength”:1024,“kvThresholdMs”:500,“queryThresholdMs”:1000,“searchThresholdMs”:1000,“analyticsThresholdMs”:1000,“viewThresholdMs”:1000},“aggregatingMeterConfig”:{“enabled”:false,“emitIntervalMs”:600000},“retryStrategy”:“BestEffortRetryStrategy”,“requestTracer”:“ThresholdRequestTracer”,“meter”:“NoopMeter”} {“coreId”:“0xb556e65500000010”,“seedNodes”:[{“address”:“10.56.60.144”}]} (com.couchbase.core)
[2021-06-03 11:10:01,299] INFO [com.couchbase.node][NodeConnectedEvent] Node connected {“coreId”:“0xb556e65500000010”,“managerPort”:“8091”,“remote”:“10.56.60.144”} (com.couchbase.node)
[2021-06-03 11:10:01,302] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-06-03 11:10:01,302] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-06-03 11:10:04,306] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Finished assignment for group at generation 3: {connector-consumer-test.couchbase.connect.sink-1-6d105924-b037-458e-8157-3712ebe47cf3=Assignment(partitions=[test.topic1-3, test.topic1-4, test.topic1-5]), connector-consumer-test.couchbase.connect.sink-0-d8e6418d-17ce-453c-8233-ca1b591e5d6c=Assignment(partitions=[test.topic1-0, test.topic1-1, test.topic1-2])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-06-03 11:10:04,312] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Successfully joined group with generation 3 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-06-03 11:10:04,312] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Adding newly assigned partitions: test.topic1-5, test.topic1-4, test.topic1-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-06-03 11:10:04,313] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Found no committed offset for partition test.topic1-5 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-06-03 11:10:04,313] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Found no committed offset for partition test.topic1-4 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-06-03 11:10:04,313] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Found no committed offset for partition test.topic1-3 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2021-06-03 11:10:04,372] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Resetting offset for partition test.topic1-3 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2021-06-03 11:10:04,374] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Resetting offset for partition test.topic1-5 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2021-06-03 11:10:04,374] INFO [Consumer clientId=connector-consumer-test.couchbase.connect.sink-1, groupId=test.connect] Resetting offset for partition test.topic1-4 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2021-06-03 11:10:06,941] ERROR WorkerSinkTask{id=test.couchbase.connect.sink-1} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: UpsertRequest, Reason: TIMEOUT (org.apache.kafka.connect.runtime.WorkerSinkTask)

My kafka connect server is on kubernetes and i checked for the network policies and communication between couchbase and my Kc. The conclusion was that everything is right.
This problem is very weird.
Hope i’ll find a solution very soon

Thanks for the info, @mfouze . I noticed something about the timeout config…

"timeoutConfig": {
  "kvMs": 2500,        <-- still using default timeout (2.5 seconds)
  "kvDurableMs": 10000,
  "managementMs": 75000,
  "queryMs": 75000,
  "viewMs": 75000,
  "searchMs": 75000,
  "analyticsMs": 75000,
  "connectMs": 30000,
  "disconnectMs": 10000
}

Can you please share the new connector config JSON to show how you’re setting the KV timeout property?

Thanks,
David

Hello @david.nault ,
Here is the conf file i’m using

{
“name”: “test.couchbase.connect.sink”,
“connector.class”: “com.couchbase.connect.kafka.CouchbaseSinkConnector”,
“tasks.max”: “2”,
“topics”: “test.topic1”,
“couchbase.seed.nodes”: “remote-address-ip”,
“couchbase.bucket”: “kafka-connect”,
“couchbase.username”: “kafka-connect”,
“couchbase.password”: “XXXXX”,
“couchbase.persist.to”: “NONE”,
“couchbase.replicate.to”: “NONE”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: “false”,
“couchbase.enable.tls”: true,
“couchbase.trust.store.path”: “path/to/jks”,
“couchbase.trust.store.password” : “XXXXX”,
“couchbase.log.redaction” : “FULL”,
“couchbase.env.timeout.kvTimeout”:“10s”
}

That’s strange… it seems like the couchbase.env.timeout.kvTimeout config property is not being honored in this case; I would expect the value there to reflected in the timeoutConfig logged at connector startup.

Sorry but the error log was with the right timeout logged;
i’ve also tested the flow between clusters and all ports (8091 and 18091) are opened.
Do tou know if some other ports are taken in the couchbase kafka connector code ? or to be specific, do know if in need to check other thing ?

The connector also talks to Couchbase on the K/V port: 11210 for non-TLS, 11207 for TLS.

It’s possible the connector’s network detection heuristic is failing, in which case you could override the heuristic by setting the couchbase.network config property to external (if on a different network) or default (if on same network). If Couchbase is on a different k8s network, it may also be necessary to configure Couchbase to advertise its external hostnames/ports using the setting-alternate-address command. (Couchbase Cloud instances come with their external addresses preconfigured.)

Thanks,
David

Hello @david.nault ,
I have finally resoleved it by adding the 11210 for non-TLS, 11207 for TLS ports into my network policy kube config.
Thank for your support.
Best regards,
Mafouze

1 Like