Regarding the `couchbase.seed.nodes` property

Hi team,

For the source connector, could you please clarify what the value of couchbase.seed.nodes should be?

We are running Couchbase behind a cluster VIP, I am wondering if we should still set the individual nodes (if so, which ones? query? data?) or if configuring the cluster VIP on its own is enough

We are seeing gaps in data updates and we are trying to discard potential issues. At the moment we have the cluster VIP configured

Thank you

I don’t know what you mean by cluster VIP. Is that something like the couchbases://cb.jvkvfzgdirlced2q.cloud.couchbase.com that is shown in the Capella UI? If it has DNS SRV records for _couchbases._tcp. then that is sufficient. Preferred, actually. Otherwise the list of data nodes is sufficient.

We are seeing gaps in data updates

If you could provide details about the gaps, maybe we can figure it out.

Hi @mreiche

From what I understand, it is a load balancer that resolves to any couchbase node that is healthy. Would that be fine?

In terms of the data loss, we have two separate Connectors listening to the same Couchbase cluster updates and publishing to two separate Kafka topics.

Configuration examples
(some properties were omitted for brevity)

  • connector1 :
{
	"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
	"producer.override.compression.type": "gzip",
	"couchbase.bootstrap.timeout": "600s",
	"couchbase.compression": "ENABLED",
	"tasks.max": "6",
	"couchbase.log.document.lifecycle": "true",
	"couchbase.source.handler": "com.custom.handler",
	"couchbase.bucket": "bucket",
	"couchbase.stream.from": "SAVED_OFFSET_OR_BEGINNING",
	"couchbase.log.redaction": "FULL",
	"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
	"couchbase.xattrs": "true",
	"couchbase.topic": "topic1"
}

Connector 2 :

{
	"connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector",
	"couchbase.bootstrap.timeout": "900s",
	"tasks.max": "10",
	"couchbase.compression": "ENABLED",
	"transforms": "deserializeJson",
	"couchbase.log.document.lifecycle": "true",
	"couchbase.source.handler": "com.custom.handler",
	"couchbase.bucket": "bucket",
	"couchbase.username": "user",
	"couchbase.stream.from": "SAVED_OFFSET_OR_NOW",
	"value.converter.schemas.enable": "false",
	"value.converter": "org.apache.kafka.connect.json.JsonConverter",
	"transforms.deserializeJson.type": "com.couchbase.connect.kafka.transform.DeserializeJson",
	"couchbase.topic": "topic2"
}

We noticed that the two topics were not in sync.

When we looked at the logs, we noticed that the connector 2 did not publish many updates (3M+ in fact)

Both connectors are using the same Kafka Connect deployment and underlying Kafka infrastructure

The only log that seems suspicious is this but I do not think that this is enough justification to have data loss

level=WARN connector_context=[cb-connect|task-0] [Producer clientId=connector-producer-cb-connect] Received invalid metadata error in produce request on partition topic due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now class=org.apache.kafka.clients.producer.internals.Sender line=650

I have couchbase.log.document.lifecycle enabled but I do not see anything in the logs to suggest that the records were persisted to Kafka from the connector 2. The first connector did persist them

connector2 streams from SAVED_OFFSET_OR_NOW so it will only publish events from mutations while the connector was running and also mutations that occurred when the connector was not running, on partitions that had saved offsets (by virtue of a mutation occurring on the partition while the connector was running earlier).

1 Like

Both connectors have been running for a long time and the “gap” was identified sometime this week

Are new partitions created in Couchbase? Or are the number of partitions a fixed value?

We are currently operating under the assumption that no new partitions are created at Couchbase’s side and that using SAVED_OFFSET_OR_NOW means “start from the current point in time or the latest status of the bucket for all documents”

Namely, when we say “gap” we mean that the latest version of a document was not received by that connector instance (but it was received at some point in the past by that same connector)

Offsets are by the vbuuid of a partition (and by topic) and are only saved when there is a mutation on that partition while the connector is running.
If you are an enterprise customer please open a support ticket for investigation. @david.nault

Thank you @mreiche. The terminology you mentioned is a bit confusing to me at this point in time and any clarification would be helpful in general

We will open a ticket in the meantime

Within a bucket there are 1024 (64 for MacOS) shards which are known as partitions or vbuckets. The partitions are assigned to data nodes to more-or-less balance the data among the nodes. Documents are assigned to partitions based on a hash of the document key. So when an operation on a document is performed, the document key is hashed to determine which partition the document is in, then the partition is used to lookup in the bucket configuration which node the partition is on. Partitions are independent - the Kafka Connector streams DCP from each partition. It also stores the offset from each partition as it receives mutations. And DCP allows it to specify the offset to start streaming from (i.e. when the Kafka Connector is restarted). The Kafka Connector saves the offsets based on the vbuid.

btw - it looks like the issue of SAVED_OFFSET_OR_NOW has been recently changed in [KAFKAC-353] - Couchbase Cloud

Thank you for the detailed response. It helps me understand how things work internally

Within a bucket there are 1024 (64 for MacOS) shards

In my environment, we only have 15 partitions in the destination topic. Would creating a topic that matches the same number of internal Couchbase partitions be beneficial?

[KAFKAC-353] - Couchbase Cloud is an interesting issue because the connector that is having issues is using SAVED_OFFSET_OR_NOW and in our environment we have constant restarts due to this issue Constant com.couchbase.client.dcp.error.BootstrapException errors · Issue #41 · couchbase/kafka-connect-couchbase · GitHub

That fix is yet to be released but I’ll definitely try it when it is out

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.