About your last comment, let me clarify: I think it is fine to leave the chain at some point (especially in legacy programs), in the sense of going from an asynchronous flow back to a synchronous one.
But while inside the Observable
chain, you should try to avoid impacting variables that are outside the chain, in favor of using adapted operators.
So for example the following (stupid) example shows an acceptable pattern:
public List<Integer> lengths(Observable<String> in) {
List<Integer> result = in
.map(String::length)
.toList()
.toBlocking().single();
Collections.sort(result);
return result;
}
Each element is collected into a list as they arrive by using an RxJava operator, but we block at the end so that once everything is collected, we hold a populated List
. We then synchronously sort that list and return it to legacy code that doesn’t know how to deal with Observable
.
On the other hand, I wouldn’t recommend the following pattern:
public List<Integer> lengths(Observable<String> in) {
List<Integer> result = Collections.synchronizedList(new ArrayList<>());
in
.map(s -> {
int i = s.length();
result.add(i); //"side effect"
return i;
})
.toBlocking().single();
Collections.sort(result);
return result;
}
It should work though… But it has more moving parts and encourages bad habits. The result of blocking is ignored, the side effect working might depend on the collection being synchronized, etc…
Using toBlocking().forEach()
is a bit between the two, and its execution would depend a lot on the source Observable
chain.
And then there is this pattern, which is plain wrong:
public List<Integer> lengths(Observable<String> in) {
List<Integer> result = Collections.synchronizedList(new ArrayList<>());
in
.map(String::length)
.subscribe(i -> result.add(i));
Collections.sort(result);
return result;
}
Here we don’t block, so the code jumps straight to sorting (an empty collection?) and returning the collection. Then the collection will be asynchronously populated as more data arrives in the in
observable. All while the caller thinks he holds a populated list that isn’t supposed to mutate. It could manifest with a ConcurrentModificationException
some time later when the caller iterates over the collection, for example.