We are in the process of refactoring a benchmark tool migrating from Couchbase Client 2 to new CouchBase SDK 2.
Previous version has following “bulk get” logic to retrive keys in bulk and if it fails reading from the master, there is a failover to read from the “replicas”
Legacy code :
List<Map.Entry<String, OperationFuture<CASValue<JsonNode>>>> futures = new java.util.ArrayList<>(keys.size());
for (String key : keys) {
futures.add(new AbstractMap.SimpleImmutableEntry<>(key, client.asyncGets(key, transcoder)));
}
Map<String, Long> casValues = new java.util.HashMap<>(keys.size(), 1f);
for (Map.Entry<String, OperationFuture<CASValue<JsonNode>>> e : futures) {
String key = e.getKey();
OperationFuture<CASValue<JsonNode>> future = e.getValue();
try {
CASValue<JsonNode> casVal = future.get();
if (checkStatus(future.getStatus(), errIfNotFound) == OK) {
result.put(key, JsonByteIterator.asMap(casVal.getValue()));
casValues.put(key, casVal.getCas());
} else {
return ERROR;
}
} catch (RuntimeException te) {
if (te.getCause() instanceof CheckedOperationTimeoutException) { ///READ FROM REPLICA
log.warn("Reading from Replica as reading from master has timed out.");
// This is a timeout operation on a read, let's try to read from slave
ReplicaGetFuture<JsonNode> futureReplica = client.asyncGetFromReplica(key, transcoder);
result.put(key, JsonByteIterator.asMap(futureReplica.get()));
} else {
throw te;
}
}
}
Using new Couchbase SDK2
According to the new Couchbase 2 SDK docs ,
http://docs.couchbase.com/developer/java-2.0/documents-bulk.html
I have following logic to retrieve in bulk.But I am not quite sure where to add the failover mechanism to read from “replicas” using
bucket.async().getFromReplica(key, ReplicaMode.ALL);
List<RawJsonDocument> rawDocs = idObs.flatMap((keys)->{
Observable<RawJsonDocument> rawJsonObs = bucket.async().get(key, RawJsonDocument.class);
return rawJsonObs;
}).toList()
.toBlocking()
.single();
How can I implement this “read from replica” failover mechanism with the new RxJava based CouchBase SDK ?