BucketStreamAggregator.feed() produces nothing

I can’t seem to get the BucketStreamAggregator.feed() method to do anything. Here’s what I’m doing more or less

CoreEnvironment env = DefaultCoreEnvironment
	.builder()
	.dcpEnabled(true)
	.build();

CouchbaseCore core = new CouchbaseCore(env);

CouchbaseRequest seedRequest = new SeedNodesRequest(couchbaseConfig.getNodes());
core.send(seedRequest)
	.timeout(10, TimeUnit.SECONDS)
	.toBlocking()
	.single();
				
CouchbaseRequest openRequest = new OpenBucketRequest(couchbaseConfig.getBucket(), couchbaseConfig.getPassword());
core.send(openRequest)
	.timeout(10, TimeUnit.SECONDS)
	.toBlocking()
	.single();

BucketStreamAggregator aggregator = new BucketStreamAggregator("testconnection", core, "testbucket");
aggregator.feed
	.forEach(dcpr -> { System.out.println("ding!); );

What happens is; the SeedNodesRequest and OpenBucketRequest are sent off successfully but then aggregator.feed.forEach(...) returns immediately and nothings ever printed. I can’t help but feel that I’m missing something. Any help is appreciated.

Alex

EDIT:
I’m using java-client:2.5.5 and connecting to a cluster with one single node running 3.1.1-1807 Enterprise Edition (build-1807) server

Is it by any chance that the application doesn’t live long enough for any event to be received? (like doing this in a main and forgetting to sleep at the end)

Is there traffic on the cluster? Did you try with sending eg. an UpserRequest?

@avsej will be able to help more there, but keep in mind that this is an @Experimental API that is for now expected to be ready to use only internally on things like the Kafka connector. I know this part of the implementation/API is being heavily reworked for 2.2.6 for instance…

Hi and thank you for quick and informative reply.

I have a running example where I perform all the required steps manually, it goes something like this

core.send(new SeedNodesRequest(nodes))
	.timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
	.toBlocking()
	.single();

core.send(new OpenBucketRequest(bucket, password))
	.timeout(CONNECT_TIMEOUT, TimeUnit.SECONDS)
	.toBlocking()
	.single();

core.send(new OpenConnectionRequest(STREAM_NAME, bucket))
	.flatMap(response ->
		core.<GetClusterConfigResponse>send(new GetClusterConfigRequest())
		   .map(configResponse -> {
			   CouchbaseBucketConfig bucketCfg = (CouchbaseBucketConfig)configResponse.config().bucketConfig(bucket);
			   return bucketCfg.numberOfPartitions();
		   }))
	.flatMap(nVBuckets -> {
		Func2<Integer, Integer, Observable<DCPRequest>> requestRange = (from, to) -> Observable.range(from, to)
			.flatMap(vBucket -> core.<StreamRequestResponse>send(new StreamRequestRequest(vBucket.shortValue(), bucket)))
			.flatMap(streamRequestResponse -> {
				/* I'm unsure about this part */
				ConnectableObservable<DCPRequest> stream = streamRequestResponse.connection().subject().publish();
				stream.connect();
				return stream;
			});

		/* A bug in couchbase inhibits us from getting all the streams in one go, see
		 * http://review.couchbase.org/#/c/61512/
		 */
		return Observable.merge(
			requestRange.call(0, 256),
			requestRange.call(256, 512),
			requestRange.call(512, 768),
			requestRange.call(768, 1024));
	})
	.toBlocking()
	.forEach(dcpr -> System.out.println("ding!"));

And it works (although I realize that’s it’s not complete), so I know that there’s traffic on the cluster, I’m also sure that my program doesn’t exit early. I have not performed any UpsertRequests and I’m new to both RxJava and Couchbase so it’s likely that I’m going about this the wrong way and I’m greatful for any help.

Alex

Hi again!

I’ve tested the BucketStreamAggregator against a few other servers. This is what I’ve discovered

Server version                                     | Works
---------------------------------------------------+-------
3.1.1-1807 Enterprise Edition (build-1807)         | No
3.1.3-1823 Enterprise Edition (build-1823)         | Yes
Version: 4.0.0-4051 Community Edition (build-4051) | Yes

So maybe this was fixed somewhere between 3.1.1 and 3.1.2. Let me know if I can help you in any way.

Alex