Inconsistent result while using bucket.async()

Hello,

I have the following code

HashMap<String, String> codes = new HashMap<>();

                bucket
                .async()
                .query(Select.select("mappings, id")
                        .fromCurrentBucket()
                        .where("type='X'"))
                .timeout(5, TimeUnit.MINUTES)
                .flatMap(AsyncN1qlQueryResult::rows)
                .subscribe(new Subscriber<AsyncN1qlQueryRow>() {

                    @Override
                    public void onCompleted() {
                        log.info("Populating codes lookup map, size {} - FINISHED", codes.size());
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        log.error("Problem while initializing codes lookup map", throwable);
                    }

                    @Override
                    public void onNext(AsyncN1qlQueryRow asyncN1qlQueryRow) {
                        JsonObject value = asyncN1qlQueryRow.value();
                        String id = value.getString("id");
                        JsonObject providers = value.getObject("mappings");
                        Map<String, Object> providersMap = providers.toMap();

                        for(Map.Entry<String, Object> mapEntry : providersMap.entrySet()) {
                            String providerName = mapEntry.getKey();
                            ArrayList<HashMap> codesList = (ArrayList<HashMap>) mapEntry.getValue();

                            for(HashMap code : codesList) {
                                String key = providerName + Constants.LOOKUP_KEY_SEPERATOR + code.get("code");
                                String val = codes.get(key);

                                codes.put(key, id);
                            }
                        }
                    }
                });

Each time I run this codes.size() is inconsistent… I’m not getting any errors. What could be the reason that I’m missing some records?

Edit:

I discovered why each time the code is ran it yields an inconsistent # of results. It’s due to the timeout configured on the environment here

    DefaultCouchbaseEnvironment defaultCouchbaseEnvironment =
            DefaultCouchbaseEnvironment
                    .builder()
                    .connectTimeout(60000)
                    .kvTimeout(60000)
                    .socketConnectTimeout(60000)
                    .autoreleaseAfter(5000)
                    .queryTimeout(60000)  <-------
                    .build();

If I increase it then I start getting consistent results.

I’m confused if queryTimeout is configured on the environment, is the timeout that I define on the query itself (on the code above) useless and does not take effect? Can anyone clarify please?

How long does your query take? the .timeout() operator does not change the timeout we send to the server. It looks like it runs into a timeout on the server which is when the query ends.

Also, your code is not thread safe which can also be a problem. the subscriber callback inserts into the codes hashmap from a different thread - please use the ConcurrentHashMap instead.

1 Like

Short answer: yes, it’s useless.

That timeout isn’t “on the query itself”… it’s an operator on the RxJava Observable which is independent of the code that executes the query and handles low-level timeouts. The only way the timeout operator on the observable would have any effect is if it were set to a duration shorter than the queryTimeout on the environment.