-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(merge): add higher-order lettable version of merge (#2809)
- Loading branch information
Showing
3 changed files
with
128 additions
and
100 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import { Observable, ObservableInput } from '../Observable'; | ||
import { IScheduler } from '../Scheduler'; | ||
import { ArrayObservable } from '../observable/ArrayObservable'; | ||
import { mergeAll } from './mergeAll'; | ||
import { isScheduler } from '../util/isScheduler'; | ||
import { OperatorFunction, MonoTypeOperatorFunction } from '../interfaces'; | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function merge<T>(scheduler?: IScheduler): MonoTypeOperatorFunction<T>; | ||
export function merge<T>(concurrent?: number, scheduler?: IScheduler): MonoTypeOperatorFunction<T>; | ||
export function merge<T, T2>(v2: ObservableInput<T2>, scheduler?: IScheduler): OperatorFunction<T, T | T2>; | ||
export function merge<T, T2>(v2: ObservableInput<T2>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2>; | ||
export function merge<T, T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3>; | ||
export function merge<T, T2, T3>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3>; | ||
export function merge<T, T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4>; | ||
export function merge<T, T2, T3, T4>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4>; | ||
export function merge<T, T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5>; | ||
export function merge<T, T2, T3, T4, T5>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5>; | ||
export function merge<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>; | ||
export function merge<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent?: number, scheduler?: IScheduler): OperatorFunction<T, T | T2 | T3 | T4 | T5 | T6>; | ||
export function merge<T>(...observables: Array<ObservableInput<T> | IScheduler | number>): MonoTypeOperatorFunction<T>; | ||
export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): OperatorFunction<T, R>; | ||
/* tslint:enable:max-line-length */ | ||
|
||
export function merge<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): OperatorFunction<T, R> { | ||
return (source: Observable<T>) => source.lift.call(mergeStatic(source, ...observables)); | ||
} | ||
|
||
/* tslint:disable:max-line-length */ | ||
export function mergeStatic<T>(v1: ObservableInput<T>, scheduler?: IScheduler): Observable<T>; | ||
export function mergeStatic<T>(v1: ObservableInput<T>, concurrent?: number, scheduler?: IScheduler): Observable<T>; | ||
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, scheduler?: IScheduler): Observable<T | T2>; | ||
export function mergeStatic<T, T2>(v1: ObservableInput<T>, v2: ObservableInput<T2>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2>; | ||
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, scheduler?: IScheduler): Observable<T | T2 | T3>; | ||
export function mergeStatic<T, T2, T3>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3>; | ||
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>; | ||
export function mergeStatic<T, T2, T3, T4>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4>; | ||
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>; | ||
export function mergeStatic<T, T2, T3, T4, T5>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5>; | ||
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>; | ||
export function mergeStatic<T, T2, T3, T4, T5, T6>(v1: ObservableInput<T>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>, concurrent?: number, scheduler?: IScheduler): Observable<T | T2 | T3 | T4 | T5 | T6>; | ||
export function mergeStatic<T>(...observables: (ObservableInput<T> | IScheduler | number)[]): Observable<T>; | ||
export function mergeStatic<T, R>(...observables: (ObservableInput<any> | IScheduler | number)[]): Observable<R>; | ||
/* tslint:enable:max-line-length */ | ||
/** | ||
* Creates an output Observable which concurrently emits all values from every | ||
* given input Observable. | ||
* | ||
* <span class="informal">Flattens multiple Observables together by blending | ||
* their values into one Observable.</span> | ||
* | ||
* <img src="./img/merge.png" width="100%"> | ||
* | ||
* `merge` subscribes to each given input Observable (as arguments), and simply | ||
* forwards (without doing any transformation) all the values from all the input | ||
* Observables to the output Observable. The output Observable only completes | ||
* once all input Observables have completed. Any error delivered by an input | ||
* Observable will be immediately emitted on the output Observable. | ||
* | ||
* @example <caption>Merge together two Observables: 1s interval and clicks</caption> | ||
* var clicks = Rx.Observable.fromEvent(document, 'click'); | ||
* var timer = Rx.Observable.interval(1000); | ||
* var clicksOrTimer = Rx.Observable.merge(clicks, timer); | ||
* clicksOrTimer.subscribe(x => console.log(x)); | ||
* | ||
* // Results in the following: | ||
* // timer will emit ascending values, one every second(1000ms) to console | ||
* // clicks logs MouseEvents to console everytime the "document" is clicked | ||
* // Since the two streams are merged you see these happening | ||
* // as they occur. | ||
* | ||
* @example <caption>Merge together 3 Observables, but only 2 run concurrently</caption> | ||
* var timer1 = Rx.Observable.interval(1000).take(10); | ||
* var timer2 = Rx.Observable.interval(2000).take(6); | ||
* var timer3 = Rx.Observable.interval(500).take(10); | ||
* var concurrent = 2; // the argument | ||
* var merged = Rx.Observable.merge(timer1, timer2, timer3, concurrent); | ||
* merged.subscribe(x => console.log(x)); | ||
* | ||
* // Results in the following: | ||
* // - First timer1 and timer2 will run concurrently | ||
* // - timer1 will emit a value every 1000ms for 10 iterations | ||
* // - timer2 will emit a value every 2000ms for 6 iterations | ||
* // - after timer1 hits it's max iteration, timer2 will | ||
* // continue, and timer3 will start to run concurrently with timer2 | ||
* // - when timer2 hits it's max iteration it terminates, and | ||
* // timer3 will continue to emit a value every 500ms until it is complete | ||
* | ||
* @see {@link mergeAll} | ||
* @see {@link mergeMap} | ||
* @see {@link mergeMapTo} | ||
* @see {@link mergeScan} | ||
* | ||
* @param {...ObservableInput} observables Input Observables to merge together. | ||
* @param {number} [concurrent=Number.POSITIVE_INFINITY] Maximum number of input | ||
* Observables being subscribed to concurrently. | ||
* @param {Scheduler} [scheduler=null] The IScheduler to use for managing | ||
* concurrency of input Observables. | ||
* @return {Observable} an Observable that emits items that are the result of | ||
* every input Observable. | ||
* @static true | ||
* @name merge | ||
* @owner Observable | ||
*/ | ||
export function mergeStatic<T, R>(...observables: Array<ObservableInput<any> | IScheduler | number>): Observable<R> { | ||
let concurrent = Number.POSITIVE_INFINITY; | ||
let scheduler: IScheduler = null; | ||
let last: any = observables[observables.length - 1]; | ||
if (isScheduler(last)) { | ||
scheduler = <IScheduler>observables.pop(); | ||
if (observables.length > 1 && typeof observables[observables.length - 1] === 'number') { | ||
concurrent = <number>observables.pop(); | ||
} | ||
} else if (typeof last === 'number') { | ||
concurrent = <number>observables.pop(); | ||
} | ||
|
||
if (scheduler === null && observables.length === 1 && observables[0] instanceof Observable) { | ||
return <Observable<R>>observables[0]; | ||
} | ||
|
||
return mergeAll(concurrent)(new ArrayObservable(<any>observables, scheduler)); | ||
} |