From f86c862624b0784d7c984ebda173cb09c111948b Mon Sep 17 00:00:00 2001 From: Jason Aden Date: Thu, 7 Sep 2017 12:37:46 -0700 Subject: [PATCH] feat(takeWhile): add higher-order lettable version of takeWhile --- src/operator/takeWhile.ts | 50 +-------------------- src/operators/index.ts | 1 + src/operators/takeWhile.ts | 89 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 48 deletions(-) create mode 100644 src/operators/takeWhile.ts diff --git a/src/operator/takeWhile.ts b/src/operator/takeWhile.ts index a2a45b50c7..f17c01a59c 100644 --- a/src/operator/takeWhile.ts +++ b/src/operator/takeWhile.ts @@ -1,7 +1,5 @@ -import { Operator } from '../Operator'; import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { TeardownLogic } from '../Subscription'; +import { takeWhile as higherOrder } from '../operators/takeWhile'; /** * Emits values emitted by the source Observable so long as each value satisfies @@ -40,49 +38,5 @@ import { TeardownLogic } from '../Subscription'; * @owner Observable */ export function takeWhile(this: Observable, predicate: (value: T, index: number) => boolean): Observable { - return this.lift(new TakeWhileOperator(predicate)); -} - -class TakeWhileOperator implements Operator { - constructor(private predicate: (value: T, index: number) => boolean) { - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new TakeWhileSubscriber(subscriber, this.predicate)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class TakeWhileSubscriber extends Subscriber { - private index: number = 0; - - constructor(destination: Subscriber, - private predicate: (value: T, index: number) => boolean) { - super(destination); - } - - protected _next(value: T): void { - const destination = this.destination; - let result: boolean; - try { - result = this.predicate(value, this.index++); - } catch (err) { - destination.error(err); - return; - } - this.nextOrComplete(value, result); - } - - private nextOrComplete(value: T, predicateResult: boolean): void { - const destination = this.destination; - if (Boolean(predicateResult)) { - destination.next(value); - } else { - destination.complete(); - } - } + return higherOrder(predicate)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 76f5dcc37a..50cae12899 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -72,6 +72,7 @@ export { switchMapTo } from './switchMapTo'; export { take } from './take'; export { takeLast } from './takeLast'; export { takeUntil } from './takeUntil'; +export { takeWhile } from './takeWhile'; export { tap } from './tap'; export { timestamp } from './timestamp'; export { toArray } from './toArray'; diff --git a/src/operators/takeWhile.ts b/src/operators/takeWhile.ts new file mode 100644 index 0000000000..529b9f69db --- /dev/null +++ b/src/operators/takeWhile.ts @@ -0,0 +1,89 @@ +import { Operator } from '../Operator'; +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { TeardownLogic } from '../Subscription'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Emits values emitted by the source Observable so long as each value satisfies + * the given `predicate`, and then completes as soon as this `predicate` is not + * satisfied. + * + * Takes values from the source only while they pass the + * condition given. When the first value does not satisfy, it completes. + * + * + * + * `takeWhile` subscribes and begins mirroring the source Observable. Each value + * emitted on the source is given to the `predicate` function which returns a + * boolean, representing a condition to be satisfied by the source values. The + * output Observable emits the source values until such time as the `predicate` + * returns false, at which point `takeWhile` stops mirroring the source + * Observable and completes the output Observable. + * + * @example Emit click events only while the clientX property is greater than 200 + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.takeWhile(ev => ev.clientX > 200); + * result.subscribe(x => console.log(x)); + * + * @see {@link take} + * @see {@link takeLast} + * @see {@link takeUntil} + * @see {@link skip} + * + * @param {function(value: T, index: number): boolean} predicate A function that + * evaluates a value emitted by the source Observable and returns a boolean. + * Also takes the (zero-based) index as the second argument. + * @return {Observable} An Observable that emits the values from the source + * Observable so long as each value satisfies the condition defined by the + * `predicate`, then completes. + * @method takeWhile + * @owner Observable + */ +export function takeWhile(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction { + return (source: Observable) => source.lift(new TakeWhileOperator(predicate)); +} + +class TakeWhileOperator implements Operator { + constructor(private predicate: (value: T, index: number) => boolean) { + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new TakeWhileSubscriber(subscriber, this.predicate)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class TakeWhileSubscriber extends Subscriber { + private index: number = 0; + + constructor(destination: Subscriber, + private predicate: (value: T, index: number) => boolean) { + super(destination); + } + + protected _next(value: T): void { + const destination = this.destination; + let result: boolean; + try { + result = this.predicate(value, this.index++); + } catch (err) { + destination.error(err); + return; + } + this.nextOrComplete(value, result); + } + + private nextOrComplete(value: T, predicateResult: boolean): void { + const destination = this.destination; + if (Boolean(predicateResult)) { + destination.next(value); + } else { + destination.complete(); + } + } +}