From ee3b593996096c90a07670b728835424717e9936 Mon Sep 17 00:00:00 2001 From: Justin Woo Date: Thu, 17 Dec 2015 01:52:25 -0600 Subject: [PATCH] feat(race): add race operator --- doc/index.md | 2 + spec/observables/race-spec.js | 193 ++++++++++++++++++++++++++++++++ spec/operators/race-spec.js | 149 ++++++++++++++++++++++++ src/Observable.ts | 3 + src/Rx.KitchenSink.ts | 2 + src/Rx.ts | 2 + src/add/operator/race-static.ts | 5 + src/add/operator/race.ts | 5 + src/operator/race-static.ts | 23 ++++ src/operator/race-support.ts | 62 ++++++++++ src/operator/race.ts | 20 ++++ 11 files changed, 466 insertions(+) create mode 100644 spec/observables/race-spec.js create mode 100644 spec/operators/race-spec.js create mode 100644 src/add/operator/race-static.ts create mode 100644 src/add/operator/race.ts create mode 100644 src/operator/race-static.ts create mode 100644 src/operator/race-support.ts create mode 100644 src/operator/race.ts diff --git a/doc/index.md b/doc/index.md index e3602ca32a..8a4fcef4c0 100644 --- a/doc/index.md +++ b/doc/index.md @@ -17,6 +17,7 @@ - [merge](function/index.html#static-function-merge) - [never](function/index.html#static-function-never) - [of](function/index.html#static-function-of) +- [race](function/index.html#static-function-race) - [range](function/index.html#static-function-range) - [throw](function/index.html#static-function-throw) - [timer](function/index.html#static-function-timer) @@ -62,6 +63,7 @@ - [publish](function/index.html#static-function-publish) - [publishBehavior](function/index.html#static-function-publishBehavior) - [publishReplay](function/index.html#static-function-publishReplay) +- [race](function/index.html#static-function-race) - [reduce](function/index.html#static-function-reduce) - [repeat](function/index.html#static-function-repeat) - [retry](function/index.html#static-function-retry) diff --git a/spec/observables/race-spec.js b/spec/observables/race-spec.js new file mode 100644 index 0000000000..47fd883a69 --- /dev/null +++ b/spec/observables/race-spec.js @@ -0,0 +1,193 @@ +/* globals expect, it, describe, hot, cold, expectObservable, expectSubscriptions */ + +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.race(...observables)', function () { + it('should race a single observable', function () { + var e1 = cold('---a-----b-----c----|'); + var e1subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = Observable.race(e1); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should race cold and cold', function () { + var e1 = cold('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = cold('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = Observable.race(e1, e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race hot and hot', function () { + var e1 = hot('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = Observable.race(e1, e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race hot and cold', function () { + var e1 = cold('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = Observable.race(e1, e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race 2nd and 1st', function () { + var e1 = cold('------x-----y-----z----|'); + var e1subs = '^ !'; + var e2 = cold('---a-----b-----c----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = Observable.race(e1, e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race emit and complete', function () { + var e1 = cold('-----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '-----|'; + + var result = Observable.race(e1, e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should allow unsubscribing early and explicitly', function () { + var e1 = cold('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b---'; + var unsub = ' !'; + + var result = Observable.race(e1, e2); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should not break unsubscription chains when unsubscribed explicitly', function () { + var e1 = hot('--a--^--b--c---d-| '); + var e1subs = '^ ! '; + var e2 = hot('---e-^---f--g---h-|'); + var e2subs = '^ ! '; + var expected = '---b--c--- '; + var unsub = ' ! '; + + var result = Observable.race( + e1.mergeMap(function (x) { return Observable.of(x); }), + e2.mergeMap(function (x) { return Observable.of(x); }) + ).mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should never emit when given non emitting sources', function () { + var e1 = cold('---|'); + var e2 = cold('---|'); + var e1subs = '^ !'; + var expected = '---|'; + + var source = Observable.race(e1, e2); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should throw when error occurs mid stream', function () { + var e1 = cold('---a-----#'); + var e1subs = '^ !'; + var e2 = cold('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----#'; + + var result = Observable.race(e1, e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should throw when error occurs before a winner is found', function () { + var e1 = cold('---#'); + var e1subs = '^ !'; + var e2 = cold('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---#'; + + var result = Observable.race(e1, e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('handle empty', function () { + var e1 = cold('|'); + var e1subs = '(^!)'; + var expected = '|'; + + var source = Observable.race(e1); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('handle never', function () { + var e1 = cold('-'); + var e1subs = '^'; + var expected = '-'; + + var source = Observable.race(e1); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('handle throw', function () { + var e1 = cold('#'); + var e1subs = '(^!)'; + var expected = '#'; + + var source = Observable.race(e1); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); diff --git a/spec/operators/race-spec.js b/spec/operators/race-spec.js new file mode 100644 index 0000000000..58c90ab12d --- /dev/null +++ b/spec/operators/race-spec.js @@ -0,0 +1,149 @@ +/* globals expect, it, describe, hot, cold, expectObservable, expectSubscriptions */ + +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('...race(observables)', function () { + it('should race cold and cold', function () { + var e1 = cold('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = cold('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = e1.race(e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race hot and hot', function () { + var e1 = hot('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = e1.race(e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race hot and cold', function () { + var e1 = cold('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = e1.race(e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race 2nd and 1st', function () { + var e1 = cold('------x-----y-----z----|'); + var e1subs = '^ !'; + var e2 = cold('---a-----b-----c----|'); + var e2subs = '^ !'; + var expected = '---a-----b-----c----|'; + + var result = e1.race(e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should race emit and complete', function () { + var e1 = cold('-----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '-----|'; + + var result = e1.race(e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should allow unsubscribing early and explicitly', function () { + var e1 = cold('---a-----b-----c----|'); + var e1subs = '^ !'; + var e2 = hot('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----b---'; + var unsub = ' !'; + + var result = e1.race(e2); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should not break unsubscription chains when unsubscribed explicitly', function () { + var e1 = hot('--a--^--b--c---d-| '); + var e1subs = '^ ! '; + var e2 = hot('---e-^---f--g---h-|'); + var e2subs = '^ ! '; + var expected = '---b--c--- '; + var unsub = ' ! '; + + var result = e1 + .mergeMap(function (x) { return Observable.of(x); }) + .race(e2) + .mergeMap(function (x) { return Observable.of(x); }); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should never emit when given non emitting sources', function () { + var e1 = cold('---|'); + var e2 = cold('---|'); + var e1subs = '^ !'; + var expected = '---|'; + + var source = e1.race(e2); + + expectObservable(source).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should throw when error occurs mid stream', function () { + var e1 = cold('---a-----#'); + var e1subs = '^ !'; + var e2 = cold('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---a-----#'; + + var result = e1.race(e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); + + it('should throw when error occurs before a winner is found', function () { + var e1 = cold('---#'); + var e1subs = '^ !'; + var e2 = cold('------x-----y-----z----|'); + var e2subs = '^ !'; + var expected = '---#'; + + var result = e1.race(e2); + + expectObservable(result).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); +}); diff --git a/src/Observable.ts b/src/Observable.ts index 3f61663add..30fa29b8e9 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -27,6 +27,7 @@ import {FromEventPatternObservable} from './observable/fromEventPattern'; import {PromiseObservable} from './observable/fromPromise'; import {IntervalObservable} from './observable/interval'; import {TimerObservable} from './observable/timer'; +import {race as raceStatic} from './operator/race-static'; import {RangeObservable} from './observable/range'; import {InfiniteObservable} from './observable/never'; import {ErrorObservable} from './observable/throw'; @@ -175,6 +176,7 @@ export class Observable implements CoreOperators { static merge: typeof mergeStatic; static never: typeof InfiniteObservable.create; static of: typeof ArrayObservable.of; + static race: typeof raceStatic; static range: typeof RangeObservable.create; static throw: typeof ErrorObservable.create; static timer: typeof TimerObservable.create; @@ -238,6 +240,7 @@ export class Observable implements CoreOperators { publishBehavior: (value: any) => ConnectableObservable; publishReplay: (bufferSize?: number, windowTime?: number, scheduler?: Scheduler) => ConnectableObservable; publishLast: () => ConnectableObservable; + race: (...observables: Array>) => Observable; reduce: (project: (acc: R, x: T) => R, seed?: R) => Observable; repeat: (count?: number) => Observable; retry: (count?: number) => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index d979963d64..b7fa71bf3d 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -26,6 +26,7 @@ export interface KitchenSinkOperators extends CoreOperators { import './add/operator/combineLatest-static'; import './add/operator/concat-static'; import './add/operator/merge-static'; +import './add/operator/race-static'; import './add/observable/bindCallback'; import './add/observable/defer'; import './add/observable/empty'; @@ -96,6 +97,7 @@ import './add/operator/publish'; import './add/operator/publishBehavior'; import './add/operator/publishReplay'; import './add/operator/publishLast'; +import './add/operator/race'; import './add/operator/reduce'; import './add/operator/repeat'; import './add/operator/retry'; diff --git a/src/Rx.ts b/src/Rx.ts index 1b450334d1..266017d208 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -11,6 +11,7 @@ import {Observable} from './Observable'; import './add/operator/combineLatest-static'; import './add/operator/concat-static'; import './add/operator/merge-static'; +import './add/operator/race-static'; import './add/observable/bindCallback'; import './add/observable/defer'; import './add/observable/empty'; @@ -71,6 +72,7 @@ import './add/operator/publish'; import './add/operator/publishBehavior'; import './add/operator/publishReplay'; import './add/operator/publishLast'; +import './add/operator/race'; import './add/operator/reduce'; import './add/operator/repeat'; import './add/operator/retry'; diff --git a/src/add/operator/race-static.ts b/src/add/operator/race-static.ts new file mode 100644 index 0000000000..da5596e14a --- /dev/null +++ b/src/add/operator/race-static.ts @@ -0,0 +1,5 @@ +import {Observable} from '../../Observable'; +import {race} from '../../operator/race-static'; +Observable.race = race; + +export var _void: void; diff --git a/src/add/operator/race.ts b/src/add/operator/race.ts new file mode 100644 index 0000000000..1afedd6f07 --- /dev/null +++ b/src/add/operator/race.ts @@ -0,0 +1,5 @@ +import {Observable} from '../../Observable'; +import {race} from '../../operator/race'; +Observable.prototype.race = race; + +export var _void: void; diff --git a/src/operator/race-static.ts b/src/operator/race-static.ts new file mode 100644 index 0000000000..5c4028f852 --- /dev/null +++ b/src/operator/race-static.ts @@ -0,0 +1,23 @@ +import {Observable} from '../Observable'; +import {ArrayObservable} from '../observable/fromArray'; +import {RaceOperator} from './race-support'; +import {isArray} from '../util/isArray'; + +/** + * Returns an Observable that mirrors the first source Observable to emit an item. + * @param {...Observables} ...observables sources used to race for which Observable emits first. + * @returns {Observable} an Observable that mirrors the output of the first Observable to emit an item. + */ +export function race(...observables: Array | Array>>): Observable { + // if the only argument is an array, it was most likely called with + // `pair([obs1, obs2, ...])` + if (observables.length === 1) { + if (isArray(observables[0])) { + observables = >>observables[0]; + } else { + return >observables[0]; + } + } + + return new ArrayObservable(observables).lift(new RaceOperator()); +} diff --git a/src/operator/race-support.ts b/src/operator/race-support.ts new file mode 100644 index 0000000000..c0dd84d509 --- /dev/null +++ b/src/operator/race-support.ts @@ -0,0 +1,62 @@ +import {Operator} from '../Operator'; +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; +import {OuterSubscriber} from '../OuterSubscriber'; +import {subscribeToResult} from '../util/subscribeToResult'; + +export class RaceOperator implements Operator { + call(subscriber: Subscriber): Subscriber { + return new RaceSubscriber(subscriber); + } +} + +export class RaceSubscriber extends OuterSubscriber { + private hasFirst: boolean = false; + private observables: Observable[] = []; + private subscriptions: Subscription[] = []; + + constructor(destination: Subscriber) { + super(destination); + } + + _next(observable: any): void { + this.observables.push(observable); + } + + _complete() { + const observables = this.observables; + const len = observables.length; + if (len === 0) { + this.destination.complete(); + } else { + for (let i = 0; i < len; i++) { + let observable = observables[i]; + let subscription = subscribeToResult(this, observable, observable, i); + + this.subscriptions.push(subscription); + this.add(subscription); + } + this.observables = null; + } + } + + notifyNext(observable: any, value: R, outerIndex: number): void { + if (!this.hasFirst) { + this.hasFirst = true; + + for (let i = 0; i < this.subscriptions.length; i++) { + if (i !== outerIndex) { + let subscription = this.subscriptions[i]; + + subscription.unsubscribe(); + this.remove(subscription); + } + } + + this.subscriptions = null; + } + + this.destination.next(value); + } +} diff --git a/src/operator/race.ts b/src/operator/race.ts new file mode 100644 index 0000000000..409e32c8e4 --- /dev/null +++ b/src/operator/race.ts @@ -0,0 +1,20 @@ +import {Observable} from '../Observable'; +import {race as raceStatic} from './race-static'; +import {isArray} from '../util/isArray'; + +/** + * Returns an Observable that mirrors the first source Observable to emit an item + * from the combination of this Observable and supplied Observables + * @param {...Observables} ...observables sources used to race for which Observable emits first. + * @returns {Observable} an Observable that mirrors the output of the first Observable to emit an item. + */ +export function race(...observables: Array | Array>>): Observable { + // if the only argument is an array, it was most likely called with + // `pair([obs1, obs2, ...])` + if (observables.length === 1 && isArray(observables[0])) { + observables = >>observables[0]; + } + + observables.unshift(this); + return raceStatic.apply(this, observables); +}