@simonbasle Yes, your guess is correct. But the performance has drastically reduced from 25k-30k ops/sec to almost around 7k ops/sec, since I remove Stale.FALSE parameter. I will anyway give you the read function. You can have look at it and suggest me ways to improve it.
private static void getKeysAndRead(ExecutorService executorService)
throws Exception {
BinaryBO theBo = new BinaryBO(config.getBucketName());
Set<Future<List<BinaryDocument>>> totalDocumentsFutures = new HashSet<Future<List<BinaryDocument>>>();
while (counter <= docsInBucket) { //docsInBucket is the number of documents in the bucket
if (docsInBucket - counter < nDocs) {
nDocs = docsInBucket - counter;
keys = theBo.getKeys(lastKey, nDocs);
List<Customer> documents = theBo.bulkReadUsingKeys(keys);
counter += documents.size();
}
else
{
keys = theBo.getKeys(lastKey, nDocs);
int keysUsed = 0;
int batchNumber = 0;
while (keysUsed < keys.size()-1) {
List<String> threadKeys = getThreadKeys(batchNumber++,
config.getBatchSize(), keys);
@SuppressWarnings("unchecked")
Future<List<BinaryDocument>> documents = executorService
.submit(new BulkReadBinaryDocumentsRunner(threadKeys));
totalDocumentsFutures.add(documents);
keysUsed += threadKeys.size();
}
}
for (Future<List<BinaryDocument>> doc : totalDocumentsFutures) {
counter += doc.get().size();
for(BinaryDocument d : doc.get()) {
d.content().release();
}
doc.get().clear();
}
totalDocumentsFutures.clear();
lastKey = keys.get(keys.size() - 1);
System.out.println("Total Size: " + counter);
if(counter >= docsInBucket){
break;
}
}
}
keys.clear();
}
This is the function which determines the keys for the threads.
private static List<String> getThreadKeys(int number, Integer batchSize,
List<String> keys) {
int startIndex = number * batchSize;
int endIndex = (number + 1) * batchSize;
if(startIndex > keys.size()-1){
startIndex = keys.size()-1;
}
if (endIndex > keys.size()) {
endIndex = keys.size() - 1;
}
return keys.subList(startIndex, endIndex);
}
This function is to deserialize once the documents have been fetched from the database.
public ArrayList<Customer> bulkReadUsingKeys(List<String> keys) throws Exception {
List<BinaryDocument> documents = theDao.readBinaryDocumentsUsingKeys(keys);
ArrayList<Customer> customers = new ArrayList<Customer>();
ByteBuffer buffer = null;
for(int i = 0; i<documents.size(); i++){
try{
buffer = documents.get(i).content().nioBuffer();
Customer cust = (Customer) Utilities.byteBufferToJavaObject(Utilities.getSerializer(), buffer, new String("com.ht.TestingWithMaven.Customer"));
customers.add(cust);
buffer.clear();
for(BinaryDocument d : documents) {
d.content().release();
}
documents.clear();
}
catch(DecoderException e){
System.exit(0);
}
catch(Exception e) {
e.printStackTrace();
}
}
return customers;
}
This function actually fetches the data from the database and returns the data to the function which is given above.
public List<BinaryDocument> readBinaryDocumentsUsingKeys(Collection<String> keys) throws DecoderException{
try{
return Observable
.from(keys)
.flatMap(new Func1<String, Observable<BinaryDocument>>(){
public Observable<BinaryDocument> call(String id){
return theBucket.async().get(id, BinaryDocument.class)
.retryWhen(RetryBuilder
.anyOf(BackpressureException.class)
.delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
.max(10)
.build()
);
}
})
.toList()
.toBlocking()
.single();
}
catch(DecoderException e){
return null;
}
}
Here is the getKeys() function
public Iterable<ViewRow> getKeys(String lastKey, Integer nDocs) {
// TODO Auto-generated method stub
if(lastKey.isEmpty())
{
ViewQuery query = ViewQuery.from("Couchbase", "numDocs")
.stale(Stale.TRUE)
.reduce(false)
.limit(nDocs);
ViewResult result = theBucket.query(query);
return result;
}
else{
ViewQuery query = ViewQuery.from("Couchbase", "numDocs")
.stale(Stale.TRUE)
.reduce(false)
.limit(nDocs)
.startKey(lastKey)
.skip(1);
ViewResult result = theBucket.query(query);
return result;
}
}
Basically what I do is, get a bunch of keys (say 10000 at once) and then split them among different threads (if I have 5 threads, each thread gets 2000 keys) and they read the documents using their respective keys. By this way I try to read the entire bucket. I don’t know if this efficient, but performance wise, the read operation works better than the ones by which I tried to read the entire bucket.