Not receiving expected errors when trying to insert documents that already exist

I am working with version 2.7.13 of the Java SDK against a local instance of Couchbase Community Edition Server version 6.0. I have written a simple program to insert a number of test documents into a bucket. The program is similar to the one given at the end of the Batching Operations page of the SDK documentation except that, instead of creating the adding the documents to a List and then creating the Observable from that List, I use the range() method to generate a series of integers that are used to generate the document identifiers:

    	int start = 1;
    	int count = 10;

    	Observable<Contact> contacts = Observable
    		.range(start, count)
    		.map(Contact::new);

    	contacts
    	.map(Contact::getJsonDocument)
    	.flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
    	    @Override
    	    public Observable<JsonDocument> call(final JsonDocument jsonDocument) {
    		return bucket.async().insert(jsonDocument);
    	    }
    	})
    	.toBlocking()
    	.subscribe(result -> {
    	        logger.info(String.format("subscribe: onNext: %s", result));
    	    },
    	    error -> {
    	        logger.warning(String.format("subscribe: onError: %s", error));
    	        if (error instanceof CompositeException) {
    		    CompositeException ce = (CompositeException) error;
    		    for (Throwable t : ce.getExceptions()) {
    		        logger.warning(String.format("+ %s", t));
    		    }
    	        }
    	    }
        );}

The Contact class constructor accepts an integer parameter which is used to construct the document’s id.

If I run it without first cleaning out the bucket (i.e. trying to insert ten documents that all already exist), I get:

WARNING: subscribe: onError: rx.exceptions.CompositeException: 4 exceptions occurred. 
WARNING: + com.couchbase.client.java.error.DocumentAlreadyExistsException
WARNING: + com.couchbase.client.java.error.DocumentAlreadyExistsException
WARNING: + com.couchbase.client.java.error.DocumentAlreadyExistsException
WARNING: + com.couchbase.client.java.error.DocumentAlreadyExistsException

The number of exceptions varies from run to run but always seems to be in the range 3-6. However, I would expect to get 10 exceptions as all the documents are already present in the bucket. I am hoping that someone might be able to explain this behavior or suggest how to investigate further.

Thanks!

Curious. I don’t spot anything, but @daschl or @graham.pople would probably know. By chance are you System.exit()ing before all the responses come back? I believe the thread handling the error will be a daemon thread.

@jsb here is what I think is happening: the Observable contract says that a single error terminates the stream, and an error in this case is also

DocumentAlreadyExistsException

So if you want to “run through” all the 10 ops, you need to move some error handling inside your flatMap (onError*) and log in there and return an empty Observable instead so that the stream continues.

return bucket.async().insert(jsonDocument).onErrorResumeNext(throwable -> {
  System.err.println(throwable);
  return Observable.empty();
});

@ingenthr that should be ok, he is using toBlocking() in the code.

Hi @daschl, many thanks for responding so quickly. Your suggestion works! However, there is still one thing that I do not understand clearly: if a single error terminates the stream, why was I seeing multiple errors? Is it because several inserts are run in parallel so that, by the time the first error is noticed downstream, several failures have accumulated?

To answer my own question, after some more thought, the single exception that stops the flow is the CompositeException instance which bundles up several DocumentAlreadyExistsException instances. Where is the CompositeException instance created? The throwable passed to the onErrorResumeNext method is a DocumentAlreadyExistsException.

Yeah well technically rxjava still raises only one error down the pipeline, it seems to wrap outstanding errors into a composite exception before terminating the stream. It’s not happening in the couchbase code, this is done purely in rxjava land.

You are 100% correct. For some reason, I had it in my head that CompositeException was a Couchbase thing.

However, there is still one thing that I do not understand clearly: if a single error terminates the stream, why was I seeing multiple errors? Is it because several inserts are run in parallel so that, by the time the first error is noticed downstream, several failures have accumulated?

If you switch the .flatMap for a .concatMap, it should make it execute one insert at a time in serial.

Indeed, however one of the reasons for using .flatMap was to get concurrent execution and thus better performance (hopefully).

In case anyone is interested, I came across a pair of blog entries that explained the workings of .flatMap in some detail:

@jsb sure, I was just answering why you were seeing multiple errors, because of the parallel inserts.