-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(takeWhile): add higher-order lettable version of takeWhile
- Loading branch information
Showing
3 changed files
with
92 additions
and
48 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
import { Operator } from '../Operator'; | ||
import { Observable } from '../Observable'; | ||
import { Subscriber } from '../Subscriber'; | ||
import { TeardownLogic } from '../Subscription'; | ||
import { MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/** | ||
* Emits values emitted by the source Observable so long as each value satisfies | ||
* the given `predicate`, and then completes as soon as this `predicate` is not | ||
* satisfied. | ||
* | ||
* <span class="informal">Takes values from the source only while they pass the | ||
* condition given. When the first value does not satisfy, it completes.</span> | ||
* | ||
* <img src="./img/takeWhile.png" width="100%"> | ||
* | ||
* `takeWhile` subscribes and begins mirroring the source Observable. Each value | ||
* emitted on the source is given to the `predicate` function which returns a | ||
* boolean, representing a condition to be satisfied by the source values. The | ||
* output Observable emits the source values until such time as the `predicate` | ||
* returns false, at which point `takeWhile` stops mirroring the source | ||
* Observable and completes the output Observable. | ||
* | ||
* @example <caption>Emit click events only while the clientX property is greater than 200</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var result = clicks.takeWhile(ev => ev.clientX > 200); | ||
* result.subscribe(x => console.log(x)); | ||
* | ||
* @see {@link take} | ||
* @see {@link takeLast} | ||
* @see {@link takeUntil} | ||
* @see {@link skip} | ||
* | ||
* @param {function(value: T, index: number): boolean} predicate A function that | ||
* evaluates a value emitted by the source Observable and returns a boolean. | ||
* Also takes the (zero-based) index as the second argument. | ||
* @return {Observable<T>} An Observable that emits the values from the source | ||
* Observable so long as each value satisfies the condition defined by the | ||
* `predicate`, then completes. | ||
* @method takeWhile | ||
* @owner Observable | ||
*/ | ||
export function takeWhile<T>(predicate: (value: T, index: number) => boolean): MonoTypeOperatorFunction<T> { | ||
return (source: Observable<T>) => source.lift(new TakeWhileOperator(predicate)); | ||
} | ||
|
||
class TakeWhileOperator<T> implements Operator<T, T> { | ||
constructor(private predicate: (value: T, index: number) => boolean) { | ||
} | ||
|
||
call(subscriber: Subscriber<T>, source: any): TeardownLogic { | ||
return source.subscribe(new TakeWhileSubscriber(subscriber, this.predicate)); | ||
} | ||
} | ||
|
||
/** | ||
* We need this JSDoc comment for affecting ESDoc. | ||
* @ignore | ||
* @extends {Ignored} | ||
*/ | ||
class TakeWhileSubscriber<T> extends Subscriber<T> { | ||
private index: number = 0; | ||
|
||
constructor(destination: Subscriber<T>, | ||
private predicate: (value: T, index: number) => boolean) { | ||
super(destination); | ||
} | ||
|
||
protected _next(value: T): void { | ||
const destination = this.destination; | ||
let result: boolean; | ||
try { | ||
result = this.predicate(value, this.index++); | ||
} catch (err) { | ||
destination.error(err); | ||
return; | ||
} | ||
this.nextOrComplete(value, result); | ||
} | ||
|
||
private nextOrComplete(value: T, predicateResult: boolean): void { | ||
const destination = this.destination; | ||
if (Boolean(predicateResult)) { | ||
destination.next(value); | ||
} else { | ||
destination.complete(); | ||
} | ||
} | ||
} |