-
Notifications
You must be signed in to change notification settings - Fork 0
Observable Utility Operators
This section explains various utility operators for working with Observables.
-
toList( )
— collect all items from an Observable and emit them as a single List -
toSortedList( )
— collect all items from an Observable and emit them as a single, sorted List -
materialize( )
— convert an Observable into a list of Notifications -
dematerialize( )
— convert a materialized Observable back into its non-materialized form -
timestamp( )
— attach a timestamp to every item emitted by an Observable -
all( )
— determine whether all items emitted by an Observable meet some criteria -
exists( )
andisEmpty( )
— determine whether an Observable emits any items or not -
sequenceEqual( )
— test the equality of pairs of items emitted by two Observables -
synchronize( )
— force an Observable to make synchronous calls and to be well-behaved -
cache( )
— remember the sequence of items emitted by the Observable and emit the same sequence to future Observers -
observeOn( )
— specify on which Scheduler an Observer should observe the Observable -
subscribeOn( )
— specify which Scheduler an Observable should use when its subscription is invoked -
parallel( )
— split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread -
finallyDo( )
— register an action to take when an Observable completes -
delay( )
— shift the emissions from an Observable forward in time by a specified amount
Normally, an Observable that emits multiple items will do so by invoking its Observer’s onNext
method for each such item. You can change this behavior, instructing the Observable to compose a list of these multiple items and then to invoke the Observer’s onNext
method once, passing it the entire list, by calling the Observable’s toList( )
method prior to calling its subscribe( )
method. For example:
Observable.tolist(myObservable).subscribe({ myListOfSomething -> do something useful with the list });
For example, the following rather pointless code takes a list of integers, converts it into an Observable, then converts that Observable into one that emits the original list as a single item:
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
Observable.toList(numbers).subscribe(
{ println(it); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[1, 2, 3, 4, 5, 6, 7, 8, 9]
Sequence complete
In addition to calling toList( )
as a stand-alone method, you can also call it as a method of an Observable, so, in the example above, instead of
Observable.toList(numbers) ...
you could instead write
numbers.toList() ...
If you pass to toList( )
an Observable that invokes onCompleted
before emitting any items, toList( )
will emit an empty list before invoking onCompleted
. If the Observable you pass to toList( )
invokes onError
, toList( )
will in turn invoke the onError
methods of its Observers.
The toSortedList( )
method behaves much like toList( )
except that it sorts the resulting list. By default it sorts the list naturally in ascending order by means of the Comparable
interface. If any of the items emitted by the Observable does not support Comparable
with respect to the type of every other item emitted by the Observable, toSortedList( )
will throw an exception. However, you can change this default behavior by also passing in to toSortedList( )
a function that takes as its parameters two items and returns a number; toSortedList( )
will then use that function instead of Comparable
to sort the items.
For example, the following code takes a list of unsorted integers, converts it into an Observable, then converts that Observable into one that emits the original list in sorted form as a single item:
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]);
Observable.toSortedList(numbers).subscribe(
{ println(it); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[1, 2, 3, 4, 5, 6, 7, 8, 9]
Sequence complete
Here is an example that provides its own sorting function, in this case, one that sorts numbers according to how close to the number 5 they are:
numbers = Observable.from([8, 6, 4, 2, 1, 3, 5, 7, 9]);
Observable.toSortedList(numbers, { n, m -> Math.abs(5-n) - Math.abs(5-m) }).subscribe(
{ println(it); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[5, 6, 4, 3, 7, 8, 2, 1, 9]
Sequence complete
In addition to calling toSortedList( )
as a stand-alone method, you can also call it as a method of an Observable, so, in the examples above, instead of
Observable.toSortedList(numbers) ...
or
Observable.toSortedList(numbers, { n, m -> Math.abs(5-n) - Math.abs(5-m) }) ...
you could instead write
numbers.toSortedList() ...
or
numbers.toSortedList({ n, m -> Math.abs(5-n) - Math.abs(5-m) }) ...
- javadoc:
toSortedList()
- javadoc:
toSortedList(sortingFunction)
A well-formed Observable will invoke its Observer’s onNext
method zero or more times, and then will invoke either the onCompleted
or onError
method exactly once. The materialize( )
method converts this series of invocations into a series of items emitted by an Observable, where it emits each such invocation as a Notification
object.
For example:
numbers = Observable.from([1, 2, 3]);
Observable.materialize(numbers).subscribe(
{ if(rx.Notification.Kind.OnNext == it.kind) { println("Next: " + it.value); }
else if(rx.Notification.Kind.OnCompleted == it.kind) { println("Completed"); }
else if(rx.Notification.Kind.OnError == it.kind) { println("Error: " + it.exception); } },
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Next: 1
Next: 2
Next: 3
Completed
Sequence complete
In addition to calling materialize( )
as a stand-alone method, you can also call it as a method of an Observable, so that instead of
Observable.materialize(numbers) ...
in the above example, you could also write
numbers.materialize() ...
- javadoc:
materialize()
- Linq:
Materialize
You can undo the effects of materialize( )
by means of the dematerialize( )
method, which will emit the items from the Observable as though materialize( )
had not been applied to it. The following example dematerializes the materialized Observable from the previous section:
numbers = Observable.from([1, 2, 3]);
Observable.materialize(numbers).dematerialize().subscribe(
{ println(it); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
2
3
Sequence complete
- javadoc:
dematerialize()
- RxJS:
dematerialize
- Linq:
Dematerialize
The timestamp( )
method converts an Observable that emits items of type T into one that emits objects of type Timestamped<T>
, where each such object is stamped with the time at which it was emitted.
def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 200000) });
myObservable.timestamp().subscribe(
{ println(it.toString()); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Timestamped(timestampMillis = 1369252582698, value = 200000)
Timestamped(timestampMillis = 1369252582740, value = 400000)
Timestamped(timestampMillis = 1369252582782, value = 600000)
Timestamped(timestampMillis = 1369252582823, value = 800000)
Timestamped(timestampMillis = 1369252582864, value = 1000000)
Sequence complete
- javadoc:
timestamp()
- Linq:
Timestamp
Pass an function to all( )
that accepts an item emitted by the source Observable and returns a boolean value based on an evaluation of that item, and all( )
will emit true
if and only if that function returned true for every item emitted by the source Observable.
numbers = Observable.from([1, 2, 3, 4, 5]);
println("all even?" )
numbers.all({ 0 == (it % 2) }).subscribe({ println(it); });
println("all positive?");
numbers.all({ 0 < it }).subscribe({ println(it); });
all even?
false
all positive?
true
- javadoc:
all(predicate)
- RxJS:
all
- Linq:
All
- Introduction to Rx: All
When you apply the exists( )
operator to a source Observable, the resulting Observable will emit true
and complete if the source Observable emits one or more items before completing, or it will emit false
and complete if the source Observable completes without emitting any items.
The inverse of this is the isEmpty( )
operator. Apply it to a source Observable and the resulting Observable will emit true
and complete if the source Observable completes without emitting any items, or it will emit false
and complete if the source Observable emits any item before completing.
- RxJS:
any
- RxJS:
isEmpty
- Linq:
Any
- Introduction to Rx: Any
Pass sequenceEqual( )
two Observables, and it will compare the items emitted by each Observable, and emit true
for each pair of items if and only if both items are the same. You can optionally pass a third parameter: a function that accepts two items and returns true
if they are equal according to a standard of your choosing.
def firstfour = Observable.from([1, 2, 3, 4]);
def firstfouragain = Observable.from([1, 2, 3, 4]);
def firstfive = Observable.from([1, 2, 3, 4, 5]);
def firstfourscrambled = Observable.from([3, 2, 1, 4]);
println('firstfour == firstfive?');
Observable.sequenceEqual(firstfour, firstfive).subscribe({ println(it); });
println('firstfour == firstfouragain?');
Observable.sequenceEqual(firstfour, firstfouragain).subscribe({ println(it); });
println('firstfour == firstfourscrambled?');
Observable.sequenceEqual(firstfour, firstfourscrambled).subscribe({ println(it); });
firstfour == firstfive?
true
true
true
true
firstfour == firstfouragain?
true
true
true
true
firstfour == firstfourscrambled?
false
true
false
true
- javadoc:
sequenceEqual(observable1, observable2)
- javadoc:
sequenceEqual(observable1, observable2, equalityFunction)
- Linq:
SequenceEqual
- Introduction to Rx: SequenceEqual
It is possible for an Observable to invoke its Observers' methods asynchronously, perhaps in different threads. This could make an Observable poorly-behaved, in that it might invoke onCompleted
or onError
before one of its onNext
invocations. You can force such an Observable to be well-behaved and synchronous by applying the synchronize( )
method to it.
- javadoc:
synchronize(observable)
- Linq:
Synchronize
remember the sequence of items emitted by the Observable and emit the same sequence to future Observers
By default, an Observable will generate its sequence of emitted items afresh for each new Observer that subscribes. You can force it to generate its sequence only once and then to emit this identical sequence to every Observer by using the cache( )
method. Compare the behavior of the following two sets of sample code, the first of which does not use cache( )
and the second of which does:
def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 400000) }).timestamp();
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Timestamped(timestampMillis = 1369252832871, value = 400000)
Timestamped(timestampMillis = 1369252832951, value = 800000)
Sequence complete
Timestamped(timestampMillis = 1369252833074, value = 400000)
Timestamped(timestampMillis = 1369252833154, value = 800000)
Sequence complete
def myObservable = Observable.range(1, 1000000).filter({ 0 == (it % 400000) }).timestamp().cache();
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
myObservable.subscribe(
{ println(it.toString()); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
Timestamped(timestampMillis = 1369252924548, value = 400000)
Timestamped(timestampMillis = 1369252924630, value = 800000)
Sequence complete
Timestamped(timestampMillis = 1369252924548, value = 400000)
Timestamped(timestampMillis = 1369252924630, value = 800000)
Sequence complete
Note that in the second example the timestamps are identical for both of the observers, whereas in the first example they differ.
- javadoc:
cache()
To specify in which Scheduler (thread) the Observable should invoke the Observers' onNext( )
, onCompleted( )
, and onError( )
methods, call the Observable's observeOn( )
method, passing it the appropriate Scheduler
.
- javadoc:
observeOn(scheduler)
- RxJS:
observeOn
- Linq:
ObserveOn
- Rx Workshop: Schedulers
To specify that the work done by the Observable should be done on a particular Scheduler (thread), call the Observable's subscribeOn( )
method, passing it the appropriate Scheduler
. By default (that is, unless you modify the Observable also with observeOn( )
) the Observable will invoke the Observers' onNext( )
, onCompleted( )
, and onError( )
methods in this same thread.
- javadoc:
subscribeOn(scheduler)
- RxJS:
subscribeOn
- Linq:
SubscribeOn
- Rx Workshop: Schedulers
split the work done on the emissions from an Observable into multiple Observables each operating on its own parallel thread
You can use the parallel( )
method to split an Observable into as many Observables as there are available processors, and to do work in parallel on each of these Observables. parallel( )
will then merge the results of these parallel computations back into a single, well-behaved Observable sequence.
You can use the finallyDo( )
method of an Observable to register an action (a function that implements Action0
) that RxJava will invoke when that Observable invokes either the onCompleted( )
or onError( )
method of its Observer.
class TestFinally
{
static class myActionClass implements rx.util.functions.Action0 {
void call() { println('Finally'); }
}
static main() {
def myAction = new myActionClass();
def numbers = Observable.from([1, 2, 3, 4, 5]);
numbers.finallyDo(myAction).subscribe(
{ println(it); }, // onNext
{ println("Error encountered"); }, // onError
{ println("Sequence complete"); } // onCompleted
);
}
}
new TestFinally().main();
1
2
3
4
5
Sequence complete
Finally
- javadoc:
finallyDo(action)
- RxJS:
finallyAction
- Linq:
Finally
The delay( )
operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable's items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.
Note that delay( )
will not time-shift an onError( )
call in this fashion but it will forward such a call immediately to its subscribers.
A Netflix Original Production
Tech Blog | Twitter @NetflixOSS | Twitter @RxJava | Jobs