JavaSDK and RxJava contract

Hello,
could you please clarify, does JavaSDK implementation follow RxJava contract ( ReactiveX - The Observable Contract ) according to this statement:

Observables must issue notifications to observers serially (not in parallel). They may issue these notifications from different threads, but there must be a formal happens-before relationship between the notifications.

i.e. is it safe to use non-synchronized approach (anyway: by using atomic*/concurrent* or via explicit sync-structures/“synchronized” operator) inside onNext() to collect items, for example ?

Yes, the SDK follows this contract. Note that it only states where and how the onNext() are triggered by the SDK-produced Observable, not what you can or cannot do in your own Observer (where you’d define an onNext behavior.

So to answer your main question, is it safe to use non-synchronized approach, the answer is: “avoid it if you can and don’t make assumptions”.

Generally, it is frown upon to use external AtomicXXX or concurrent collections from the outside in the observable chain. It is almost always a better option to do collections by using operators, like toList, toMap, collect, reduce
That said, as long as these external variables are thread safe, it’s ugly, messy, but should work (eg. for tests, that is ok enough).

The big problem with these (and I think there’s a high chance it is even worse with a synchronized) is that long-running code that blocks the execution of the Observable chain can grind everything to an halt.

@simonbasle,

uf… :slight_smile: too tough for me: “Yes” and “No” together.

Generally, it is frown upon to use external AtomicXXX or concurrent collections from the outside in the observable chain.

… frown by who or what ? Ok, i “feel like you”, that “synchronized” (and less likely, but probably ReentrantLock/StampedLock too) in case of “dumb usage” could cause potential blocking for rxjava’s scheduler threads. But what is the problem for atomic / concurrent (in case on non-blocking implementations)?

 final ConcurrentLinkedQueue<JsonDocument> q = new ConcurrentLinkedQueue();
 ...
 Observable
     .from(docids)
     .flatMap(b::get)
     .subscribe((d) -> { q.add(d); }, ..., ...);

“All within me” says that there could be no side-effects. Any objections ?

The question is “could we safely replace ConcurrentLinkedQueue with LinkedList” to avoid unnecessary synchronization. And here (as i understand) you say “don’t make assumptions” … brr!

By the RxJava community, regular users and contributors alike.

What I meant by “don’t make assumption” is that even if an Observable doesn’t in fact call onNext from multiple threads today, it wouldn’t be considered a breaking change if it started doing so tomorrow.

So data structures outside of the Observable chain should be thread safe. But there is a better alternative where you never leave the chain, and that is the dedicated toList(), toMap() and collect() operators.

These will perform the collection in a way that is optimized for RxJava, and safe to use.

So use toList() after your flatMap, if a List is good enough. If you absolutely need a Queue, use collect instead.

@simonbasle,
thanks for an answer.
Seems like you’re closely related to RxJava community.

P.S. Following is not more then offtopic, so, if you would like to join to “useless opinion exchange” (i dislike term “flame”), you are welcome :wink:

What I meant by “don’t make assumption” is that even if an Observable doesn’t in fact call onNext from multiple threads today, it wouldn’t be considered a breaking change if it started doing so tomorrow.

Breaking the contract ? I think we both understand, that this is impossible under “RxJava” brand.

But there is a better alternative where you never leave the chain …

“Thinking globally” it’s a good idea to create “pure rxjava programs” (just because any program is nothing more than a chain of data transformation). But what about legacy, or alternatives (like http://projectreactor.io/ ), or “lovers of plain old programming-style ?” (toList(), toMap(), collect() … toBlocking() - for last ones ?) Is it reasonable to “never leave the chain” always ?..

About your last comment, let me clarify: I think it is fine to leave the chain at some point (especially in legacy programs), in the sense of going from an asynchronous flow back to a synchronous one.

But while inside the Observable chain, you should try to avoid impacting variables that are outside the chain, in favor of using adapted operators.

So for example the following (stupid) example shows an acceptable pattern:

public List<Integer> lengths(Observable<String> in) {
    List<Integer> result = in
        .map(String::length)
        .toList()
        .toBlocking().single();

   Collections.sort(result);
   return result;
}

Each element is collected into a list as they arrive by using an RxJava operator, but we block at the end so that once everything is collected, we hold a populated List. We then synchronously sort that list and return it to legacy code that doesn’t know how to deal with Observable.

On the other hand, I wouldn’t recommend the following pattern:

public List<Integer> lengths(Observable<String> in) {
    List<Integer> result = Collections.synchronizedList(new ArrayList<>());
    
    in
        .map(s -> {
            int i = s.length();
            result.add(i); //"side effect"
            return i;
        })
        .toBlocking().single();

   Collections.sort(result);
   return result;
}

It should work though… But it has more moving parts and encourages bad habits. The result of blocking is ignored, the side effect working might depend on the collection being synchronized, etc…

Using toBlocking().forEach() is a bit between the two, and its execution would depend a lot on the source Observable chain.

And then there is this pattern, which is plain wrong:

public List<Integer> lengths(Observable<String> in) {
    List<Integer> result = Collections.synchronizedList(new ArrayList<>());
    
    in
        .map(String::length)
        .subscribe(i -> result.add(i));

   Collections.sort(result);
   return result;
}

Here we don’t block, so the code jumps straight to sorting (an empty collection?) and returning the collection. Then the collection will be asynchronously populated as more data arrives in the in observable. All while the caller thinks he holds a populated list that isn’t supposed to mutate. It could manifest with a ConcurrentModificationException some time later when the caller iterates over the collection, for example.

@simonbasle,

It should work though… But it has more moving parts and encourages bad habits. The result of blocking is ignored, the side effect working might depend on the collection being synchronized, etc…

Surely, it will work fine (but “yak! never do like this”)
Seems like our “flame” is terminological (definition of “side effect”).

This one:

public List<Integer> lengths(Observable<String> in) {
final ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue();
in
    .map(s -> {
        int i = s.length();
        q.add(i);
        return i;
    })
    .toBlocking();
 return Collections.sort(new ArrayList(q));
}

is almost identical, but reduces potential side-effect of “implicit mutual blocking” (NB-impl, see ConcurrentLinkedQueue (Java Platform SE 8 ), suppose CAS-like). “Mutual blocking” is impossible even in previous example, but we can imagine a newbie, who wraps already synchronized collection like “synchronized(result) { result.add(); }” inside the chain for your example, or more complicated object that implements a List and uses synchronized(this) { } within add() call; i should also mention here, that i suppose that Collections.synchronizedList() is nothing more like “creation of synchronized(this) wrapper”.

So, in my terminology “side-effect” == “implicit trouble-possible behavior” while yours “side-effect” == “affection on something that is outside the chain” (am i right ?).

About

But it has more moving parts and encourages bad habits.

… one Man asked a Camel “Why your back is so curved ?” Camel thought and answered “And where am i straight at all?” …
That the real life is, and “bad habits” are part of this life. But this statement, of course, should not be used as a guide or excuse for “conceptually ugly coding”.

P.S. Seems like your last example with "simultaneous collection access between current thread and rxjava-scheduler-thread " was a reaction for my initial out-of-contexted example :wink: