How to get the collection and scope details of a mutation from DCP CLIENT

I trying get the collectionname and scope name or the fully qualified name of a mutated document using DCP client , can anyone over here help me out with this?

Hi Karthikeyan. Welcome to the Couchbase Forums!

Obligatory disclaimer: The Java DCP client is not generally a supported way for developers to interact with the cluster. Now, with that out of the way…

To get scope and collection info, you’ll need to use the High Level API.

Once you have a DocumentChange (Mutation or Deletion), you can call:

String scopeName = documentChange.getCollection().scope();
String collectionName = documentChange.getCollection().name();

Thanks,
David

thanks David , as The Java DCP client is not generally a supported way for developers to interact with the cluster ,is there any other way to do Change data capture in couchbase

Hi Karthikeyan,

is there any other way to do Change data capture in couchbase

The officially supported ways to listen for changes are the Eventing service and the connectors. The Kafka connector in particular is often used for this. Make sure to understand the limitations described in the Kafka Source Connector Delivery Guarantees… most of the caveats apply to anything that uses the DCP protocol (this includes the Eventing service).

CC: Eventing guru @jon.strabala

Thanks,
David

Thanks a lot David , few more question
→ as DCP captures the mutations with inclides insertions , deletions and updations, does it maintain a sequence number to keep a track ? on the entire bucket level /scope level / collection level .
→ is it possible to fetch the mutated full document using high level api ?

as DCP captures the mutations with inclides insertions , deletions and updations, does it maintain a sequence number to keep a track ? on the entire bucket level /scope level / collection level .

Sequence numbers are specific to a partition within the bucket. These partitions are sometimes called “virtual buckets”, or “vbuckets”. Each bucket has 1024 partitions (or 64 if running on macOS). A mutation/deletion can be uniquely identified by combining the partition UUID (an opaque 64-bit token that changes whenever there’s a rollback), plus the partition ID (a number in the range 0-1023), plus the mutation/deletion’s sequence number (an unsigned 64-bit number that increases each time a document in the partition is mutated or deleted).

is it possible to fetch the mutated full document using high level api ?

Yes, by calling DocumentChange.getContent(). Both Mutation and Deletion extend DocumentChange. A deletion’s content is always an empty byte array.

Thanks,
David

thank you David, a huge kudos for the couchbase team for kind responses in forum.

1 Like

hey David , back with few more questions

assuming that the vBuckets are assigned to documents by using the hashed value of the document’s key . suppose there is a document residing in collection1 With key as 001 , and another document in collection2 with the same key 001, how will be the vBucket assigned in this case. the output of crc2 algo will be same for both the keys and should get the same vBucket id, and both the docs will be stored in the same vBukcet, so how will they be uniquely identified to query a document out?

are there any relationships between scopes collections and vBuckets?

@Karthikeyan you can find more general information about vBuckets here, vBuckets | Couchbase Docs

Scopes and collections do not affect the way in which keys are allocated to vBuckets. However, each vBucket is aware, for each of its allocated keys, of the scope and collection with which that particular key is associated.

1 Like

thanks for the answer

hey David , do we have a timestamp field for each document ?
is there any possibility to fetch the timestamp for each document.

while the dcp is listening to an active bucket and that node carrying the active bucket fails , what effect will it have on DCP client

Hi Karthikeyan,

is there any possibility to fetch the timestamp for each document.

Yes. DocumentChange has a getTimestamp() method. Make sure to read the Javadoc so you understand the limitations.

while the dcp is listening to an active bucket and that node carrying the active bucket fails , what effect will it have on DCP client

The DCP client automatically reconnects and reopens the stream on the new active partition (vbucket). If the server determined a rollback is required, your listener might receive a Rollback event. If you’re using the High Level API, the default behavior is to simply resume streaming from the rollback point. (If you’re not using the Higi Level API, you should be :wink: )

Thanks,
David

yes got it !

Q1: Regarding the timestamp , my question was on the Couchbase java client and not the DCP, while fetching documents using java-client , is it possible to get the timestamp?

Q2: Is there any possibility to resume the DCP streaming from a particular position on Bucket level /scope level / collection level instead of vBukcet level , is there anything link resumeToken?

As per my undertanding the DCP streams can be resumed on only vBucket level and not on collection/scope or bucket level , am I right ?

I hesitate to recommend it, since I’m not sure about our current stance on whether it’s supported, but you could take the CAS from a GetResult (or MutationResult) and feed it through the same algorithm as the DCP client’s DocumentChange.getTimestamp(). The same caveats would apply.

In the Java DCP client, every DocumentChange has a StreamOffset for its vbucket, which you can use later to resume the stream for that vbucket. A vbucket is the only thing you can stream from with DCP. The scope and collection parameters just limit which of the vbucket’s events you receive.

The way to resume “on Bucket level” is to resume streaming for each vbucket. There’s no single “resume token” because the partitions are completely independent. Instead, you have one StreamOffset for each partition.

Incidentally, what is your ultimate goal? (I should have asked ealier :slight_smile: )

Thanks,
David

thanks for the answer David , my goal is to build a data replication service :slight_smile:

hey David , i think isDueToExpiry() under deletion event is not working.

Hi Karthikeyan,

You are correct. Distinguishing between deletions and expirations requires setting a special DCP control that the Java DCP client currently doesn’t support.

I filed JDCP-222 to add this feature.

Thanks,
David

1 Like

hey David , back with few more questions

Q1 : is there any way to query all the documents in a collection as batches, like 1000 by 1000 documents ? . because if I try to get query 10M data from a collection using a select * query , I get unambiguousTimeoutException.

Q2: is there any way to get continuation token , for example if I query 1000 data, I should get the position of 1001st data ,using which I can fire the next query and get the next 1000 batch

Q3 : is there any way to query the mutations happened so far whenever needed? does dcp client store the mutations somewhere which we could access.

thanks in advance

Same answer for both of those questions: consider using a technique called keyset pagination. See this StackOverflow post: How do I use paginate with ‘cursors’ in N1ql?

The team is also working on a new feature that would simplify this particular use case. We’ll say more when it’s released.

Q3 : is there any way to query the mutations happened so far whenever needed? does dcp client store the mutations somewhere which we could access.

I’m afraid not. It’s up to you to store the mutations if you want to query them later.

Thanks,
David

1 Like

thank you so much David ,

can you provide some links where I can learn more about DCP client.