From b7154f23ed89ce46fcde86378933111d186e0676 Mon Sep 17 00:00:00 2001 From: benlesh Date: Mon, 4 Sep 2017 14:05:18 -0700 Subject: [PATCH] feat(combineLatest): add higher-order lettable version of combineLatest --- src/observable/combineLatest.ts | 2 +- src/operator/combineAll.ts | 2 +- src/operator/combineLatest.ts | 107 +------------------- src/operators/combineAll.ts | 0 src/operators/combineLatest.ts | 169 ++++++++++++++++++++++++++++++++ src/operators/index.ts | 1 + 6 files changed, 175 insertions(+), 106 deletions(-) create mode 100644 src/operators/combineAll.ts create mode 100644 src/operators/combineLatest.ts diff --git a/src/observable/combineLatest.ts b/src/observable/combineLatest.ts index 095510c047..217644768e 100644 --- a/src/observable/combineLatest.ts +++ b/src/observable/combineLatest.ts @@ -3,7 +3,7 @@ import { IScheduler } from '../Scheduler'; import { isScheduler } from '../util/isScheduler'; import { isArray } from '../util/isArray'; import { ArrayObservable } from './ArrayObservable'; -import { CombineLatestOperator } from '../operator/combineLatest'; +import { CombineLatestOperator } from '../operators/combineLatest'; /* tslint:disable:max-line-length */ export function combineLatest(v1: ObservableInput, v2: ObservableInput, scheduler?: IScheduler): Observable<[T, T2]>; diff --git a/src/operator/combineAll.ts b/src/operator/combineAll.ts index 353289e806..28835bb147 100644 --- a/src/operator/combineAll.ts +++ b/src/operator/combineAll.ts @@ -1,4 +1,4 @@ -import { CombineLatestOperator } from './combineLatest'; +import { CombineLatestOperator } from '../operators/combineLatest'; import { Observable } from '../Observable'; /** diff --git a/src/operator/combineLatest.ts b/src/operator/combineLatest.ts index 7025c54791..4fd5401397 100644 --- a/src/operator/combineLatest.ts +++ b/src/operator/combineLatest.ts @@ -1,12 +1,5 @@ import { Observable, ObservableInput } from '../Observable'; -import { ArrayObservable } from '../observable/ArrayObservable'; -import { isArray } from '../util/isArray'; -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; -const none = {}; +import { combineLatest as higherOrder } from '../operators'; /* tslint:disable:max-line-length */ export function combineLatest(this: Observable, project: (v1: T) => R): Observable; @@ -71,99 +64,5 @@ export function combineLatest(this: Observable, array: Observab export function combineLatest(this: Observable, ...observables: Array | Array> | ((...values: Array) => R)>): Observable { - let project: (...values: Array) => R = null; - if (typeof observables[observables.length - 1] === 'function') { - project = <(...values: Array) => R>observables.pop(); - } - - // if the first and only other argument besides the resultSelector is an array - // assume it's been called with `combineLatest([obs1, obs2, obs3], project)` - if (observables.length === 1 && isArray(observables[0])) { - observables = (observables[0]).slice(); - } - - observables.unshift(this); - - return this.lift.call(new ArrayObservable(observables), new CombineLatestOperator(project)); -} - -export class CombineLatestOperator implements Operator { - constructor(private project?: (...values: Array) => R) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new CombineLatestSubscriber(subscriber, this.project)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -export class CombineLatestSubscriber extends OuterSubscriber { - private active: number = 0; - private values: any[] = []; - private observables: any[] = []; - private toRespond: number; - - constructor(destination: Subscriber, private project?: (...values: Array) => R) { - super(destination); - } - - protected _next(observable: any) { - this.values.push(none); - this.observables.push(observable); - } - - protected _complete() { - const observables = this.observables; - const len = observables.length; - if (len === 0) { - this.destination.complete(); - } else { - this.active = len; - this.toRespond = len; - for (let i = 0; i < len; i++) { - const observable = observables[i]; - this.add(subscribeToResult(this, observable, observable, i)); - } - } - } - - notifyComplete(unused: Subscriber): void { - if ((this.active -= 1) === 0) { - this.destination.complete(); - } - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - const values = this.values; - const oldVal = values[outerIndex]; - const toRespond = !this.toRespond - ? 0 - : oldVal === none ? --this.toRespond : this.toRespond; - values[outerIndex] = innerValue; - - if (toRespond === 0) { - if (this.project) { - this._tryProject(values); - } else { - this.destination.next(values.slice()); - } - } - } - - private _tryProject(values: any[]) { - let result: any; - try { - result = this.project.apply(this, values); - } catch (err) { - this.destination.error(err); - return; - } - this.destination.next(result); - } -} + return higherOrder(...observables)(this); +} \ No newline at end of file diff --git a/src/operators/combineAll.ts b/src/operators/combineAll.ts new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/operators/combineLatest.ts b/src/operators/combineLatest.ts new file mode 100644 index 0000000000..823bd35d9f --- /dev/null +++ b/src/operators/combineLatest.ts @@ -0,0 +1,169 @@ +import { Observable, ObservableInput } from '../Observable'; +import { ArrayObservable } from '../observable/ArrayObservable'; +import { isArray } from '../util/isArray'; +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { OperatorFunction } from '../interfaces'; + +const none = {}; + +/* tslint:disable:max-line-length */ +export function combineLatest(project: (v1: T) => R): OperatorFunction; +export function combineLatest(v2: ObservableInput, project: (v1: T, v2: T2) => R): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput, project: (v1: T, v2: T2, v3: T3) => R): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4) => R): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => R): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => R): OperatorFunction ; +export function combineLatest(v2: ObservableInput): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput): OperatorFunction; +export function combineLatest(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): OperatorFunction ; +export function combineLatest(...observables: Array | ((...values: Array) => R)>): OperatorFunction; +export function combineLatest(array: ObservableInput[]): OperatorFunction>; +export function combineLatest(array: ObservableInput[], project: (v1: T, ...values: Array) => R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Combines multiple Observables to create an Observable whose values are + * calculated from the latest values of each of its input Observables. + * + * Whenever any input Observable emits a value, it + * computes a formula using the latest values from all the inputs, then emits + * the output of that formula. + * + * + * + * `combineLatest` combines the values from this Observable with values from + * Observables passed as arguments. This is done by subscribing to each + * Observable, in order, and collecting an array of each of the most recent + * values any time any of the input Observables emits, then either taking that + * array and passing it as arguments to an optional `project` function and + * emitting the return value of that, or just emitting the array of recent + * values directly if there is no `project` function. + * + * @example Dynamically calculate the Body-Mass Index from an Observable of weight and one for height + * var weight = Rx.Observable.of(70, 72, 76, 79, 75); + * var height = Rx.Observable.of(1.76, 1.77, 1.78); + * var bmi = weight.combineLatest(height, (w, h) => w / (h * h)); + * bmi.subscribe(x => console.log('BMI is ' + x)); + * + * // With output to console: + * // BMI is 24.212293388429753 + * // BMI is 23.93948099205209 + * // BMI is 23.671253629592222 + * + * @see {@link combineAll} + * @see {@link merge} + * @see {@link withLatestFrom} + * + * @param {ObservableInput} other An input Observable to combine with the source + * Observable. More than one input Observables may be given as argument. + * @param {function} [project] An optional function to project the values from + * the combined latest values into a new value on the output Observable. + * @return {Observable} An Observable of projected values from the most recent + * values from each input Observable, or an array of the most recent values from + * each input Observable. + * @method combineLatest + * @owner Observable + */ +export function combineLatest(...observables: Array | + Array> | + ((...values: Array) => R)>): OperatorFunction { + let project: (...values: Array) => R = null; + if (typeof observables[observables.length - 1] === 'function') { + project = <(...values: Array) => R>observables.pop(); + } + + // if the first and only other argument besides the resultSelector is an array + // assume it's been called with `combineLatest([obs1, obs2, obs3], project)` + if (observables.length === 1 && isArray(observables[0])) { + observables = (observables[0]).slice(); + } + + return (source: Observable) => source.lift.call(new ArrayObservable([source, ...observables]), new CombineLatestOperator(project)); +} + +export class CombineLatestOperator implements Operator { + constructor(private project?: (...values: Array) => R) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new CombineLatestSubscriber(subscriber, this.project)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +export class CombineLatestSubscriber extends OuterSubscriber { + private active: number = 0; + private values: any[] = []; + private observables: any[] = []; + private toRespond: number; + + constructor(destination: Subscriber, private project?: (...values: Array) => R) { + super(destination); + } + + protected _next(observable: any) { + this.values.push(none); + this.observables.push(observable); + } + + protected _complete() { + const observables = this.observables; + const len = observables.length; + if (len === 0) { + this.destination.complete(); + } else { + this.active = len; + this.toRespond = len; + for (let i = 0; i < len; i++) { + const observable = observables[i]; + this.add(subscribeToResult(this, observable, observable, i)); + } + } + } + + notifyComplete(unused: Subscriber): void { + if ((this.active -= 1) === 0) { + this.destination.complete(); + } + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + const values = this.values; + const oldVal = values[outerIndex]; + const toRespond = !this.toRespond + ? 0 + : oldVal === none ? --this.toRespond : this.toRespond; + values[outerIndex] = innerValue; + + if (toRespond === 0) { + if (this.project) { + this._tryProject(values); + } else { + this.destination.next(values.slice()); + } + } + } + + private _tryProject(values: any[]) { + let result: any; + try { + result = this.project.apply(this, values); + } catch (err) { + this.destination.error(err); + return; + } + this.destination.next(result); + } +} diff --git a/src/operators/index.ts b/src/operators/index.ts index 1d9ef31314..750a37c029 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -6,6 +6,7 @@ export { bufferTime } from './bufferTime'; export { bufferToggle } from './bufferToggle'; export { bufferWhen } from './bufferWhen'; export { catchError } from './catchError'; +export { combineLatest } from './combineLatest'; export { concat } from './concat'; export { concatAll } from './concatAll'; export { concatMap } from './concatMap';