Skip to content

Commit

Permalink
feat(concatMapTo): add higher-order lettable version of concatMapTo
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Jul 25, 2017
1 parent 73d30cd commit 0a6672e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
4 changes: 2 additions & 2 deletions src/operator/concatMapTo.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Observable, ObservableInput } from '../Observable';
import { MergeMapToOperator } from './mergeMapTo';
import { concatMapTo as higherOrder } from '../operators';

/* tslint:disable:max-line-length */
export function concatMapTo<T, R>(this: Observable<T>, observable: ObservableInput<R>): Observable<R>;
Expand Down Expand Up @@ -64,5 +64,5 @@ export function concatMapTo<T, I, R>(this: Observable<T>, observable: Observable
*/
export function concatMapTo<T, I, R>(this: Observable<T>, innerObservable: Observable<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): Observable<R> {
return this.lift(new MergeMapToOperator(innerObservable, resultSelector, 1));
return higherOrder(innerObservable, resultSelector)(this);
}
71 changes: 71 additions & 0 deletions src/operators/concatMapTo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import { Observable, ObservableInput } from '../Observable';
import { concatMap } from './concatMap';
import { OperatorFunction } from '../interfaces';

/* tslint:disable:max-line-length */
export function concatMapTo<T, R>(observable: ObservableInput<R>): OperatorFunction<T, R>;
export function concatMapTo<T, I, R>(observable: ObservableInput<I>, resultSelector: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R): OperatorFunction<T, R>;
/* tslint:enable:max-line-length */

/**
* Projects each source value to the same Observable which is merged multiple
* times in a serialized fashion on the output Observable.
*
* <span class="informal">It's like {@link concatMap}, but maps each value
* always to the same inner Observable.</span>
*
* <img src="./img/concatMapTo.png" width="100%">
*
* Maps each source value to the given Observable `innerObservable` regardless
* of the source value, and then flattens those resulting Observables into one
* single Observable, which is the output Observable. Each new `innerObservable`
* instance emitted on the output Observable is concatenated with the previous
* `innerObservable` instance.
*
* __Warning:__ if source values arrive endlessly and faster than their
* corresponding inner Observables can complete, it will result in memory issues
* as inner Observables amass in an unbounded buffer waiting for their turn to
* be subscribed to.
*
* Note: `concatMapTo` is equivalent to `mergeMapTo` with concurrency parameter
* set to `1`.
*
* @example <caption>For each click event, tick every second from 0 to 3, with no concurrency</caption>
* var clicks = Rx.Observable.fromEvent(document, 'click');
* var result = clicks.concatMapTo(Rx.Observable.interval(1000).take(4));
* result.subscribe(x => console.log(x));
*
* // Results in the following:
* // (results are not concurrent)
* // For every click on the "document" it will emit values 0 to 3 spaced
* // on a 1000ms interval
* // one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3
*
* @see {@link concat}
* @see {@link concatAll}
* @see {@link concatMap}
* @see {@link mergeMapTo}
* @see {@link switchMapTo}
*
* @param {ObservableInput} innerObservable An Observable to replace each value from
* the source Observable.
* @param {function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any} [resultSelector]
* A function to produce the value on the output Observable based on the values
* and the indices of the source (outer) emission and the inner Observable
* emission. The arguments passed to this function are:
* - `outerValue`: the value that came from the source
* - `innerValue`: the value that came from the projected Observable
* - `outerIndex`: the "index" of the value that came from the source
* - `innerIndex`: the "index" of the value from the projected Observable
* @return {Observable} An observable of values merged together by joining the
* passed observable with itself, one after the other, for each value emitted
* from the source.
* @method concatMapTo
* @owner Observable
*/
export function concatMapTo<T, I, R>(
innerObservable: Observable<I>,
resultSelector?: (outerValue: T, innerValue: I, outerIndex: number, innerIndex: number) => R
): OperatorFunction<T, R> {
return concatMap(() => innerObservable, resultSelector);
}
1 change: 1 addition & 0 deletions src/operators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export { catchError } from './catchError';
export { concat } from './concat';
export { concatAll } from './concatAll';
export { concatMap } from './concatMap';
export { concatMapTo } from './concatMapTo';
export { defaultIfEmpty } from './defaultIfEmpty';
export { dematerialize } from './dematerialize';
export { filter } from './filter';
Expand Down

0 comments on commit 0a6672e

Please sign in to comment.