From 0b732085000f4181c9b10afa1ee4de8104a9338d Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 26 Jun 2017 08:21:42 -0700 Subject: [PATCH] feat(windowWhen): add higher-order lettable version of windowWhen --- src/operator/windowWhen.ts | 100 +------------------------- src/operators/index.ts | 1 + src/operators/windowWhen.ts | 140 ++++++++++++++++++++++++++++++++++++ 3 files changed, 144 insertions(+), 97 deletions(-) create mode 100644 src/operators/windowWhen.ts diff --git a/src/operator/windowWhen.ts b/src/operator/windowWhen.ts index 8c45194da7..9839cd9087 100644 --- a/src/operator/windowWhen.ts +++ b/src/operator/windowWhen.ts @@ -1,15 +1,6 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { Observable } from '../Observable'; -import { Subject } from '../Subject'; -import { Subscription } from '../Subscription'; - -import { tryCatch } from '../util/tryCatch'; -import { errorObject } from '../util/errorObject'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { Observable } from '../Observable'; +import { windowWhen as higherOrder } from '../operators'; /** * Branch out the source Observable values as a nested Observable using a @@ -50,90 +41,5 @@ import { subscribeToResult } from '../util/subscribeToResult'; * @owner Observable */ export function windowWhen(this: Observable, closingSelector: () => Observable): Observable> { - return this.lift(new WindowOperator(closingSelector)); -} - -class WindowOperator implements Operator> { - constructor(private closingSelector: () => Observable) { - } - - call(subscriber: Subscriber>, source: any): any { - return source.subscribe(new WindowSubscriber(subscriber, this.closingSelector)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class WindowSubscriber extends OuterSubscriber { - private window: Subject; - private closingNotification: Subscription; - - constructor(protected destination: Subscriber>, - private closingSelector: () => Observable) { - super(destination); - this.openWindow(); - } - - notifyNext(outerValue: T, innerValue: any, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.openWindow(innerSub); - } - - notifyError(error: any, innerSub: InnerSubscriber): void { - this._error(error); - } - - notifyComplete(innerSub: InnerSubscriber): void { - this.openWindow(innerSub); - } - - protected _next(value: T): void { - this.window.next(value); - } - - protected _error(err: any): void { - this.window.error(err); - this.destination.error(err); - this.unsubscribeClosingNotification(); - } - - protected _complete(): void { - this.window.complete(); - this.destination.complete(); - this.unsubscribeClosingNotification(); - } - - private unsubscribeClosingNotification(): void { - if (this.closingNotification) { - this.closingNotification.unsubscribe(); - } - } - - private openWindow(innerSub: InnerSubscriber = null): void { - if (innerSub) { - this.remove(innerSub); - innerSub.unsubscribe(); - } - - const prevWindow = this.window; - if (prevWindow) { - prevWindow.complete(); - } - - const window = this.window = new Subject(); - this.destination.next(window); - - const closingNotifier = tryCatch(this.closingSelector)(); - if (closingNotifier === errorObject) { - const err = errorObject.e; - this.destination.error(err); - this.window.error(err); - } else { - this.add(this.closingNotification = subscribeToResult(this, closingNotifier)); - } - } + return higherOrder(closingSelector)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index ba9aba97cd..5f60956cd1 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -26,3 +26,4 @@ export { window } from './window'; export { windowCount } from './windowCount'; export { windowTime } from './windowTime'; export { windowToggle } from './windowToggle'; +export { windowWhen } from './windowWhen'; diff --git a/src/operators/windowWhen.ts b/src/operators/windowWhen.ts new file mode 100644 index 0000000000..3b629044b9 --- /dev/null +++ b/src/operators/windowWhen.ts @@ -0,0 +1,140 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable } from '../Observable'; +import { Subject } from '../Subject'; +import { Subscription } from '../Subscription'; +import { tryCatch } from '../util/tryCatch'; +import { errorObject } from '../util/errorObject'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { OperatorFunction } from '../interfaces'; + +/** + * Branch out the source Observable values as a nested Observable using a + * factory function of closing Observables to determine when to start a new + * window. + * + * It's like {@link bufferWhen}, but emits a nested + * Observable instead of an array. + * + * + * + * Returns an Observable that emits windows of items it collects from the source + * Observable. The output Observable emits connected, non-overlapping windows. + * It emits the current window and opens a new one whenever the Observable + * produced by the specified `closingSelector` function emits an item. The first + * window is opened immediately when subscribing to the output Observable. + * + * @example Emit only the first two clicks events in every window of [1-5] random seconds + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks + * .windowWhen(() => Rx.Observable.interval(1000 + Math.random() * 4000)) + * .map(win => win.take(2)) // each window has at most 2 emissions + * .mergeAll(); // flatten the Observable-of-Observables + * result.subscribe(x => console.log(x)); + * + * @see {@link window} + * @see {@link windowCount} + * @see {@link windowTime} + * @see {@link windowToggle} + * @see {@link bufferWhen} + * + * @param {function(): Observable} closingSelector A function that takes no + * arguments and returns an Observable that signals (on either `next` or + * `complete`) when to close the previous window and start a new one. + * @return {Observable>} An observable of windows, which in turn + * are Observables. + * @method windowWhen + * @owner Observable + */ +export function windowWhen(closingSelector: () => Observable): OperatorFunction> { + return function windowWhenOperatorFunction(source: Observable) { + return source.lift(new WindowOperator(closingSelector)); + }; +} + +class WindowOperator implements Operator> { + constructor(private closingSelector: () => Observable) { + } + + call(subscriber: Subscriber>, source: any): any { + return source.subscribe(new WindowSubscriber(subscriber, this.closingSelector)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class WindowSubscriber extends OuterSubscriber { + private window: Subject; + private closingNotification: Subscription; + + constructor(protected destination: Subscriber>, + private closingSelector: () => Observable) { + super(destination); + this.openWindow(); + } + + notifyNext(outerValue: T, innerValue: any, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.openWindow(innerSub); + } + + notifyError(error: any, innerSub: InnerSubscriber): void { + this._error(error); + } + + notifyComplete(innerSub: InnerSubscriber): void { + this.openWindow(innerSub); + } + + protected _next(value: T): void { + this.window.next(value); + } + + protected _error(err: any): void { + this.window.error(err); + this.destination.error(err); + this.unsubscribeClosingNotification(); + } + + protected _complete(): void { + this.window.complete(); + this.destination.complete(); + this.unsubscribeClosingNotification(); + } + + private unsubscribeClosingNotification(): void { + if (this.closingNotification) { + this.closingNotification.unsubscribe(); + } + } + + private openWindow(innerSub: InnerSubscriber = null): void { + if (innerSub) { + this.remove(innerSub); + innerSub.unsubscribe(); + } + + const prevWindow = this.window; + if (prevWindow) { + prevWindow.complete(); + } + + const window = this.window = new Subject(); + this.destination.next(window); + + const closingNotifier = tryCatch(this.closingSelector)(); + if (closingNotifier === errorObject) { + const err = errorObject.e; + this.destination.error(err); + this.window.error(err); + } else { + this.add(this.closingNotification = subscribeToResult(this, closingNotifier)); + } + } +}