Error with Couchbase SDK Reactive GET Operation in Spring Boot Application

I’m encountering an intermittent issue in my Spring Boot application while using the Couchbase SDK. Here’s the scenario:

  • Spring Boot version: 3.1.6
  • Couchbase SDK version: 3.6.0
  • Description: I’m performing a simple “GET” operation from the bucket using the following code:

This error seems to be related to Reactor Core’s SinkManyUnicast, indicating that multiple subscribers are attempting to connect to it, whereas it only allows a single subscriber.

It’s important to note that this error doesn’t occur consistently; rather, it happens only from time to time.

I’m not even sure if this issue is directly related to the Couchbase SDK, but it’s noteworthy that the error occurs exclusively during the GET operation in various parts of my application.

mybucket.defaultCollection().reactive().get(id)

java.lang.IllegalStateException: Sinks.many().unicast() sinks only allow a single Subscriber
	at reactor.core.publisher.SinkManyUnicast.subscribe(SinkManyUnicast.java:426) ~[reactor-core-3.6.4.jar:3.6.4]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ Handler XXXX#get(UUID, Boolean) [DispatcherHandler]
Original Stack Trace:
		at reactor.core.publisher.SinkManyUnicast.subscribe(SinkManyUnicast.java:426) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxPublish.connect(FluxPublish.java:106) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:88) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1865) ~[reactor-core-3.6.4.jar:3.6.4]
		at com.couchbase.client.core.Reactor$SilentMonoCompletionStage.lambda$subscribe$0(Reactor.java:213) ~[core-io-2.6.0.jar:?]
		at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) ~[?:?]
		at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:887) ~[?:?]
		at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2357) ~[?:?]
		at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:144) ~[?:?]
		at com.couchbase.client.core.Reactor$SilentMonoCompletionStage.subscribe(Reactor.java:168) ~[core-io-2.6.0.jar:?]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:85) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxRepeatWhen$RepeatWhenMainSubscriber.resubscribe(FluxRepeatWhen.java:185) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxRepeatWhen$RepeatWhenOtherSubscriber.onNext(FluxRepeatWhen.java:241) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:259) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:865) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68) ~[reactor-core-3.6.4.jar:3.6.4]
		at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28) ~[reactor-core-3.6.4.jar:3.6.4]
		at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) ~[?:?]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
		at java.lang.Thread.run(Thread.java:1583) [?:?]

Thank you in advance!

Can you please show the rest of your code? It sounds like you have multiple subscribers to the result of get().