Thread blocked issue with reactive API's - Java

We are seeing issues while doing the basic reactive calls. This below logs shows it for get and touch but we are seeing issue with reactive API’s(get, upsert, delete …). We have put timeouts as well as retries, even after that we are getting thread blocked issues.

Java SDK version: 2.7.4
Cocuhbase version: 5.1
Source Code:

JsonDocument jsonDocumentPagination = couchbaseRead.getAndTouch(messageIdKey, msgIdDocumentExpiryTime).switchIfEmpty(Flowable.just(JsonDocument.create("GARBAGE"))).blockingSingle(); // Line # 1195
if (!jsonDocumentPagination.id().equals("GARBAGE")) { // Pagination key exists
…
}


/**
* @param key The key to fetch from couchbase and touch the document
* @return The JsonDocument for the provided key
*/
public Flowable<JsonDocument> getAndTouch(String key, int documentExpiryTime) {
                long startTime = System.currentTimeMillis();
                // Get the bucket based on region (IPC1 / IPC2)
                return getAndTouchPrimary(key, documentExpiryTime)
                                                .lift(circuitBreakerOperatorForKey)
                                                .onErrorResumeNext(throwable -> {
                                                                return getAndTouchSecondary(key, documentExpiryTime);
                                                })
                                                .doOnEach(action -> {
                                                                if (action.isOnComplete()) {
                                                                                logger.info("{}", new GPCLogEvent("getAndTouch", "fetch key", new GPCEntry<>("key", key), new GPCEntry<>("Time Taken", System.currentTimeMillis() - startTime)));
                                                                }
                                                });
}

// coucbbaseRead.retryCountForKey=5
// coucbbaseRead.timeoutForKey=1000
// couchbaseRead.delayBetweenRetriesForKey=10

/**
* @param key The key to fetch from near couchbase instance with document expiry time
* @return The JsonDocument
*/
private Flowable<JsonDocument> getAndTouchPrimary(String key, int documentExpiryTime) {
                //logger.info("{}", new GPCLogEvent("getAndTouchPrimary", "IN", new GPCEntry<>("STATE", circuitBreakerForCouchbaseReadKey.getState().name()), new GPCEntry<>("Call Permitted", circuitBreakerForCouchbaseReadKey.isCallPermitted())));
                if (circuitBreakerForCouchbaseReadKey.isCallPermitted()) {
                                if (currentBucket.getBucketType() != null && currentBucket.getBucketType().equals(BucketType.MAIN)) {
                                                return RxJavaInterop.toV2Flowable(readPrimaryMainAsyncBucket
                                                                                .getAndTouch(key, documentExpiryTime, timeoutForKey, TimeUnit.MILLISECONDS)
                                                                                .retryWhen(RetryBuilder.anyOf(TimeoutException.class).delay(Delay.fixed(delayBetweenRetriesForKey, TimeUnit.MICROSECONDS)).max(retryCountForKey).build()));
                                } else if (currentBucket.getBucketType() != null && currentBucket.getBucketType().equals(BucketType.BACKUP)) {
                                                return RxJavaInterop.toV2Flowable(readPrimaryBackupAsyncBucket
                                                                                .getAndTouch(key, documentExpiryTime, timeoutForKey, TimeUnit.MILLISECONDS)
                                                                                .retryWhen(RetryBuilder.anyOf(TimeoutException.class).delay(Delay.fixed(delayBetweenRetriesForKey, TimeUnit.MICROSECONDS)).max(retryCountForKey).build()));
                                } else {
                                                logger.error("{}", new GPCLogEvent("getAndTouchPrimary", "ERROR"));
                                                return Flowable.error(new DetermineBucketException("getAndTouchPrimary: " + RUN_TIME_BUCKET_CANNOT_BE_DETERMINED));
                                }
                } else {
                                return Flowable.error(new GenericException("getAndTouchPrimary: Circuit is OPEN"));
                }
}

Logs

05:57:35.295 [vertx-blocked-thread-checker] WARN  i.v.core.impl.BlockedThreadChecker - Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 1729768 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
                at sun.misc.Unsafe.park(Native Method)
                at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
                at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
                at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
                at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:85)
                at io.reactivex.Single.blockingGet(Single.java:2835)
                at io.reactivex.Flowable.blockingSingle(Flowable.java:5910)
                at com.test.gpc.spd.verticle.SPDServerVerticle.getAllProduct(SPDServerVerticle.java:1195)
                at com.test.gpc.spd.verticle.SPDServerVerticle$$Lambda$304/807571894.handle(Unknown Source)
                at io.vertx.ext.web.impl.RouteImpl.handleContext(RouteImpl.java:227)
                at io.vertx.ext.web.impl.RoutingContextImplBase.iterateNext(RoutingContextImplBase.java:121)
                at io.vertx.ext.web.impl.RoutingContextImpl.next(RoutingContextImpl.java:134)
                at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.doEnd(BodyHandlerImpl.java:296)
                at io.vertx.ext.web.handler.impl.BodyHandlerImpl$BHandler.end(BodyHandlerImpl.java:276)
                at io.vertx.ext.web.handler.impl.BodyHandlerImpl.lambda$handle$0(BodyHandlerImpl.java:87)
                at io.vertx.ext.web.handler.impl.BodyHandlerImpl$$Lambda$412/503098263.handle(Unknown Source)
                at io.vertx.core.http.impl.HttpServerRequestImpl.doEnd(HttpServerRequestImpl.java:537)
                at io.vertx.core.http.impl.HttpServerRequestImpl.handleEnd(HttpServerRequestImpl.java:526)
                at io.vertx.core.http.impl.Http1xServerConnection.handleEnd(Http1xServerConnection.java:167)
                at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:136)
                at io.vertx.core.net.impl.ConnectionBase.handleRead(ConnectionBase.java:390)
                at io.vertx.core.net.impl.VertxHandler$$Lambda$380/266170253.handle(Unknown Source)
                at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
                at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
                at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:188)
                at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:174)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
                at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
                at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
                at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
                at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
                at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
                at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
                at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1476)
                at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1225)
                at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1272)
                at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
                at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
                at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
                at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
                at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
                at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
                at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
                at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
                at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:421)
                at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:321)
                at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
                at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
                at java.lang.Thread.run(Thread.java:748)

Hey @himanshu.mps

I don’t know much about Vertx, but just going from the stacktrace it seems to be complaining about the blockingSingle() call on line 1195. Unless I’m missing something, it’s simply a legitimate complaint that you’re doing a blocking call, I think? I can’t see any problems with the rest of the code.

Most IO and web stacks now have some way of ‘feeding’ a reactive publisher back into the response, without blocking.

@graham.pople

Vertx uses event loop similar to nodejs. What we are trying to do is using the reactive API to get and touch the document. It is similar to blocking getAndTouch call. The issue that I am facing is that the couchbase doesn’t sends a response for such a long time which blocks the event loop

@himanshu.mps

What is vertx’s time limit for blocking? getAndTouch should be a very fast call, with I’d think the main limits on its speed being your network latency to the cluster, and the size of the document.

@graham.pople

The vertx thread start complaining the thread blocked after the default time of 2 ms. This issue is seen when we are doing performance test where there are tons of get, insert, update and getAndTocuch calls are there. Sometime the test runs fine with 1000 users and sometimes it blocks the thread even with 10 users. There is no change in the test or application functionality for 1000 users vs 10 users.

Well, 2 ms is an extremely tight SLA. Depending on your network setup it’s very possible to achieve that with Couchbase, and considerably faster in fact - but 100% of the time for every single call? That’s very tricky. In fact I’d suggest it’s pretty much impossible to achieve on the JVM, given random garbage collection costs.

I googled and found this reactive streams integration with vertx: https://vertx.io/docs/vertx-reactive-streams/java/. I think that’ll probably solve your woes.

@graham.pople … My mistake… It is not 2 ms but 2 seconds.