Concurrent Issue Couchbase Data

Hi All,

I’m using Couchbase 6.5.1 and Java Client SDK 2.7.1 and Flink to stream the message from kafka to Couchbase

Right now I’ using KV async to get then upsert back the data.
I’m having issue with sequence of concurrent data
(Message from kafka is sequential a,b,c,d) and need to store in couchbase with latest data (d)

I have plan to use CAS (Compare and Swap) with below logic:
When CasMismatchExcption, it will sleep 1 sec then retry.
I assume during sleep, the retry message will not know the sequence from kafka anymore so that the sequence will be wrong

How’s the best practice to handle this kind of issue?
Is there any sample java code for this?

Thanks

Hi Han,

Just checking – is there a reason you don’t want to use Kafka Connect Couchbase to stream data from Kafka into Couchbase?

The strategy used by that Kafka connector is to write messages in batches, one batch at a time. Each batch is pre-processed so only the last version of a document is written.

Thanks,
David

Hi @david.nault,

I’m using flink bcs I have several logics (transformation) before saving to couchbase.
High level logic:

  1. Get existing doc
  2. Modif the data
  3. Replace back the doc

I have tried to implement using CAS to handle concurrent, but the result is that the sequence is not as expected

Another option that I have is changing the method from async to sync, but the problem is that the performance for sync is not as async if we compare the throughput

Let me know if you have another insight

Another option that I have is changing the method from async to sync, but the problem is that the performance for sync is not as async if we compare the throughput

I’m not a Flink expert, so I don’t know if this will work, but can you use a Tumbling Window to group the items into batches, filter out all but the latest version of each document in a batch, and then write one batch at a time using the Couchbase Java SDK’s async API?