Java SDK V 3.0 threading/scheduler question

I have this piece of code using SDK v2

   bucket.async().query(N1qlQuery.simple("SELECT META().id FROM example"))
            .flatMap(AsyncN1qlQueryResult::rows)
            .flatMap(row -> bucket.async().get(row.value().getString("id"))
                    .map(d -> {
                        System.out.println("map: " + Thread.currentThread().getName());
                        return d;
                    })).toList().toBlocking().single();

this prints:

map: cb-computations-2
map: cb-computations-1
map: cb-computations-3
map: cb-computations-4
map: cb-rcomputations-8
...

so the map function is automatically run on multiple computation threads.

I tried to translate this code into V3: (3.0.8)

    cluster.reactive().query("SELECT META().id FROM example")
            .flatMapMany(ReactiveQueryResult::rowsAsObject)
            .flatMap(row -> bucket.reactive().defaultCollection().get(row.getString("id"))
                    .map(d -> {
                        System.out.println("map: " + Thread.currentThread().getName());
                        return d;
                    })).collectList().block();

and this prints:

map: cb-io-kv-5-2
map: cb-io-kv-5-2
map: cb-io-kv-5-2
map: cb-io-kv-5-2 
....

Why is the map executed on io threads and only on single thread? How would I get the same behaviour as in V2 SDK?

Hi Mico,

By default, the downstream operators are executed in the Netty Event Loop thread for the socket connected to the Couchbase Server node. If you had more than one Couchbase Server node, and the documents you were fetching lived on different nodes, you might see different threads being used.

If your processing is expensive and you have more CPUs than Couchbase nodes, it might make sense to distribute the work among all CPUs. You can do this by applying the publishOn operator to the get result. This specifies the scheduler to use for executing all downstream operators.

cluster.reactive().query("SELECT META().id FROM `travel-sample`")
    .flatMapMany(ReactiveQueryResult::rowsAsObject)
    .flatMap(row -> bucket.reactive().defaultCollection().get(row.getString("id"))
        .publishOn(Schedulers.boundedElastic()) // <-- ** ADD THIS **
        .map(d -> {
          System.out.println("map: " + Thread.currentThread().getName());
          return d;
        })).collectList().block();

That example uses the “bounded elastic” scheduler. There’s also a “parallel” scheduler that might be appropriate if your work is CPU-bound. Alternatively, you could create a scheduler backed by an existing Executor.

Here’s an article about the Reactor threading model which describes how the publishOn and subscribeOn operators work:

Thanks,
David

Hi Mico,

I’ve filed an enhancement request for a configuration option that would make SDK 3 behave like SDK 2 for the threading.

Thanks,
David