Thread Blocking while upsert in async operation

I am using vertx to insert the data to couchbase. i opened the bucket using async rxjava operation. but thread is blocking due to below error

SEVERE: Unhandled exception
java.lang.RuntimeException: java.util.concurrent.TimeoutException
at rx.observables.BlockingObservable.blockForSingle(
at rx.observables.BlockingObservable.single(

below is the code used for insert and bucket is Observable

bucket.flatMap(bu -> bu.upsert(JsonDocument.create(“token”,data)))
.doOnNext(j-> System.out.println(“Token insert SUCCESS”
.timeout(1, TimeUnit.SECONDS)

it works when i used only above code as seperate, but in application it is blocking

Hi @Arihant, thanks for trying couchbase.

I have a few follow up questions to start:

  • Which SDK version are you using, and can you share the logs of your run?
  • Is there some code I can run to reproduce?
  • If you increase the timeout, does it work? Or is it stuck forever in your code?

SDK version is 2.1.4
Its just simple code . I am using Async for opening bucket and insert the data .
AsyncCluster cluster = CouchbaseAsyncCluster.create(config.couchbase.server);
Observable bucket = cluster.openBucket(config.couchbase.bucketName, config.couchbase.bucketPassword);

Increasing timeout is not working…

As we are doing database operation, code needs to wait for some time to effect in datatbase. But in async operation next code will be executing before finish of first one. Is there any best practices for couchbase operation for reactive Async application.

1 Like

@Arihant do you have a standalone code sample that reproduces your issue?

I am facing the same issue as when try to upsert a large number of documents then for some documents i get this exception.
The same code works sometimes for all the documents, but sometimes it fails for half the documents.
I am upserting the document one by one in future i.e as soon as the document is made then it must get upserted., but making the document and its upsertion is all in future and totally asynchronous. Please @daschl @simonbasle @geraldss help me with this!

@shiv4nsh can you share your code and stacktrace please?

I have made a sample project here, and now it sometimes give me this exception
at rx.internal.operators.OperatorMap$1.onNext(
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(
at rx.subjects.AsyncSubject.onCompleted(
at com.couchbase.client.core.endpoint.AbstractGenericHandler$
at java.util.concurrent.Executors$
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(
at java.util.concurrent.ScheduledThreadPoolExecutor$
at java.util.concurrent.ThreadPoolExecutor.runWorker(
at java.util.concurrent.ThreadPoolExecutor$
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.UpsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(
at rx.internal.operators.OperatorMap$1.onNext(
… 11 more

And sometimes it gives me TimeOut for blocking Exception. @daschl How should I avoid it.?

@shiv4nsh tempfail is the server telling you “wait, you are going to fast on me!”. If you have a insert heavy workload you need to monitor your write queues on the target server. I bet that you are running full of memory and the server can’t persist/free up stuff fast enough - maybe it’s undersized?

So that is the root cause which you need to address, but in the client itself you can also apply intelligent retry logic and “retry a bit later” the same request with the hope that things clear up on the server. You can use async retry for this in an efficient manner, check out the docs here for example: (error handling section) and

In any case, you need to fix the root cause on the server I think. Also, the timeouts might come from the same issue, but its hard to tell without more info.