Hi!
I am experiencing some update problems with the new ‘Java DCP Client’ (https://github.com/couchbaselabs/java-dcp-client). It appears as if though updates to documents in the server eventually stop being pushed to the client. Upon inspecting the logs I see nothing indicating that anything has gone wrong. Am I using the client correctly?
Version info
com.couchbase.client.java-client version: '2.3.4’
com.couchbase.client,dcp-client version: '0.4.0’
Server Version: 3.1.3-1823 Enterprise Edition (build-1823)
Please see my code below.
@Slf4j
public class DcpClient {
private final Client client;
private ScheduledExecutorService executorService;
private ScheduledFuture<?> taskHandle;
private BiConsumer<String, JsonObject> onNext;
private Consumer<Throwable> onError;
boolean initialized;
boolean running;
private AtomicLong eventsLastMinute;
public DcpClient(CouchbaseConfig couchbaseConfig) {
client = Client.configure()
.hostnames(couchbaseConfig.getNodes())
.bucket(couchbaseConfig.getBucketName())
.password(couchbaseConfig.getPassword())
.connectionNameGenerator(couchbaseConfig::getDcpConnectionName)
.controlParam(DcpControl.Names.CONNECTION_BUFFER_SIZE, couchbaseConfig.getDcpBufferSize())
.bufferAckWatermark(couchbaseConfig.getDcpAckWatermark())
.build();
executorService = Executors.newSingleThreadScheduledExecutor();
eventsLastMinute = new AtomicLong(0);
}
public boolean initialize(BiConsumer<String, JsonObject> onNext, Consumer<Throwable> onError) {
this.onNext = onNext;
this.onError = onError;
client.controlEventHandler(event -> {
if (RollbackMessage.is(event)) {
log.debug("RollbackMessage");
short partition = RollbackMessage.vbucket(event);
long seqNo = RollbackMessage.seqno(event);
client.rollbackAndRestartStream(partition, seqNo)
.doOnCompleted(() -> log.info("Rollback for partition {} complete", partition))
.doOnError(onError::accept)
.await();
} else if (DcpOpenStreamResponse.is(event)) {
log.debug("Unhandled 'DcpOpenStreamResponse' event");
} else if (DcpStreamEndMessage.is(event)) {
log.debug("Unhandled 'DcpStreamEndMessage' event");
} else if (DcpSnapshotMarkerMessage.is(event)) {
log.debug("Unhandled 'DcpSnapshotMarkerMessage' event");
} else if (DcpFailoverLogResponse.is(event)) {
log.debug("Unhandled 'DcpFailoverLogResponse' event");
} else if (DcpCloseStreamResponse.is(event)) {
log.debug("Unhandled 'DcpCloseStreamResponse' event");
} else if (DcpGetPartitionSeqnosResponse.is(event)) {
log.debug("Unhandled 'DcpGetPartitionSeqnosResponse' event");
} else {
log.debug("Unhandled DCP event of unknown type");
}
eventsLastMinute.getAndIncrement();
client.acknowledgeBuffer(event);
event.release();
});
client.dataEventHandler(event -> {
if (DcpMutationMessage.is(event)) {
String key = DcpMutationMessage.keyString(event);
String content = DcpMutationMessage.content(event).toString(CharsetUtil.UTF_8);
JsonObject jsonObject = JsonObject.fromJson(content);
onNext.accept(key, jsonObject);
}
eventsLastMinute.getAndIncrement();
client.acknowledgeBuffer(event);
event.release();
});
client.connect()
.doOnCompleted(() -> log.info("DCP client successfully connected"))
.doOnError(onError::accept)
.await();
byte[] blob = InitialStateService.fetchState();
boolean initialStateAvailable = blob != null && blob.length > 0;
if (!initialStateAvailable) {
client.initializeState(StreamFrom.NOW, StreamTo.INFINITY)
.doOnCompleted(() -> log.info("DCP state successfully initialized"))
.doOnError(onError::accept)
.await();
} else {
client.recoverState(StateFormat.JSON, blob)
.doOnCompleted(() -> log.info("DCP state successfully recovered"))
.doOnError(onError::accept)
.await();
}
initialized = true;
return initialStateAvailable;
}
public void start() {
if (!initialized) {
throw new IllegalStateException("Not initialized");
}
persistState();
client.startStreaming()
.doOnCompleted(() -> log.info("DCP stream successfully started"))
.doOnError(onError::accept)
.await();
taskHandle = executorService.scheduleWithFixedDelay(() -> {
try {
for (short vbid = 0; vbid < client.numPartitions(); vbid++) {
boolean open = client.streamIsOpen(vbid);
if (!open) {
log.warn("Stream is not open for vBucket: {}", vbid);
}
}
long events = eventsLastMinute.getAndSet(0);
if (events > 0) {
log.info("Received {} DCP events received in the last minute", events);
persistState();
} else {
log.error("No DCP events received in the last minute");
}
} catch (Throwable t) {
t.printStackTrace();
}
}, 60, 60, TimeUnit.SECONDS);
running = true;
}
public void stop() {
if (!running) {
throw new IllegalStateException("Not running");
}
taskHandle.cancel(true);
taskHandle = null;
client.stopStreaming()
.doOnCompleted(() -> log.info("DCP stream successfully stopped"))
.doOnError(onError::accept)
.await();
client.disconnect()
.doOnCompleted(() -> log.info("DCP client successfully disconnected"))
.doOnError(onError::accept)
.await();
running = false;
}
private void persistState() {
log.info("Persisting current DCP state");
byte[] blob = client.sessionState().export(StateFormat.JSON);
InitialStateService.persistState(blob);
}
}
BR
Alex