Issue with reactive bulk get operation

Hi,

I am seeing issues while doing a bulk get using reactive code. This is the code

List<JsonDocument> documents =Observable.from(subList)
			.flatMap(new Func1<String, Observable<JsonDocument>>() {
				@Override
				public Observable<JsonDocument> call(String metaId) {
					if(metaId != null) {
						System.out.println("Sending: " + metaId);
						return asyncBucket.get(metaId);
					}
					return Observable.empty();
				}
			},100)
			.doOnNext(t -> {
				System.out.println("Received: " + t.id());
			})
			.toList()
			.toBlocking()
			.single();

The sublist contains 100 elements. While the sending is showing that all the request for 100 items have been sent which are there in database, I only get response for 90 elements (always) and then I get this warning:

Jul 12, 2018 1:35:26 PM com.couchbase.client.core.endpoint.AbstractGenericHandler$KeepAliveResponseAction onError
WARNING: [/127.0.0.1:8093][QueryEndpoint]: Got error while consuming KeepAliveResponse.
java.util.concurrent.TimeoutException
	at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber.onTimeout(OnSubscribeTimeoutTimedWithFallback.java:166)
	at rx.internal.operators.OnSubscribeTimeoutTimedWithFallback$TimeoutMainSubscriber$TimeoutTask.call(OnSubscribeTimeoutTimedWithFallback.java:191)
	at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Am I doing something incorrectly ?

I am using java-client 2.5.9 and rx-java 1.3.8 and here is my couchbase configuration details:

INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslTruststoreFile='null', sslKeystorePassword=false, sslTruststorePassword=false, sslKeystore=null, sslTruststore=null, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=3, computationPoolSize=3, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=12, queryServiceEndpoints=12, searchServiceEndpoints=12, configPollInterval=2500, configPollFloorInterval=50, ioPool=NioEventLoopGroup, kvIoPool=null, viewIoPool=null, searchIoPool=null, queryIoPool=null, coreScheduler=CoreScheduler, memcachedHashingStrategy=DefaultMemcachedHashingStrategy, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.5.9 (git: 2.5.9, core: 1.5.9), retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS, powers of 2; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=10, upper=100000}, keepAliveInterval=30000, continuousKeepAliveEnabled=true, keepAliveErrorThreshold=4, keepAliveTimeout=2500, autoreleaseAfter=75000, bufferPoolingEnabled=true, tcpNodelayEnabled=true, mutationTokensEnabled=false, socketConnectTimeout=30000, callbacksOnIoPool=false, disconnectTimeout=25000, requestBufferWaitStrategy=com.couchbase.client.core.env.DefaultCoreEnvironment$2@43b6a5a6, certAuthEnabled=false, coreSendHook=null, forceSaslPlain=false, queryTimeout=75000, viewTimeout=75000, searchTimeout=75000, analyticsTimeout=75000, kvTimeout=2500, connectTimeout=30000, dnsSrvEnabled=false}

Fixed it by using RetryHandler

If it’s just taking a while every once in a while, you might try setting a higher timeout for the operation instead. That’d be less work on the system than retrying things that timeout. Any other thoughts @subhashni or @daschl?

1 Like

@subhashni @daschl @ingenthr

Hi,

I used 2.6.0 and here are the opentracing logs while connecting to localhost couchbase instance:

https://pastebin.com/sM5dmdcw

Can you suggest what might be causing the issue. I am doing a bulk get of 500 ID’s from couchbase using reactive api and it is taking 3297 milliseconds.

Thanks for providing the log, helpful to look at.

One thing you need to consider is that even if you are fetching 100 in parallel, technically you are not because you only connect to localhost (1 machine) over 1 socket. So they are going to be serialized over the network, even if the JVM code is non-blocking. Can you try your code on a cluster of lets say 3 nodes with the kv service enabled and see if it performs better? You could also, for a test environment, try to increase the kvEndpoints to 2 or 3 and see if it makes a difference. But note that I recommend this only for the 1 localhost environment, for prod I think adding more nodes will help here too.

Also note that some operations really take a loong time to complete, i.e.

{
“operation_name” : “get”,
“server_us” : 11,
“last_local_id” : “7E661647ABBEAB37/0000000018E10958”,
“last_local_address” : “127.0.0.1:51526”,
“last_remote_address” : “127.0.0.1:11210”,
“last_dispatch_us” : 59930,
“last_operation_id” : “0x7”,
“total_us” : 3171838
}

It spent 60ms in the server & network (which on localhost is negligible) but then it took 3 171 838 to complete. So that’s over 3 seconds. And the server reported 11 micros vs 60ms dispatch time… All this points to the fact that your JVM and/or OS might be overloaded - doing GC and whatnot. Have you done some profiling of the system and/or running with gc logs enabled to exclude that?

I am getting the similar results while running it on a cluster of 3 nodes. If you want I can deploy it as a docker application and share the results when it is run in a cluster. It will take a while to do that but let me share those results too as we are seeing the same behaviour there too. I will also attach the code so that you can check if I am doing something incorrectly.

Shared the logs in the couchbase forum inbox of yours.