Skip to content

Commit b145dca

Browse files
committed
feat(exhaust): add higher-order lettable version of exhaust
1 parent 13f3503 commit b145dca

File tree

4 files changed

+95
-142
lines changed

4 files changed

+95
-142
lines changed

src/operator/exhaust.ts

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
import { Operator } from '../Operator';
1+
22
import { Observable } from '../Observable';
3-
import { Subscriber } from '../Subscriber';
4-
import { Subscription, TeardownLogic } from '../Subscription';
5-
import { OuterSubscriber } from '../OuterSubscriber';
6-
import { subscribeToResult } from '../util/subscribeToResult';
3+
import { exhaust as higherOrder } from '../operators';
74

85
/**
96
* Converts a higher-order Observable into a first-order Observable by dropping
@@ -41,47 +38,5 @@ import { subscribeToResult } from '../util/subscribeToResult';
4138
* @owner Observable
4239
*/
4340
export function exhaust<T>(this: Observable<T>): Observable<T> {
44-
return this.lift(new SwitchFirstOperator<T>());
45-
}
46-
47-
class SwitchFirstOperator<T> implements Operator<T, T> {
48-
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
49-
return source.subscribe(new SwitchFirstSubscriber(subscriber));
50-
}
51-
}
52-
53-
/**
54-
* We need this JSDoc comment for affecting ESDoc.
55-
* @ignore
56-
* @extends {Ignored}
57-
*/
58-
class SwitchFirstSubscriber<T> extends OuterSubscriber<T, T> {
59-
private hasCompleted: boolean = false;
60-
private hasSubscription: boolean = false;
61-
62-
constructor(destination: Subscriber<T>) {
63-
super(destination);
64-
}
65-
66-
protected _next(value: T): void {
67-
if (!this.hasSubscription) {
68-
this.hasSubscription = true;
69-
this.add(subscribeToResult(this, value));
70-
}
71-
}
72-
73-
protected _complete(): void {
74-
this.hasCompleted = true;
75-
if (!this.hasSubscription) {
76-
this.destination.complete();
77-
}
78-
}
79-
80-
notifyComplete(innerSub: Subscription): void {
81-
this.remove(innerSub);
82-
this.hasSubscription = false;
83-
if (this.hasCompleted) {
84-
this.destination.complete();
85-
}
86-
}
41+
return higherOrder()(this);
8742
}

src/operator/exhaustMap.ts

Lines changed: 3 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
import { Operator } from '../Operator';
1+
22
import { Observable, ObservableInput } from '../Observable';
3-
import { Subscriber } from '../Subscriber';
4-
import { Subscription } from '../Subscription';
5-
import { OuterSubscriber } from '../OuterSubscriber';
6-
import { InnerSubscriber } from '../InnerSubscriber';
7-
import { subscribeToResult } from '../util/subscribeToResult';
3+
import { exhaustMap as higherOrder } from '../operators';
84

95
/* tslint:disable:max-line-length */
106
export function exhaustMap<T, R>(this: Observable<T>, project: (value: T, index: number) => ObservableInput<R>): Observable<R>;
@@ -58,92 +54,5 @@ export function exhaustMap<T, I, R>(this: Observable<T>, project: (value: T, ind
5854
*/
5955
export function exhaustMap<T, I, R>(this: Observable<T>, project: (value: T, index: number) => ObservableInput<I>,
6056
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable<R> {
61-
return this.lift(new SwitchFirstMapOperator(project, resultSelector));
62-
}
63-
64-
class SwitchFirstMapOperator<T, I, R> implements Operator<T, R> {
65-
constructor(private project: (value: T, index: number) => ObservableInput<I>,
66-
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
67-
}
68-
69-
call(subscriber: Subscriber<R>, source: any): any {
70-
return source.subscribe(new SwitchFirstMapSubscriber(subscriber, this.project, this.resultSelector));
71-
}
72-
}
73-
74-
/**
75-
* We need this JSDoc comment for affecting ESDoc.
76-
* @ignore
77-
* @extends {Ignored}
78-
*/
79-
class SwitchFirstMapSubscriber<T, I, R> extends OuterSubscriber<T, I> {
80-
private hasSubscription: boolean = false;
81-
private hasCompleted: boolean = false;
82-
private index: number = 0;
83-
84-
constructor(destination: Subscriber<R>,
85-
private project: (value: T, index: number) => ObservableInput<I>,
86-
private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) {
87-
super(destination);
88-
}
89-
90-
protected _next(value: T): void {
91-
if (!this.hasSubscription) {
92-
this.tryNext(value);
93-
}
94-
}
95-
96-
private tryNext(value: T): void {
97-
const index = this.index++;
98-
const destination = this.destination;
99-
try {
100-
const result = this.project(value, index);
101-
this.hasSubscription = true;
102-
this.add(subscribeToResult(this, result, value, index));
103-
} catch (err) {
104-
destination.error(err);
105-
}
106-
}
107-
108-
protected _complete(): void {
109-
this.hasCompleted = true;
110-
if (!this.hasSubscription) {
111-
this.destination.complete();
112-
}
113-
}
114-
115-
notifyNext(outerValue: T, innerValue: I,
116-
outerIndex: number, innerIndex: number,
117-
innerSub: InnerSubscriber<T, I>): void {
118-
const { resultSelector, destination } = this;
119-
if (resultSelector) {
120-
this.trySelectResult(outerValue, innerValue, outerIndex, innerIndex);
121-
} else {
122-
destination.next(innerValue);
123-
}
124-
}
125-
126-
private trySelectResult(outerValue: T, innerValue: I,
127-
outerIndex: number, innerIndex: number): void {
128-
const { resultSelector, destination } = this;
129-
try {
130-
const result = resultSelector(outerValue, innerValue, outerIndex, innerIndex);
131-
destination.next(result);
132-
} catch (err) {
133-
destination.error(err);
134-
}
135-
}
136-
137-
notifyError(err: any): void {
138-
this.destination.error(err);
139-
}
140-
141-
notifyComplete(innerSub: Subscription): void {
142-
this.remove(innerSub);
143-
144-
this.hasSubscription = false;
145-
if (this.hasCompleted) {
146-
this.destination.complete();
147-
}
148-
}
57+
return higherOrder(project, resultSelector)(this);
14958
}

src/operators/exhaust.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { Operator } from '../Operator';
2+
import { Observable } from '../Observable';
3+
import { Subscriber } from '../Subscriber';
4+
import { Subscription, TeardownLogic } from '../Subscription';
5+
import { OuterSubscriber } from '../OuterSubscriber';
6+
import { subscribeToResult } from '../util/subscribeToResult';
7+
import { MonoTypeOperatorFunction } from '../interfaces';
8+
9+
/**
10+
* Converts a higher-order Observable into a first-order Observable by dropping
11+
* inner Observables while the previous inner Observable has not yet completed.
12+
*
13+
* <span class="informal">Flattens an Observable-of-Observables by dropping the
14+
* next inner Observables while the current inner is still executing.</span>
15+
*
16+
* <img src="./img/exhaust.png" width="100%">
17+
*
18+
* `exhaust` subscribes to an Observable that emits Observables, also known as a
19+
* higher-order Observable. Each time it observes one of these emitted inner
20+
* Observables, the output Observable begins emitting the items emitted by that
21+
* inner Observable. So far, it behaves like {@link mergeAll}. However,
22+
* `exhaust` ignores every new inner Observable if the previous Observable has
23+
* not yet completed. Once that one completes, it will accept and flatten the
24+
* next inner Observable and repeat this process.
25+
*
26+
* @example <caption>Run a finite timer for each click, only if there is no currently active timer</caption>
27+
* var clicks = Rx.Observable.fromEvent(document, 'click');
28+
* var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(5));
29+
* var result = higherOrder.exhaust();
30+
* result.subscribe(x => console.log(x));
31+
*
32+
* @see {@link combineAll}
33+
* @see {@link concatAll}
34+
* @see {@link switch}
35+
* @see {@link mergeAll}
36+
* @see {@link exhaustMap}
37+
* @see {@link zipAll}
38+
*
39+
* @return {Observable} An Observable that takes a source of Observables and propagates the first observable
40+
* exclusively until it completes before subscribing to the next.
41+
* @method exhaust
42+
* @owner Observable
43+
*/
44+
export function exhaust<T>(): MonoTypeOperatorFunction<T> {
45+
return (source: Observable<T>) => source.lift(new SwitchFirstOperator<T>());
46+
}
47+
48+
class SwitchFirstOperator<T> implements Operator<T, T> {
49+
call(subscriber: Subscriber<T>, source: any): TeardownLogic {
50+
return source.subscribe(new SwitchFirstSubscriber(subscriber));
51+
}
52+
}
53+
54+
/**
55+
* We need this JSDoc comment for affecting ESDoc.
56+
* @ignore
57+
* @extends {Ignored}
58+
*/
59+
class SwitchFirstSubscriber<T> extends OuterSubscriber<T, T> {
60+
private hasCompleted: boolean = false;
61+
private hasSubscription: boolean = false;
62+
63+
constructor(destination: Subscriber<T>) {
64+
super(destination);
65+
}
66+
67+
protected _next(value: T): void {
68+
if (!this.hasSubscription) {
69+
this.hasSubscription = true;
70+
this.add(subscribeToResult(this, value));
71+
}
72+
}
73+
74+
protected _complete(): void {
75+
this.hasCompleted = true;
76+
if (!this.hasSubscription) {
77+
this.destination.complete();
78+
}
79+
}
80+
81+
notifyComplete(innerSub: Subscription): void {
82+
this.remove(innerSub);
83+
this.hasSubscription = false;
84+
if (this.hasCompleted) {
85+
this.destination.complete();
86+
}
87+
}
88+
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export { distinctUntilChanged } from './distinctUntilChanged';
2121
export { distinctUntilKeyChanged } from './distinctUntilKeyChanged';
2222
export { elementAt } from './elementAt';
2323
export { every } from './every';
24+
export { exhaust } from './exhaust';
2425
export { filter } from './filter';
2526
export { ignoreElements } from './ignoreElements';
2627
export { map } from './map';

0 commit comments

Comments
 (0)