From 2280e83f3ca9442c677c329022b638d951f5d42b Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 21:11:13 -0800 Subject: [PATCH 1/6] refactor(forkJoin): eliminate ForkJoinObservable - deletes ForkJoinObservable - implements forkJoin as just a function - Still uses the same ForkJoinSubscriber as an optimization - Improves some of the internal typings for (slightly) better type safety --- spec/observables/forkJoin-spec.ts | 136 +++++----- src/internal/InnerSubscriber.ts | 5 +- src/internal/observable/ForkJoinObservable.ts | 239 ------------------ src/internal/observable/forkJoin.ts | 223 +++++++++++++++- src/internal/observable/fromArray.ts | 2 +- 5 files changed, 293 insertions(+), 312 deletions(-) delete mode 100644 src/internal/observable/ForkJoinObservable.ts diff --git a/spec/observables/forkJoin-spec.ts b/spec/observables/forkJoin-spec.ts index e8f6647a74..304e5a1edd 100644 --- a/spec/observables/forkJoin-spec.ts +++ b/spec/observables/forkJoin-spec.ts @@ -1,20 +1,20 @@ import { expect } from 'chai'; -import * as Rx from '../../src/Rx'; +import { Observable } from '../../src'; +import { forkJoin, of } from '../../src/create'; import { lowerCaseO } from '../helpers/test-helper'; import marbleTestingSignature = require('../helpers/marble-testing'); // tslint:disable-line:no-require-imports -declare const {type, asDiagram}; +declare const type: any; +declare const asDiagram: any; declare const hot: typeof marbleTestingSignature.hot; declare const expectObservable: typeof marbleTestingSignature.expectObservable; declare const expectSubscriptions: typeof marbleTestingSignature.expectSubscriptions; -const Observable = Rx.Observable; - /** @test {forkJoin} */ -describe('Observable.forkJoin', () => { +describe('forkJoin', () => { asDiagram('forkJoin') ('should join the last values of the provided observables into an array', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('---a---b---c---d---|'), hot('-1---2---3---|') ); @@ -24,7 +24,7 @@ describe('Observable.forkJoin', () => { }); it('should join the last values of the provided observables into an array', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), hot('--1--2--3--|') @@ -35,7 +35,7 @@ describe('Observable.forkJoin', () => { }); it('should allow emit null or undefined', () => { - const e2 = Observable.forkJoin( + const e2 = forkJoin( hot('--a--b--c--d--|', { d: null }), hot('(b|)'), hot('--1--2--3--|'), @@ -47,11 +47,11 @@ describe('Observable.forkJoin', () => { }); it('should join the last values of the provided observables with selector', () => { - function selector(x, y, z) { + function selector(x: string, y: string, z: string) { return x + y + z; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), hot('--1--2--3--|'), @@ -63,7 +63,7 @@ describe('Observable.forkJoin', () => { }); it('should accept single observable', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|') ); const expected = '--------------(x|)'; @@ -72,7 +72,7 @@ describe('Observable.forkJoin', () => { }); it('should accept array of observable contains single', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( [hot('--a--b--c--d--|')] ); const expected = '--------------(x|)'; @@ -81,11 +81,11 @@ describe('Observable.forkJoin', () => { }); it('should accept single observable with selector', () => { - function selector(x) { + function selector(x: string) { return x + x; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), selector ); @@ -95,11 +95,11 @@ describe('Observable.forkJoin', () => { }); it('should accept array of observable contains single with selector', () => { - function selector(x) { + function selector(x: string) { return x + x; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( [hot('--a--b--c--d--|')], selector ); @@ -109,7 +109,7 @@ describe('Observable.forkJoin', () => { }); it('should accept lowercase-o observables', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), lowerCaseO('1', '2', '3') @@ -120,7 +120,7 @@ describe('Observable.forkJoin', () => { }); it('should accept empty lowercase-o observables', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), lowerCaseO() @@ -131,12 +131,12 @@ describe('Observable.forkJoin', () => { }); it('should accept promise', (done: MochaDone) => { - const e1 = Observable.forkJoin( - Observable.of(1), + const e1 = forkJoin( + of(1), Promise.resolve(2) ); - e1.subscribe((x: Array) => { + e1.subscribe((x: number[]) => { expect(x).to.deep.equal([1, 2]); }, (err: any) => { @@ -147,7 +147,7 @@ describe('Observable.forkJoin', () => { }); it('should accept array of observables', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( [hot('--a--b--c--d--|'), hot('(b|)'), hot('--1--2--3--|')] @@ -158,11 +158,11 @@ describe('Observable.forkJoin', () => { }); it('should accept array of observables with selector', () => { - function selector(x, y, z) { + function selector(x: string, y: string, z: string) { return x + y + z; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( [hot('--a--b--c--d--|'), hot('(b|)'), hot('--1--2--3--|')], @@ -174,7 +174,7 @@ describe('Observable.forkJoin', () => { }); it('should not emit if any of source observable is empty', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), hot('------------------|') @@ -185,7 +185,7 @@ describe('Observable.forkJoin', () => { }); it('should complete early if any of source is empty and completes before than others', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('(b|)'), hot('---------|') @@ -196,7 +196,7 @@ describe('Observable.forkJoin', () => { }); it('should complete when all sources are empty', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--------------|'), hot('---------|') ); @@ -206,7 +206,7 @@ describe('Observable.forkJoin', () => { }); it('should not complete when only source never completes', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--------------') ); const expected = '-'; @@ -215,7 +215,7 @@ describe('Observable.forkJoin', () => { }); it('should not complete when one of the sources never completes', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--------------'), hot('-a---b--c--|') ); @@ -225,7 +225,7 @@ describe('Observable.forkJoin', () => { }); it('should complete when one of the sources never completes but other completes without values', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--------------'), hot('------|') ); @@ -235,25 +235,25 @@ describe('Observable.forkJoin', () => { }); it('should complete if source is not provided', () => { - const e1 = Observable.forkJoin(); + const e1 = forkJoin(); const expected = '|'; expectObservable(e1).toBe(expected); }); it('should complete if sources list is empty', () => { - const e1 = Observable.forkJoin([]); + const e1 = forkJoin([]); const expected = '|'; expectObservable(e1).toBe(expected); }); it('should complete when any of source is empty with selector', () => { - function selector(x, y) { + function selector(x: string, y: string) { return x + y; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('---------|'), selector); @@ -263,11 +263,11 @@ describe('Observable.forkJoin', () => { }); it('should emit results by resultselector', () => { - function selector(x, y) { + function selector(x: string, y: string) { return x + y; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a--b--c--d--|'), hot('---2-----|'), selector); @@ -277,7 +277,7 @@ describe('Observable.forkJoin', () => { }); it('should raise error when any of source raises error with empty observable', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('------#'), hot('---------|')); const expected = '------#'; @@ -286,7 +286,7 @@ describe('Observable.forkJoin', () => { }); it('should raise error when any of source raises error with source that never completes', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('------#'), hot('----------')); const expected = '------#'; @@ -295,11 +295,11 @@ describe('Observable.forkJoin', () => { }); it('should raise error when any of source raises error with selector with empty observable', () => { - function selector(x, y) { + function selector(x: string, y: string) { return x + y; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('------#'), hot('---------|'), selector); @@ -309,7 +309,7 @@ describe('Observable.forkJoin', () => { }); it('should raise error when source raises error', () => { - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('------#'), hot('---a-----|')); const expected = '------#'; @@ -318,11 +318,11 @@ describe('Observable.forkJoin', () => { }); it('should raise error when source raises error with selector', () => { - function selector(x, y) { + function selector(x: string, y: string) { return x + y; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('------#'), hot('-------b-|'), selector); @@ -332,11 +332,11 @@ describe('Observable.forkJoin', () => { }); it('should raise error when the selector throws', () => { - function selector(x, y) { + function selector(x: string, y: string) { throw 'error'; } - const e1 = Observable.forkJoin( + const e1 = forkJoin( hot('--a-|'), hot('---b-|'), selector); @@ -353,7 +353,7 @@ describe('Observable.forkJoin', () => { const expected = '---------- '; const unsub = ' ! '; - const result = Observable.forkJoin(e1, e2); + const result = forkJoin(e1, e2); expectObservable(result, unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -367,7 +367,7 @@ describe('Observable.forkJoin', () => { const e2subs = '^ ! '; const expected = '---------# '; - const result = Observable.forkJoin(e1, e2); + const result = forkJoin(e1, e2); expectObservable(result).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(e1subs); @@ -379,61 +379,61 @@ describe('Observable.forkJoin', () => { let a: Promise; let b: Promise; let c: Promise; - let o1: Rx.Observable<[number, string, boolean]> = Observable.forkJoin(a, b, c); - let o2: Rx.Observable = Observable.forkJoin(a, b, c, (aa, bb, cc) => !!aa && !!bb && cc); + let o1: Observable<[number, string, boolean]> = forkJoin(a, b, c); + let o2: Observable = forkJoin(a, b, c, (aa, bb, cc) => !!aa && !!bb && cc); /* tslint:enable:no-unused-variable */ }); type('should support observables', () => { /* tslint:disable:no-unused-variable */ - let a: Rx.Observable; - let b: Rx.Observable; - let c: Rx.Observable; - let o1: Rx.Observable<[number, string, boolean]> = Observable.forkJoin(a, b, c); - let o2: Rx.Observable = Observable.forkJoin(a, b, c, (aa, bb, cc) => !!aa && !!bb && cc); + let a: Observable; + let b: Observable; + let c: Observable; + let o1: Observable<[number, string, boolean]> = forkJoin(a, b, c); + let o2: Observable = forkJoin(a, b, c, (aa, bb, cc) => !!aa && !!bb && cc); /* tslint:enable:no-unused-variable */ }); type('should support mixed observables and promises', () => { /* tslint:disable:no-unused-variable */ let a: Promise; - let b: Rx.Observable; + let b: Observable; let c: Promise; - let d: Rx.Observable; - let o1: Rx.Observable<[number, string, boolean, string[]]> = Observable.forkJoin(a, b, c, d); - let o2: Rx.Observable = Observable.forkJoin(a, b, c, d, (aa, bb, cc, dd) => !!aa && !!bb && cc && !!dd.length); + let d: Observable; + let o1: Observable<[number, string, boolean, string[]]> = forkJoin(a, b, c, d); + let o2: Observable = forkJoin(a, b, c, d, (aa, bb, cc, dd) => !!aa && !!bb && cc && !!dd.length); /* tslint:enable:no-unused-variable */ }); type('should support arrays of promises', () => { /* tslint:disable:no-unused-variable */ let a: Promise[]; - let o1: Rx.Observable = Observable.forkJoin(a); - let o2: Rx.Observable = Observable.forkJoin(...a); - let o3: Rx.Observable = Observable.forkJoin(a, (...x) => x.length); + let o1: Observable = forkJoin(a); + let o2: Observable = forkJoin(...a); + let o3: Observable = forkJoin(a, (...x) => x.length); /* tslint:enable:no-unused-variable */ }); type('should support arrays of observables', () => { /* tslint:disable:no-unused-variable */ - let a: Rx.Observable[]; - let o1: Rx.Observable = Observable.forkJoin(a); - let o2: Rx.Observable = Observable.forkJoin(...a); - let o3: Rx.Observable = Observable.forkJoin(a, (...x) => x.length); + let a: Observable[]; + let o1: Observable = forkJoin(a); + let o2: Observable = forkJoin(...a); + let o3: Observable = forkJoin(a, (...x) => x.length); /* tslint:enable:no-unused-variable */ }); type('should return Array when given a single promise', () => { /* tslint:disable:no-unused-variable */ let a: Promise; - let o1: Rx.Observable = Observable.forkJoin(a); + let o1: Observable = forkJoin(a); /* tslint:enable:no-unused-variable */ }); type('should return Array when given a single observable', () => { /* tslint:disable:no-unused-variable */ - let a: Rx.Observable; - let o1: Rx.Observable = Observable.forkJoin(a); + let a: Observable; + let o1: Observable = forkJoin(a); /* tslint:enable:no-unused-variable */ }); }); diff --git a/src/internal/InnerSubscriber.ts b/src/internal/InnerSubscriber.ts index b460f96996..cb16178057 100644 --- a/src/internal/InnerSubscriber.ts +++ b/src/internal/InnerSubscriber.ts @@ -3,13 +3,14 @@ import { OuterSubscriber } from './OuterSubscriber'; /** * We need this JSDoc comment for affecting ESDoc. + * @internal * @ignore * @extends {Ignored} */ export class InnerSubscriber extends Subscriber { - private index: number = 0; + private index = 0; - constructor(private parent: OuterSubscriber, private outerValue: T, private outerIndex: number) { + constructor(private parent: OuterSubscriber, public outerValue: T, public outerIndex: number) { super(); } diff --git a/src/internal/observable/ForkJoinObservable.ts b/src/internal/observable/ForkJoinObservable.ts deleted file mode 100644 index 69e2e0fd35..0000000000 --- a/src/internal/observable/ForkJoinObservable.ts +++ /dev/null @@ -1,239 +0,0 @@ -import { Observable, SubscribableOrPromise } from '../Observable'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { empty } from './empty'; -import { isArray } from '..//util/isArray'; - -import { subscribeToResult } from '..//util/subscribeToResult'; -import { OuterSubscriber } from '../OuterSubscriber'; -import { InnerSubscriber } from '../InnerSubscriber'; - -/** - * We need this JSDoc comment for affecting ESDoc. - * @extends {Ignored} - * @hide true - */ -export class ForkJoinObservable extends Observable { - constructor(private sources: Array>, - private resultSelector?: (...values: Array) => T) { - super(); - } - - /* tslint:disable:max-line-length */ - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise): Observable<[T, T2]>; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise): Observable<[T, T2, T3]>; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise): Observable<[T, T2, T3, T4]>; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, v5: SubscribableOrPromise): Observable<[T, T2, T3, T4, T5]>; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, v5: SubscribableOrPromise, v6: SubscribableOrPromise): Observable<[T, T2, T3, T4, T5, T6]>; - static create(v1: SubscribableOrPromise, project: (v1: T) => R): Observable; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, project: (v1: T, v2: T2) => R): Observable; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, project: (v1: T, v2: T2, v3: T3) => R): Observable; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, project: (v1: T, v2: T2, v3: T3, v4: T4) => R): Observable; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, v5: SubscribableOrPromise, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => R): Observable; - static create(v1: SubscribableOrPromise, v2: SubscribableOrPromise, v3: SubscribableOrPromise, v4: SubscribableOrPromise, v5: SubscribableOrPromise, v6: SubscribableOrPromise, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => R): Observable; - static create(sources: SubscribableOrPromise[]): Observable; - static create(sources: SubscribableOrPromise[]): Observable; - static create(sources: SubscribableOrPromise[], project: (...values: Array) => R): Observable; - static create(sources: SubscribableOrPromise[], project: (...values: Array) => R): Observable; - static create(...sources: SubscribableOrPromise[]): Observable; - static create(...sources: SubscribableOrPromise[]): Observable; - /* tslint:enable:max-line-length */ - - /** - * Joins last values emitted by passed Observables. - * - * Wait for Observables to complete and then combine last values they emitted. - * - * - * - * `forkJoin` is an operator that takes any number of Observables which can be passed either as an array - * or directly as arguments. If no input Observables are provided, resulting stream will complete - * immediately. - * - * `forkJoin` will wait for all passed Observables to complete and then it will emit an array with last - * values from corresponding Observables. So if you pass `n` Observables to the operator, resulting - * array will have `n` values, where first value is the last thing emitted by the first Observable, - * second value is the last thing emitted by the second Observable and so on. That means `forkJoin` will - * not emit more than once and it will complete after that. If you need to emit combined values not only - * at the end of lifecycle of passed Observables, but also throughout it, try out {@link combineLatest} - * or {@link zip} instead. - * - * In order for resulting array to have the same length as the number of input Observables, whenever any of - * that Observables completes without emitting any value, `forkJoin` will complete at that moment as well - * and it will not emit anything either, even if it already has some last values from other Observables. - * Conversely, if there is an Observable that never completes, `forkJoin` will never complete as well, - * unless at any point some other Observable completes without emitting value, which brings us back to - * the previous case. Overall, in order for `forkJoin` to emit a value, all Observables passed as arguments - * have to emit something at least once and complete. - * - * If any input Observable errors at some point, `forkJoin` will error as well and all other Observables - * will be immediately unsubscribed. - * - * Optionally `forkJoin` accepts project function, that will be called with values which normally - * would land in emitted array. Whatever is returned by project function, will appear in output - * Observable instead. This means that default project can be thought of as a function that takes - * all its arguments and puts them into an array. Note that project function will be called only - * when output Observable is supposed to emit a result. - * - * @example Use forkJoin with operator emitting immediately - * const observable = Rx.Observable.forkJoin( - * Rx.Observable.of(1, 2, 3, 4), - * Rx.Observable.of(5, 6, 7, 8) - * ); - * observable.subscribe( - * value => console.log(value), - * err => {}, - * () => console.log('This is how it ends!') - * ); - * - * // Logs: - * // [4, 8] - * // "This is how it ends!" - * - * - * @example Use forkJoin with operator emitting after some time - * const observable = Rx.Observable.forkJoin( - * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete - * Rx.Observable.interval(500).take(4) // emit 0, 1, 2, 3 every half a second and complete - * ); - * observable.subscribe( - * value => console.log(value), - * err => {}, - * () => console.log('This is how it ends!') - * ); - * - * // Logs: - * // [2, 3] after 3 seconds - * // "This is how it ends!" immediately after - * - * - * @example Use forkJoin with project function - * const observable = Rx.Observable.forkJoin( - * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete - * Rx.Observable.interval(500).take(4), // emit 0, 1, 2, 3 every half a second and complete - * (n, m) => n + m - * ); - * observable.subscribe( - * value => console.log(value), - * err => {}, - * () => console.log('This is how it ends!') - * ); - * - * // Logs: - * // 5 after 3 seconds - * // "This is how it ends!" immediately after - * - * @see {@link combineLatest} - * @see {@link zip} - * - * @param {...SubscribableOrPromise} sources Any number of Observables provided either as an array or as an arguments - * passed directly to the operator. - * @param {function} [project] Function that takes values emitted by input Observables and returns value - * that will appear in resulting Observable instead of default array. - * @return {Observable} Observable emitting either an array of last values emitted by passed Observables - * or value from project function. - * @static true - * @name forkJoin - * @owner Observable - */ - static create(...sources: Array | - Array> | - ((...values: Array) => any)>): Observable { - if (sources === null || arguments.length === 0) { - return empty(); - } - - let resultSelector: (...values: Array) => any = null; - if (typeof sources[sources.length - 1] === 'function') { - resultSelector = <(...values: Array) => any>sources.pop(); - } - - // if the first and only other argument besides the resultSelector is an array - // assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)` - if (sources.length === 1 && isArray(sources[0])) { - sources = >>sources[0]; - } - - if (sources.length === 0) { - return empty(); - } - - return new ForkJoinObservable(>>sources, resultSelector); - } - - protected _subscribe(subscriber: Subscriber): Subscription { - return new ForkJoinSubscriber(subscriber, this.sources, this.resultSelector); - } -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @ignore - * @extends {Ignored} - */ -class ForkJoinSubscriber extends OuterSubscriber { - private completed = 0; - private total: number; - private values: any[]; - private haveValues = 0; - - constructor(destination: Subscriber, - private sources: Array>, - private resultSelector?: (...values: Array) => T) { - super(destination); - - const len = sources.length; - this.total = len; - this.values = new Array(len); - - for (let i = 0; i < len; i++) { - const source = sources[i]; - const innerSubscription = subscribeToResult(this, source, null, i); - - if (innerSubscription) { - ( innerSubscription).outerIndex = i; - this.add(innerSubscription); - } - } - } - - notifyNext(outerValue: any, innerValue: T, - outerIndex: number, innerIndex: number, - innerSub: InnerSubscriber): void { - this.values[outerIndex] = innerValue; - if (!(innerSub)._hasValue) { - (innerSub)._hasValue = true; - this.haveValues++; - } - } - - notifyComplete(innerSub: InnerSubscriber): void { - const destination = this.destination; - const { haveValues, resultSelector, values } = this; - const len = values.length; - - if (!(innerSub)._hasValue) { - destination.complete(); - return; - } - - this.completed++; - - if (this.completed !== len) { - return; - } - - if (haveValues === len) { - let value: any; - try { - value = resultSelector ? resultSelector.apply(this, values) : values; - } catch (err) { - destination.error(err); - return; - } - destination.next(value); - } - - destination.complete(); - } -} diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index 4906992f19..3439e439d8 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -1,3 +1,222 @@ -import { ForkJoinObservable } from './ForkJoinObservable'; +import { Observable, ObservableInput } from '../Observable'; +import { isArray } from '../util/isArray'; +import { EMPTY } from './empty'; +import { subscribeToResult } from '../util/subscribeToResult'; +import { OuterSubscriber } from '../OuterSubscriber'; +import { InnerSubscriber } from '../InnerSubscriber'; +import { Subscriber } from '../Subscriber'; -export const forkJoin = ForkJoinObservable.create; \ No newline at end of file +/* tslint:disable:max-line-length */ +export function forkJoin(v1: ObservableInput, v2: ObservableInput): Observable<[T, T2]>; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput): Observable<[T, T2, T3]>; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput): Observable<[T, T2, T3, T4]>; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput): Observable<[T, T2, T3, T4, T5]>; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): Observable<[T, T2, T3, T4, T5, T6]>; +export function forkJoin(v1: ObservableInput, project: (v1: T) => R): Observable; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, project: (v1: T, v2: T2) => R): Observable; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, project: (v1: T, v2: T2, v3: T3) => R): Observable; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4) => R): Observable; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5) => R): Observable; +export function forkJoin(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, project: (v1: T, v2: T2, v3: T3, v4: T4, v5: T5, v6: T6) => R): Observable; +export function forkJoin(sources: Array>): Observable; +export function forkJoin(sources: Array>): Observable; +export function forkJoin(sources: Array>, project: (...values: T[]) => R): Observable; +export function forkJoin(sources: Array>, project: (...values: any[]) => R): Observable; +export function forkJoin(...sources: Array>): Observable; +export function forkJoin(...sources: Array>): Observable; +/* tslint:enable:max-line-length */ + +/** + * Joins last values emitted by passed Observables. + * + * Wait for Observables to complete and then combine last values they emitted. + * + * + * + * `forkJoin` is an operator that takes any number of Observables which can be passed either as an array + * or directly as arguments. If no input Observables are provided, resulting stream will complete + * immediately. + * + * `forkJoin` will wait for all passed Observables to complete and then it will emit an array with last + * values from corresponding Observables. So if you pass `n` Observables to the operator, resulting + * array will have `n` values, where first value is the last thing emitted by the first Observable, + * second value is the last thing emitted by the second Observable and so on. That means `forkJoin` will + * not emit more than once and it will complete after that. If you need to emit combined values not only + * at the end of lifecycle of passed Observables, but also throughout it, try out {@link combineLatest} + * or {@link zip} instead. + * + * In order for resulting array to have the same length as the number of input Observables, whenever any of + * that Observables completes without emitting any value, `forkJoin` will complete at that moment as well + * and it will not emit anything either, even if it already has some last values from other Observables. + * Conversely, if there is an Observable that never completes, `forkJoin` will never complete as well, + * unless at any point some other Observable completes without emitting value, which brings us back to + * the previous case. Overall, in order for `forkJoin` to emit a value, all Observables passed as arguments + * have to emit something at least once and complete. + * + * If any input Observable errors at some point, `forkJoin` will error as well and all other Observables + * will be immediately unsubscribed. + * + * Optionally `forkJoin` accepts project function, that will be called with values which normally + * would land in emitted array. Whatever is returned by project function, will appear in output + * Observable instead. This means that default project can be thought of as a function that takes + * all its arguments and puts them into an array. Note that project function will be called only + * when output Observable is supposed to emit a result. + * + * @example Use forkJoin with operator emitting immediately + * const observable = Rx.Observable.forkJoin( + * Rx.Observable.of(1, 2, 3, 4), + * Rx.Observable.of(5, 6, 7, 8) + * ); + * observable.subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('This is how it ends!') + * ); + * + * // Logs: + * // [4, 8] + * // "This is how it ends!" + * + * + * @example Use forkJoin with operator emitting after some time + * const observable = Rx.Observable.forkJoin( + * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete + * Rx.Observable.interval(500).take(4) // emit 0, 1, 2, 3 every half a second and complete + * ); + * observable.subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('This is how it ends!') + * ); + * + * // Logs: + * // [2, 3] after 3 seconds + * // "This is how it ends!" immediately after + * + * + * @example Use forkJoin with project function + * const observable = Rx.Observable.forkJoin( + * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete + * Rx.Observable.interval(500).take(4), // emit 0, 1, 2, 3 every half a second and complete + * (n, m) => n + m + * ); + * observable.subscribe( + * value => console.log(value), + * err => {}, + * () => console.log('This is how it ends!') + * ); + * + * // Logs: + * // 5 after 3 seconds + * // "This is how it ends!" immediately after + * + * @see {@link combineLatest} + * @see {@link zip} + * + * @param {...ObservableInput} sources Any number of Observables provided either as an array or as an arguments + * passed directly to the operator. + * @param {function} [project] Function that takes values emitted by input Observables and returns value + * that will appear in resulting Observable instead of default array. + * @return {Observable} Observable emitting either an array of last values emitted by passed Observables + * or value from project function. + * @static true + * @name forkJoin + * @owner Observable + */ +export function forkJoin(...sources: Array | + Array> | + ((...values: T[]) => R)>): Observable { + if (sources === null || arguments.length === 0) { + return EMPTY; + } + + let resultSelector: (...values: T[]) => R = null; + if (typeof sources[sources.length - 1] === 'function') { + resultSelector = sources.pop() as ((...values: T[]) => R); + } + + // if the first and only other argument besides the resultSelector is an array + // assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)` + if (sources.length === 1 && isArray(sources[0])) { + sources = sources[0] as Array>; + } + + if (sources.length === 0) { + return EMPTY; + } + + return new Observable(subscriber => { + return new ForkJoinSubscriber(subscriber, sources as Array>, resultSelector); + }); +} +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class ForkJoinSubscriber extends OuterSubscriber { + private completed = 0; + private total: number; + private values: T[]; + private haveValues = 0; + + constructor(destination: Subscriber, + private sources: Array>, + private resultSelector?: (...values: T[]) => R) { + super(destination); + + const len = sources.length; + this.total = len; + this.values = new Array(len); + + for (let i = 0; i < len; i++) { + const source = sources[i]; + const innerSubscription = subscribeToResult(this, source, null, i); + + if (innerSubscription) { + (innerSubscription as any).outerIndex = i; + this.add(innerSubscription); + } + } + } + + notifyNext(outerValue: any, innerValue: T, + outerIndex: number, innerIndex: number, + innerSub: InnerSubscriber): void { + this.values[outerIndex] = innerValue; + if (!(innerSub as any)._hasValue) { + (innerSub as any)._hasValue = true; + this.haveValues++; + } + } + + notifyComplete(innerSub: InnerSubscriber): void { + const destination = this.destination; + const { haveValues, resultSelector, values } = this; + const len = values.length; + + if (!(innerSub as any)._hasValue) { + destination.complete(); + return; + } + + this.completed++; + + if (this.completed !== len) { + return; + } + + if (haveValues === len) { + let value: any; + try { + value = resultSelector ? resultSelector.apply(this, values) : values; + } catch (err) { + destination.error(err); + return; + } + destination.next(value); + } + + destination.complete(); + } +} diff --git a/src/internal/observable/fromArray.ts b/src/internal/observable/fromArray.ts index 2b49715888..1185e91577 100644 --- a/src/internal/observable/fromArray.ts +++ b/src/internal/observable/fromArray.ts @@ -3,7 +3,7 @@ import { IScheduler } from '../Scheduler'; import { Subscription } from '../Subscription'; import { subscribeToArray } from '../util/subscribeToArray'; -export function fromArray(input: ArrayLike, scheduler: IScheduler) { +export function fromArray(input: ArrayLike, scheduler?: IScheduler) { if (!scheduler) { return new Observable(subscribeToArray(input)); } else { From 78c057f538595dfc0945a6289f2b0e1aa1615f94 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 21:11:55 -0800 Subject: [PATCH 2/6] refactor(forkJoin): remove unnecessary property --- src/internal/observable/forkJoin.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index 3439e439d8..c5ea51226d 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -156,7 +156,6 @@ export function forkJoin(...sources: Array | */ class ForkJoinSubscriber extends OuterSubscriber { private completed = 0; - private total: number; private values: T[]; private haveValues = 0; @@ -166,7 +165,6 @@ class ForkJoinSubscriber extends OuterSubscriber { super(destination); const len = sources.length; - this.total = len; this.values = new Array(len); for (let i = 0; i < len; i++) { From 2c6d3d9c4c1c6ce7af84cdfbebaf08edf80d70bf Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 21:12:37 -0800 Subject: [PATCH 3/6] refactor(forkJoin): remove unnecessary property assignment --- src/internal/observable/forkJoin.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index c5ea51226d..41812b2373 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -172,7 +172,6 @@ class ForkJoinSubscriber extends OuterSubscriber { const innerSubscription = subscribeToResult(this, source, null, i); if (innerSubscription) { - (innerSubscription as any).outerIndex = i; this.add(innerSubscription); } } From 6d81b58862e9c98fb0a8e7c80c70065dab80c4e6 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 21:14:05 -0800 Subject: [PATCH 4/6] refactor(forkJoin): improve type safety of resultSelector call --- src/internal/observable/forkJoin.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index 41812b2373..e3c6ab4ac2 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -204,14 +204,14 @@ class ForkJoinSubscriber extends OuterSubscriber { } if (haveValues === len) { - let value: any; + let result: R | T[]; try { - value = resultSelector ? resultSelector.apply(this, values) : values; + result = resultSelector ? resultSelector(...values) : values; } catch (err) { destination.error(err); return; } - destination.next(value); + destination.next(result); } destination.complete(); From af8fac64c9540bd845650db01fb40b807957ec49 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 21:15:04 -0800 Subject: [PATCH 5/6] style(forkJoin): destructure destination with others --- src/internal/observable/forkJoin.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index e3c6ab4ac2..8787229e0d 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -188,8 +188,7 @@ class ForkJoinSubscriber extends OuterSubscriber { } notifyComplete(innerSub: InnerSubscriber): void { - const destination = this.destination; - const { haveValues, resultSelector, values } = this; + const { destination, haveValues, resultSelector, values } = this; const len = values.length; if (!(innerSub as any)._hasValue) { From b2ad7ad5f08dd7c3ff7bb6c67634b8344dafa47e Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 1 Feb 2018 21:18:30 -0800 Subject: [PATCH 6/6] docs(forkJoin): update examples to v6 imports --- src/internal/observable/forkJoin.ts | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/src/internal/observable/forkJoin.ts b/src/internal/observable/forkJoin.ts index 8787229e0d..c9e7767cb6 100644 --- a/src/internal/observable/forkJoin.ts +++ b/src/internal/observable/forkJoin.ts @@ -63,9 +63,11 @@ export function forkJoin(...sources: Array>): Observable * when output Observable is supposed to emit a result. * * @example Use forkJoin with operator emitting immediately - * const observable = Rx.Observable.forkJoin( - * Rx.Observable.of(1, 2, 3, 4), - * Rx.Observable.of(5, 6, 7, 8) + * import { forkJoin, of } from 'rxjs/create'; + * + * const observable = forkJoin( + * of(1, 2, 3, 4), + * of(5, 6, 7, 8) * ); * observable.subscribe( * value => console.log(value), @@ -79,9 +81,12 @@ export function forkJoin(...sources: Array>): Observable * * * @example Use forkJoin with operator emitting after some time - * const observable = Rx.Observable.forkJoin( - * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete - * Rx.Observable.interval(500).take(4) // emit 0, 1, 2, 3 every half a second and complete + * import { forkJoin, interval } from 'rxjs/create'; + * import { take } from 'rxjs/operators'; + * + * const observable = forkJoin( + * interval(1000).pipe(take(3)), // emit 0, 1, 2 every second and complete + * interval(500).pipe(take(4)) // emit 0, 1, 2, 3 every half a second and complete * ); * observable.subscribe( * value => console.log(value), @@ -95,9 +100,12 @@ export function forkJoin(...sources: Array>): Observable * * * @example Use forkJoin with project function - * const observable = Rx.Observable.forkJoin( - * Rx.Observable.interval(1000).take(3), // emit 0, 1, 2 every second and complete - * Rx.Observable.interval(500).take(4), // emit 0, 1, 2, 3 every half a second and complete + * import { jorkJoin, interval } from 'rxjs/create'; + * import { take } from 'rxjs/operators'; + * + * const observable = forkJoin( + * interval(1000).pipe(take(3)), // emit 0, 1, 2 every second and complete + * interval(500).pipe(take(4)), // emit 0, 1, 2, 3 every half a second and complete * (n, m) => n + m * ); * observable.subscribe(