diff --git a/src/operator/exhaust.ts b/src/operator/exhaust.ts index c06675ca82..29c4154435 100644 --- a/src/operator/exhaust.ts +++ b/src/operator/exhaust.ts @@ -1,9 +1,6 @@ -import { Operator } from '../Operator'; + import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription, TeardownLogic } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { exhaust as higherOrder } from '../operators'; /** * Converts a higher-order Observable into a first-order Observable by dropping @@ -41,47 +38,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function exhaust(this: Observable): Observable { - return this.lift(new SwitchFirstOperator()); -} - -class SwitchFirstOperator implements Operator { - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new SwitchFirstSubscriber(subscriber)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class SwitchFirstSubscriber extends OuterSubscriber { - private hasCompleted: boolean = false; - private hasSubscription: boolean = false; - - constructor(destination: Subscriber) { - super(destination); - } - - protected _next(value: T): void { - if (!this.hasSubscription) { - this.hasSubscription = true; - this.add(subscribeToResult(this, value)); - } - } - - protected _complete(): void { - this.hasCompleted = true; - if (!this.hasSubscription) { - this.destination.complete(); - } - } - - notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); - this.hasSubscription = false; - if (this.hasCompleted) { - this.destination.complete(); - } - } + return higherOrder()(this); } diff --git a/src/operator/exhaustMap.ts b/src/operator/exhaustMap.ts index be1f74710b..111a5a151c 100644 --- a/src/operator/exhaustMap.ts +++ b/src/operator/exhaustMap.ts @@ -1,10 +1,6 @@ -import { Operator } from '../Operator'; + import { Observable, ObservableInput } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { exhaustMap as higherOrder } from '../operators'; /* tslint:disable:max-line-length */ export function exhaustMap(this: Observable, project: (value: T, index: number) => ObservableInput): Observable; @@ -58,92 +54,5 @@ export function exhaustMap(this: Observable, project: (value: T, ind */ export function exhaustMap(this: Observable, project: (value: T, index: number) => ObservableInput, resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable { - return this.lift(new SwitchFirstMapOperator(project, resultSelector)); -} - -class SwitchFirstMapOperator implements Operator { - constructor(private project: (value: T, index: number) => ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new SwitchFirstMapSubscriber(subscriber, this.project, this.resultSelector)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class SwitchFirstMapSubscriber extends OuterSubscriber { - private hasSubscription: boolean = false; - private hasCompleted: boolean = false; - private index: number = 0; - - constructor(destination: Subscriber, - private project: (value: T, index: number) => ObservableInput, - private resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R) { - super(destination); - } - - protected _next(value: T): void { - if (!this.hasSubscription) { - this.tryNext(value); - } - } - - private tryNext(value: T): void { - const index = this.index++; - const destination = this.destination; - try { - const result = this.project(value, index); - this.hasSubscription = true; - this.add(subscribeToResult(this, result, value, index)); - } catch (err) { - destination.error(err); - } - } - - protected _complete(): void { - this.hasCompleted = true; - if (!this.hasSubscription) { - this.destination.complete(); - } - } - - notifyNext(outerValue: T, innerValue: I, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - const { resultSelector, destination } = this; - if (resultSelector) { - this.trySelectResult(outerValue, innerValue, outerIndex, innerIndex); - } else { - destination.next(innerValue); - } - } - - private trySelectResult(outerValue: T, innerValue: I, - outerIndex: number, innerIndex: number): void { - const { resultSelector, destination } = this; - try { - const result = resultSelector(outerValue, innerValue, outerIndex, innerIndex); - destination.next(result); - } catch (err) { - destination.error(err); - } - } - - notifyError(err: any): void { - this.destination.error(err); - } - - notifyComplete(innerSub: Subscription): void { - this.remove(innerSub); - - this.hasSubscription = false; - if (this.hasCompleted) { - this.destination.complete(); - } - } + return higherOrder(project, resultSelector)(this); } diff --git a/src/operators/exhaust.ts b/src/operators/exhaust.ts new file mode 100644 index 0000000000..d684516490 --- /dev/null +++ b/src/operators/exhaust.ts @@ -0,0 +1,88 @@ +import { Operator } from '../Operator'; +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { Subscription, TeardownLogic } from '../Subscription'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Converts a higher-order Observable into a first-order Observable by dropping + * inner Observables while the previous inner Observable has not yet completed. + * + * Flattens an Observable-of-Observables by dropping the + * next inner Observables while the current inner is still executing. + * + * + * + * `exhaust` subscribes to an Observable that emits Observables, also known as a + * higher-order Observable. Each time it observes one of these emitted inner + * Observables, the output Observable begins emitting the items emitted by that + * inner Observable. So far, it behaves like {@link mergeAll}. However, + * `exhaust` ignores every new inner Observable if the previous Observable has + * not yet completed. Once that one completes, it will accept and flatten the + * next inner Observable and repeat this process. + * + * @example Run a finite timer for each click, only if there is no currently active timer + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var higherOrder = clicks.map((ev) => Rx.Observable.interval(1000).take(5)); + * var result = higherOrder.exhaust(); + * result.subscribe(x => console.log(x)); + * + * @see {@link combineAll} + * @see {@link concatAll} + * @see {@link switch} + * @see {@link mergeAll} + * @see {@link exhaustMap} + * @see {@link zipAll} + * + * @return {Observable} An Observable that takes a source of Observables and propagates the first observable + * exclusively until it completes before subscribing to the next. + * @method exhaust + * @owner Observable + */ +export function exhaust(): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new SwitchFirstOperator()); +} + +class SwitchFirstOperator implements Operator { + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new SwitchFirstSubscriber(subscriber)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SwitchFirstSubscriber extends OuterSubscriber { + private hasCompleted: boolean = false; + private hasSubscription: boolean = false; + + constructor(destination: Subscriber) { + super(destination); + } + + protected _next(value: T): void { + if (!this.hasSubscription) { + this.hasSubscription = true; + this.add(subscribeToResult(this, value)); + } + } + + protected _complete(): void { + this.hasCompleted = true; + if (!this.hasSubscription) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription): void { + this.remove(innerSub); + this.hasSubscription = false; + if (this.hasCompleted) { + this.destination.complete(); + } + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 6c8297cc23..07d2637091 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -21,6 +21,7 @@ export { distinctUntilChanged } from './distinctUntilChanged'; export { distinctUntilKeyChanged } from './distinctUntilKeyChanged'; export { elementAt } from './elementAt'; export { every } from './every'; +export { exhaust } from './exhaust'; export { filter } from './filter'; export { ignoreElements } from './ignoreElements'; export { map } from './map';