Document not saved on disconnect with asyncbucket?

Hey,

I am currently trying to save/load some documents via asyncbucket and running into problems.
After narrowing it down i managed to write this simple testcode that looses data:

// 1. Create connection
// 2. Store a new Document via AsyncBucket
// 3. Disconnect from the Cluster
If the asyncBucket is not indirectly converted to a sync-call the document will be lost in this test.

What am i doing/using wrong here?
Version: “5.1.1-5723-community”.
java-client 2.6.2 and 2.7.1

private void asynctesting() {
	Cluster cluster = CouchbaseCluster.create(environment, Configs.A.getDatabaseName());
	cluster.authenticate(Configs.A.getCouchbaseUsername(), Configs.A.getBucketPassword());
	Bucket bucket = cluster.openBucket(Configs.A.getBucketName(), 30, TimeUnit.SECONDS);
	AsyncBucket asyncBucket = bucket.async();
	String id = "asynctestobject";

	RawJsonDocument doc = RawJsonDocument.create(id, "testdata");
	
	// Document is never saved/visible in Couchbase Console
	asyncBucket.upsert(doc, 10, TimeUnit.SECONDS);
	
	// Document is saved/visible in Couchbase Console
	//asyncBucket.upsert(doc, 10, TimeUnit.SECONDS).toBlocking().last();

	asyncBucket.close();
	bucket.close(5, TimeUnit.SECONDS);
	cluster.disconnect(10, TimeUnit.SECONDS);

}

(I am obviously having the same problem when saving a doc via asyncBucket and instantly trying to retrieve it via asyncBucket. The result is always empty)

Thanks,
Kademlia

:upside_down_face: Well as so often you find the problem directly after posting.

The test works by closing the Environment as well.
environment.shutdown(10, TimeUnit.SECONDS);
After that call the document will be persisted and visible in the console.

This leaves my initial problem:

Why is it not possible to do something like this from the same client and what do i need to do to get close to this?

asyncBucket.upsert(doc, 10, TimeUnit.SECONDS);
asyncBucket.get(docid).toBlocking().last().content(); // does not get the object from above

Thanks,
kademlia

Hey @Kademlia

In the first post, it’s failing because you kick off an asynchronous operation and then immediately your application ends before it’s actually sent.

In the second, it’s basically the same - you’re kicking off an asynchronous operation and then immediately reading the doc before it’s complete.

If you want to do the get after the upsert, you can do this:

    asyncBucket.upsert(doc, 10, TimeUnit.SECONDS)
            .flatMap(v -> asyncBucket.get(docid))
            .flatMap(doc -> // do something with doc.content())

asyncBucket uses the reactive programming library RxJava. In general with reactive programming you want to avoid blocking if at all possible, and keep the reactive chain going with flatMap and other operators, before ultimately calling subscribe. You may find this tutorial useful.

@graham.pople

Thanks for the response. I got it to work with a Subscriber - that is necessary as far as i can tell.

		String id = "asynctestobject";
	RawJsonDocument doc = RawJsonDocument.create(id, "testdata");
	asyncBucket.upsert(doc, 10, TimeUnit.SECONDS)
	.flatMap(v -> asyncBucket.get(id, RawJsonDocument.class))
	.subscribe(raw -> System.out.println("Got: " + raw.content()));

Just that i understand this correctly there are basically only two options then for these scenarios:

  1. Insert data syncronously
  2. Insert data async and keep track of the specific Observable if the data is being used somewhere else in the immediate future.
    (3. There is no data caching from the asyncBucket that is accessible in the same environment)

Are there any other options (maybe something like .flush() Operations )?

Thanks,
Kademlia

Hi @Kademlia

Correct, you need a .subscribe() at the end of the chain ultimately (or a toBlocking or similar, which does a subscribe under the hood). Ah and in fact I’d missed that in your original examples you weren’t subscribing to the upsert operations, so in fact they would not have been started at all.

I don’t really follow your other questions, sorry. What would you like a flush() operation to do?

In the first case of a complete shutdown of the connection to Couchbase the client tries to insert all the missing async data - if not this would result in random data loss. Under the hood couchbase should have a “wait for all async tasks to finish” or “force all outstanding tasks to finish”. This would be a third possibility to solve said problem (similar to flushing/committing in rdbms, probably no the best word to use when “Flush a Bucket” exists in this context :)).

Ah I see. Well for one, there’s no way for the SDK to guarantee that it sends queued mutations out. What if your application simply crashes? Second, if we ignore that scenario, I don’t think it’s possible for us to do it anyway. It’s your application that owns the Subscription object, and at the SDK layer we don’t know anything about it, so we can’t block until all active subscriptions are finished. Sorry but I think this is something you need to manage at the application layer.