Skip to content

Combining Observables

DavidMGross edited this page Nov 21, 2013 · 32 revisions

This section explains operators you can use to combine multiple Observables.

  • startWith( ) — emit a specified sequence of items before beginning to emit the items from the Observable
  • concat( ) — concatenate two or more Observables sequentially
  • merge( ) — combine multiple Observables into one
  • mergeDelayError( ) — combine multiple Observables into one, allowing error-free Observables to continue before propagating errors
  • parallelMerge( ) — combine multiple Observables into a smaller number of Observables, to facilitate parallelism
  • zip( ) — combine sets of items emitted by two or more Observables together via a specified function and emit items based on the results of this function
  • and( ), then( ), and when( ) — combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries
  • combineLatest( ) — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
  • join( ) — combine the items emitted by two Observables whenever one item from one Observable falls within a window of duration specified by an item emitted by the other Observable
  • switchOnNext( ) — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently emitted of those Observables
  • takeUntil( ) — emits the items from the source Observable until a second Observable emits an item
  • amb( ) — given two or more source Observables, emits all of the items from the first of these Observables to emit an item

startWith( )

emit a specified sequence of items before beginning to emit the items from the Observable

If you want an Observable to immediately begin emitting a specific sequence of items before it begins emitting the items normally expected from it, pass that specific sequence of items into that Observable's startWith( ) method, as in the following example:

def myObservable = Observable.from([1, 2, 3]);

myObservable.startWith(-3, -2, -1, 0).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
-3
-2
-1
0
1
2
3

see also:


concat( )

concatenate two or more Observables sequentially

You can concatenate the output of multiple Observables so that they act like a single Observable, with all of the items emitted by the first Observable being emitted before any of the items emitted by the second Observable, by using the concat( ) method:

myConcatenatedObservable = Observable.concat(observable1, observable2, ... );

For example, the following code concatenates the 'odds' and 'evens' Observables into a single Observable:

odds  = Observable.from([1, 3, 5, 7]);
evens = Observable.from([2, 4, 6]);

Observable.concat(odds, evens).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
)
1
3
5
7
2
4
6
Sequence complete

Instead of passing multiple Observables into concat( ), you could also pass in a List<> of Observables, or even an Observable that emits Observables, and concat( ) will concatenate their output into the output of a single Observable.

see also:


merge( )

combine multiple Observables into one

You can combine the output of multiple Observables so that they act like a single Observable, by using the merge( ) method:

myMergedObservable = Observable.merge(observable1, observable2, ... )

For example, the following code merges the odds and evens Observables into a single Observable:

odds  = Observable.from([1, 3, 5, 7]);
evens = Observable.from([2, 4, 6]);

Observable.merge(odds,evens).subscribe(
  { println(it); },                  // onNext
  { println("Error encountered"); }, // onError
  { println("Sequence complete"); }  // onCompleted
);
1
3
2
5
4
7
6
Sequence complete

The items emitted by the merged Observable may appear in any order, regardless of which source Observable they came from.

Instead of passing multiple Observables into merge( ), you could also pass in a List<> of Observables, or even an Observable that emits Observables, and merge( ) will merge their output into the output of a single Observable.

If any of the individual Observables passed into merge( ) aborts by invoking onError, the merge( ) call itself will immediately abort and invoke onError. If you would prefer a merge that continues emitting the results of the remaining, error-free Observables before reporting the error, use mergeDelayError( ) instead.

see also:


mergeDelayError( )

combine multiple Observables into one but delay errors until completion

mergeDelayError( ) behaves much like merge( ). The exception is when one of the Observables being merged throws an error. If this happens with merge( ), the merged Observable will immediately throw an error itself (that is, it will invoke the onError method of its Observer). mergeDelayError( ), on the other hand, will hold off on reporting the error until it has given any other non-error-producing Observables that it is merging a chance to finish emitting their items, and it will emit those itself, and will only invoke onError when all of the other merged Observables have finished.

Because it is possible that more than one of the merged observables encountered an error, mergeDelayError( ) may pass information about multiple errors to the onError method (which it will never invoke more than once). For this reason, if you want to know the nature of these errors, you should write your onError method so that it accepts a parameter of the class CompositeException.

see also:


parallelMerge( )

combine multiple Observables into a smaller number of Observables, to facilitate parallelism

Use the parallelMerge( ) method to take an Observable that emits a large number of Observables and to reduce it to an Observable that emits a particular, smaller number of Observables that emit the same set of items as the original larger set of Observables: for instance a number of Observables that matches the number of parallel processes that you want to use when processing the emissions from the complete set of Observables.


zip( )

combine Observables together via a specified function and emit items based on the results of this function

The zip( ) method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable. It applies this function in strict sequence, so the first item emitted by the new zip-Observable will be the result of the function applied to the first item emitted by Observable #1 and the first item emitted by Observable #2; the second item emitted by the new zip-Observable will be the result of the function applied to the second item emitted by Observable #1 and the second item emitted by Observable #2; and so forth.

myZipObservable = Observable.zip(observable1, observable2, { response1, response2 -> some operation on those responses } );

There are also versions of zip( ) that accept three or four Observables:

myZip3Observable = Observable.zip(observable1, observable2, observable3 { response1, response2, response3 -> some operation on those responses });
myZip4Observable = Observable.zip(observable1, observable2, observable3, observable4 { response1, response2, response3, response4 -> some operation on those responses });

For example, the following code zips together two Observables, one of which emits a series of odd integers and the other of which emits a series of even integers:

odds  = Observable.from([1, 3, 5, 7, 9]);
evens = Observable.from([2, 4, 6]);

Observable.zip(odds, evens, {o, e -> [o, e]}).subscribe(
  { odd, even -> println("odd: " + odd + ", even: " + even); }, // onNext
  { println("Error encountered"); },                            // onError
  { println("Sequence complete"); }                             // onCompleted
)
odd: 1, even: 2
odd: 3, even: 4
odd: 5, even: 6
Sequence complete

Note: that the zipped Observable completes normally after emitting three items, which is the number of items emitted by the smaller of the two component Observables (evens, which emits three even integers).

see also:


and( ), then( ), and when( )

combine sets of items emitted by two or more Observables by means of Pattern and Plan intermediaries

The combination of and( ), then( ), and when( ) methods operate much like zip( ) but they do so by means of intermediary data structures. and( ) accepts two or more Observables and combines the emissions from each, one set at a time, into Pattern objects. then( ) operates on such Pattern objects, transforming them in a Plan. when( ) then transforms these various Plan objects into emissions from an Observable.

see also:


combineLatest( )

when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function

combineLatest( ) behaves in a similar way to zip( ), but while zip( ) emits items only when all of the zipped source Observables have emitted a previously unzipped item, combineLatest( ) emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). When any of the source Observables emits an item, combineLatest( ) combines the most recently emitted items from each of the other source Observables, using the function you provide, and emits the return value from that function.

see also:


join( )

combine the items emitted by two Observables whenever one item from one Observable falls within a window of duration specified by an item emitted by the other Observable

images/rx-operators/join.png

The join( ) method combines the items emitted by two Observables, and selects which items to combine based on duration-windows that you define on a per-item basis. You implement these windows as Observables whose lifespans begin with each item emitted by either Observable. When such a window-defining Observable either emits an item or completes, the window for the item it is associated with closes. So long as an item's window is open, it will combine with any item emitted by the other Observable. You define the function by which the items combine.

see also:


switchOnNext( )

convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently emitted of those Observables

switchOnNext( ) subscribes to an Observable that emits Observables. Each time it observes one of these emitted Observables, the Observable returned by switchOnNext( ) begins emitting items from that Observable. When a new Observable is emitted, switchOnNext( ) stops emitting items from the earlier-emitted Observable and begins emitting items from the new one.

see also:


takeUntil( )

emits the items from the source Observable until another Observable emits an item

see also:


amb( )

given two or more source Observables, emits all of the items from the first of these Observables to emit an item

When you pass a number of source Observables to amb( ), it will pass through the emissions and messages of exactly one of these Observables: the first one that emits an item to amb( ). It will ignore and discard the emissions of all of the other source Observables.

see also: