Releases: Reactive-Extensions/RxJS
RxJS v4.1.0
We are happy to announce the release of RxJS version 4.1. With this release came a few new additions as well as a new system for pulling in what you want.
Some of the changes are the following:
- Build What You Want with
@rxjs/rx
- Adding
repeatWhen
- Parity with RxJS v5 names
- Other changes
Build What You Want with @rxjs/rx
One of the biggest requests with RxJS was to build only what you wanted. In previous attempts, we looked at a CLI to build what you wanted, but that was suboptimal experience. Instead, we have ported the existing code base to CommonJS format so that it works right out of the box with Node.js, or with your favorite bundler whether it is Browserify, Webpack, Rollup, etc.
By default, this brings in all of RxJS when you require the @rxjs/rx
module:
const Rx = require('@rxjs/rx');
const subscription = Rx.Observable.from([1,2,3])
.filter(x => x % 2 === 0)
.map(x => x + 2)
.subscribe(x => console.log(`The answer is ${x}`));
// => The answer is 4
Now it is possible to bring in as much or as little as you want by only including the operators you want:
const fromArray = require('@rxjs/rx/observable/fromArray');
const filter = require('@rxjs/rx/observable/filter');
const map = require('@rxjs/rx/observable/map');
const source = fromArray([1,2,3]);
const filtered = filter(source, x => x % 2 === 0);
const mapped = map(filtered, x => x + 2);
const subscription = mapped.subscribe( x => console.log(`The answer is ${x}`) );
// => The answer is 4
Not only can you bring in certain operators, but you can also add each one to the prototype if you want the nice chaining feature as well.
const Observable = require('@rxjs/rx/observable');
// Add class methods
Observable.addToObject({
fromArray: require('@rxjs/rx/observable/fromarray')
});
// Add instance methods
Observable.addToPrototype({
filter: require('@rxjs/rx/observable/filter'),
map: require('@rxjs/rx/observable/map')
});
const subscription = Observable.fromArray([1,2,3])
.filter(x => x % 2 === 0)
.map(x => x + 2)
.subscribe(x => console.log(`The answer is ${x}`));
In addition, we also added some distributions that you will find under our src/modular/dist
folder which contains all of RxJS built in a UMD style as well as a "lite" version as well. This should give you the building blocks for creating your own builds of RxJS and taking the only the operators you need.
Adding repeatWhen
We often try to keep parity with other Rx versions such as RxJava. To that end, we've added repeatWhen
which complements the retryWhen
we've had for quite some time. Rather than buffering and replaying the sequence from the source Observable, the repeatWhen
resubscribes to and mirrors the source Observable, but only conditionally based upon the Observable you return from the call.
Here is an example where we can repeat a sequence twice with a delay of 200ms in between time.
const source = Rx.Observable.just(42)
.repeatWhen(function(notifications) {
return notifications.scan((acc, x) => return acc + x, 0)
.delay(200)
.takeWhile(count => count < 2);
});
var subscription = source.subscribe(
x => console.log(`Next ${x}`,
err => console.log(`Error ${err}`),
() => console.log('Completed')
);
// => Next: 42
// 200 ms pass
// => Next: 42
// 200 ms pass
// => Completed
Parity with RxJS v5 names
Given the differences between RxJS v4 and RxJS v5, we've made the migration path easier by creating aliases for you such as the following:
// Prototype methods
Observable.prototype.race = Observable.prototype.amb;
Observable.prototype.mergeMap = Observable.prototype.flatMap;
Observable.prototype.switchMap = Observable.prototype.flatMapLatest;
Observable.prototype.exhaustMap = Observable.prototype.flatMapFirst;
Observable.prototype.exhaust = Observable.prototype.switchFirst;
Observable.prototype.publishReplay = Observable.prototype.replay;
// Object methods
Observable.bindCallback = Observable.fromCallback;
Observable.bindNodeCallback = Observable.fromNodeCallback;
Observable.race = Observable.amb;
Other Changes
- Bug fixes such as to
ConnectableObservable
so that if the underlyingSubject
has been disposed, we will no longer attempt to resubscribe to it. - The
startWith
operator no longer usesScheduler.currentThread
and now usesScheduler.immediate
, as that caused issues with the backpressure operators such aspausable
andpausableBuffered
. - Documentation bug fixes
RxJS Version 4.0.6
This is a bug fix release of the Reactive Extensions for JavaScript (RxJS) for version 4.0 to fix a number of issues. The most prominent being the issue with fromPromise(promise)
was swallowing errors from Observable
instances which is now fixed. Looking forward, we will continue to work on performance as well as the modular design for those who want to pick and choose which pieces from NPM they want to use.
Performance Work
Work continued on performance with Rx.Observable.onErrorResumeNext
, Rx.Observable.mergeDelayError
as well as our join patterns. Expect this to continue throughout the lifecycle of v4.x.
Bugs Fixed:
These were the bugs fixed during this release since 4.0.0:
- #969 - fix for
timeout
without otherObservable
- #964 - fixed shared state for
zip
,combineLatest
andwithLatestFrom
insubscribeCore
- #963 - Angular broken with latest release
- #957 - fix issue with
fromEvent
not firing - #955 -
rx.d.ts
compilation issue fix - #949 - add null handling for
isIterable
check - #947 - add
initialValue
topublishValue.md
- #941 - fix for
timer
which was firing immediately - #939 - documentation fix for
find
- #938 - fix
defaultIfEmpty
with default value. - #936 - fix
fromPromise
behavior not to swallow errors when used withRx.Observable.spawn
- #934 - fix
BehaviorSubject
inheritance fromObserver
- #932 - include
zip
in TypeScript exports - #931 - include
merge
in TypeScript exports
RxJS Version 4.0
This is another release in terms of cleaning up our technical debt by simplifying a number of our infrastructure, including our schedulers both for regular usage as well as testing. There were will be a few more point releases from this repository before a switch over the more modern RxJS vNext, including modularity, expression trees, and so forth.
Before we go further, it's worth mentioning that since Microsoft Edge supports ES 2016 Async Functions, you can take advantage of them in whole new ways in RxJS, because as we've had support for returning Promises, we support async functions as well.
With a very quick example, we can do the following:
const source = Rx.Observable.of(1,2,3)
.flatMap(async function (x, i) {
var result = await Promise.resolve(x * i);
return result;
});
source.subscribe((x) => console.log(`Next: ${x}`))
// => Next: 0
// => Next: 2
// => Next: 6
Included in this release are the following:
- A More Complete
rx.all.js
- Scheduler rewrite
- Testing rewrite
- Collapsing of operators
- Performance upgrades
What's coming next in this release?
- Modularity
- More Performance Upgrades
A More Complete rx.all.js
In previous releases, rx.all.js
and its compat counterpart rx.all.compat.js
contained all of the operators in RxJS, but did not include any of the testing infrastructure. This has been changed so that you no longer need to bring in rx.testing
in order to write your own tests.
Scheduler Rewrite
The schedulers have long had a long of technical debt when brought over directly from the .NET world. To simplify this, we will now have the following contract on the Scheduler
as written in TypeScript so that it makes sense as an interface. You will notice that the previous versions which did not have state associated with them are no longer supported. This caused too much overhead to support both, so if you have no state to pass, simply pass null
for the state.
interface IDisposable {
dispose(): void
}
interface IScheduler {
// Current time
now(): number;
// Schedule immediately
schedule<TState>(state: TState, action: (scheduler: IScheduler, state: TState) => IDisposable) : IDisposable;
// Schedule relative
scheduleFuture<TState>(state: TState, dueTime: number, action: (scheduler: IScheduler, state: any) => IDisposable) : IDisposable;
// Schedule absolute
scheduleFuture<TState>(state: TState, dueTime: Date, action: (scheduler: IScheduler, state: TState) => IDisposable) : IDisposable;
// Schedule recursive
scheduleRecursive<TState>(state: TState, action: (state: TState, action: (state: TState) => void) => void): IDisposable;
// Schedule recursive relative
scheduleRecursiveFuture<TState>(state: TState, dueTime: number, action: (state: TState, action: (state: TState, dueTime: number) => void) => void): IDisposable;
// Schedule recursive absolute
scheduleRecursiveFuture<TState>(state: TState, dueTime: Date, action: (state: TState, action: (state: TState, dueTime: Date) => void) => void): IDisposable;
// Schedule periodic
schedulePeriodic<TState>(state: TState, period: number, action: (state: TState) => TState): IDisposable;
}
Now, to schedule something immediately, you must follow the following code snippet. The return value is optional as we will automatically fix it to be a Disposable
if you do not provide us with one.
var d = scheduler.schedule(null, function (scheduler, state) {
console.log('scheduled ASAP');
return Rx.Disposable.empty;
});
Same applies to scheduling in the future:
// Scheduled 5 seconds in the future with absolute time
var d = scheduler.scheduleFuture(null, new Date(Date.now() + 5000), function (scheduler, state) {
console.log('scheduled using absolute time');
return Rx.Disposable.empty;
});
// Scheduled 5 seconds in the future with relative time
var d = scheduler.scheduleFuture(null, 5000 function (scheduler, state) {
console.log('scheduled using relative time');
return Rx.Disposable.empty;
});
You will also notice that the recursive scheduling as well as periodic scheduling removed the versions where no state was involved. Also, it is necessary to enforce that with scheduleRecursiveFuture
determines the relative or absolute timing by the return value from the recurse
call for example. If you don't wish to use state for the recurse
call, simply use recurse(null, dueTime)
.
// Absolute scheduling
scheduler.scheduleRecursiveFuture(1, new Date(Date.now() + 5000), function (state, recurse) {
if (state < 10) {
recurse(state + 1, new Date(Date.now() + (state * 1000));
}
});
// Relative scheduling
scheduler.scheduleRecursiveFuture(1, 5000, function (state, recurse) {
if (state < 10) {
recurse(state + 1, state * 1000);
}
});
Testing Rewrite
One of the biggest undertakings in this release was to standardize and clean up our unit tests. Over the past releases, there was a bit of technical debt that needed to be paid off. In this release, our virtual time system as well as our test scheduling was rewritten as to put it more in line with the regular schedulers. We rid ourselves of the WithState
operators, simply renaming them to their basic operators such as scheduleAsbolute
and scheduleRelative
.
With the TestScheduler
, we cleaned it up so that you can easily specify when a particular timing for the creation, subscription and disposal of the Observable sequence. This is a quick example of a test in action using timing where in the scheduler.startScheduler
method as the second parameter, you can pass in an object with some timings for creation, subscription disposal. If you omit this, it will default to the normal timings of 100
for created
, 200
for subscribed
and 1000
for disposed
.
test('first default', function () {
var scheduler = new TestScheduler();
var xs = scheduler.createHotObservable(
onNext(150, 1),
onCompleted(250)
);
var res = scheduler.startScheduler(
function () {
return xs.first({defaultValue: 42});
},
{ created: 100, subscribed: 200, disposed: 1000 }
);
res.messages.assertEqual(
onNext(250, 42),
onCompleted(250)
);
xs.subscriptions.assertEqual(
subscribe(200, 250)
);
});
All tests should look like this now making them much easier to read going forward.
Collapsing of Operators
Previously, we had a number of operators such as debounceWithSelector
and timeoutWithSelector
that were simply overloads of their respective debounce
and timeout
methods. To avoid confusion having more named operators, we have simply condensed those into debounce
and timeout` so that they look like the following:
Debounce with relative due time:
Rx.Observable.prototype.debounce(dueTime, [scheduler])
Debounce with selector:
Rx.Observable.prototype.debounce(durationSelector)
Timeout with relative or absolute due time:
Rx.Observable.prototype.timeout(dueTime, [other], [scheduler])
Timeout with selector and optional first timeout:
Rx.Observable.prototype.timeout([firstTimeout], timeoutDurationSelector, [other])
Performance Upgrades
In this version, we addressed more performance as we rewrote many operators to minimize chained scopes in addition to writing operators from the bottom up instead of relying on composition from other operators. This had some significant increases in some areas. In addition, we also did some shortcuts for example if the Rx.Scheduler.immediate
was used, we could swap that out for an inline call with returning an empty disposable.
In RxJS vNext, many of the performance concerns will be addressed and have shown great progress.
What's Next
There will be a number of more releases for 4.x until vNext is ready which will address a number of issues including:
- Modularity
- Performance
Modularity
Now that the operators and schedulers have largely stabilized for performance, we're going to fix more of the modularity story, allowing you to bring in only what you need. Work had already begun on this part of the project, but now that the majority of the technical debt has been paid, this makes for a much easier transition.
Performance
Although many operators have been rewritten to minimized chained scopes, there are a number of operators that have not. In this release, we intend to get those operators such as timeout
to be optimized for performance and making it more optimized for the GC.
RxJS Version 3.1.1
This is a minor release with a patch for a regression for zip
when given an array of values and closes issue #867
RxJS Version 3.1
This is a slight update to RxJS since 3.0 which contains a number of fixes and new features. This should be the last of the 3.x releases moving forward with 4.0 to include the large scheduler changes to improve performance as well as get rid of some more technical debt.
Some of the features included in this release include:
- Result Selectors now optional on
zip
andcombineLatest
- Changes to
zip
- Added
zipIterable
- Introduction of
Rx.Observable.wrap
- New TypeScript Definitions
- Bug fixes
Result Selectors Now Optional
In previous releases of RxJS, the zip
and combineLatest
took either an array of Observables or Promises and followed by a resultSelector
function which took the values projected from each sequence and allowed you to combine them any which way you wanted. With version 3.1, the resultSelector
has been made completely optional.
Previously, if you wanted to combine three sequences, you had to have a result selector as well, even if it were as trivial as adding all of the values to an array.
var source = Rx.Observable.zip(
Rx.Observable.of(1,2,3),
Rx.Observable.of(4,5,6),
Rx.Observable.of(7,8,9),
function (x, y, z) { return [x, y, z]; });
// => [1, 4, 7]
// => [2, 5, 8]
// => [3, 6, 9]
With the result selector being optional you can now omit the last parameter and the aforementioned behavior will be done automatically for you. This applies to all versions of zip
and combineLatest
whether it is on the prototype or the Observable itself.
var source = Rx.Observable.zip(
Rx.Observable.of(1,2,3),
Rx.Observable.of(4,5,6),
Rx.Observable.of(7,8,9));
// => [1, 4, 7]
// => [2, 5, 8]
// => [3, 6, 9]
Changes to zip
The zip
function is very useful and very flexible about its input. So much so, that we tried to allow for iterables such as Map
, Set
, and Array
, but making this a solid API was too difficult, so instead zip
now only accepts Observables and Promises so now you can have full parity between the method signatures of combineLatest
and zip
with either arguments or an array of Observables or Promises.
/* Arguments version */
var source = Rx.Observable.zip(
Rx.Observable.of(1,2,3),
Rx.Observable.of(4,5,6),
Rx.Observable.of(7,8,9)
);
/* Array version */
var source = Rx.Observable.zip([
Rx.Observable.of(1,2,3),
Rx.Observable.of(4,5,6),
Rx.Observable.of(7,8,9)
]);
Introducing zipIterable
As noted above there were some issues with zip
trying to accept iterables as well as Observable and Promises given that we accepted both arguments and arrays. To fix this, we have introduced zipIterable
which allows you to zip an Observable with any iterable whether it be an Array
, Map
, Set
or even Generator
.
var source = Rx.Observable.of(1,2,3).zipIterable(
[4, 5, 6],
[7, 8, 9]
);
// => [1, 4, 7]
// => [2, 5, 8]
// => [3, 6, 9]
With a generator it can be even more fun such as:
var source = Rx.Observable.of(1,2,3).zipIterable(
(function* () { yield 4; yield 5; yield 6; })();
(function* () { yield 7; yield 8; yield 9; })();
);
// => [1, 4, 7]
// => [2, 5, 8]
// => [3, 6, 9]
Introduction to Rx.Observable.wrap
In the previous release, we redid Rx.Observable.spawn
so that it could accept a number of yieldable arguments such as Observable, Promises, Arrays and Objects and returned an Observable. In this release we went a bit further by adding Rx.Observable.wrap
which creates an Observable from a function which now takes arguments.
var fn = Rx.Observable.wrap(function* (value) {
return [
yield Rx.Observable.just(value + 2).delay(2000),
yield Rx.Observable.just(value)
];
});
fn(1000).subscribe(next => console.log('Next %s', next));
// => [1002, 1000]
With some fixes with this release and wrap
we can create pretty complex objects and have it yield properly.
var fn = Rx.Observable.wrap(function* (v) {
return {
1: {
3: yield [
Rx.Observable.just(20).delay(5000), [
Rx.Observable.just(30).delay(2000), Rx.Observable.just(40).delay(3000)
]
]
},
2: yield Promise.resolve(v)
};
});
fn(1000).subscribe(next => console.log('Next %s', next));
Many thanks to @xgrommx for his help on this effort!
New TypeScript Definitions
Many people have a lot of interest in the TypeScript definitions for RxJS. Many thanks to @david-driscoll for stepping up to the plate and completely rewriting them from scratch!
Bug Fixes
RxJS Version 3.0
Here is the long awaited RxJS version 3.0! This has many changes since our last release of v2.5.3. This release contains a great number of changes for the better including rx.core.js
and the Rx-Core family as well as the rx.lite.js
and the Rx-Lite family of libraries. Rx-Core allows you to implement the bare bones of the Observable contract so that your library is compatible with RxJS with additions for multicast operations via rx.core.binding.js
and testing with rx.core.testing.js
. In addition, the Rx-Lite family allows you to stick with rx.lite.js
and then require only the lite
packages you need as you need them.
In this release, we tried to pay down some of our technical debt that has incurred over the years since we started releasing RxJS five years ago. To that end, there are some breaking changes, hence the major version bump to 3.0 most of which are dealing with ridding ourselves of aliases that did not properly reflect the code and has reduced to a single name.
Here are some of the highlights:
- Rx Core Packages
- Rx Lite Packages
- New Operators
Rx.Observable.fromEvent
ChangesRx.Observable.spawn
ChangesRx.Observable.prototype.scan
ChangesOrDefault
Method Changes- Deprecations
- Performance Improvements
- Bug Fixes
What's next for us? We're working on the following items during the 3.x timeframe including:
- Bonsai Tree serialization to allow RxJS to cross boundaries
- Single module exports per operator
- Moving towards a new testing system
Rx Core Packages
There have been many questions over the years as to a minimal implementation of RxJS so that developers can provide shims for Observables instead of shipping the entire library. To that end, we have shipped a minimal implementation of Rx Observables along with a base scheduler and some disposables. We also ship for those who want multicast behavior on their created Observables we also ship that library as well. And finally, we also have a shim for testing just in case you want to test your shims in the exact same style that RxJS uses.
We ship three main files to support this:
rx.core.js
rx.core.binding.js
rx.core.testing.js
Each of these can be found in their own respective NPM packages
We'd like to get them as lean and mean as possible so any suggestions and we're all ears.
Rx Lite Packages
In previous releases of RxJS, we shipped Lite packages which were meant to be a stripped down version of RxJS to the most commonly used operators. To stay more productive, developers would eventually bring in more operators as they need them with such things as time-based operators, virtual-time, testing and so forth. Not only that, but they would need to support older environments as well as newer ones. To fix this, the team has introduced a number of new NPM packages to support the Lite workflow. Each package has a compat counterpart so all the way up the chain it uses the right version of RxJS for you.
We have created the following NPM packages to support Rx Lite:
rx-lite
-rx-lite-compat
rx-lite-extras
-rx-lite-extras-compat
rx-lite-aggregates
-rx-lite-aggregates-compat
rx-lite-async
-rx-lite-async-compat
rx-lite-backpressure
-rx-lite-backpressure-compat
rx-lite-coincidence
-rx-lite-coincidence-compat
rx-lite-experimental
-rx-lite-experimental-compat
rx-lite-joinpatterns
-rx-lite-joinpatterns-compat
rx-lite-testing
-rx-lite-testing-compat
rx-lite-time
-rx-lite-time-compat
rx-lite-virtualtime
-rx-lite-virtualtime-compat
New Operators
People are always looking for new ways to combine sequences using variations of flatMap
, whether it is to control the amount of concurrency to whether the first or the last value is cut and disposed. RxJS version 3.0 has added the following operators to help with those efforts.
Rx.Observable.fromEvent
Changes
In previous releases, Rx.Observable.fromEvent
, if given a selector function, would pass the raw arguments to the selector function to allow the developer to pick which items to project. This has been found to be clumsy so instead, the function is applied with the given arguments, thus making it a more natural experience.
For example, previously, you would have to do the following:
var Rx = require('rx');
var EventEmitter = require('events').EventEmitter;
var e = new EventEmitter();
var changes = Rx.Observable.fromEvent(e, 'changes', function (args) {
return { first: args[0], second: args[1] };
});
This has been simplified to instead so that the arguments are directly applied on the selector function:
var Rx = require('rx');
var EventEmitter = require('events').EventEmitter;
var e = new EventEmitter();
var changes = Rx.Observable.fromEvent(e, 'changes', function (first, second) {
return { first: first, second: second };
});
Rx.Observable.spawn
Changes
Originally, RxJS shipped with a very primitive version of handling generator functions through Rx.spawn
. This has been changed to Rx.Observable.spawn
which now instead of returning a Node.js style callback, will now return an Observable
for consistency with the rest of RxJS.
This enables a number of exciting scenarios mixing in support for Promises, Arrays, Generators, Node-style callbacks, and yes of course, Observables.
var Rx = require('rx');
var nodeCallBack = function (val) {
return function (cb) {
cb(null, val);
};
};
var spawned = Rx.Observable.spawn(function* () {
var v = yield nodeCallBack(12);
var w = yield [24];
var x = yield Rx.Observable.just(42);
var y = yield Rx.Observable.just(56);
var z = yield Promise.resolve(78);
return v + w[0] + x + y + z;
});
spawned.subscribe(
function (x) { console.log('next %s', x); },
function (e) { console.log('error %s', e); },
function () { console.log('completed'); }
);
// => next 212
// => completed
- Rx.Observable.prototype.scan
Changes
In previous releases, the scan
method followed the now removed aggregate
method syntax which had the seed first, followed by the aggregate function. For RxJS 3.0, this has been reversed to match the reduce
signature which better matches the natures of JavaScript and the Array#extras.
Previously, the signature was the following:
observable.scan(0 /*seed*/, function (acc, x) {
return acc + x;
});
Now has been changed to the following:
observable.scan(function (acc, x) {
return acc + x;
}, 0 /*seed*/
OrDefault
Method Changes
In previous releases of RxJS, we had methods for a number of operators such as first
, last
, elementAt
and single
with OrDefault
suffixes on them which determined whether they would throw on an error or return some default value. Instead of having more operators, this has been condensed into one single operator for each:
Each of those APIs have changed to allow a much nicer workflow. For example, in previous releases, we wanted to have a default value if there was no first element, then we'd have to provide the argument for the default value as the third parameter.
observable.firstOrDefault(null /*selector*/, null /*thisArg*/, 42 /*defaultValue*/);
Instead, this has been radically simplified with the following:
observable.first({defaultValue: 42});
Note that if a default value is omitted, then the function will throw upon not meeting the condition or being empty.
Deprecations
Note that the following method aliases have had their accompanied aliases removed:
...Method | Removed Aliases |
---|---|
catch | catchError, catchException |
do | doAction |
every | all |
finally |
RxJS version 2.5
This is the first stable release of RxJS version 2.5. Once again with version 2.4, we've focused on performance, modularity among other topics.
To that end, here is what is included in the latest release.
- Moving node.js Specific Bindings
- Improved Performance of Creation Operators
- Addition of More Rx-Lite NPM Modules
Moving node.js Specific Bindings
In previous versions, index.js
included node.js specific bindings to such things as the EventEmitter
, Streams, and more. This added extra bloat to RxJS, especially for those using browserify to create their appplications when many of these functions were not needed. This also caused issues for users of React and React Native. The functionality that was previously in index.js
has been moved to a node.js specific NPM module called rx-node.
For those wishing to still have Rx.Node
still in your application can simply do the following:
var Rx = require('rx');
Rx.Node = require('rx-node');
Existing methods that were added to Rx.Observable
have been moved directly into RxJS itself such as Rx.Observable.prototype.pipe
which handles node.js style streams and interoperates with such libraries such as Highland.js.
Improved Performance Of Creation Operators
In previous releases of RxJS, we focused on improving the performance of the most common used instance operators such as map
, filter
, mergeAll
, concatAll
, and various creation operators such as fromArray
, from
, of
, range
among others. In this release, we focused on some of the more basic operators such as just
, throw
, empty
, never
, and pairs
.
Some of the performance gains are immense. This numbers come from node.js v0.12.2 running on a MacBook Pro with 16GB RAM.
Operator just
/return
$ node just.js
old x 330,062 ops/sec ±2.25% (86 runs sampled)
new x 747,616 ops/sec ±1.60% (90 runs sampled)
Fastest is new
Operator throw
/throwError
$ node throw.js
old x 336,647 ops/sec ±2.37% (85 runs sampled)
new x 867,807 ops/sec ±2.18% (76 runs sampled)
Fastest is new
Operator empty
$ node empty.js
old x 312,889 ops/sec ±2.47% (80 runs sampled)
new x 844,512 ops/sec ±1.72% (91 runs sampled)
Fastest is new
Operator never
$ node never.js
old x 350,545 ops/sec ±2.49% (81 runs sampled)
new x 1,307,236 ops/sec ±1.65% (62 runs sampled)
Fastest is new
Operator pairs
$ node pairs.js
old x 125,360 ops/sec ±1.36% (90 runs sampled)
new x 178,085 ops/sec ±1.08% (95 runs sampled)
Fastest is new
We're not even close to finishing performance optimization as we're looking at zip
, combineLatest
, withLatestFrom
, as well as more aggregate operations such as scan
in the upcoming releases.
Addition of Rx-Lite Modules
In the previous release, we released a smaller version of RxJS with the rx-lite
, rx-lite-compat
and its extras of rx-lite-extras
and rx-lite-extras-compat
which allowed you to use only rx.lite.js
and rx.lite.extras.js
and its older browser compatible versions. To that end, we also added the capability to add in every other module into rx-lite
so that you can pick and choose to bring in certain functionality instead of getting all of the rx
module. This is due mostly that people are using NPM to create their client-side applications so keeping the footprint of RxJS low is ideal.
These include:
rx-lite-aggregates
/rx-lite-aggregates-compat
rx-lite-async
/rx-lite-async-compat
rx-lite-backpressure
/rx-lite-backpressure-compat
rx-lite-coincidence
/rx-lite-coincidence-compat
rx-lite-experimental
/rx-lite-experimental-compat
rx-lite-joinpatterns
/rx-lite-joinpatterns-compat
rx-lite-testing
/rx-lite-testing-compat
rx-lite-time
/rx-lite-time-compat
rx-lite-virtualtime
/rx-lite-virtualtime-compat
RxJS version 2.4
This is the first stable release of RxJS 2.4! There have been several things at the forefront of our mind which is performance and modularity.
To that end, we focused on the following for this release:
- Performance Enhancements
- New NPM Packages
- New operators/methods
- Non-breaking Changes
- Breaking Changes
Performance Enhancements
Performance has been a key aspect of the past RxJS releases and for 2.4 will continue to be a work in progress. You will note that we have included performance tests using benchmark.js which tests 2.4.x against previous 2.3.x releases in the tests/perf/operators
folder.
You can run them to see what kinds of gains we were able to get for example, this is a check of the new map
implementation versus the previous version.
function square(x) { return x * x; }
function double(x) { return x + x; }
Rx.Observable.range(0, 50)
.map(square)
.map(double).subscribe();
Running this code between versions, we can see a definite speedup:
$ node map.js
old x 12,968 ops/sec ±3.44% (89 runs sampled)
new x 15,079 ops/sec ±4.34% (81 runs sampled)
Fastest is new
We can also test the two different versions of range
to see what kinds of gains we were able to make:
Rx.Observable.range(0, 25).subscribe();
Running that sample, we can find the difference pretty staggering.
$ node range.js
old x 26,711 ops/sec ±3.81% (87 runs sampled)
new x 37,831 ops/sec ±4.33% (83 runs sampled)
Fastest is new
What Changes Did We Make?
There were a number of rules that we followed that allowed us to get faster code. They include:
- Do not let
arguments
leave the scope of the method - Do not perform
Array.prototype.slice
onarguments
and instead do a local copy - Avoid try/catch/finally if possible such as the work Bluebird did
- Use the state based methods for scheduling which reduced scope chains
- Use classes instead of anonymous classes created by scope chains.
- Use method fusion when possible to chain together
map
withmap
calls, andfilter
withfilter
calls.
To cover some of them in more detail, we can for example reduce chained scopes by using the scheduler, let's look at the following example. Let's just use scheduleRecursive
which does not allow for state to be passed in. Instead, we would have to close over it in our inner scope.
Observable.range = function (start, count, scheduler) {
isScheduler(scheduler) || (scheduler = currentThreadScheduler);
return new AnonymousObservable(function (observer) {
var i = 0;
return scheduler.scheduleRecursive(function (self) {
if (i < count) {
observer.onNext(start + (i++));
self();
} else {
observer.onCompleted();
}
});
});
};
Instead, we could use our state based which will rid ourselves of the i
variable and capture the state solely within our scheduler.
Observable.range = function (start, count, scheduler) {
isScheduler(scheduler) || (scheduler = currentThreadScheduler);
return new AnonymousObservable(function (observer) {
return scheduler.scheduleRecursiveWithState(0, function (i, self) {
if (i < count) {
observer.onNext(start + i);
self(i + 1);
} else {
observer.onCompleted();
}
});
});
};
Of course we went further by turning the range
operator into its own class. You can find the optimized operators in the src/core/perf/operators
folder.
We also looked into method fusion, for example, if the current operation is a map
and the next operation is also a map
, then we can shortcut it so that we only use a single Observer
for them both.
return this instanceof MapObservable ?
this.internalMap(selectorFn, thisArg) :
new MapObservable(this, selectorFn, thisArg);
Then the internalMap
function might look like the following where we have the two selector functions chained together for our new selector function.
MapObservable.prototype.internalMap = function (selector, thisArg) {
var self = this;
return new MapObservable(
this.source,
function (x, i, o) { return selector(self.selector(x, i, o), i, o); }, thisArg);
};
This is by no means the end of our optimizations. We must carefully weigh the changes made here with the performance gains obtained versus the code size that we produce and that we are going for larger optimizations and not micro-optimizations.
New NPM Packages
Many users of RxJS want better modularity and smaller builds of RxJS using only what they need. One of the ways we have done this is by creating rx.lite.js
which is a subset of the complete RxJS which has the most used operators in a single package.
We have now made the following packages also available on NPM:
rx.lite
- Includesrx.lite.js
rx.lite.compat
- Includesrx.lite.js
for browsers older than IE9rx.lite.extras
- Includes extras inrx.js
not inrx.lite.js
rx.lite.extras.compat
- Includes extras inrx.js
not inrx.lite.js
for browsers older than IE9
New Operators/Methods
We have introduced two new operators/methods in this release:
Rx.Observable.mergeDelayError(..args)
Rx.BehaviorSubject.prototype.getValue()
Rx.Observable.mergeDelayError(..args)
The first operator, mergeDelayError
flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to receive all successfully emitted items from all of the source Observables without being interrupted by an error notification from one of them.
This behaves like Observable.prototype.mergeAll
except that if any of the merged Observables notify of an error via the Observer's onError
, mergeDelayError
will refrain from propagating that error notification until all of the merged Observables have finished emitting items.
var source1 = Rx.Observable.of(1,2,3);
var source2 = Rx.Observable.throwError(new Error('woops'));
var source3 = Rx.Observable.of(4,5,6);
var source = Rx.Observable.mergeDelayError(source1, source2, source3);
var subscription = source.subscribe(
function (x) {
console.log('Next: %s', x);
},
function (err) {
console.log('Error: %s', err);
},
function () {
console.log('Completed');
});
// => 1
// => 2
// => 3
// => 4
// => 5
// => 6
// => Error: Error: woops
Rx.BehaviorSubject.prototype.getValue()
Another operator that was added was to properly get the current value from a BehaviorSubject
. In previous releases, you could use the value
field, but it was not guarded against improper access. This value is frozen in time once onCompleted
has been called. If onError
has been called, calling this method will cause the Error
from the onError
call to be thrown.
var b = new Rx.BehaviorSubject(42);
console.log(b.getValue());
// => 42
b.onNext(56);
console.log(b.getValue());
try {
b.onError(new Error('woops'));
var x = b.getValue();
} catch (e) {
console.log(e.message);
}
// => woops
Non-Breaking Changes
Sometimes we feel that in a previous release, we did not consider naming and now we're stuck with it, whether we like it or not.
Aliasing Rx.Scheduler.timeout
to Rx.Scheduler.default
Such is the case for Rx.Scheduler.timeout
which in the old days of RxJS used setTimeout
indeed to schedule items. But, it is much more than that now that it should be the default scheduler used for asynchronous operations. To that, we have created an alias to it for Rx.Scheduler.default
to indicate its new status and we have changed our documentation appropriately.
Breaking Changes
For point releases, RxJS will keep any changes to a minimum until we get to version 3 and beyond. There are some instances where there are breaking changes on internal code, not meant for public consumption.
Current Thread and Immediate Scheduler Changes
In previous releases, the Rx.Scheduler.immediate
and Rx.Scheduler.currentThread
allowed for blocking on the main thread for future scheduling with either relative or absolute time. This is now disallowed and will throw an Error
if it is attempted. The affected APIs are:
Rx.Scheduler.immediate.scheduleWithRelative
Rx.Scheduler.immediate.scheduleWithRelativeAndState
Rx.Scheduler.currentThread.scheduleWithRelative
Rx.Scheduler.currentThread.scheduleWithRelativeAndState
Note that these were internal APIs not meant for public consumption, so none of your code should be affected. We wanted to ensure the proper usage of schedulers, hence disallowing anything that might block the main thread.
RxJS version 2.3.25
This release was mainly focused on performance as we continue to push RxJS forward.
In this release, there were the following changes:
- Performance Upgrades
- Method Fusion
Observable.prototype.pluck
Changes- Fixes
Performance Upgrades
RxJS has been committed to bringing the best performance possible. Lately we have been making changes to make it even faster while keeping the memory footprint low. To that end, we've been making strides little by little to identify bottlenecks and eliminate them.
We've been doing the following to ensure performance:
- Add benchmarks versus previous versions
- Optimize scheduler usage
- Reducing scope chains
- Method Fusion
- Enhance for Engine Optimizations
Add Benchmarks
To measure our progress, we have set up benchmark tests for performance in the tests/perf/operators
folder using benchmark.js which compares the previous version of RxJS to the current edge version. This will also give you insight into what changes we are making and how much of a difference it is making.
To give a concrete example, we were able to make the following improvements to operators such as Observable.prototype.map
:
Using node.js v0.10.36 on a Surface 2 Pro as your mileage may vary based upon your machine.
λ node map.js
old x 5,471 ops/sec ±5.04% (82 runs sampled)
new x 7,529 ops/sec ±4.61% (87 runs sampled)
Fastest is new
As you'll notice, the operations per second increase is rather dramatic in some cases, especially for map
and filter
where we reduced method scopes and created classes for both an Observable
and Observer
.
Optimize Scheduler Usage
Another optimization we made is to use schedulers more efficiently, especially around using scheduleWithState
, and scheduleRecursiveWithState
which are more efficient than their non-state counterparts of schedule
and scheduleRecursive
. One such example is how Observable.fromArray
was optimized. Previously, we had an index outside the scope of the scheduler which was incremented, and chained scopes can be expensive if you mutate variables. In this case, we are no longer doing that and instead, start with a state of 0 and then incrementing it via the recursive call to self(i + 1)
.
Observable.fromArray = function (array, scheduler) {
var len = array.length;
isScheduler(scheduler) || (scheduler = currentThreadScheduler);
return new AnonymousObservable(function (observer) {
return scheduler.scheduleRecursiveWithState(0, function (i, self) {
if (i < len) {
observer.onNext(array[i]);
self(i + 1);
} else {
observer.onCompleted();
}
});
});
};
And we're not stopping here to make schedulers more efficient since they are the heart of Rx.
Reducing Scope Chains
If you looked at previous implementations of map
, you'll notice there are nested scope chains, especially with mutation of the count
variable.
Observable.prototype.map = function (selector, thisArg) {
var selectorFn = isFunction(selector) ? bindCallback(selector, thisArg, 3) : function () { return selector; },
source = this;
return new AnonymousObservable(function (o) {
var count = 0;
return source.subscribe(function (value) {
try {
var result = selectorFn(value, count++, source);
} catch (e) {
return o.onError(e);
}
o.onNext(result);
}, function (e) { o.onError(e); }, function () { o.onCompleted(); });
}, source);
};
To get around this we created classes for map
with an Observable
and Observer
here. This allows us also future optimizations such as method fusion which we will get into later.
Method Fusion
Another way we could squeeze more performance through the technique of method fusion, meaning the ability to fuse two calls into one. For example, if you have two filter
calls in a row, we could easily flatten that into a single filter
call, and the same applies to map
as well.
Observable.prototype.map = function (selector, thisArg) {
var selectorFn = isFunction(selector) ? selector : function () { return selector; };
return this instanceof MapObservable ?
this.internalMap(selector, thisArg) :
new MapObservable(this, selectorFn, thisArg);
};
Here we are detecting if there are two map
calls in a row, and then can flatten them into a single call:
MapObservable.prototype.internalMap = function (selector, thisArg) {
var self = this;
return new MapObservable(
this.source,
function (x, i, o) { return selector(self.selector(x, i, o), i, o); }, thisArg)
};
Enhance For Engine Optimizations
One bit more involved is to ensure that we are optimizing for the ways that JavaScript engines work. For example, the Bluebird library team has a good document on optimization killers for V8.
One simple example was to ensure that we were not leaking arguments anywhere. To fix that is simple, simply a call such as the following where instead of calling slice
on arguments
which can be expensive, we can simply inline the arguments
into a new Array
with the proper size.
Observable.of = function() {
var a = new Array(arguments.length), len = a.length;
for(var i = 0; i < len; i++) { a.push(arguments[i]); }
observableOf(null, a);
};
We are looking into the next version of fixing all functions with try/catch
and try/finally
to ensure that we have faster speeds which will come in the next release.
Observable.prototype.pluck
Changes
One request that we've had for some time is having the ability to get nested properties using the Observable.prototype.pluck
operator. To that and, we have added the ability to get to nested properties by giving an argument list of properties to pluck. So, for example, we could pull out the event.target.value
from the event
argument.
var element = document.querySelector('#input');
var source = Rx.Observable.fromEvent(element, 'keyup')
.pluck('target', 'value');
This is equivalent to using map
in this way:
var element = document.querySelector('#input');
var source = Rx.Observable.fromEvent(element, 'keyup')
.map(function(event) { return event.target.value; });
Why didn't we just allow a single string to pluck the values like some other libraries have done? That's because there's no easy way to break them up, after all, having a dot in the name of a variable is acceptable, thus leading to unpredictable behavior.
var foo = {
'.bar': {
'.baz': 42
}
};
If you try to use a string like target.value
it will not work in that case.
Many thanks to @sergi for his help in creating this operator.
Fixes
- Issue #495 - Fix links - @38elements
- Issue #496 - Fix documentation - @38elements
- Issue #497 - fixed typo - @g00fy-
- Issue #500 - fix link in observable.md - @38elements
- Issue #502 - fix sample code - @38elements
- Issue #505 - Fix trailing comma in
intro.js
- @mattpodwysocki - Issue #506 - fix links - @38elements
- Issue #507 - documentation fixes - @38elements
- Issue #509 - Rename
contains
toincludes
- Issue #510 - Fix valueOf for hash calls - @wasphub
- Issue #511 - Documentation fixes - @38elements
- Issue #513 - Regression in
AnonymousSubject.prototype.subscribe
- @mattpodwysocki - Issue #514 - Fixing documentation links - @zebulonj
- Issue #516 - Minor sample code fix - @zebulonj
- Issue #517 - Fix example in document - @38elements
- Issue #518 - Fix callbacks.md - @varunmc
- Issue #520 - Fix scheduler.md - @38elements
- Issue #525 - Documentation fixes - @38elements
- Issue #526 - Implement
deepPluck
- @sergi - Issue #530 - Fix parentheses in
spawn
- @cultofmetatron - Issue #534 - Fix html - @rutsky
- Issue #535 -
subscribe
ignoresthisArg
- @mattpodwysocki - Issue #578 - Fixing small issue in docs - @unao
RxJS version 2.3.23
This release has a number of new features as well as improvements to the existing codebase.
Of note, there are the following items changed:
- New Operators
- Performance Enhancements
- Bug Fixes
- Documentation fixes
New Operators
In this version, a number of new operators are now available
Observable.pairs
One question that was posed to the team, what if we could take an object and turn it into an observable of key/value pairs? To that end, we have implemented Observable.pairs
, which is an implementation which mirrors Lo-Dash and Underscore.js but instead of returning an array of key/value arrays, it returns an Observable
of key/value arrays.
Using this together with ES6 produces quite a nice result with destructuring of the arrays.
let obj = {
foo: 42,
bar: 56,
baz: 78
};
let source = Rx.Observable.pairs(obj);
let subscription = source.subscribe(
[key, value] => {
console.log('Key:', key, 'Value:', value);
},
err => {
console.log('Error: %s', err);
},
=> () {
console.log('Completed');
});
// => Key: 'foo' Value: 42
// => Key: 'bar' Value: 56
// => Key: 'baz' Value: 78
// => Completed
This is a great alternative to using a much longer form of:
var obj = {
foo: 42,
bar: 56,
baz: 78
};
var source = Observable.from(Object.keys(obj))
.map(function (x) { return [x, obj[x]]; });
Observable.prototype.retryWhen
In previous releases of RxJS, for retrying behavior, we had a single operator of Observable.prototype.retry
which would try running the observable the specified number of times. This is acceptable behavior for immediate retries, but for more complex scenarios, we want this behavior to be more configurable. To that end, we have implemented Observable.prototype.retryWhen
which comes from RxJava.
The retryWhen
operator is similar to retry
but decides whether or not to resubscribe to and mirror the source Observable
by passing the Error
from the onError
notification to a function that generates a second Observable
, and observes its result to determine what to do. If that result is an emitted item, retryWhen
resubscribes to and mirrors the source and the process repeats; if that result is an onError
notification, retryWhen
passes this notification on to its observers and terminates.
This allows for an eventual back-off strategy to handle failures such as the following:
var source = Observable.create(function (o) {
console.log('subscribing');
o.onError(new Error('always fails'));
})
.retryWhen(function(attempts) {
return attempts
.zip(Observable.range(1, 3), function (n, i) { return i; })
.flatMap(function(i) {
console.log('delay retry by', i, 'second(s)');
return Observable.timer(i * 1000 /*ms*/);
});
});
source.subscribe();
/*
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
*/
Many thanks to @Blesh for the implementation!
Observable.prototype.withLatestFrom
RxJS has many ways of combining observable sequences whether it is zip
which waits for pairs, which could penalize the faster of the observables which produce values. RxJS also has combineLatest
which allows you to combine the latest value from each of the observable sequences which allows you to no longer be penalized by a slower observable sequence, instead, you will get each one in turn, as part of the pair.
There may be times, however, when you only want the latest values from the other sequences, produced with the source sequence. To that end, we have introduced the Observable.prototype.withLatestFrom
method which merges the specified observable sequences into one observable sequence by using the selector function only when the source observable sequence (the instance) produces an element.
/* Have staggering intervals */
var source1 = Rx.Observable.interval(140)
.map(function (i) { return 'First: ' + i; });
var source2 = Rx.Observable.interval(50)
.map(function (i) { return 'Second: ' + i; });
// When source1 emits a value, combine it with the latest emission from source2.
var source = source1.withLatestFrom(
source2,
function (s1, s2) { return s1 + ', ' + s2; }
).take(4);
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
// => Next: First: 0, Second: 1
// => Next: First: 1, Second: 4
// => Next: First: 2, Second: 7
// => Next: First: 3, Second: 10
// => Completed
Many thanks to @staltz for the implementation!
Performance Improvements
RxJS is committed to being as high performance as possible. To fix some issues, we have reverted the usage of Function.prototype.bind
which is incredibly slow as of this date. In addition, we have removed some composition from some operators and implementing them as standalone operators.
The following operators should have much better performance including:
Observable.prototype.concatAll
Observable.prototype.concatMap
Observable.prototype.filter
Observable.prototype.flatMap
Observable.prototype.map
Observable.prototype.merge
Observable.prototype.mergeAll
Observable.prototype.reduce
In the coming point releases, we will be continuously striving for better performance.
Bug Fixes
The following bugs were fixed:
- #484 - Fix Long Stack Traces with Subjects - @trxcllnt
- #485 - Add current time to Immediate Scheduler loop - @trxcllnt
Documentation Fixes
As always, we are adding more documentation for RxJS to help you better understand the library and why you should use RxJS versus other libraries, as well as the mapping concepts between the libraries.
Going forward, you will be able to find the RxJS documentation along with all other languages supported on the reactivex.io home page, which is the home for all implementations of Rx.
We'd like to thank the following for submitting bug fixes for our documentation: @scouten, @artemyarulin, @lazaruslarue, @andrewk, @adm72, @eladb, @YuvalZiegler, @jca02266, @erykpiast, @saraid, @paddlefish, @kt3k, and @38elements! You are what make RxJS so awesome!