ReactiveCollection.unlock hangs

The program below does the following:

  1. creates a document
  2. gets and locks the document
  3. unlocks the document
    It hangs on line 22, which is the unlock.
    Do I do something wrong, or is it a bug?
    I use java sdk 3.4.6 and couchbase server 7.1.0 on a Mac.
package dr.couchbase;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;


public class Test {
	public static void main(final String[] args) throws InterruptedException {
		final CountDownLatch latch = new CountDownLatch(1);
		final Cluster cluster = Cluster.connect(
			      "couchbase://", "Administrator", "cbadmin"
		final Bucket bucket = cluster.bucket("Aggregation");
		final ReactiveCollection reactiveColl = bucket.reactive().defaultCollection();
		reactiveColl.upsert("k", "hello").subscribe(r1 -> {
			reactiveColl.getAndLock("k", Duration.ofSeconds(10)).subscribe(r2 -> {
				reactiveColl.unlock("k", r2.cas()).subscribe(v -> {

Hi @anders_eliasson

No there’s no bug here, you just need to chain reactive operations together using concatMap rather than calling them inside the subscribe in this way.

1 Like

Thanks for the reply!
If I change unlock to remove it works fine with all the subscribes. Do you know why?

Don’t use this construct as Graham said. This post explains it a bit further. reactive programming - Function now executing properly after subscribe - Stack Overflow

It doesn’t hang on unlock(), it hangs on latch.await() because latch.countDown() is never called.

It works with remove because remove returns a (next) object, and you’re specifying the consumer for objects

unlock returns Mono<Void> - there is no (next) object. So the consumer for objects is never called. Use the subscribe() method that takes a completeConsumer (the last argument).

    reactiveColl.unlock("k", r2.cas()).subscribe(t -> {}, e -> {}, () -> {
      System.out.println("unlocked ");

Thanks a lot, now I get it.

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.