Old events in Kafka Source Connector


We are using below config for our Couchbase env
CB server version v6.6.3
Kafka source connector v3.4

Connector config

    "connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
    "connection.password": "xxx",
    "tasks.max": "6",
    "couchbase.compression": "ENABLED",
    "connection.timeout.ms": "2000",
    "couchbase.stream_from": "SAVED_OFFSET_OR_NOW",
    "connection.username": "xxxx",
    "transforms": "deserializeJson",
    "couchbase.flow_control_buffer": "64m",
    "dcp.message.converter.class": "com.couchbase.connect.kafka.handler.source.RawJsonWithMetadataSourceHandler",
    "connection.bucket": "xxxx",
    "event.filter.class": "com.couchbase.connect.kafka.filter.AllPassFilter",
    "name": "xxxx",
    "topic.name": "xxx",
    "couchbase.persistence_polling_interval": "100ms",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
    "connection.cluster_address": "xxxx",
    "use_snapshots": "false"

Recently we started seeing old messages in our kafka. These messages have additional fields compared to original document in Couchbase.

  "event_timestamp": 1647856077931000,
  "idd": "testId",
  "isDeleted": false

There are no errors logged in connector for this bucket connector.

  1. Are these fields added by connector by default? I could not find it in another connector for a different bucket.
  2. What the possible scenarios which can lead to older DCP messages being read? How to debug this?
  3. As per my understanding, these messages are generated based on DCP. Can someone please clarify what happens behind the connector read? Connector doc says that if multiple updates happen on one bucket, connector may see the last event only? Why?

No, those fields are not added by the Couchbase Kafka connector. Could some other downstream process be enriching the Kafka records?

A DCP rollback could lead to old Couchbase documents being streamed again. You’ll see a warning message containing “rollback” in the connector logs if this happens.

For debugging I would recommend upgrading the connector to version 4.1.6 or later, and enabling the couchbase.enable.dcp.trace and couchbase.log.document.lifecycle connector config options.

Here’s the migration guide if you decide to upgrade. You’ll still see the “rollback” warning if you don’t upgrade – the config options I mentioned just make it easier to see what the connector is doing.

The docs are referring to changes to a single document. The connector sees only the latest version of a document because Couchbase Server does not retain a complete history of all changes, since that would require unbounded storage. If multiple changes to the same document happen while the connector is not running, or the changes happen very quickly, the DCP protocol delivers only the latest version.