-
Notifications
You must be signed in to change notification settings - Fork 0
Observable Utility Operators
This section explains various utility operators for working with Observables.
-
materialize( )
— convert an Observable into a list of Notifications -
dematerialize( )
— convert a materialized Observable back into its non-materialized form -
timestamp( )
— attach a timestamp to every item emitted by an Observable -
synchronize( )
— force an Observable to make synchronous calls and to be well-behaved -
cache( )
— remember the sequence of items emitted by the Observable and emit the same sequence to future Observers -
observeOn( )
— specify on which Scheduler an Observer should observe the Observable -
subscribeOn( )
— specify which Scheduler an Observable should use when its subscription is invoked -
parallel( )
— split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread -
doOnEach( )
— register an action to take whenever an Observable emits an item -
doOnCompleted( )
— register an action to take when an Observable completes successfully -
doOnError( )
— register an action to take when an Observable completes with an error -
finallyDo( )
— register an action to take when an Observable completes -
delay( )
— shift the emissions from an Observable forward in time by a specified amount -
delaySubscription( )
— hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable -
timeInterval( )
— emit the time lapsed between consecutive emissions of a source Observable -
using( )
— create a disposable resource that has the same lifespan as an Observable -
single( )
— if the Observable completes after emitting a single item, return that item, otherwise throw an exception -
singleOrDefault( )
— if the Observable completes after emitting a single item, return that item, otherwise return a default item
A well-formed Observable will invoke its Observer’s onNext
method zero or more times, and then will invoke either the onCompleted
or onError
method exactly once. The materialize( )
method converts this series of invocations into a series of items emitted by an Observable, where it emits each such invocation as a Notification
object.
For example:
numbers = Observable.from([1, 2, 3]);
numbers.materialize().subscribe(
{ if(rx.Notification.Kind.OnNext == it.kind) { println("Next: " + it.value); }
else if(rx.Notification.Kind.OnCompleted == it.kind) { println("Completed"); }
else if(rx.Notification.Kind.OnError == it.kind) { println("Error: " + it.exception); } },
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Next: 1
Next: 2
Next: 3
Completed
Sequence complete
You can undo the effects of materialize( )
by means of the dematerialize( )
method, which will emit the items from the Observable as though materialize( )
had not been applied to it. The following example dematerializes the materialized Observable from the previous section:
numbers = Observable.from([1, 2, 3]);
numbers.materialize().dematerialize().subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
2
3
Sequence complete
- javadoc:
dematerialize()
- RxJS:
dematerialize
- Linq:
Dematerialize
- Introduction to Rx: Materialize and Dematerialize
The timestamp( )
method converts an Observable that emits items of type T into one that emits objects of type Timestamped<T>
, where each such object is stamped with the time at which it was emitted.
def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 200000) });
myObservable.timestamp().subscribe(
{ println(it.toString()); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Timestamped(timestampMillis = 1369252582698, value = 200000)
Timestamped(timestampMillis = 1369252582740, value = 400000)
Timestamped(timestampMillis = 1369252582782, value = 600000)
Timestamped(timestampMillis = 1369252582823, value = 800000)
Timestamped(timestampMillis = 1369252582864, value = 1000000)
Sequence complete
- javadoc:
timestamp()
- Linq:
Timestamp
- RxJS:
timestamp
- Introduction to Rx: TimeStamp and TimeInterval
It is possible for an Observable to invoke its Observers' methods asynchronously, perhaps in different threads. This could make an Observable poorly-behaved, in that it might invoke onCompleted
or onError
before one of its onNext
invocations. You can force such an Observable to be well-behaved and synchronous by applying the synchronize( )
method to it.
- javadoc:
synchronize(observable)
- Linq:
Synchronize
remember the sequence of items emitted by the Observable and emit the same sequence to future Observers
By default, an Observable will generate its sequence of emitted items afresh for each new Observer that subscribes. You can force it to generate its sequence only once and then to emit this identical sequence to every Observer by using the cache( )
method. Compare the behavior of the following two sets of sample code, the first of which does not use cache( )
and the second of which does:
def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 400000) }).timestamp();
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error:" + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error:" + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Timestamped(timestampMillis = 1369252832871, value = 400000)
Timestamped(timestampMillis = 1369252832951, value = 800000)
Sequence complete
Timestamped(timestampMillis = 1369252833074, value = 400000)
Timestamped(timestampMillis = 1369252833154, value = 800000)
Sequence complete
def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 400000) }).timestamp().cache();
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error:" + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error:" + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Timestamped(timestampMillis = 1369252924548, value = 400000)
Timestamped(timestampMillis = 1369252924630, value = 800000)
Sequence complete
Timestamped(timestampMillis = 1369252924548, value = 400000)
Timestamped(timestampMillis = 1369252924630, value = 800000)
Sequence complete
Note that in the second example the timestamps are identical for both of the observers, whereas in the first example they differ.
The cache( )
method will not itself trigger the execution of the source Observable; an initial observer must subscribe to the Observable returned from cache( )
before it will begin emitting items.
- javadoc:
cache()
To specify in which Scheduler (thread) the Observable should invoke the Observers' onNext( )
, onCompleted( )
, and onError( )
methods, call the Observable's observeOn( )
method, passing it the appropriate Scheduler
.
- javadoc:
observeOn(scheduler)
- RxJS:
observeOn
- Linq:
ObserveOn
- Rx Workshop: Schedulers
- Introduction to Rx: SubscribeOn and ObserveOn
To specify that the work done by the Observable should be done on a particular Scheduler (thread), call the Observable's subscribeOn( )
method, passing it the appropriate Scheduler
. By default (that is, unless you modify the Observable also with observeOn( )
) the Observable will invoke the Observers' onNext( )
, onCompleted( )
, and onError( )
methods in this same thread.
- javadoc:
subscribeOn(scheduler)
- RxJS:
subscribeOn
- Linq:
SubscribeOn
- Rx Workshop: Schedulers
- Introduction to Rx: SubscribeOn and ObserveOn
split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread
You can use the parallel( )
method to split an Observable into as many Observables as there are available processors, and to do work in parallel on each of these Observables. parallel( )
will then merge the results of these parallel computations back into a single, well-behaved Observable sequence.
- javadoc:
parallel(function)
- javadoc:
parallel(function,scheduler)
Use the doOnEach( )
method to register an Action
that RxJava will perform each time the Observable emits an item. This action takes the item as a parameter.
There are also doOnEach( )
variants that allow you to register actions to perform if the Observable completes or informs of a throwable. The doOnNext( )
method is equivalent to the version of doOnEach( )
that registers an action to be performed only for each emitted item.
- javadoc:
doOnEach(action)
- javadoc:
doOnEach(observer)
- Linq:
Do
- RxJS:
do
anddoAction
Use the doOnCompleted( )
method to register an Action
that RxJava will perform if the Observable completes normally (not by means of an error).
- javadoc:
doOnCompleted(action)
- Linq:
Do
- RxJS:
do
anddoAction
Use the doOnError( )
method to register an Action
that RxJava will perform if the Observable terminates with an error. This action takes the Throwable representing the error as a parameter.
- javadoc:
doOnError(action)
- Linq:
Do
- RxJS:
do
anddoAction
You can use the finallyDo( )
method of an Observable to register an action that RxJava will invoke after that Observable invokes either the onCompleted( )
or onError( )
method of its Observer.
def numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.finallyDo({ println('Finally'); }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
2
3
4
5
Sequence complete
Finally
- javadoc:
finallyDo(action)
- RxJS:
finally
/finallyAction
- Linq:
Finally
The delay( )
operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable's items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.
Note that delay( )
will not time-shift an onError( )
call in this fashion but it will forward such a call immediately to its subscribers.
- javadoc:
delay(delay,unit)
anddelay(delay,unit,scheduler)
- RxJS:
delay
- Linq:
Delay
- Introduction to Rx: Delay
hold an Observer's subscription request for a specified amount of time before passing it on to the source Observable
The delaySubscription( )
operator shifts waits for a specified period of time after receiving a subscription request before subscribing to the source Observable.
The timeInterval( )
operator converts a source Observable into an Observable that emits the amount of time lapsed between consecutive emissions of the source Observable. The first emission is the amount of time lapsed between the time the Observer subscribed to the Observable and the time the source Observable emitted its first item. There is no corresponding emission marking the amount of time lapsed between the last emission of the source Observable and the subsequent call to onCompleted( )
.
- javadoc:
timeInterval()
- javadoc:
timeInterval(scheduler)
- RxJS:
timeInterval
- Linq:
TimeInterval
- Introduction to Rx: TimeStamp and TimeInterval
Pass the using( )
method two factory functions: the first creates a disposable resource, the second creates an Observable. When an observer subscribes to the resulting Observable, using( )
will use the Observable factory function to create the Observable the observer will observe, while at the same time using the resource factory function to create a resource. When the Observer unsubscribes from the Observable, or when the Observable terminates (normally or with an error), using( )
will dispose of the resource it created.
- javadoc:
using(resourceFactory,observableFactory)
- RxJS:
using
if the Observable completes after emitting a single item, return that item, otherwise throw an exception (or return a default item)
Use the single( )
method to retrieve the only item emitted by an Observable. single( )
will notify of an exception if the source Observable does not emit exactly one item.
You can also use this method to retrieve the only item emitted by an Observable that meets some particular condition (or null
if the Observable method emits no such item). To do this, pass a function to single( )
that returns true
if the item meets the condition. In such a case, single( )
will again notify of an exception unless the source Observable emits exactly one item that meets the condition.
The singleOrDefault( )
method is similar, except that while it will still notify of an exception if the underlying Observable emits more than one item, if the underlying Observable does not emit any items at all, rather than notifying of an exception, the Observable returned by singleOrDefault( )
will emit a default item that you specify. Specify that default item by passing it as the first parameter to singleOrDefault( )
.
- Table of similar blocking and non-blocking operators
- javadoc:
single( )
- javadoc:
single(predicate)
- javadoc:
singleOrDefault(default)
- javadoc:
singleOrDefault(default,predicate)
- RxJS:
single
andsingleOrDefault
- Linq:
singleAsync
andsingleOrDefaultAsync
- Introduction to Rx: Single
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Twitter @RxJava | Jobs