ReactiveCollection.unlock hangs

The program below does the following:

  1. creates a document
  2. gets and locks the document
  3. unlocks the document
    It hangs on line 22, which is the unlock.
    Do I do something wrong, or is it a bug?
    I use java sdk 3.4.6 and couchbase server 7.1.0 on a Mac.
package dr.couchbase;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;


public class Test {
	public static void main(final String[] args) throws InterruptedException {
		final CountDownLatch latch = new CountDownLatch(1);
		final Cluster cluster = Cluster.connect(
			      "couchbase://", "Administrator", "cbadmin"
		final Bucket bucket = cluster.bucket("Aggregation");
		final ReactiveCollection reactiveColl = bucket.reactive().defaultCollection();
		reactiveColl.upsert("k", "hello").subscribe(r1 -> {
			reactiveColl.getAndLock("k", Duration.ofSeconds(10)).subscribe(r2 -> {
				reactiveColl.unlock("k", r2.cas()).subscribe(v -> {

Hi @anders_eliasson

No there’s no bug here, you just need to chain reactive operations together using concatMap rather than calling them inside the subscribe in this way.

Thanks for the reply!
If I change unlock to remove it works fine with all the subscribes. Do you know why?

Don’t use this construct as Graham said. This post explains it a bit further. reactive programming - Function now executing properly after subscribe - Stack Overflow

It doesn’t hang on unlock(), it hangs on latch.await() because latch.countDown() is never called.

It works with remove because remove returns a (next) object, and you’re specifying the consumer for objects

unlock returns Mono<Void> - there is no (next) object. So the consumer for objects is never called. Use the subscribe() method that takes a completeConsumer (the last argument).

    reactiveColl.unlock("k", r2.cas()).subscribe(t -> {}, e -> {}, () -> {
      System.out.println("unlocked ");

Thanks a lot, now I get it.

