-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
The onErrorFlatMap operator was added a few releases ago to allow "swallowing" errors and letting a stream continue.
It works in certain scenarios, such as this:
observable
.map(t -> { function that fails })
.onErrorFlatMap(e -> {swallow error})
.subscribe().In this case, the error is coming from the user provided map function, so the source observable is not breaking it's contract by continuing.
Not all scenarios are this clear however.
observable
.flatMap(t -> { user function })
// error gets emitted
.onErrorFlatMap(e -> {swallow error})
.subscribe().If flatMap emits an error, was it the function that failed, or a source Observable that it is merging? Should the merge function unsubscribe the source Observable that failed to clean up resources, or should it not?
Issues have arisen from these nuances and recent fixes attempted in #1434, #1441 and #1457.
More to the point though is that onErrorFlatMap is working against (or outright breaking) the Rx contract by not treating an onError as a terminal state. This means any operator that "cleans up" after receiving an onError will stop onErrorFlatMap from working. It is completely okay for operators to move into a terminal state after receiving an onError since that is what onError signals, yet onErrorFlatMap is trying to override this. It results in non-deterministic behavior depending on the implementations of each operator in the sequence.
For this reason I suggest we remove onErrorFlatMap. It is confusing and breaks the Rx contract.
This leads us to what the replacement of onErrorFlatMap should be. It was added to handle use cases where a user wants to ignore errors so the source Observable can continue – in other words, there are use cases where users wants an Observable to not follow the Rx contract. These are primarily "event bus" cases, where all the message passing semantics of Rx are wanted, but not the lifecycle (it should never terminate).
I suggest we need to treat this use case differently so we are not trying to break the Rx contract with operators like onErrorFlatMap.
The current Rx way of doing this is to materialize all events into Notification<T> so onError and onCompleted become notifications. The drawback to this is each event now has an extra object allocation. In most cases that object allocation is doing to fine as few applications are attempting millions of events per second (the rate where this extra object allocation starts to matter).
Even with materialize though, a map function that throws an Exception can result in the upstream being unsubscribed depending on the operators it is composed within.
I'd like to discuss what the solution should be for this that is idiomatic, contractually correct and deterministic.
I also propose we deprecate onErrorFlatMap in 0.20 and remove it in 1.0.