diff --git a/src/operator/withLatestFrom.ts b/src/operator/withLatestFrom.ts index 718b831e9b..49bffc9732 100644 --- a/src/operator/withLatestFrom.ts +++ b/src/operator/withLatestFrom.ts @@ -1,9 +1,5 @@ -import { Operator } from '../Operator'; -import { Subscriber } from '../Subscriber'; import { Observable, ObservableInput } from '../Observable'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; -import { subscribeToResult } from '../util/subscribeToResult'; +import { withLatestFrom as higherOrder } from '../operators/withLatestFrom'; /* tslint:disable:max-line-length */ export function withLatestFrom(this: Observable, project: (v1: T) => R): Observable; @@ -61,86 +57,5 @@ export function withLatestFrom(this: Observable, array: ObservableInput * @owner Observable */ export function withLatestFrom(this: Observable, ...args: Array | ((...values: Array) => R)>): Observable { - let project: any; - if (typeof args[args.length - 1] === 'function') { - project = args.pop(); - } - const observables = []>args; - return this.lift(new WithLatestFromOperator(observables, project)); -} - -class WithLatestFromOperator implements Operator { - constructor(private observables: Observable[], - private project?: (...values: any[]) => Observable) { - } - - call(subscriber: Subscriber, source: any): any { - return source.subscribe(new WithLatestFromSubscriber(subscriber, this.observables, this.project)); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class WithLatestFromSubscriber extends OuterSubscriber { - private values: any[]; - private toRespond: number[] = []; - - constructor(destination: Subscriber, - private observables: Observable[], - private project?: (...values: any[]) => Observable) { - super(destination); - const len = observables.length; - this.values = new Array(len); - - for (let i = 0; i < len; i++) { - this.toRespond.push(i); - } - - for (let i = 0; i < len; i++) { - let observable = observables[i]; - this.add(subscribeToResult(this, observable, observable, i)); - } - } - - notifyNext(outerValue: T, innerValue: R, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.values[outerIndex] = innerValue; - const toRespond = this.toRespond; - if (toRespond.length > 0) { - const found = toRespond.indexOf(outerIndex); - if (found !== -1) { - toRespond.splice(found, 1); - } - } - } - - notifyComplete() { - // noop - } - - protected _next(value: T) { - if (this.toRespond.length === 0) { - const args = [value, ...this.values]; - if (this.project) { - this._tryProject(args); - } else { - this.destination.next(args); - } - } - } - - private _tryProject(args: any[]) { - let result: any; - try { - result = this.project.apply(this, args); - } catch (err) { - this.destination.error(err); - return; - } - this.destination.next(result); - } + return higherOrder(...args)(this); } diff --git a/src/operators/index.ts b/src/operators/index.ts index 4f7ab53876..5b8f1b1a23 100644 --- a/src/operators/index.ts +++ b/src/operators/index.ts @@ -78,4 +78,5 @@ export { windowCount } from './windowCount'; export { windowTime } from './windowTime'; export { windowToggle } from './windowToggle'; export { windowWhen } from './windowWhen'; +export { withLatestFrom } from './withLatestFrom'; export { zip } from './zip'; diff --git a/src/operators/withLatestFrom.ts b/src/operators/withLatestFrom.ts new file mode 100644 index 0000000000..0c387515e7 --- /dev/null +++ b/src/operators/withLatestFrom.ts @@ -0,0 +1,149 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { Observable, ObservableInput } from '../Observable'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { OperatorFunction } from '../interfaces'; + +/* tslint:disable:max-line-length */ +export function withLatestFrom(project: (v1: T) => R): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, project: (v1: T, v2: T2) => R): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput, project: (v1: T, v2: T2, v3: T3) => R): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4) => R): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => R): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => R): OperatorFunction ; +export function withLatestFrom(v2: ObservableInput): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput): OperatorFunction; +export function withLatestFrom(v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): OperatorFunction ; +export function withLatestFrom(...observables: Array | ((...values: Array) => R)>): OperatorFunction; +export function withLatestFrom(array: ObservableInput[]): OperatorFunction; +export function withLatestFrom(array: ObservableInput[], project: (...values: Array) => R): OperatorFunction; +/* tslint:enable:max-line-length */ + +/** + * Combines the source Observable with other Observables to create an Observable + * whose values are calculated from the latest values of each, only when the + * source emits. + * + * Whenever the source Observable emits a value, it + * computes a formula using that value plus the latest values from other input + * Observables, then emits the output of that formula. + * + * + * + * `withLatestFrom` combines each value from the source Observable (the + * instance) with the latest values from the other input Observables only when + * the source emits a value, optionally using a `project` function to determine + * the value to be emitted on the output Observable. All input Observables must + * emit at least one value before the output Observable will emit a value. + * + * @example On every click event, emit an array with the latest timer event plus the click event + * var clicks = Rx.Observable.fromEvent(document, 'click'); + * var timer = Rx.Observable.interval(1000); + * var result = clicks.withLatestFrom(timer); + * result.subscribe(x => console.log(x)); + * + * @see {@link combineLatest} + * + * @param {ObservableInput} other An input Observable to combine with the source + * Observable. More than one input Observables may be given as argument. + * @param {Function} [project] Projection function for combining values + * together. Receives all values in order of the Observables passed, where the + * first parameter is a value from the source Observable. (e.g. + * `a.withLatestFrom(b, c, (a1, b1, c1) => a1 + b1 + c1)`). If this is not + * passed, arrays will be emitted on the output Observable. + * @return {Observable} An Observable of projected values from the most recent + * values from each input Observable, or an array of the most recent values from + * each input Observable. + * @method withLatestFrom + * @owner Observable + */ +export function withLatestFrom(...args: Array | ((...values: Array) => R)>): OperatorFunction { + return (source: Observable) => { + let project: any; + if (typeof args[args.length - 1] === 'function') { + project = args.pop(); + } + const observables = []>args; + return source.lift(new WithLatestFromOperator(observables, project)); + }; +} + +class WithLatestFromOperator implements Operator { + constructor(private observables: Observable[], + private project?: (...values: any[]) => Observable) { + } + + call(subscriber: Subscriber, source: any): any { + return source.subscribe(new WithLatestFromSubscriber(subscriber, this.observables, this.project)); + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class WithLatestFromSubscriber extends OuterSubscriber { + private values: any[]; + private toRespond: number[] = []; + + constructor(destination: Subscriber, + private observables: Observable[], + private project?: (...values: any[]) => Observable) { + super(destination); + const len = observables.length; + this.values = new Array(len); + + for (let i = 0; i < len; i++) { + this.toRespond.push(i); + } + + for (let i = 0; i < len; i++) { + let observable = observables[i]; + this.add(subscribeToResult(this, observable, observable, i)); + } + } + + notifyNext(outerValue: T, innerValue: R, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.values[outerIndex] = innerValue; + const toRespond = this.toRespond; + if (toRespond.length > 0) { + const found = toRespond.indexOf(outerIndex); + if (found !== -1) { + toRespond.splice(found, 1); + } + } + } + + notifyComplete() { + // noop + } + + protected _next(value: T) { + if (this.toRespond.length === 0) { + const args = [value, ...this.values]; + if (this.project) { + this._tryProject(args); + } else { + this.destination.next(args); + } + } + } + + private _tryProject(args: any[]) { + let result: any; + try { + result = this.project.apply(this, args); + } catch (err) { + this.destination.error(err); + return; + } + this.destination.next(result); + } +}