I assume your application exits right after the call to batchUpsert() and never has a chance to apply the upsert.
If you cluster.disconnect() the upsert will be applied. Or wait a few hundredths of a second.
import com.couchbase.client.core.BackpressureException;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.util.retry.RetryBuilder;
import rx.Observable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.logging.Logger;
public class ObservableExample {
Logger log = Logger.getLogger("mylogger");
long RETRY_DELAY_CEILING = 1000;
int MAX_RETRIES = 2;
String connectionString = "localhost";
String username = "Administrator";
String password = "password";
String bucketName = "travel-sample";
CouchbaseCluster cluster;
Bucket bucket;
AtomicLong counter = new AtomicLong();
private ObservableExample init() {
cluster = CouchbaseCluster.create(connectionString);
cluster.authenticate(username, password);
bucket = cluster.openBucket(bucketName);
return this;
}
public static void main(String[] args){
ObservableExample obj = new ObservableExample().init();
List<String> items=new ArrayList<>();
items.add("one");
obj.bucket.remove("one");
obj.batchUpsert(items, obj::creator, obj.bucket);
obj.cluster.disconnect();
System.out.println("count: "+obj.counter.get());
obj.init();
System.out.println( obj.bucket.get("one"));
obj.cluster.disconnect();
}
public JsonDocument creator(String s){
JsonDocument jdoc = JsonDocument.create(s, JsonObject.create().put("a", "a"));
return jdoc;
}
public <T> long batchUpsert(Iterable<T> items, Function<T, JsonDocument> docCreator, Bucket couchbaseBucket) {
AsyncBucket asyncBucket = couchbaseBucket.async();
Observable<JsonDocument> observableFromDocs =
Observable
.from(items)
.map(elem -> docCreator.apply(elem))
.filter(elem -> elem!=null)//skip creating problematic docs. logging their info for troubleshooting
.flatMap(elem -> upsertDocument(elem, asyncBucket))
.retryWhen(
RetryBuilder.anyOf(BackpressureException.class, Exception.class)
.doOnRetry((Integer integer, Throwable throwable, Long aLong, TimeUnit timeUnit) ->
log.severe("Retrying load. Attempt {} For exception {} " + integer +" "+throwable.toString())
)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, RETRY_DELAY_CEILING))
.max(MAX_RETRIES)
.build()
);
observableFromDocs.subscribe(
(elem)-> {System.out.println("inserted");},
elem -> log.severe("Document insertion failure " +elem),
() -> {counter.incrementAndGet();log.fine("Completed ASYNC load ");});
return counter.get();
}
private Observable<JsonDocument> upsertDocument(JsonDocument document, AsyncBucket asyncBucket){
Observable<JsonDocument> observableDoc = asyncBucket.upsert(document);
return observableDoc;
}
}