I started abnormal testing 2.0.2 client in dev.
and I met CouchbaseCore Exception under rebalancing.
Can anybody give me help about this issue ?
JDK client version : 2.0.2
Server OS : Linux
Server Nodes : 40 (Ram Usage 30% per node)
Data Buckets : 4
Item Count
- easter : 1,133,215,267 (571GB)
- easter_ham : 1,170,013,272 (257GB)
ERROR LOG:
2014-12-08 22:50:10,056 WARN [com.couchbase.client.core.CouchbaseCore] - Exception while Handling Response Events ResponseEvent{message=GetResponse{bucket=‘easter’, status=RETRY, cas=0, flags=0, request=GetRequest{observable=rx.subjects.AsyncSubject@68499df1, bucket=‘easter’}, content=I’m not responsible for this vbucket}, observable=rx.subjects.AsyncSubject@68499df1}, com.couchbase.client.core.CouchbaseException: Could not parse configuration
Spring context.xml
...
Java Code
private Map<String, JsonNode> getArgonMap(String dsids) {
final String[] dlist = dsids.split(",");
return Observable.from(dlist)
.flatMap(argonGather)
.toMap(argonKeySelector, argonValueSelector)
.toBlocking()
.single();
}
@Resource
ArgonGather argonGather;
private final Func1<BinaryDocument, String> argonKeySelector = new Func1<BinaryDocument, String>() {
@Override
public String call(BinaryDocument binaryDocument) {
return binaryDocument.id();
}
};
private final Func1<BinaryDocument, JsonNode> argonValueSelector = new Func1<BinaryDocument, JsonNode>() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public JsonNode call(BinaryDocument binaryDocument) {
if (binaryDocument.content() == null) {
logger.info(String.format("argon value is null, id(%s)", binaryDocument.id()));
return null;
}
try {
byte[] bytes = Snappy.uncompress(binaryDocument.content().array());
return mapper.readTree(new String(bytes));
} catch (IOException e) {
logger.error(String.format("Snappy uncompress error, id(%s)", binaryDocument.id()), e);
}
return null;
}
};
ArgonGather class
@Component
public class ArgonGather implements Func1<String, Observable<BinaryDocument>> {
private final Logger logger = LoggerFactory.getLogger(ArgonGather.class);
@Resource(name="bucketArg")
private Bucket bucket;
/**
* TIMEOUT - unit : MILLISECONDS
*/
private long TIMEOUT = 50L;
@Override
public Observable<BinaryDocument> call(final String dsid) {
return bucket.async().get(dsid, BinaryDocument.class).timeout(TIMEOUT, TimeUnit.MILLISECONDS)
.onErrorResumeNext(new Resumer(dsid));
}
class Resumer implements Func1<Throwable, Observable<? extends BinaryDocument>>{
final private String dsid;
public Resumer(final String dsid){
this.dsid = dsid;
}
@Override
public Observable<? extends BinaryDocument> call(Throwable throwable) {
logger.error(dsid,throwable);
return bucket.async().getFromReplica(dsid, ReplicaMode.FIRST, BinaryDocument.class).timeout(TIMEOUT, TimeUnit.MILLISECONDS)
.onErrorResumeNext(empty);
}
}
final private Func1 empty = new Func1<Throwable, Observable<? extends BinaryDocument>>() {
@Override
public Observable<? extends BinaryDocument> call(Throwable throwable) {
logger.error("FAIL_TO_REQUEST",throwable);
return Observable.<BinaryDocument>empty();
}
};
}