-
Notifications
You must be signed in to change notification settings - Fork 0
Combining Observables
This section explains operators you can use to combine multiple Observables.
-
startWith( )
— emit a specified sequence of items before beginning to emit the items from the Observable -
merge( )
— combine multiple Observables into one -
mergeDelayError( )
— combine multiple Observables into one, allowing error-free Observables to continue before propagating errors -
parallelMerge( )
— combine multiple Observables into a smaller number of Observables, to facilitate parallelism -
zip( )
— combine sets of items emitted by two or more Observables together via a specified function and emit items based on the results of this function -
and( )
,then( )
, andwhen( )
— combine sets of items emitted by two or more Observables by means ofPattern
andPlan
intermediaries -
combineLatest( )
— when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function -
join( )
andgroupJoin( )
— combine the items emitted by two Observables whenever one item from one Observable falls within a window of duration specified by an item emitted by the other Observable -
switchOnNext( )
— convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently emitted of those Observables
If you want an Observable to immediately begin emitting a specific sequence of items before it begins emitting the items normally expected from it, pass that specific sequence of items into that Observable's startWith( )
method, as in the following example:
def myObservable = Observable.from([1, 2, 3]);
myObservable.startWith(-3, -2, -1, 0).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
-3
-2
-1
0
1
2
3
- javadoc:
startWith(x, y, ...)
- RxJS:
startWith
- Linq:
StartWith
- Introduction to Rx: StartWith
You can combine the output of multiple Observables so that they act like a single Observable, by using the merge( )
method:
myMergedObservable = Observable.merge(observable1, observable2, ... )
For example, the following code merges the odds
and evens
Observables into a single Observable:
odds = Observable.from([1, 3, 5, 7]);
evens = Observable.from([2, 4, 6]);
Observable.merge(odds,evens).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
3
2
5
4
7
6
Sequence complete
The items emitted by the merged Observable may appear in any order, regardless of which source Observable they came from.
Instead of passing multiple Observables into merge( )
, you could also pass in a List<>
of Observables, or even an Observable that emits Observables, and merge( )
will merge their output into the output of a single Observable.
If any of the individual Observables passed into merge( )
aborts by invoking onError
, the merge( )
call itself will immediately abort and invoke onError
. If you would prefer a merge that continues emitting the results of the remaining, error-free Observables before reporting the error, use mergeDelayError( )
instead.
- javadoc:
merge(listOfObservables)
- javadoc:
merge(observableThatEmitsObservables)
- javadoc:
merge(observable1, observable2, ...)
- RxJS:
merge
andmergeObservable
- Linq:
Merge
- Introduction to Rx: Merge
mergeDelayError( )
behaves much like merge( )
. The exception is when one of the Observables being merged throws an error. If this happens with merge( )
, the merged Observable will immediately throw an error itself (that is, it will invoke the onError
method of its Subscriber). mergeDelayError( )
, on the other hand, will hold off on reporting the error until it has given any other non-error-producing Observables that it is merging a chance to finish emitting their items, and it will emit those itself, and will only invoke onError
when all of the other merged Observables have finished.
Because it is possible that more than one of the merged Observables encountered an error, mergeDelayError( )
may pass information about multiple errors to the onError
method (which it will never invoke more than once). For this reason, if you want to know the nature of these errors, you should write your onError
method so that it accepts a parameter of the class CompositeException
.
- javadoc:
mergeDelayError(listOfObservables)
- javadoc:
mergeDelayError(observableThatEmitsObservables)
- javadoc:
mergeDelayError(observable1, observable2, ...)
Use the parallelMerge( )
method to take an Observable that emits a large number of Observables and to reduce it to an Observable that emits a particular, smaller number of Observables that emit the same set of items as the original larger set of Observables: for instance a number of Observables that matches the number of parallel processes that you want to use when processing the emissions from the complete set of Observables.
combine Observables together via a specified function and emit items based on the results of this function
The zip( )
method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable. It applies this function in strict sequence, so the first item emitted by the new zip-Observable will be the result of the function applied to the first item emitted by Observable #1 and the first item emitted by Observable #2; the second item emitted by the new zip-Observable will be the result of the function applied to the second item emitted by Observable #1 and the second item emitted by Observable #2; and so forth.
myZipObservable = Observable.zip(observable1, observable2, { response1, response2 -> some operation on those responses } );
For example, the following code zips together two Observables, one of which emits a series of odd integers and the other of which emits a series of even integers:
odds = Observable.from([1, 3, 5, 7, 9]);
evens = Observable.from([2, 4, 6]);
Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[1, 2]
[3, 4]
[5, 6]
Sequence complete
Note: that the zipped Observable completes normally after emitting three items, which is the number of items emitted by the smaller of the two component Observables (evens
, which emits three even integers).
There are also versions of zip( )
that accept three or more Observables, as well as a version that accepts an Observable that emits Observables and zips together the emissions of these emitted Observables:
- javadoc:
zip()
(several versions) - RxJS:
zip
- Linq:
Zip
- Introduction to Rx: Zip
combine sets of items emitted by two or more Observables by means of Pattern
and Plan
intermediaries
The combination of and( )
, then( )
, and when( )
methods operate much like zip( )
but they do so by means of intermediary data structures. and( )
accepts two or more Observables and combines the emissions from each, one set at a time, into Pattern
objects. then( )
operates on such Pattern
objects, transforming them in a Plan
. when( )
then transforms these various Plan
objects into emissions from an Observable.
- javadoc:
and( )
- javadoc:
then(selector)
- javadoc:
when( )
(multiple varieties) - Linq:
And
/Then
/When
- Intro to Rx: And-Then-When
when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
combineLatest( )
behaves in a similar way to zip( )
, but while zip( )
emits items only when all of the zipped source Observables have emitted a previously unzipped item, combineLatest( )
emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). When any of the source Observables emits an item, combineLatest( )
combines the most recently emitted items from each of the other source Observables, using the function you provide, and emits the return value from that function.
- javadoc:
combineLatest(observable1, observable2, combineFunction)
(along with versions that take anywhere from three to nine observables) - RxJS:
combineLatest
- Linq:
CombineLatest
- Introduction to Rx: CombineLatest
combine the items emitted by two Observables whenever one item from one Observable falls within a window of duration specified by an item emitted by the other Observable
The join( )
method combines the items emitted by two Observables, and selects which items to combine based on duration-windows that you define on a per-item basis. You implement these windows as Observables whose lifespans begin with each item emitted by either Observable. When such a window-defining Observable either emits an item or completes, the window for the item it is associated with closes. So long as an item's window is open, it will combine with any item emitted by the other Observable. You define the function by which the items combine.
The groupJoin( )
method is similar, except that the function you define to combine items emitted by the two Observables pairs individual items emitted by the source Observable with an Observable that emits items from the second Observable that fall in the same window as that item.
- javadoc:
join( )
- javadoc:
groupJoin( )
- Linq:
Join
- Linq:
GroupJoin
- RxJS:
join
- RxJS:
groupJoin
- Introduction to Rx: Join
- Introduction to Rx: GroupJoin
convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently emitted of those Observables
switchOnNext( )
subscribes to an Observable that emits Observables. Each time it observes one of these emitted Observables, the Observable returned by switchOnNext( )
begins emitting items from that Observable. When a new Observable is emitted, switchOnNext( )
stops emitting items from the earlier-emitted Observable and begins emitting items from the new one.
- javadoc:
switchOnNext(sequenceOfSequences)
- RxJS:
switchLatest
- Linq:
Switch
- Introduction to Rx: Switch
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Twitter @RxJava | Jobs