Kafka source connector and sequence numbers

As per this documentation, i see that the event message from the CouchBase consists of bySeqNo and revSeqNo but its not very clear what these numbers correspond to. I understand that the couchbase dcp message events have the increasing sequence number for every vBucket.

Questions

  1. Is bySeqNo related to the event sequence number from Couchbase?
  2. And what is the significance of revSeqNo?

I use Kafka source connector to consume the dcp events and i want to know the order of events for every vBucket i.e. I need to get the latest event for a document (a document with key 100 may go though n number of changes but i want to get the latest). I want to know if i can use the bySeqNo to get the latest event for a document.

  1. Is bySeqNo related to the event sequence number from Couchbase?

Yes, bySeqNo is the DCP sequence number for that event. When comparing these values, it’s important to interpret them as an unsigned 64-bit integers. There may be gaps (unassigned numbers) in the sequence. If two events have the same partition and vBucketUuid fields, it’s certain the event with the lower bySeqNo occurred first.

If two events have the same partition but different vBucketUuid values, it means they occurred on different history branches and their sequence numbers might not be directly comparable. This is related to the “rollback” and “failover log” concepts which you can read about in the DCP protocol documentation.

Enabling persistence polling in the connector config means that in most cases (the notable exception being total cluster failure with rollback to zero) the connector will hide events from phantom history branches, allowing you to directly compare sequence numbers within a vbucket even if they have different vBucketUuid values.

  1. And what is the significance of revSeqNo?

The “rev” stands for “revision.” It’s essentially the “version” of the document. I think it’s used by some DCP consumers for conflict resolution in bi-directional replication scenarios. I’m not sure how useful it is for Kafka consumers, but the information is available if you want it. All the same caveats about history branches apply here as well.

1 Like

@david.nault , Thanks and few more questions,

  1. " When comparing these values, it’s important to interpret them as an unsigned 64-bit integers." - In the kafka connector i get this as long anyways. Hope i can directly use the long value for comparison.

  2. I get the fact that the same sequence numbers can be reassigned in the case of a fail over scenario (when vBucket moves from one node to another), but you mentioned that it might not be directly comparable. Is there any alternative or it has to be in the application logic?

  3. “Enabling persistence polling in the connector config means that…” - few questions on this as follows

3.1.What do you mean by persistence polling? I do not see any references in the connector document and how to enable this?
3.2.And does this mean that connector has a built in way where it adjust the sequence numbers so that ordering is maintained and we can directly compare?

  1. While reading the DCP link, i came across snapshots. If my understanding is correct, only if connector’s ‘use_snapshot’ is set to true, it becomes resilient to the connet cluster failure. So if the ‘use_snapshot’ is false and the cluster goes down then does it read the DCP stream from the beginning?. I thought few kafka topics thats configured for the connector would help to recover from the failure. Yes, i understand that if the kafka brokers are also gone then those offsets are not there anymore and so read from the beginning. But in that case, enabling snapshot also would have the same problem.
  1. " When comparing these values, it’s important to interpret them as an unsigned 64-bit integers." - In the kafka connector i get this as long anyways. Hope i can directly use the long value for comparison.

If you’re using Java you’ll want to use Long.compareUnsigned(x,y) to see which of two sequence numbers is greater. If the language you’re working with has native support for unsigned 64-bit integers then this isn’t an issue at all.

  1. I get the fact that the same sequence numbers can be reassigned in the case of a fail over scenario (when vBucket moves from one node to another), but you mentioned that it might not be directly comparable. Is there any alternative or it has to be in the application logic?

Without persistence polling, the application logic would need to look at the failover log. It might get complicated, which I why I tried to gloss over it :sweat_smile:

When persistence polling is enabled you should be able to simply compare the sequence numbers.

3.1.What do you mean by persistence polling? I do not see any references in the connector document and how to enable this?

Persistence polling is a rollback mitigation strategy where the DCP client waits for changes to be persisted to all replicas before telling the connector about the change. It’s enabled by setting the couchbase.persistence_polling_interval connector config property to a non-zero duration.

3.2.And does this mean that connector has a built in way where it adjust the sequence numbers so that ordering is maintained and we can directly compare?

That’s persistence polling, yes.

  1. While reading the DCP link, i came across snapshots. If my understanding is correct, only if connector’s ‘use_snapshot’ is set to true, it becomes resilient to the connet cluster failure.

The Kafka connector’s use_snapshots config property doesn’t do anything except cause OutOfMemoryErrors :slightly_frowning_face: It will be removed in a future release; in the mean time I’d recommend setting this to false.

When the same Couchbase document is modified twice, the DCP protocol allows the server to “de-duplicate” the event stream and send only the second version of the document. For example, let’s say an application creates document A, then document B, and finally updates document A. The “real” sequence of events looks like this:

A1 B1 A2

The DCP protocol allows the server to de-duplicate the modifications to document A and send this instead:

B1 A2

If you’re reading the stream one event at a time, there’s a period when you would know about only document B, even though document A was created first. Snapshots are a way to retain a consistent view of all documents. In this case, the server presents B1 A2 in the same snapshot. The idea is that if you process an entire snapshot at once, you know you’re be looking at documents that all existed together at the same point in time.

For the Kafka connector, snapshots don’t provide any value, since we send the events to the topic one at a time. The only thing the use_snapshots setting does is buffer an entire snapshot into memory before sending the messages to the topic. The messages are still published one event at a time, without any indication that they belong to the same DCP snapshot.

Incidentally, there’s an open enhancement request MB-26908 to allow disabling de-duplication (and eliminating the need for snapshots). This would be a boon for the connectors, but it’s not clear whether a high-performance solution is feasible.

@david.nault, I see that the persistence polling is available from the connector version 3.4. Not sure if i understand it right. Its enabled by default with 100 ms and 128 mb as the flow control buffer size.

What does this mean?

In the case of a failure, if one of the replicas become active, then there is a possibility that the replica was not up the previous active vBucket when the failover happened. Now the other replicas have to rollback. I understand this. Hopefully :slight_smile:

Now, with persistence polling enabled - at what point the connector buffers the data? Does it buffer always or only in the case of the failover?
If its only in the case of a failover, then 128 mb is the amount of data that the connector buffers before it gets to know that that data is persisted on all replicas? So if the persistence is so slow that it goes beyond the time that connector buffers 128 mb then it may lead to the old issue of seq numbers being reassigned? Not sure if i understand this right. It would be great if you can throw more light on this.

With the default settings, the DCP client will always buffer up to 128 mb of events. Every 100 ms it will ask all of the online Couchbase Server nodes to report the highest seqence number they have persisted for each vbucket. For each vbucket, the DCP client calculates the lowest seqno that has been persisted across all of that vbucket’s replicas. All events in the buffer with seqno <= that value are then released to the connector and published to the Kafka topic.

if the persistence is so slow that it goes beyond the time that connector buffers 128 mb then it may lead to the old issue of seq numbers being reassigned?

No, something different happens. If it takes a very long time for Couchbase to report that it has persisted the changes, the client’s buffer will fill up and the server will pause the DCP stream until the connector consumes the events it has already received (this is the normal flow control behavior). This does mean that if the polling interval is too long or the buffer is too small then the connector may be effectively idle while it waits for the changes to be persisted. And of course, waiting for persistence introduces latency, which on average is half the polling interval, I’d guess.

The cost of persistence polling is this latency, as well as the additional network traffic from polling, and the increased load on the server because it has to answer the polling requests. In return, the chance of rolled-back changes ever reaching the connector and being published to the Kafka topic is dramatically reduced.

@david.nault I think i got it but the sequence number that you are talking about is the bySeqNo in the kafka connector right? So if i enable persistence polling, then i have to compare the bySeqNo in the messages to get the latest version of the document is my understanding.

I.e.If there is a JSON document with the key 100, then i change the document 5 times. So these are 5 different mutation events in the kafka connector. So i can use the bySeqNo to get the latest and persistence polling would help in the failover scenario as well. Hope this understanding is correct.

I am not sure why you are talking about revSeqNo in theother thread though. I always thought that the sequence number that Couch increases for every change in the document (per vBucket increasing seq number) is same as the bySeqNo. I am a little confused with the point on revSeqNo in the other thread.

@david.nault Can you please revert?