Bulk with sub-document API

Hi,

I’m currently using bulk api inside the Java SDK to persit many documents by batch.

I would like to use the Sub-document API to avoid the use of bandwidth to get the document, merge the document with new value and send it back to couchbase.

But I don’t see any way to combine bulk operation and the Sub-document API.

Can you help me to figure out the best way to append data into an array (for each documents) to many documents in one batch?

Thanks

Hi @mickaelhemri,

AsyncBucket, used for bulk operations, supports sub-document operations lookupIn and mutateIn.

This example, without error handling, updates two arrays in multiple documents, using JUnit and RxJava’s TestScheduler for convenience:

  // Create documents like this:
  //        {
  //            "field1": ["a", "b", "c"]
  //            "field2": {
  //              "field3": [1, 2, 3]
  //            },
  //        }
  List<String> docIds = new ArrayList<>(Arrays.asList("doc01", "doc02", "doc03"));
    for (String docId : docIds) {
      JsonDocument document =
        JsonDocument.create(docId,
          JsonObject.from(ImmutableMap.of(
            "field1", JsonArray.from("a", "b", "c"),
            "field2", JsonObject.from(ImmutableMap.of("field3", JsonArray.from(1, 2, 3)
            ))
          )));
      bucket.upsert(document);
    }

  // Create an observable from the document IDs to operate over
  Observable<String> docIdsObs = Observable.from(docIds);

  // Mutate each document by appending items to two different arrays
  // After the changes, each document will look like this:
  //        {
  //            "field1": ["a", "b", "c", "e"]
  //            "field2": {
  //              "field3": [1, 2, 3, 4]
  //            },
  //        }
  Observable<DocumentFragment<Mutation>>
      subdocAppendObs = docIdsObs.flatMap(docId -> asyncBucket
        .mutateIn(docId)
        .arrayAppend("field1", "e", subdocOptions)
        .arrayAppend("field2.field3", 4, subdocOptions)
        .execute());

  // Subscribe to the observable and await completion
  TestSubscriber<DocumentFragment<Mutation>> subscriber = new TestSubscriber<>();
  subdocAppendObs.subscribe(subscriber);
  subscriber.awaitTerminalEvent();

  // Assert the expected results
  subscriber.assertCompleted();
  subscriber.assertValueCount(3);
  List<DocumentFragment<Mutation>> mutations = subscriber.getOnNextEvents();
  for (DocumentFragment<Mutation> mutation : mutations) {
    // Document mutations should have IDs matching the document IDs
    assertTrue("Unexpected document ID " + mutation.id(), 
        docIds.contains(mutation.id()));
  }

  // Retrieve one of the documents and confirm the change
  assertEquals("Wrong number of elements", 4,
    bucket.get("doc02").content().getObject("field2").getArray("field3").size());

Since any mutation may fail, your code must handle the errors and recover accordingly, either retrying operations or tracking progress and retrying at a later time.

Also, there is no guarantee you will receive acknowledgement of a successful mutation.

If performing the mutation twice on a document will have an undesired effect, you must use both document ID and CAS value when mutating the document. See MutateInBuilder.withCas(long).

I’m using Couchbase Java SDK 1.5.1 and RxJava 1.3.2.

Regards,

Jeff

1 Like

Thank you so much, this is exactly what I was looking for!

Do you know why there is no guarantee to receive an acknowledgement for each successful mutations?

Best

Sure, the missed acknowledgment scenario is described in the Generic Error Types section of the documentation:

One exception that you want to plan for is the RequestCancelledException. It comes up if a request is in-flight but the socket gets closed or the SDK can’t schedule the request for longer than the maximum configurable lifetime. In any event, if the operation is idempotent it makes sense to retry with backoff, but if a mutation operation has been canceled extra care needs to be taken to make sure state is not lost. For example, if a replace operation got canceled it makes sense to check again with a get to see if it took place or not and then react based on the result.

@jkurtz can we do the same with Golang SDK?