Incorrect behavior of upsert.PreserveExpiry()

Cb server 7.0.4
Java sdk v3.4.3

We have the following order of operations for a document (say D1) in a bucket:
A. transactional insert D1(transaction 1)
B. transactional delete D1 (transaction 2)
C. non-transactional upsert D1 with preserveExpiry() option and without cas

The operations are all serial, so writes do not intersect.

In Step A, no expiry is set, so the expiry is seen as 0 after insert.
When preserveExpiry() option is used in step C, D1 is created either with a very short expiry (1-2min) or is expired immediately.
This is deterministic behavior for the bucket where the RPS is high(5-10k) and data is around 100M.

Following are DCP events captured via kafka connect

"schema":null,"payload":{"bucket":"ce","partition":798,"cas":1701330344943484928,"revSeqno":300042,"bySeqno":133245,"vBucketUuid":199233759794413,"event":"deletion","key":"abcdefghij123456"}}
{"schema":null,"payload":{"bucket":"ce","partition":798,"cas":1701330345548513280,"revSeqno":300043,"lockTime":0,"bySeqno":133249,"vBucketUuid":199233759794413,"flags":0,"expiration":0,"event":"mutation","key":"abcdefghij123456","content":{"id":"abcdefghij123456","name":{"fname":"abcd","sname":"pqrs","classesPassed":[1]},"age":1}}}
{"schema":null,"payload":{"bucket":"ce","partition":798,"cas":1701330356860092416,"revSeqno":300044,"lockTime":0,"bySeqno":133255,"vBucketUuid":199233759794413,"flags":0,"expiration":0,"event":"mutation","key":"abcdefghij123456","content":{"id":"abcdefghij123456","name":{"fname":"abcd","sname":"pqrs","classesPassed":[1]},"age":1}}}
{"schema":null,"payload":{"bucket":"ce","partition":798,"cas":1701330357623914496,"revSeqno":300045,"bySeqno":133259,"vBucketUuid":199233759794413,"event":"deletion","key":"abcdefghij123456"}}
{"schema":null,"payload":{"bucket":"ce","partition":798,"cas":1701330368500531200,"revSeqno":300046,"lockTime":0,"bySeqno":133262,"vBucketUuid":199233759794413,"flags":33554432,"expiration":1701330358,"event":"mutation","key":"abcdefghij123456","content":{"id":"abcdefghij123456","name":{"fname":"abcd","sname":"pqrs","classesPassed":[1]},"age":1}}}
{"schema":null,"payload":{"bucket":"ce","partition":798,"cas":17013303

Hi @Ankur-Shukl
That does indeed seem unexpected… I would expect preserveExpiry() on a document that does not have any expiry set, to be a no-op.

Purely to simplify the problem, what happens if you only do step (C)? Steps (A) and (B) will result in a tombstone document which shouldn’t affected step (C) at all.

Can you provide the code for A,B and C?

I attempted to reproduce with the Kafka 4.1.15-SNAPSHOT against an 8.0.0 dev server. I added logging of expiration - it remains 0.

    TransactionResult result = cluster.transactions().run((ctx) -> {
      ctx.insert(collection, "k", JsonObject.create().put(id, "k"));
      TransactionGetResult docA = ctx.get(collection, id);
      ctx.remove(docA);
    });

    collection.upsert(id, JsonObject.create().put(id, "k"), UpsertOptions.upsertOptions().preserveExpiry(true));
[2023-11-30 11:38:09,486] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":21,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":40,"type":"mutation","partition":34,"sequenceNumber":3339,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":49139,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,487] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":22,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":41,"type":"deletion","partition":34,"sequenceNumber":3341,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":14619,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,588] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":23,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":42,"type":"deletion","partition":34,"sequenceNumber":3343,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":101725,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,588] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":24,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":41,"type":"mutation","partition":34,"sequenceNumber":3345,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":92878,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,589] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":25,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":42,"type":"mutation","partition":34,"sequenceNumber":3347,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":90208,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:38:09,589] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":26,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":43,"type":"mutation","partition":34,"sequenceNumber":3348,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":76642,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)

Or maybe the insert/delete were in separate transactions…

    TransactionResult result1 = cluster.transactions().run((ctx) -> {
      ctx.insert(collection, "k", JsonObject.create().put(id, "k"));
    });

    Thread.sleep(2000); // to delineate the two transactions

    TransactionResult result2 = cluster.transactions().run((ctx) -> {
      TransactionGetResult docA = ctx.get(collection, id);
      ctx.remove(docA);
    });

    collection.upsert(id, JsonObject.create().put(id, "k"), UpsertOptions.upsertOptions().preserveExpiry(true));
[2023-11-30 11:57:37,432] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":67,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:client-record","connectTaskId":"1","revision":4,"type":"mutation","partition":62,"sequenceNumber":3310,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":32404,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,432] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":68,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":61,"type":"mutation","partition":34,"sequenceNumber":3417,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":92960,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,432] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":69,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":63,"type":"deletion","partition":34,"sequenceNumber":3419,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":55408,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,433] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":70,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":62,"type":"mutation","partition":34,"sequenceNumber":3421,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":43939,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,433] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":71,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":64,"type":"mutation","partition":34,"sequenceNumber":3423,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":32815,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:37,433] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":72,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":63,"type":"mutation","partition":34,"sequenceNumber":3425,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":26458,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,443] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":73,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":64,"type":"mutation","partition":34,"sequenceNumber":3427,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":17425,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,444] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":74,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":65,"type":"mutation","partition":34,"sequenceNumber":3429,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":14951,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,445] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":75,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":65,"type":"mutation","partition":34,"sequenceNumber":3431,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":13199,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,445] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":76,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":66,"type":"deletion","partition":34,"sequenceNumber":3433,"sizeInBytes":0,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":7104,"dueToExpiration":false} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,445] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":77,"context":"[test-couchbase-source|task-1]","documentId":"_default._default._txn:atr-98-#1bc7","connectTaskId":"1","revision":66,"type":"mutation","partition":34,"sequenceNumber":3435,"sizeInBytes":1,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":5531,"expiration":0,"flags":0,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)
[2023-11-30 11:57:39,546] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":78,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":67,"type":"mutation","partition":34,"sequenceNumber":3436,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":94603,"expiration":0,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)

Just to make sure the logging for expiry is working…

 collection.upsert(id, JsonObject.create().put(id, "k"), UpsertOptions.upsertOptions().expiry(Duration.ofSeconds(1)));
[2023-11-30 12:12:27,969] INFO [test-couchbase-source|task-1] {"milestone":"RECEIVED_FROM_COUCHBASE","tracingToken":92,"context":"[test-couchbase-source|task-1]","documentId":"_default._default.k","connectTaskId":"1","revision":74,"type":"mutation","partition":34,"sequenceNumber":3459,"sizeInBytes":9,"usSinceCouchbaseChange(might be inaccurate before Couchbase 7)":89698,"expiration":1701375148,"flags":33554432,"lockTime":0} (com.couchbase.connect.kafka.SourceDocumentLifecycle:202)

Hey @mreiche, n.b. small issue with that test code - unless id==“k”, the insert transaction is inserting a different doc than the delete transaction and upsert. I don’t think it should matter, but it is a small difference to @Ankur-Shukl 's description.

It is. And I later fixed it to use id everywhere.

The issue is not a deterministic one in the sense that it is not the behaviour of preserveExpiry() is normal scenarios. This was deterministically seen and reproducible for a bucket where we have more than 200M documents and RPS of 5-10k range.

Can you pleae open a ticket at issues.couchbase.com and provide a reproducer?
Thanks.