Skip to content

Commit b134e0c

Browse files
committed
feat(exhaustMap): add higher-order lettable exhaustMap
1 parent b145dca commit b134e0c

File tree

2 files changed

+153
-0
lines changed

2 files changed

+153
-0
lines changed

src/operators/exhaustMap.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import { Operator } from '../Operator';
2+
import { Observable, ObservableInput } from '../Observable';
3+
import { Subscriber } from '../Subscriber';
4+
import { Subscription } from '../Subscription';
5+
import { OuterSubscriber } from '../OuterSubscriber';
6+
import { InnerSubscriber } from '../InnerSubscriber';
7+
import { subscribeToResult } from '../util/subscribeToResult';
8+
import { OperatorFunction } from '../interfaces';
9+
10+
/* tslint:disable:max-line-length */
11+
export function exhaustMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
12+
export function exhaustMap<T, I, R>(project: (value: T, index: number) => ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, R>;
13+
/* tslint:enable:max-line-length */
14+
15+
/**
16+
* Projects each source value to an Observable which is merged in the output
17+
* Observable only if the previous projected Observable has completed.
18+
*
19+
* <span class="informal">Maps each value to an Observable, then flattens all of
20+
* these inner Observables using {@link exhaust}.</span>
21+
*
22+
* <img src="./img/exhaustMap.png" width="100%">
23+
*
24+
* Returns an Observable that emits items based on applying a function that you
25+
* supply to each item emitted by the source Observable, where that function
26+
* returns an (so-called "inner") Observable. When it projects a source value to
27+
* an Observable, the output Observable begins emitting the items emitted by
28+
* that projected Observable. However, `exhaustMap` ignores every new projected
29+
* Observable if the previous projected Observable has not yet completed. Once
30+
* that one completes, it will accept and flatten the next projected Observable
31+
* and repeat this process.
32+
*
33+
* @example <caption>Run a finite timer for each click, only if there is no currently active timer</caption>
34+
* var clicks = Rx.Observable.fromEvent(document, 'click');
35+
* var result = clicks.exhaustMap((ev) => Rx.Observable.interval(1000).take(5));
36+
* result.subscribe(x => console.log(x));
37+
*
38+
* @see {@link concatMap}
39+
* @see {@link exhaust}
40+
* @see {@link mergeMap}
41+
* @see {@link switchMap}
42+
*
43+
* @param {function(value: T, ?index: number): ObservableInput} project A function
44+
* that, when applied to an item emitted by the source Observable, returns an
45+
* Observable.
46+
* @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector]
47+
* A function to produce the value on the output Observable based on the values
48+
* and the indices of the source (outer) emission and the inner Observable
49+
* emission. The arguments passed to this function are:
50+
* - `outerValue`: the value that came from the source
51+
* - `innerValue`: the value that came from the projected Observable
52+
* - `outerIndex`: the "index" of the value that came from the source
53+
* - `innerIndex`: the "index" of the value from the projected Observable
54+
* @return {Observable} An Observable containing projected Observables
55+
* of each item of the source, ignoring projected Observables that start before
56+
* their preceding Observable has completed.
57+
* @method exhaustMap
58+
* @owner Observable
59+
*/
60+
export function exhaustMap<T, I, R>(
61+
project: (value: T, index: number) => ObservableInput<I>,
62+
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R
63+
): OperatorFunction<T, R> {
64+
return (source: Observable<T>) => source.lift(new SwitchFirstMapOperator(project, resultSelector));
65+
}
66+
67+
class SwitchFirstMapOperator<T, I, R> implements Operator<T, R> {
68+
constructor(private project: (value: T, index: number) => ObservableInput<I>,
69+
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
70+
}
71+
72+
call(subscriber: Subscriber<R>, source: any): any {
73+
return source.subscribe(new SwitchFirstMapSubscriber(subscriber, this.project, this.resultSelector));
74+
}
75+
}
76+
77+
/**
78+
* We need this JSDoc comment for affecting ESDoc.
79+
* @ignore
80+
* @extends {Ignored}
81+
*/
82+
class SwitchFirstMapSubscriber<T, I, R> extends OuterSubscriber<T, I> {
83+
private hasSubscription: boolean = false;
84+
private hasCompleted: boolean = false;
85+
private index: number = 0;
86+
87+
constructor(destination: Subscriber<R>,
88+
private project: (value: T, index: number) => ObservableInput<I>,
89+
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
90+
super(destination);
91+
}
92+
93+
protected _next(value: T): void {
94+
if (!this.hasSubscription) {
95+
this.tryNext(value);
96+
}
97+
}
98+
99+
private tryNext(value: T): void {
100+
const index = this.index++;
101+
const destination = this.destination;
102+
try {
103+
const result = this.project(value, index);
104+
this.hasSubscription = true;
105+
this.add(subscribeToResult(this, result, value, index));
106+
} catch (err) {
107+
destination.error(err);
108+
}
109+
}
110+
111+
protected _complete(): void {
112+
this.hasCompleted = true;
113+
if (!this.hasSubscription) {
114+
this.destination.complete();
115+
}
116+
}
117+
118+
notifyNext(outerValue: T, innerValue: I,
119+
outerIndex: number, innerIndex: number,
120+
innerSub: InnerSubscriber<T, I>): void {
121+
const { resultSelector, destination } = this;
122+
if (resultSelector) {
123+
this.trySelectResult(outerValue, innerValue, outerIndex, innerIndex);
124+
} else {
125+
destination.next(innerValue);
126+
}
127+
}
128+
129+
private trySelectResult(outerValue: T, innerValue: I,
130+
outerIndex: number, innerIndex: number): void {
131+
const { resultSelector, destination } = this;
132+
try {
133+
const result = resultSelector(outerValue, innerValue, outerIndex, innerIndex);
134+
destination.next(result);
135+
} catch (err) {
136+
destination.error(err);
137+
}
138+
}
139+
140+
notifyError(err: any): void {
141+
this.destination.error(err);
142+
}
143+
144+
notifyComplete(innerSub: Subscription): void {
145+
this.remove(innerSub);
146+
147+
this.hasSubscription = false;
148+
if (this.hasCompleted) {
149+
this.destination.complete();
150+
}
151+
}
152+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ export { distinctUntilKeyChanged } from './distinctUntilKeyChanged';
2222
export { elementAt } from './elementAt';
2323
export { every } from './every';
2424
export { exhaust } from './exhaust';
25+
export { exhaustMap } from './exhaustMap';
2526
export { filter } from './filter';
2627
export { ignoreElements } from './ignoreElements';
2728
export { map } from './map';

0 commit comments

Comments
 (0)