Selective event generation by Kafka Connector

Hi,

I am testinKafka source connector in standalone mode. I observe very peculiar behavior:
Out of 6 updated documents only 4 generate CRUD events upon modification. Other two can be successfully updated in Couchbase, but the Kafka connector does not generate any events for them.

Thanks,
Igor Sandler

I switched Kafka connector logging to debug level and I received the following logs lines on when I touch the four succesfull documents:

[2018-08-13 15:38:14,027] INFO Poll returns 1 result(s) (com.couchbase.connect.kafka.CouchbaseSourceTask:174)
[2018-08-13 15:38:14,027] DEBUG WorkerSourceTask{id=test-couchbase-source-1} About to send 1 records to Kafka (org.apache.kafka.connect.runtime.WorkerSourceTask:186)
[2018-08-13 15:38:16,611] DEBUG Scavenging sessions at 1534189096611 (org.eclipse.jetty.server.session:347)

Updates of the two ‘unlucky’ documents are reflected in the Kafka Connector logs.

Thanks,
Igor Sandler

Hi Igor,

Are the updates for 6 different documents (unique document IDs), or are some documents updated multiple times?

The reason I ask is that the DCP implementation used by the Kafka connector may merge redundant operations, and only report the most recent state of the document. For example, if a document is modified twice in rapid succession, only one event (with the latest version of the document) may be propagated to Kafka. This is also true of the initial backfill; if a document is modified several times before the Kafka connector runs, it’s possible that only the latest version of the document will be propagated to Kafka.

Thanks,
David

Hi David,

Thanks for the fast response. All the document IDs are unique:

urn:cms:resource:Actor0
urn:cms:resource:Actor1
urn:cms:resource:Actor2

Thanks
Igor Sandler

P.S

Btw : I create the documents using the following Java SDK code:

RawJsonDocument jDoc = RawJsonDocument.create(docId,cbResource2);

cbDataBucket.upsert(jDoc);

Hi Igor,

In that case, I agree the behavior you see is very peculiar.

Updates of the two ‘unlucky’ documents are reflected in the Kafka Connector logs.

I’m afraid I do not understand – did you mean they are not reflected in the logs? If that’s not what you meant, can you post the Connector log entries that show the ‘unlucky’ entries?

May I ask how you are verifying which documents are published to Kafka?

Thanks,
David

Hi David,

For unlucky documents there are no lines found in the Kafka Connector at all.
I modify documents using web ui and see all my changes properly captured.

Regards,
Igor Sandler

Hi David,

Could it be a problem with connector saved offsets? I changed couchbase.stream_from to BEGINNING and received all the events to the Kafka topic. Should I somehow reset the saved offsets after flashing the bucket?

Thanks,
Igor Sandler

Hi Igor,

Could it be a problem with connector saved offsets? I changed couchbase.stream_from to BEGINNING and received all the events to the Kafka topic. Should I somehow reset the saved offsets after flashing the bucket?

Yes, I think you have identified the problem. When the bucket is flushed, the saved offsets become obsolete. As you observed, you can ignore the saved offsets by setting couchbase.stream_from to BEGINNING.

For development and testing purposes, if you’re running the connector in standalone mode you can also reset the offsets by deleting the offset storage file managed by Kafka Connect.

I’ve filed an issue in Jira to address the root cause KAFKAC-123: Include vbucket uuid in saved offset.

Thanks,
David

Hi David,

Thanks, now everything works as expected.

Regards,
Igor Sandler

1 Like