Strange errors with views

Hi all!

I’m having strange issues with CB views:

bucket
            .query(query)
            .flatMap(AsyncViewResult::rows)
            .map((AsyncViewRow arg00) -> arg00.id())
        .subscribe(new Subscriber<String>() {

                    @Override
                    public void onCompleted() {
                        _response.resume(computations.toString());
                    }

                    @Override
                    public void onError(Throwable thrwbl) {
                        _response.cancel();
                    }

                    @Override
                    public void onNext(String t) {
                        computations.add(t);
                    }
                });

I’m using jersey for my API and the previous code does not work (connection closed without output).

BUT this one is working:

Observble.from( bucket
            .query(query)
            .flatMap(AsyncViewResult::rows)
            .map((AsyncViewRow arg00) -> arg00.id()).toList().toBlocking().single())
        .subscribe(new Subscriber<String>() {

                    @Override
                    public void onCompleted() {
                        _response.resume(computations.toString());
                    }

                    @Override
                    public void onError(Throwable thrwbl) {
                        _response.cancel();
                    }

                    @Override
                    public void onNext(String t) {
                        computations.add(t);
                    }
                });

Could someone help me ?

Is your onError() getting called? It doesn’t look like you’d know from the code you’ve posted. Also, check the logs on the server side. In particular, the map reduce errors log.

No error in onError() nor on server. What I don’t understand is the fact that the first piece of code does not work while the seconde one works like a charm but isn’t really async

To clarify, you’re saying “connection closed without output”, so you mean to the Jersey web service? Hmmm. I don’t immediately spot the error; maybe @simonbasle can.

Yes it’s what I mean. But sending to view’s objects inside a list and then creating an observable from this list works.

Also, I noticed that if somewhere in the chain, Schedulers.io() is used, it stops working

So if I understand correctly you want to wait for the ids and put them in a list that you output to jersey in one go at the end?
what about avoiding using an external collection (computations is a Collection I’m guessing?) in the async code? Something like:

//with bucket being an AsyncBucket
bucket.query(query)
  .flatMap(AsyncViewResult::rows)
  .map(row -> row.id())
  .toList() //this will collect the results internally in the RxJava operator
  .subscribe(
    //onNext lambda will only be invoked once with toList()
    listOfIds -> _response.resume(listOfIds.toString()),
   //then onError lambda will be invoked if something goes wrong
   error -> _response.cancel()
);

I was doing it this way because computations is a JsonArray (big) and I wanted to avoid doing a JsonArray.from(list) since this operations will itterate over all the list.

By the way, I tried and it also doesn’t works. I really can’t understand what’s happening

if you remove jersey’s AsyncResponse out of the picture and replace it with a System.out.println(...) in the onNext, do you see some output? do the timings look correct (ie. items arriving regularly and without pause)?