GetAllMutationTokensRequest generates incomplete response

Hi!

When sending a GetAllMutationTokensRequest I expect to get MutationToken(s) for all 1024 partitions in return but in some situations I only get 512 tokens. The code that fetches the MutationTokens looks as follows

List<MutationToken> stateList = core.<GetAllMutationTokensResponse>send(new GetAllMutationTokensRequest(bucket))
    .flatMap(resp -> Observable.<MutationToken>from(resp.mutationTokens()))
    .toList()
    .toBlocking()
    .single();

I’ve tested this against two different servers and I’ve gotten two different results

Server                                     | #nodes | #returned tokens
-------------------------------------------+--------+-----------------
3.1.3-1823 Enterprise Edition (build-1823) |   2    | 512
4.0.0-4051 Community Edition (build-4051)  |   1    | 1024

The main difference between the servers, besides the version numbers, is the number of nodes. The 3.1.3 server, which has two nodes, only returns 512 tokens in its response, while the 4.0.0 server, witch has one node, returns 1024 tokens. Does the number of nodes play a role here or is that just an unlucky circumstance?

#nodes * #returned tokens = 1024

What’s the proper method of getting the all MutationTokens? I’m using Java SDK version 2.2.7 and I’m aware that these APIs are experimental.

I’m Thankful for any help!
Alex

@alexm your observation is correct - the reason is that the message only gets directed to one node. So you get the response from only one node and since both of them have half the partitions that’s why you only get 512 back in the 2 node case. On 4 nodes it would be 256.

So right now this only seems to work with 1 nodes, maybe @avsej can shed more light on alternative ways on this.

For what reason do you need the tokens? If you want to use DCP there are other APIs that do this properly I think…

Hi and thank you for your quick reply.

In a broader context this is what I want to do

  1. Obtain a snapshot of the sequence numbers from each vBucket
  2. Use view queries to fetch a data set which represents the history or initial state of my data
  3. Start a DCP stream, using the previously obtained snapshot as a starting point, in order to capture upserts made to the data set.

Essentially I’m using Couchbase as streaming service after obtaining an initial state. I’m monitoring a large set of documents that are updated frequently and the “push behavior” offered by DCP, suites my needs better than the traditional view queries.

For what reason do you need the tokens? If you want to use DCP there are other APIs that do this properly I think…

I used to rely on the BucketStreamAggregator.getCurrentState() and BucketStreamAggregator.feed(), but they were removed in 2.2.6. Now there’s DCPConnection.getCurrentState() but it seem to have a different idea of what the current state is compared to BucketStreamAggregator.getCurrentState().

It is different in implementation, but the idea the same. It uses combination of GetFailoverLogRequest with GetLastCheckpointRequest, because the ltter does not have vbucket UUID. The code below is basically extraction from DCPConnection.getCurrentState()

List<MutationToken> stateList = core
        .<GetClusterConfigResponse>send(new GetClusterConfigRequest())
        .map(response -> {
            CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
            return config.numberOfPartitions();
        })
        .flatMap((Func1<Integer, Observable<MutationToken>>) numPartitions ->
                Observable.range(0, numPartitions)
                        .flatMap((Func1<Integer, Observable<GetFailoverLogResponse>>) partition -> core.send(new GetFailoverLogRequest(partition.shortValue(), bucket)))
                        .flatMap((Func1<GetFailoverLogResponse, Observable<MutationToken>>) failoverLogsResponse -> {
                            final FailoverLogEntry entry = failoverLogsResponse.failoverLog().get(0);
                            return core.<GetLastCheckpointResponse>send(new GetLastCheckpointRequest(failoverLogsResponse.partition(), bucket))
                                    .map(lastCheckpointResponse -> new MutationToken(
                                            failoverLogsResponse.partition(),
                                            entry.vbucketUUID(),
                                            lastCheckpointResponse.sequenceNumber(),
                                            bucket));
                        }))
        .toList().toBlocking().single();

Thank you!

If I do this

List<MutationToken> stateList = ... //as described in the previous post

CountdownLatch latch = new CountdownLatch(stateList.size())

Observable.from(stateList)
    .flatMap(mutationToken -> {
        return dcpConnection.addStream(
            (short) mutationToken.vbucketID(),
            mutationToken.vbucketUUID(),
            mutationToken.sequenceNumber(), end,
            mutationToken.sequenceNumber(), end);
        })
        .subscribe(response -> latch.countDown());

latch.await();

dcpConnection.subject()
    subscribe(req -> handle(req));

then I should only get changes that occurred between the calls to dcpConnection.addStream() and dcpConnection.subject().subscribe()? But what I see is a huge number of MutationMessages even though nothing has happened to the documents in the bucket between those calls. Is this the expected behavior and if so, is it possible to get notifications with respect to only those documents that have changed between a certain point in time and now?

Thanks!
Alex

I general you should not. But I will take a look and try to reproduce it.

Hi,

Here’s a class that reproduces the behavior

import com.couchbase.client.core.CouchbaseCore;
import com.couchbase.client.core.config.CouchbaseBucketConfig;
import com.couchbase.client.core.endpoint.dcp.DCPConnection;
import com.couchbase.client.core.message.cluster.*;
import com.couchbase.client.core.message.dcp.*;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import rx.Observable;
import rx.functions.Func1;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class Concept {
    public static void test() throws InterruptedException {
        CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder()
                .dcpEnabled(true)
                .build();

        CouchbaseCore core = new CouchbaseCore(env);

        String[] nodes = {"localhost"};
        String bucket = "beer-sample";
        String password = "";

        DCPConnection connection = core.<SeedNodesResponse>send(new SeedNodesRequest(nodes))
                .flatMap(seedNodesResponse -> core.<OpenBucketResponse>send(new OpenBucketRequest(bucket, password)))
                .flatMap(openBucketResponse -> core.<OpenConnectionResponse>send(new OpenConnectionRequest("TestConnection", bucket)))
                .toBlocking()
                .single()
                .connection();

        List<MutationToken> stateList = core
                .<GetClusterConfigResponse>send(new GetClusterConfigRequest())
                .map(response -> {
                    CouchbaseBucketConfig config = (CouchbaseBucketConfig) response.config().bucketConfig(bucket);
                    return config.numberOfPartitions();
                })
                .flatMap((Func1<Integer, Observable<MutationToken>>) numPartitions ->
                        Observable.range(0, numPartitions)
                                .flatMap((Func1<Integer, Observable<GetFailoverLogResponse>>) partition -> core.send(new GetFailoverLogRequest(partition.shortValue(), bucket)))
                                .flatMap((Func1<GetFailoverLogResponse, Observable<MutationToken>>) failoverLogsResponse -> {
                                    final FailoverLogEntry entry = failoverLogsResponse.failoverLog().get(0);
                                    return core.<GetLastCheckpointResponse>send(new GetLastCheckpointRequest(failoverLogsResponse.partition(), bucket))
                                            .map(lastCheckpointResponse -> new MutationToken(
                                                    failoverLogsResponse.partition(),
                                                    entry.vbucketUUID(),
                                                    lastCheckpointResponse.sequenceNumber(),
                                                    bucket));
                                }))
                .toList().toBlocking().single();

        long end = 0xffffffff;

        final CountDownLatch stateLatch = new CountDownLatch(stateList.size());
        Observable.from(stateList)
                .flatMap(mutationToken -> connection.addStream(
                        (short) mutationToken.vbucketID(),
                        mutationToken.vbucketUUID(),
                        mutationToken.sequenceNumber(), end,
                        mutationToken.sequenceNumber(), end))
                .subscribe(response -> stateLatch.countDown());

        stateLatch.await();

        final CountDownLatch messageLatch = new CountDownLatch(1);
        connection.subject()
                .subscribe(dcpRequest ->
                        {
                            System.out.println(dcpRequest);
                            if (dcpRequest instanceof DCPMessage) {
                                connection.consumed((DCPMessage) dcpRequest);
                            }
                        }
                );
        messageLatch.await();
    }

    public static void main(String[] args) throws InterruptedException {
        test();
    }
}