diff --git a/src/operator/concatMapTo.ts b/src/operator/concatMapTo.ts index 649aaf188e..7a904740b2 100644 --- a/src/operator/concatMapTo.ts +++ b/src/operator/concatMapTo.ts @@ -1,5 +1,5 @@ import { Observable, ObservableInput } from '../Observable'; -import { MergeMapToOperator } from './mergeMapTo'; +import { concatMapTo as higherOrder } from '../operators'; /* tslint:disable:max-line-length */ export function concatMapTo(this: Observable, observable: ObservableInput): Observable; @@ -64,5 +64,5 @@ export function concatMapTo(this: Observable, observable: Observable */ export function concatMapTo(this: Observable, innerObservable: Observable, resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable { - return this.lift(new MergeMapToOperator(innerObservable, resultSelector, 1)); + return higherOrder(innerObservable, resultSelector)(this); } diff --git a/src/operators/concatMapTo.ts b/src/operators/concatMapTo.ts new file mode 100644 index 0000000000..e92af2495e --- /dev/null +++ b/src/operators/concatMapTo.ts @@ -0,0 +1,71 @@ +import { Observable, ObservableInput } from '../Observable'; +import { concatMap } from './concatMap'; +import { OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function concatMapTo(observable: ObservableInput): OperatorFunction; +export function concatMapTo(observable: ObservableInput, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Projects each source value to the same Observable which is merged multiple + * times in a serialized fashion on the output Observable. + * + * It's like {@link concatMap}, but maps each value + * always to the same inner Observable. + * + * + * + * Maps each source value to the given Observable `innerObservable` regardless + * of the source value, and then flattens those resulting Observables into one + * single Observable, which is the output Observable. Each new `innerObservable` + * instance emitted on the output Observable is concatenated with the previous + * `innerObservable` instance. + * + * __Warning:__ if source values arrive endlessly and faster than their + * corresponding inner Observables can complete, it will result in memory issues + * as inner Observables amass in an unbounded buffer waiting for their turn to + * be subscribed to. + * + * Note: `concatMapTo` is equivalent to `mergeMapTo` with concurrency parameter + * set to `1`. + * + * @example For each click event, tick every second from 0 to 3, with no concurrency + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4)); + * result.subscribe(x => console.log(x)); + * + * // Results in the following: + * // (results are not concurrent) + * // For every click on the "document" it will emit values 0 to 3 spaced + * // on a 1000ms interval + * // one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3 + * + * @see {@link concat} + * @see {@link concatAll} + * @see {@link concatMap} + * @see {@link mergeMapTo} + * @see {@link switchMapTo} + * + * @param {ObservableInput} innerObservable An Observable to replace each value from + * the source Observable. + * @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector] + * A function to produce the value on the output Observable based on the values + * and the indices of the source (outer) emission and the inner Observable + * emission. The arguments passed to this function are: + * - `outerValue`: the value that came from the source + * - `innerValue`: the value that came from the projected Observable + * - `outerIndex`: the "index" of the value that came from the source + * - `innerIndex`: the "index" of the value from the projected Observable + * @return {Observable} An observable of values merged together by joining the + * passed observable with itself, one after the other, for each value emitted + * from the source. + * @method concatMapTo + * @owner Observable + */ +export function concatMapTo( + innerObservable: Observable, + resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R +): OperatorFunction { + return concatMap(() => innerObservable, resultSelector); +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 63299418bc..9135e84224 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -9,6 +9,7 @@ export { catchError } from './catchError'; export { concat } from './concat'; export { concatAll } from './concatAll'; export { concatMap } from './concatMap'; +export { concatMapTo } from './concatMapTo'; export { defaultIfEmpty } from './defaultIfEmpty'; export { dematerialize } from './dematerialize'; export { filter } from './filter';