AsyncBucket.getAndTouch() vs Bucket.getAndTouch() when doing so through RX java

Hi guys, I am currently working on some custom code that fetches document ids for documents that are older than a certain date, and then using rx java it issues statements to change the TTL in the document to expire them soon ~30sec from current time.

The problem I am experiencing consists of some issue where if the bucket in use is a plain Bucket (which I assume is sync by nature) then the code executes fine (and slowly) and documents are expired and I can confirm in CB that they are no longer in use.

However, when the bucket .async() is used however, the code executes way faster (as expected) and all of the calls are issued to change the ttl, but upon closer inspection on the CB side, I see it never happens, since the documents never expire in couchbase.
This is some sample code to illustrate:

public <T> Long  batchOperation(Bucket couchbaseBucket, Iterable<T> items, BiFunction<T, Bucket, 
Observable<String>> batchProcessor, int batchLength) {

        AtomicLong counter = new AtomicLong();

        Observable<String> observableFromDocs =
                Observable
                    .from(items)
                    .buffer(batchLength)
                    .flatMap(observableLists -> Observable.from(observableLists))
                    .flatMap(batchElem -> batchProcessor.apply(batchElem, couchbaseBucket))
                    .retryWhen(
                            RetryBuilder.anyOf(BackpressureException.class, Exception.class)
                                    .doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
                                            log.error("Retrying load. Attempt {} For exception {}", integer,throwable.toString())
                                    )
                                    .delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
                                    .max(MAX_RETRIES)
                                    .build()
                    );

    observableFromDocs.subscribe(
            (elem)-> {},
            elem -> log.error("Error during batch item operation", elem),
            () -> {counter.incrementAndGet();log.debug("Completed ASYNC load ");});

    return counter.get();
}

and this is the batch processor code:

public static BiFunction<String, Bucket, Observable<String>> purgeBatchOperationActionResolver =
        (String docId, Bucket bucket) -> {
        bucket.getAndTouch(docId, 30);
    return Observable.from(new String[] {item.getDocId()});
};

Notice that changing the bucket to bucket.async() makes the code run fast as it should, and the calls to:

    bucket.getAndTouch(docId, 30);

are all done but nothing happens on the cb side

Has someone seen a similar problem before? Maybe the main thread dies before the sdk emmits these calls?

Maybe the main thread dies before the sdk emmits these calls?

It’s certainly possible. There doesn’t seem to be any code in batchOperation that waits for the Observable to emit the final element before returning the counter value.

mreiche I feel this is a similar scenario to another question you helped me figure out a while back, can you pls take a look at this one? ^
If the bucket is async all of the code is executed for every observable but nothing happens on Couchbase.
If the bucket is a synchronous one or if every observable blocks then I see the effect but it takes forever as it seems to be done synchronously.

I was able to solve this on my own after a few changes to the code above ^,
the code was changed as follows:

public <T> Long  batchOperation(Bucket couchbaseBucket, Iterable<T> items, BiFunction<T, AsyncBucket, 
Observable<JsonDocument>> batchProcessor, int batchLength) {

    AtomicLong counter = new AtomicLong();
    AsyncBucket asBucket = bucket.async();

    Observable<String> observableFromDocs =
            Observable
                .from(items)
                .buffer(batchLength)
                .flatMap(observableLists -> Observable.from(observableLists))
                .flatMap(batchElem -> batchProcessor.apply(batchElem, asBucket))
                .retryWhen(
                        RetryBuilder.anyOf(BackpressureException.class, Exception.class)
                                .doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
                                        log.error("Retrying load. Attempt {} For exception {}", integer,throwable.toString())
                                )
                                .delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
                                .max(MAX_RETRIES)
                                .build()
                );

observableFromDocs.subscribe(
        (elem)-> {},
        elem -> log.error("Error during batch item operation", elem),
        () -> {counter.incrementAndGet();log.debug("Completed ASYNC load ");});

return counter.get();

}

and this:

public static BiFunction<String, AsyncBucket, Observable<JsonDocument>> purgeBatchOperationActionResolver =
    (String docId, AsyncBucket asBucket) -> {
        return  asBucket.getAndTouch(docId, 30);
};

It is now working as expected executing really fast, asynchronously and the thread remains active long enough to actually do the couchbase round trip.