Skip to content

Commit e646851

Browse files
committed
feat(race): add higher-order lettable version of race
Refactors race static to live under `observables\/race`
1 parent 68286d4 commit e646851

File tree

5 files changed

+135
-8
lines changed

5 files changed

+135
-8
lines changed

src/add/observable/race.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Observable } from '../../Observable';
2-
import { raceStatic } from '../../operator/race';
2+
import { race as raceStatic } from '../../observable/race';
33

44
Observable.race = raceStatic;
55

src/observable/race.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { Observable } from '../Observable';
2+
import { isArray } from '../util/isArray';
3+
import { ArrayObservable } from '../observable/ArrayObservable';
4+
import { Operator } from '../Operator';
5+
import { Subscriber } from '../Subscriber';
6+
import { Subscription, TeardownLogic } from '../Subscription';
7+
import { OuterSubscriber } from '../OuterSubscriber';
8+
import { InnerSubscriber } from '../InnerSubscriber';
9+
import { subscribeToResult } from '../util/subscribeToResult';
10+
11+
/**
12+
* Returns an Observable that mirrors the first source Observable to emit an item.
13+
* @param {...Observables} ...observables sources used to race for which Observable emits first.
14+
* @return {Observable} an Observable that mirrors the output of the first Observable to emit an item.
15+
* @static true
16+
* @name race
17+
* @owner Observable
18+
*/
19+
export function race<T>(observables: Array<Observable<T>>): Observable<T>;
20+
export function race<T>(observables: Array<Observable<any>>): Observable<T>;
21+
export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): Observable<T>;
22+
export function race<T>(...observables: Array<Observable<any> | Array<Observable<any>>>): Observable<T> {
23+
// if the only argument is an array, it was most likely called with
24+
// `race([obs1, obs2, ...])`
25+
if (observables.length === 1) {
26+
if (isArray(observables[0])) {
27+
observables = <Array<Observable<any>>>observables[0];
28+
} else {
29+
return <Observable<any>>observables[0];
30+
}
31+
}
32+
33+
return new ArrayObservable<T>(<any>observables).lift(new RaceOperator<T>());
34+
}
35+
36+
export class RaceOperator<T> implements Operator<T, T> {
37+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
38+
return source.subscribe(new RaceSubscriber(subscriber));
39+
}
40+
}
41+
42+
/**
43+
* We need this JSDoc comment for affecting ESDoc.
44+
* @ignore
45+
* @extends {Ignored}
46+
*/
47+
export class RaceSubscriber<T> extends OuterSubscriber<T, T> {
48+
private hasFirst: boolean = false;
49+
private observables: Observable<any>[] = [];
50+
private subscriptions: Subscription[] = [];
51+
52+
constructor(destination: Subscriber<T>) {
53+
super(destination);
54+
}
55+
56+
protected _next(observable: any): void {
57+
this.observables.push(observable);
58+
}
59+
60+
protected _complete() {
61+
const observables = this.observables;
62+
const len = observables.length;
63+
64+
if (len === 0) {
65+
this.destination.complete();
66+
} else {
67+
for (let i = 0; i < len && !this.hasFirst; i++) {
68+
let observable = observables[i];
69+
let subscription = subscribeToResult(this, observable, observable, i);
70+
71+
if (this.subscriptions) {
72+
this.subscriptions.push(subscription);
73+
}
74+
this.add(subscription);
75+
}
76+
this.observables = null;
77+
}
78+
}
79+
80+
notifyNext(outerValue: T, innerValue: T,
81+
outerIndex: number, innerIndex: number,
82+
innerSub: InnerSubscriber<T, T>): void {
83+
if (!this.hasFirst) {
84+
this.hasFirst = true;
85+
86+
for (let i = 0; i < this.subscriptions.length; i++) {
87+
if (i !== outerIndex) {
88+
let subscription = this.subscriptions[i];
89+
90+
subscription.unsubscribe();
91+
this.remove(subscription);
92+
}
93+
}
94+
95+
this.subscriptions = null;
96+
}
97+
98+
this.destination.next(innerValue);
99+
}
100+
}

src/operator/race.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { Subscription, TeardownLogic } from '../Subscription';
77
import { OuterSubscriber } from '../OuterSubscriber';
88
import { InnerSubscriber } from '../InnerSubscriber';
99
import { subscribeToResult } from '../util/subscribeToResult';
10+
import { race as higherOrder } from '../operators';
1011

1112
/* tslint:disable:max-line-length */
1213
export function race<T>(this: Observable<T>, observables: Array<Observable<T>>): Observable<T>;
@@ -24,13 +25,7 @@ export function race<T, R>(this: Observable<T>, ...observables: Array<Observable
2425
* @owner Observable
2526
*/
2627
export function race<T>(this: Observable<T>, ...observables: Array<Observable<T> | Array<Observable<T>>>): Observable<T> {
27-
// if the only argument is an array, it was most likely called with
28-
// `pair([obs1, obs2, ...])`
29-
if (observables.length === 1 && isArray(observables[0])) {
30-
observables = <Array<Observable<T>>>observables[0];
31-
}
32-
33-
return this.lift.call(raceStatic<T>(this, ...observables));
28+
return higherOrder(...observables)(this);
3429
}
3530

3631
/**

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export { mergeMap } from './mergeMap';
99
export { min } from './min';
1010
export { multicast } from './multicast';
1111
export { publish } from './publish';
12+
export { race } from './race';
1213
export { reduce } from './reduce';
1314
export { refCount } from './refCount';
1415
export { scan } from './scan';

src/operators/race.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { Observable } from '../Observable';
2+
import { isArray } from '../util/isArray';
3+
import { MonoTypeOperatorFunction, OperatorFunction } from '../interfaces';
4+
import { race as raceStatic } from '../observable/race';
5+
6+
/* tslint:disable:max-line-length */
7+
export function race<T>(observables: Array<Observable<T>>): MonoTypeOperatorFunction<T>;
8+
export function race<T, R>(observables: Array<Observable<T>>): OperatorFunction<T, R>;
9+
export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): MonoTypeOperatorFunction<T>;
10+
export function race<T, R>(...observables: Array<Observable<any> | Array<Observable<any>>>): OperatorFunction<T, R>;
11+
/* tslint:enable:max-line-length */
12+
13+
/**
14+
* Returns an Observable that mirrors the first source Observable to emit an item
15+
* from the combination of this Observable and supplied Observables.
16+
* @param {...Observables} ...observables Sources used to race for which Observable emits first.
17+
* @return {Observable} An Observable that mirrors the output of the first Observable to emit an item.
18+
* @method race
19+
* @owner Observable
20+
*/
21+
export function race<T>(...observables: Array<Observable<T> | Array<Observable<T>>>): MonoTypeOperatorFunction<T> {
22+
return function raceOperatorFunction(source: Observable<T>) {
23+
// if the only argument is an array, it was most likely called with
24+
// `pair([obs1, obs2, ...])`
25+
if (observables.length === 1 && isArray(observables[0])) {
26+
observables = <Array<Observable<T>>>observables[0];
27+
}
28+
29+
return source.lift.call(raceStatic<T>(source, ...observables));
30+
};
31+
}

0 commit comments

Comments
 (0)