diff --git a/src/operator/defaultIfEmpty.ts b/src/operator/defaultIfEmpty.ts index cb33ff6b4e..7c0ef12cac 100644 --- a/src/operator/defaultIfEmpty.ts +++ b/src/operator/defaultIfEmpty.ts @@ -1,6 +1,6 @@ -import { Operator } from '../Operator'; + import { Observable } from '../Observable'; -import { Subscriber } from '../Subscriber'; +import { defaultIfEmpty as higherOrder } from '../operators'; /* tslint:disable:max-line-length */ export function defaultIfEmpty(this: Observable, defaultValue?: T): Observable; @@ -38,40 +38,5 @@ export function defaultIfEmpty(this: Observable, defaultValue?: R): Obs * @owner Observable */ export function defaultIfEmpty(this: Observable, defaultValue: R = null): Observable { - return this.lift(new DefaultIfEmptyOperator(defaultValue)); -} - -class DefaultIfEmptyOperator implements Operator { - - constructor(private defaultValue: R) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new DefaultIfEmptySubscriber(subscriber, this.defaultValue)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class DefaultIfEmptySubscriber extends Subscriber { - private isEmpty: boolean = true; - - constructor(destination: Subscriber, private defaultValue: R) { - super(destination); - } - - protected _next(value: T): void { - this.isEmpty = false; - this.destination.next(value); - } - - protected _complete(): void { - if (this.isEmpty) { - this.destination.next(this.defaultValue); - } - this.destination.complete(); - } + return higherOrder(defaultValue)(this); } diff --git a/src/operator/max.ts b/src/operator/max.ts index eedcd7b97a..240d3d3b45 100644 --- a/src/operator/max.ts +++ b/src/operator/max.ts @@ -1,5 +1,5 @@ import { Observable } from '../Observable'; -import { ReduceOperator } from './reduce'; +import { max as higherOrderMax } from '../operators'; /** * The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), @@ -33,8 +33,5 @@ import { ReduceOperator } from './reduce'; * @owner Observable */ export function max(this: Observable, comparer?: (x: T, y: T) => number): Observable { - const max: (x: T, y: T) => T = (typeof comparer === 'function') - ? (x, y) => comparer(x, y) > 0 ? x : y - : (x, y) => x > y ? x : y; - return this.lift(new ReduceOperator(max)); + return higherOrderMax(comparer)(this); } diff --git a/src/operator/min.ts b/src/operator/min.ts index 997fb5dd3f..27eb2e331f 100644 --- a/src/operator/min.ts +++ b/src/operator/min.ts @@ -1,5 +1,5 @@ import { Observable } from '../Observable'; -import { ReduceOperator } from './reduce'; +import { min as higherOrderMin } from '../operators'; /** * The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), @@ -33,8 +33,5 @@ import { ReduceOperator } from './reduce'; * @owner Observable */ export function min(this: Observable, comparer?: (x: T, y: T) => number): Observable { - const min: (x: T, y: T) => T = (typeof comparer === 'function') - ? (x, y) => comparer(x, y) < 0 ? x : y - : (x, y) => x < y ? x : y; - return this.lift(new ReduceOperator(min)); + return higherOrderMin(comparer)(this); } diff --git a/src/operator/reduce.ts b/src/operator/reduce.ts index 9cbb638781..53d202c5b0 100644 --- a/src/operator/reduce.ts +++ b/src/operator/reduce.ts @@ -1,6 +1,5 @@ import { Observable } from '../Observable'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; +import { reduce as higherOrderReduce } from '../operators'; /* tslint:disable:max-line-length */ export function reduce(this: Observable, accumulator: (acc: T, value: T, index: number) => T, seed?: T): Observable; @@ -53,73 +52,14 @@ export function reduce(this: Observable, accumulator: (acc: R, value: T * @owner Observable */ export function reduce(this: Observable, accumulator: (acc: R, value: T, index?: number) => R, seed?: R): Observable { - let hasSeed = false; // providing a seed of `undefined` *should* be valid and trigger // hasSeed! so don't use `seed !== undefined` checks! // For this reason, we have to check it here at the original call site // otherwise inside Operator/Subscriber we won't know if `undefined` // means they didn't provide anything or if they literally provided `undefined` if (arguments.length >= 2) { - hasSeed = true; + return higherOrderReduce(accumulator, seed)(this); } - return this.lift(new ReduceOperator(accumulator, seed, hasSeed)); -} - -export class ReduceOperator implements Operator { - constructor(private accumulator: (acc: R, value: T, index?: number) => R, private seed?: R, private hasSeed: boolean = false) {} - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new ReduceSubscriber(subscriber, this.accumulator, this.seed, this.hasSeed)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class ReduceSubscriber extends Subscriber { - private index: number = 0; - private acc: T | R; - private hasValue: boolean = false; - - constructor(destination: Subscriber, - private accumulator: (acc: R, value: T, index?: number) => R, - seed: R, - private hasSeed: boolean) { - super(destination); - this.acc = seed; - - if (!this.hasSeed) { - this.index++; - } - } - - protected _next(value: T) { - if (this.hasValue || (this.hasValue = this.hasSeed)) { - this._tryReduce(value); - } else { - this.acc = value; - this.hasValue = true; - } - } - - private _tryReduce(value: T) { - let result: any; - try { - result = this.accumulator(this.acc, value, this.index++); - } catch (err) { - this.destination.error(err); - return; - } - this.acc = result; - } - - protected _complete() { - if (this.hasValue || this.hasSeed) { - this.destination.next(this.acc); - } - this.destination.complete(); - } + return higherOrderReduce(accumulator)(this); } diff --git a/src/operators/defaultIfEmpty.ts b/src/operators/defaultIfEmpty.ts new file mode 100644 index 0000000000..4d78419359 --- /dev/null +++ b/src/operators/defaultIfEmpty.ts @@ -0,0 +1,80 @@ +import { Operator } from '../Operator'; +import { Observable } from '../Observable'; +import { Subscriber } from '../Subscriber'; +import { OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function defaultIfEmpty(defaultValue?: T): OperatorFunction; +export function defaultIfEmpty(defaultValue?: R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Emits a given value if the source Observable completes without emitting any + * `next` value, otherwise mirrors the source Observable. + * + * If the source Observable turns out to be empty, then + * this operator will emit a default value. + * + * + * + * `defaultIfEmpty` emits the values emitted by the source Observable or a + * specified default value if the source Observable is empty (completes without + * having emitted any `next` value). + * + * @example If no clicks happen in 5 seconds, then emit "no clicks" + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var clicksBeforeFive = clicks.takeUntil(Rx.Observable.interval(5000)); + * var result = clicksBeforeFive.defaultIfEmpty('no clicks'); + * result.subscribe(x => console.log(x)); + * + * @see {@link empty} + * @see {@link last} + * + * @param {any} [defaultValue=null] The default value used if the source + * Observable is empty. + * @return {Observable} An Observable that emits either the specified + * `defaultValue` if the source Observable emits no items, or the values emitted + * by the source Observable. + * @method defaultIfEmpty + * @owner Observable + */ +export function defaultIfEmpty(defaultValue: R = null): OperatorFunction { + return function defaultIfEmptyOperatorFunction(source: Observable) { + return source.lift(new DefaultIfEmptyOperator(defaultValue)); + }; +} + +class DefaultIfEmptyOperator implements Operator { + + constructor(private defaultValue: R) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new DefaultIfEmptySubscriber(subscriber, this.defaultValue)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class DefaultIfEmptySubscriber extends Subscriber { + private isEmpty: boolean = true; + + constructor(destination: Subscriber, private defaultValue: R) { + super(destination); + } + + protected _next(value: T): void { + this.isEmpty = false; + this.destination.next(value); + } + + protected _complete(): void { + if (this.isEmpty) { + this.destination.next(this.defaultValue); + } + this.destination.complete(); + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 245d6ff706..c852eaaf21 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -1,7 +1,11 @@ export { concatMap } from './concatMap'; +export { defaultIfEmpty } from './defaultIfEmpty'; export { filter } from './filter'; export { map } from './map'; +export { max } from './max'; export { mergeMap } from './mergeMap'; +export { min } from './min'; +export { reduce } from './reduce'; export { scan } from './scan'; export { switchMap } from './switchMap'; export { takeLast } from './takeLast'; diff --git a/src/operators/max.ts b/src/operators/max.ts new file mode 100644 index 0000000000..c3693fc577 --- /dev/null +++ b/src/operators/max.ts @@ -0,0 +1,41 @@ +import { reduce } from './reduce'; +import { OperatorFunction } from '../interfaces'; + +/** + * The Max operator operates on an Observable that emits numbers (or items that can be compared with a provided function), + * and when source Observable completes it emits a single item: the item with the largest value. + * + * + * + * @example Get the maximal value of a series of numbers + * Rx.Observable.of(5, 4, 7, 2, 8) + * .max() + * .subscribe(x => console.log(x)); // -> 8 + * + * @example Use a comparer function to get the maximal item + * interface Person { + * age: number, + * name: string + * } + * Observable.of({age: 7, name: 'Foo'}, + * {age: 5, name: 'Bar'}, + * {age: 9, name: 'Beer'}) + * .max((a: Person, b: Person) => a.age < b.age ? -1 : 1) + * .subscribe((x: Person) => console.log(x.name)); // -> 'Beer' + * } + * + * @see {@link min} + * + * @param {Function} [comparer] - Optional comparer function that it will use instead of its default to compare the + * value of two items. + * @return {Observable} An Observable that emits item with the largest value. + * @method max + * @owner Observable + */ +export function max(comparer?: (x: T, y: T) => number): OperatorFunction { + const max: (x: T, y: T) => T = (typeof comparer === 'function') + ? (x, y) => comparer(x, y) > 0 ? x : y + : (x, y) => x > y ? x : y; + + return reduce(max); +} diff --git a/src/operators/min.ts b/src/operators/min.ts new file mode 100644 index 0000000000..75e9f58ac6 --- /dev/null +++ b/src/operators/min.ts @@ -0,0 +1,40 @@ +import { reduce } from './reduce'; +import { OperatorFunction } from '../interfaces'; + +/** + * The Min operator operates on an Observable that emits numbers (or items that can be compared with a provided function), + * and when source Observable completes it emits a single item: the item with the smallest value. + * + * + * + * @example Get the minimal value of a series of numbers + * Rx.Observable.of(5, 4, 7, 2, 8) + * .min() + * .subscribe(x => console.log(x)); // -> 2 + * + * @example Use a comparer function to get the minimal item + * interface Person { + * age: number, + * name: string + * } + * Observable.of({age: 7, name: 'Foo'}, + * {age: 5, name: 'Bar'}, + * {age: 9, name: 'Beer'}) + * .min( (a: Person, b: Person) => a.age < b.age ? -1 : 1) + * .subscribe((x: Person) => console.log(x.name)); // -> 'Bar' + * } + * + * @see {@link max} + * + * @param {Function} [comparer] - Optional comparer function that it will use instead of its default to compare the + * value of two items. + * @return {Observable} An Observable that emits item with the smallest value. + * @method min + * @owner Observable + */ +export function min(comparer?: (x: T, y: T) => number): OperatorFunction { + const min: (x: T, y: T) => T = (typeof comparer === 'function') + ? (x, y) => comparer(x, y) < 0 ? x : y + : (x, y) => x < y ? x : y; + return reduce(min); +} diff --git a/src/operators/reduce.ts b/src/operators/reduce.ts new file mode 100644 index 0000000000..bb05114d6f --- /dev/null +++ b/src/operators/reduce.ts @@ -0,0 +1,74 @@ +import { Observable } from '../Observable'; +import { scan } from './scan'; +import { takeLast } from './takeLast'; +import { defaultIfEmpty } from './defaultIfEmpty'; +import { OperatorFunction } from '../interfaces'; +import { compose } from '../util/compose'; + +/* tslint:disable:max-line-length */ +export function reduce(accumulator: (acc: T, value: T, index: number) => T, seed?: T): OperatorFunction; +export function reduce(accumulator: (acc: T[], value: T, index: number) => T[], seed: T[]): OperatorFunction; +export function reduce(accumulator: (acc: R, value: T, index: number) => R, seed?: R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Applies an accumulator function over the source Observable, and returns the + * accumulated result when the source completes, given an optional seed value. + * + * Combines together all values emitted on the source, + * using an accumulator function that knows how to join a new source value into + * the accumulation from the past. + * + * + * + * Like + * [Array.prototype.reduce()](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce), + * `reduce` applies an `accumulator` function against an accumulation and each + * value of the source Observable (from the past) to reduce it to a single + * value, emitted on the output Observable. Note that `reduce` will only emit + * one value, only when the source Observable completes. It is equivalent to + * applying operator {@link scan} followed by operator {@link last}. + * + * Returns an Observable that applies a specified `accumulator` function to each + * item emitted by the source Observable. If a `seed` value is specified, then + * that value will be used as the initial value for the accumulator. If no seed + * value is specified, the first item of the source is used as the seed. + * + * @example Count the number of click events that happened in 5 seconds + * var clicksInFiveSeconds = Rx.Observable.fromEvent(document, 'click') + * .takeUntil(Rx.Observable.interval(5000)); + * var ones = clicksInFiveSeconds.mapTo(1); + * var seed = 0; + * var count = ones.reduce((acc, one) => acc + one, seed); + * count.subscribe(x => console.log(x)); + * + * @see {@link count} + * @see {@link expand} + * @see {@link mergeScan} + * @see {@link scan} + * + * @param {function(acc: R, value: T, index: number): R} accumulator The accumulator function + * called on each source value. + * @param {R} [seed] The initial accumulation value. + * @return {Observable} An Observable that emits a single value that is the + * result of accumulating the values emitted by the source Observable. + * @method reduce + * @owner Observable + */ +export function reduce(accumulator: (acc: R, value: T, index?: number) => R, seed?: R): OperatorFunction { + // providing a seed of `undefined` *should* be valid and trigger + // hasSeed! so don't use `seed !== undefined` checks! + // For this reason, we have to check it here at the original call site + // otherwise inside Operator/Subscriber we won't know if `undefined` + // means they didn't provide anything or if they literally provided `undefined` + if (arguments.length >= 2) { + return function reduceOperatorFunctionWithSeed(source: Observable): Observable { + return compose(scan(accumulator, seed), takeLast(1), defaultIfEmpty(seed))(source); + }; + } + return function reduceOperatorFunction(source: Observable): Observable { + return compose(scan((acc, value, index) => { + return accumulator(acc, value, index + 1); + }), takeLast(1))(source); + }; +}