Only half of the binary documents are getting inserted during bulk insert

Hi, I am having a weird problem during insertion. I have two types of documents - JSON and BinaryDocument. I am performing bulk insert operation restricted to a batch size.

The operation works fine for JSON documents. But if I upload, say 100 documents, then only 50 are getting upload in the case of BinaryDocument. Every time only half the number of documents are getting loaded in the database.

Here is my code for JSON document insertion:

public void createMultipleCustomerDocuments(String docId, Customer myCust, long numDocs, int batchSize) {

        Gson gson = new GsonBuilder().create();
        JsonObject content = JsonObject.fromJson(gson.toJson(myCust));
        JsonDocument document = JsonDocument.create(docId, content);
        jsonDocuments.add(document);
        documentCounter.incrementAndGet();
        System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
        if(documentCounter.get() >= batchSize){
            System.out.println("Document counter: " + documentCounter.get());
            Observable
            .from(jsonDocuments)
            .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
                public Observable<JsonDocument> call(final JsonDocument docToInsert) {
                    return theBucket.async().upsert(docToInsert);
                }
            })
            .last()
            .toList()
            .toBlocking()
            .single();
            jsonDocuments.clear();
            documentCounter.set(0);
        }


    }

This works completely fine. I have no problem in insertion.

Here is the code for my BinaryDocument insertion:

public void createMultipleCustomerDocuments(final String docId, ByteBuffer myCust, long numDocs, int batchSize) throws BackpressureException, InterruptedException {
        ByteBuf buffer = Unpooled.wrappedBuffer(myCust);
        binaryDocuments.add(buffer);
        documentCounter.incrementAndGet();
        
        System.out.println("Batch size: " + batchSize + " Document Counter: " + documentCounter.get());
        
        if(documentCounter.get() >= batchSize){
            System.out.println("Document counter: " + documentCounter.get() + " Binary Document list size: " + binaryDocuments.size());
            Observable
            .from(binaryDocuments)
            .flatMap(new Func1<ByteBuf, Observable<BinaryDocument>>() {
                public Observable<BinaryDocument> call(final ByteBuf docToInsert) {
                    //docToInsert.retain();
                    
                    return theBucket.async().upsert(BinaryDocument.create(docId, docToInsert));
                    
                } 
            })
            .last()
            .toList()
            .toBlocking()
            .single();
            
            binaryDocuments.clear();
            
            documentCounter.set(0);
        }
     }   

This fails. Exactly half the number of documents get inserted. Even the numbers are printed in exactly the same manner as of JSON document’s function’s numbers. The documentCounter shows the correct number. But the number of documents that get inserted in the DB is only the half of what it is shown.

Can someone please help me this?

This has been cross-posted and answered on StackOverflow (see this answer by johnwowus).

The batch was badly built, as it buffered ByteBuf instead of directly creating and buffering BinaryDocument. This resulted in all documents in the batch being created with the same docId (the last docId in the batch, triggering flush to db), effectively overwriting one another.