diff --git a/src/operator/take.ts b/src/operator/take.ts index 22153e10a7..c622c55713 100644 --- a/src/operator/take.ts +++ b/src/operator/take.ts @@ -1,9 +1,5 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; -import { EmptyObservable } from '../observable/EmptyObservable'; import { Observable } from '../Observable'; -import { TeardownLogic } from '../Subscription'; +import { take as higherOrder } from '../operators/take'; /** * Emits only the first `count` values emitted by the source Observable. @@ -39,46 +35,5 @@ import { TeardownLogic } from '../Subscription'; * @owner Observable */ export function take(this: Observable, count: number): Observable { - if (count === 0) { - return new EmptyObservable(); - } else { - return this.lift(new TakeOperator(count)); - } -} - -class TakeOperator implements Operator { - constructor(private total: number) { - if (this.total < 0) { - throw new ArgumentOutOfRangeError; - } - } - - call(subscriber: Subscriber, source: any): TeardownLogic { - return source.subscribe(new TakeSubscriber(subscriber, this.total)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class TakeSubscriber extends Subscriber { - private count: number = 0; - - constructor(destination: Subscriber, private total: number) { - super(destination); - } - - protected _next(value: T): void { - const total = this.total; - const count = ++this.count; - if (count <= total) { - this.destination.next(value); - if (count === total) { - this.destination.complete(); - this.unsubscribe(); - } - } - } + return higherOrder(count)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index df149921ff..174e493419 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -69,6 +69,7 @@ export { subscribeOn } from './subscribeOn'; export { switchAll } from './switchAll'; export { switchMap } from './switchMap'; export { switchMapTo } from './switchMapTo'; +export { take } from './take'; export { takeLast } from './takeLast'; export { tap } from './tap'; export { timestamp } from './timestamp'; diff --git a/src/operators/take.ts b/src/operators/take.ts new file mode 100644 index 0000000000..7f42a58341 --- /dev/null +++ b/src/operators/take.ts @@ -0,0 +1,87 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; +import { EmptyObservable } from '../observable/EmptyObservable'; +import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; +import { MonoTypeOperatorFunction } from '../interfaces'; + +/** + * Emits only the first `count` values emitted by the source Observable. + * + * Takes the first `count` values from the source, then + * completes. + * + * + * + * `take` returns an Observable that emits only the first `count` values emitted + * by the source Observable. If the source emits fewer than `count` values then + * all of its values are emitted. After that, it completes, regardless if the + * source completes. + * + * @example Take the first 5 seconds of an infinite 1-second interval Observable + * var interval = Rx.Observable.interval(1000); + * var five = interval.take(5); + * five.subscribe(x => console.log(x)); + * + * @see {@link takeLast} + * @see {@link takeUntil} + * @see {@link takeWhile} + * @see {@link skip} + * + * @throws {ArgumentOutOfRangeError} When using `take(i)`, it delivers an + * ArgumentOutOrRangeError to the Observer's `error` callback if `i < 0`. + * + * @param {number} count The maximum number of `next` values to emit. + * @return {Observable} An Observable that emits only the first `count` + * values emitted by the source Observable, or all of the values from the source + * if the source emits fewer than `count` values. + * @method take + * @owner Observable + */ +export function take(count: number): MonoTypeOperatorFunction { + return (source: Observable) => { + if (count === 0) { + return new EmptyObservable(); + } else { + return source.lift(new TakeOperator(count)); + } + }; +} + +class TakeOperator implements Operator { + constructor(private total: number) { + if (this.total < 0) { + throw new ArgumentOutOfRangeError; + } + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + return source.subscribe(new TakeSubscriber(subscriber, this.total)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class TakeSubscriber extends Subscriber { + private count: number = 0; + + constructor(destination: Subscriber, private total: number) { + super(destination); + } + + protected _next(value: T): void { + const total = this.total; + const count = ++this.count; + if (count <= total) { + this.destination.next(value); + if (count === total) { + this.destination.complete(); + this.unsubscribe(); + } + } + } +}