Upserting with async bucket on java sdk 2.7.15

I assume your application exits right after the call to batchUpsert() and never has a chance to apply the upsert.
If you cluster.disconnect() the upsert will be applied. Or wait a few hundredths of a second.

  import com.couchbase.client.core.BackpressureException;
  import com.couchbase.client.core.time.Delay;
  import com.couchbase.client.java.AsyncBucket;
  import com.couchbase.client.java.Bucket;
  import com.couchbase.client.java.CouchbaseCluster;
  import com.couchbase.client.java.document.JsonDocument;
  
  import com.couchbase.client.java.document.json.JsonObject;
  import com.couchbase.client.java.util.retry.RetryBuilder;
  
  import rx.Observable;
  
  import java.util.ArrayList;
  import java.util.List;
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicLong;
  import java.util.function.Function;
  import java.util.logging.Logger;
  
  public class ObservableExample {
  
    Logger log = Logger.getLogger("mylogger");
    long RETRY_DELAY_CEILING = 1000;
    int MAX_RETRIES = 2;
    String connectionString = "localhost";
    String username = "Administrator";
    String password = "password";
    String bucketName = "travel-sample";
  
    CouchbaseCluster cluster;
    Bucket bucket;
    AtomicLong counter = new AtomicLong();
  
  
    private ObservableExample init() {
      cluster = CouchbaseCluster.create(connectionString);
      cluster.authenticate(username, password);
      bucket = cluster.openBucket(bucketName);
      return this;
    }
  
    public static void main(String[] args){
        ObservableExample obj = new ObservableExample().init();
        List<String> items=new ArrayList<>();
        items.add("one");
        obj.bucket.remove("one");
        obj.batchUpsert(items, obj::creator, obj.bucket);
        obj.cluster.disconnect();
        System.out.println("count: "+obj.counter.get());
        obj.init();
        System.out.println( obj.bucket.get("one"));
        obj.cluster.disconnect();
    }
  
    public JsonDocument creator(String s){
      JsonDocument jdoc = JsonDocument.create(s, JsonObject.create().put("a", "a"));
      return jdoc;
    }
  
    public <T> long  batchUpsert(Iterable<T> items, Function<T, JsonDocument> docCreator, Bucket couchbaseBucket) {
  
      AsyncBucket asyncBucket = couchbaseBucket.async();
      Observable<JsonDocument> observableFromDocs =
          Observable
              .from(items)
              .map(elem -> docCreator.apply(elem))
              .filter(elem -> elem!=null)//skip creating problematic docs. logging their info for troubleshooting
              .flatMap(elem -> upsertDocument(elem, asyncBucket))
              .retryWhen(
                  RetryBuilder.anyOf(BackpressureException.class, Exception.class)
                      .doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
                          log.severe("Retrying load. Attempt {} For exception {} " + integer +" "+throwable.toString())
                      )
                      .delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
                      .max(MAX_RETRIES)
                      .build()
              );
  
      observableFromDocs.subscribe(
          (elem)-> {System.out.println("inserted");},
          elem -> log.severe("Document insertion failure " +elem),
          () -> {counter.incrementAndGet();log.fine("Completed ASYNC load ");});
      return counter.get();
    }
  
    private Observable<JsonDocument> upsertDocument(JsonDocument document, AsyncBucket asyncBucket){
      Observable<JsonDocument> observableDoc = asyncBucket.upsert(document);
      return observableDoc;
    }
  
  }
2 Likes