From b4733d7aa8c4ee1e00332be44003be7d48daa92a Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 22 Apr 2019 21:29:38 -0500 Subject: [PATCH] feat(scheduled): Add scheduled creation method (#4595) - Simplifies code around common observable creation and subscription - Removes `scalar` internal impl - Deprecates a number of APIs that accept schedulers where we would rather people use `scheduled`. --- spec/helpers/observableMatcher.ts | 48 +++++++++++++++ spec/observables/ScalarObservable-spec.ts | 17 ------ spec/observables/of-spec.ts | 9 +-- spec/scheduled/scheduled-spec.ts | 63 ++++++++++++++++++++ spec/util/subscribeToResult-spec.ts | 4 +- src/index.ts | 1 + src/internal/Observable.ts | 2 +- src/internal/observable/concat.ts | 29 +++++++-- src/internal/observable/empty.ts | 11 ++-- src/internal/observable/from.ts | 30 ++-------- src/internal/observable/fromArray.ts | 18 +----- src/internal/observable/fromIterable.ts | 41 +------------ src/internal/observable/fromObservable.ts | 20 ++----- src/internal/observable/fromPromise.ts | 18 +----- src/internal/observable/merge.ts | 53 ++++++++++++---- src/internal/observable/of.ts | 59 +++++++++++------- src/internal/observable/scalar.ts | 11 ---- src/internal/operators/endWith.ts | 54 ++++++++--------- src/internal/operators/startWith.ts | 60 ++++++++++--------- src/internal/scheduled/scheduleArray.ts | 21 +++++++ src/internal/scheduled/scheduleIterable.ts | 45 ++++++++++++++ src/internal/scheduled/scheduleObservable.ts | 19 ++++++ src/internal/scheduled/schedulePromise.ts | 21 +++++++ src/internal/scheduled/scheduled.ts | 36 +++++++++++ src/internal/util/subscribeTo.ts | 16 +---- src/internal/util/subscribeToArray.ts | 4 +- src/internal/util/subscribeToResult.ts | 7 ++- 27 files changed, 448 insertions(+), 269 deletions(-) create mode 100644 spec/helpers/observableMatcher.ts delete mode 100644 spec/observables/ScalarObservable-spec.ts create mode 100644 spec/scheduled/scheduled-spec.ts delete mode 100644 src/internal/observable/scalar.ts create mode 100644 src/internal/scheduled/scheduleArray.ts create mode 100644 src/internal/scheduled/scheduleIterable.ts create mode 100644 src/internal/scheduled/scheduleObservable.ts create mode 100644 src/internal/scheduled/schedulePromise.ts create mode 100644 src/internal/scheduled/scheduled.ts diff --git a/spec/helpers/observableMatcher.ts b/spec/helpers/observableMatcher.ts new file mode 100644 index 0000000000..b2965117d2 --- /dev/null +++ b/spec/helpers/observableMatcher.ts @@ -0,0 +1,48 @@ +import * as _ from 'lodash'; + +function stringify(x: any): string { + return JSON.stringify(x, function (key: string, value: any) { + if (Array.isArray(value)) { + return '[' + value + .map(function (i) { + return '\n\t' + stringify(i); + }) + '\n]'; + } + return value; + }) + .replace(/\\"/g, '"') + .replace(/\\t/g, '\t') + .replace(/\\n/g, '\n'); +} + +function deleteErrorNotificationStack(marble: any) { + const { notification } = marble; + if (notification) { + const { kind, error } = notification; + if (kind === 'E' && error instanceof Error) { + notification.error = { name: error.name, message: error.message }; + } + } + return marble; +} + +export function observableMatcher(actual: any, expected: any) { + if (Array.isArray(actual) && Array.isArray(expected)) { + actual = actual.map(deleteErrorNotificationStack); + expected = expected.map(deleteErrorNotificationStack); + const passed = _.isEqual(actual, expected); + if (passed) { + return; + } + + let message = '\nExpected \n'; + actual.forEach((x: any) => message += `\t${stringify(x)}\n`); + + message += '\t\nto deep equal \n'; + expected.forEach((x: any) => message += `\t${stringify(x)}\n`); + + chai.assert(passed, message); + } else { + chai.assert.deepEqual(actual, expected); + } +} diff --git a/spec/observables/ScalarObservable-spec.ts b/spec/observables/ScalarObservable-spec.ts deleted file mode 100644 index 1e3d785b0d..0000000000 --- a/spec/observables/ScalarObservable-spec.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { expect } from 'chai'; -import { of as scalar } from 'rxjs'; -import { TestScheduler } from 'rxjs/testing'; - -declare const rxTestScheduler: TestScheduler; - -describe('scalar', () => { - it('should create expose a value property', () => { - const s = scalar(1); - expect((s as any).value).to.equal(1); - }); - - it('should set `_isScalar` to true when NOT called with a Scheduler', () => { - const s = scalar(1); - expect(s._isScalar).to.be.true; - }); -}); diff --git a/spec/observables/of-spec.ts b/spec/observables/of-spec.ts index 740b74ec7b..d62967b4ad 100644 --- a/spec/observables/of-spec.ts +++ b/spec/observables/of-spec.ts @@ -1,5 +1,5 @@ import { expect } from 'chai'; -import { empty, of, Observable } from 'rxjs'; +import { of, Observable } from 'rxjs'; import { expectObservable } from '../helpers/marble-testing'; import { TestScheduler } from 'rxjs/testing'; import { concatMap, delay, concatAll } from 'rxjs/operators'; @@ -33,18 +33,13 @@ describe('of', () => { }); }); - it('should return an empty observable if passed no values', () => { - const obs = of(); - expect(obs).to.equal(empty()); - }); - it('should emit one value', (done: MochaDone) => { let calls = 0; of(42).subscribe((x: number) => { expect(++calls).to.equal(1); expect(x).to.equal(42); - }, (err: any) => { + }, (err: any) => { done(new Error('should not be called')); }, () => { done(); diff --git a/spec/scheduled/scheduled-spec.ts b/spec/scheduled/scheduled-spec.ts new file mode 100644 index 0000000000..89cfd859fe --- /dev/null +++ b/spec/scheduled/scheduled-spec.ts @@ -0,0 +1,63 @@ +import { scheduled, of } from 'rxjs'; +import { TestScheduler } from 'rxjs/testing'; +import { lowerCaseO } from '../helpers/test-helper'; +import { observableMatcher } from '../helpers/observableMatcher'; +import { expect } from 'chai'; + +describe('scheduled', () => { + let testScheduler: TestScheduler; + + beforeEach(() => { + testScheduler = new TestScheduler(observableMatcher); + }); + + it('should schedule a sync observable', () => { + const input = of('a', 'b', 'c'); + testScheduler.run(({ expectObservable }) => { + expectObservable(scheduled(input, testScheduler)).toBe('(abc|)'); + }); + }); + + it('should schedule an array', () => { + const input = ['a', 'b', 'c']; + testScheduler.run(({ expectObservable }) => { + expectObservable(scheduled(input, testScheduler)).toBe('(abc|)'); + }); + }); + + it('should schedule an iterable', () => { + const input = 'abc'; // strings are iterables + testScheduler.run(({ expectObservable }) => { + expectObservable(scheduled(input, testScheduler)).toBe('(abc|)'); + }); + }); + + it('should schedule an observable-like', () => { + const input = lowerCaseO('a', 'b', 'c'); // strings are iterables + testScheduler.run(({ expectObservable }) => { + expectObservable(scheduled(input, testScheduler)).toBe('(abc|)'); + }); + }); + + it('should schedule a promise', done => { + const results: any[] = []; + const input = Promise.resolve('x'); // strings are iterables + scheduled(input, testScheduler).subscribe({ + next(value) { results.push(value); }, + complete() { results.push('done'); }, + }); + + expect(results).to.deep.equal([]); + + // Promises force async, so we can't schedule synchronously, no matter what. + testScheduler.flush(); + expect(results).to.deep.equal([]); + + Promise.resolve().then(() => { + // NOW it should work, as the other promise should have resolved. + testScheduler.flush(); + expect(results).to.deep.equal(['x', 'done']); + done(); + }); + }); +}); diff --git a/spec/util/subscribeToResult-spec.ts b/spec/util/subscribeToResult-spec.ts index fd4e153c5f..b3b0bfd929 100644 --- a/spec/util/subscribeToResult-spec.ts +++ b/spec/util/subscribeToResult-spec.ts @@ -6,7 +6,7 @@ import $$symbolObservable from 'symbol-observable'; import { of, range, throwError } from 'rxjs'; describe('subscribeToResult', () => { - it('should synchronously complete when subscribe to scalarObservable', () => { + it('should synchronously complete when subscribed to scalarObservable', () => { const result = of(42); let expected: number; const subscriber = new OuterSubscriber((x) => expected = x); @@ -14,7 +14,7 @@ describe('subscribeToResult', () => { const subscription = subscribeToResult(subscriber, result); expect(expected).to.be.equal(42); - expect(subscription).to.not.exist; + expect(subscription.closed).to.be.true; }); it('should subscribe to observables that are an instanceof Observable', (done) => { diff --git a/src/index.ts b/src/index.ts index c47cc5de8a..35b831c9b3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -64,6 +64,7 @@ export { throwError } from './internal/observable/throwError'; export { timer } from './internal/observable/timer'; export { using } from './internal/observable/using'; export { zip } from './internal/observable/zip'; +export { scheduled } from './internal/scheduled/scheduled'; /* Constants */ export { EMPTY } from './internal/observable/empty'; diff --git a/src/internal/Observable.ts b/src/internal/Observable.ts index 837ccdcf95..c677428012 100644 --- a/src/internal/Observable.ts +++ b/src/internal/Observable.ts @@ -6,7 +6,7 @@ import { canReportError } from './util/canReportError'; import { toSubscriber } from './util/toSubscriber'; import { iif } from './observable/iif'; import { throwError } from './observable/throwError'; -import { observable as Symbol_observable } from '../internal/symbol/observable'; +import { observable as Symbol_observable } from './symbol/observable'; import { pipeFromArray } from './util/pipe'; import { config } from './config'; diff --git a/src/internal/observable/concat.ts b/src/internal/observable/concat.ts index 1fd95e4cb7..1a93376481 100644 --- a/src/internal/observable/concat.ts +++ b/src/internal/observable/concat.ts @@ -6,13 +6,30 @@ import { from } from './from'; import { concatAll } from '../operators/concatAll'; /* tslint:disable:max-line-length */ -export function concat>(v1: O1, scheduler?: SchedulerLike): Observable>; -export function concat, O2 extends ObservableInput>(v1: O1, v2: O2, scheduler?: SchedulerLike): Observable | ObservedValueOf>; -export function concat, O2 extends ObservableInput, O3 extends ObservableInput>(v1: O1, v2: O2, v3: O3, scheduler?: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf>; -export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler?: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf>; -export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler?: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf>; -export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, O6 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler?: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf>; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ +export function concat>(v1: O1, scheduler: SchedulerLike): Observable>; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ +export function concat, O2 extends ObservableInput>(v1: O1, v2: O2, scheduler: SchedulerLike): Observable | ObservedValueOf>; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ +export function concat, O2 extends ObservableInput, O3 extends ObservableInput>(v1: O1, v2: O2, v3: O3, scheduler: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf>; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ +export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, scheduler: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf>; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ +export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, scheduler: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf>; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ +export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, O6 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6, scheduler: SchedulerLike): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf>; + +export function concat>(v1: O1): Observable>; +export function concat, O2 extends ObservableInput>(v1: O1, v2: O2): Observable | ObservedValueOf>; +export function concat, O2 extends ObservableInput, O3 extends ObservableInput>(v1: O1, v2: O2, v3: O3): Observable | ObservedValueOf | ObservedValueOf>; +export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf>; +export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf>; +export function concat, O2 extends ObservableInput, O3 extends ObservableInput, O4 extends ObservableInput, O5 extends ObservableInput, O6 extends ObservableInput>(v1: O1, v2: O2, v3: O3, v4: O4, v5: O5, v6: O6): Observable | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf | ObservedValueOf>; +export function concat>(...observables: O[]): Observable>; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ export function concat>(...observables: (O | SchedulerLike)[]): Observable>; +export function concat(...observables: ObservableInput[]): Observable; +/** @deprecated Use {@link scheduled} and {@link concatAll} (e.g. `scheduled([o1, o2, o3], scheduler).pipe(concatAll())`) */ export function concat(...observables: (ObservableInput | SchedulerLike)[]): Observable; /* tslint:enable:max-line-length */ /** diff --git a/src/internal/observable/empty.ts b/src/internal/observable/empty.ts index 630bb13677..299d0c1f12 100644 --- a/src/internal/observable/empty.ts +++ b/src/internal/observable/empty.ts @@ -53,19 +53,16 @@ export const EMPTY = new Observable(subscriber => subscriber.complete()); * @see {@link of} * @see {@link throwError} * - * @param {SchedulerLike} [scheduler] A {@link SchedulerLike} to use for scheduling + * @param scheduler A {@link SchedulerLike} to use for scheduling * the emission of the complete notification. - * @return {Observable} An "empty" Observable: emits only the complete + * @return An "empty" Observable: emits only the complete * notification. - * @static true - * @name empty - * @owner Observable - * @deprecated Deprecated in favor of using {@link index/EMPTY} constant. + * @deprecated Deprecated in favor of using {@link EMPTY} constant, or {@link scheduled} (e.g. `scheduled([], scheduler)`) */ export function empty(scheduler?: SchedulerLike) { return scheduler ? emptyScheduled(scheduler) : EMPTY; } -export function emptyScheduled(scheduler: SchedulerLike) { +function emptyScheduled(scheduler: SchedulerLike) { return new Observable(subscriber => scheduler.schedule(() => subscriber.complete())); } diff --git a/src/internal/observable/from.ts b/src/internal/observable/from.ts index 1f851fee21..697e92860b 100644 --- a/src/internal/observable/from.ts +++ b/src/internal/observable/from.ts @@ -1,16 +1,11 @@ import { Observable } from '../Observable'; -import { isPromise } from '../util/isPromise'; -import { isArrayLike } from '../util/isArrayLike'; -import { isInteropObservable } from '../util/isInteropObservable'; -import { isIterable } from '../util/isIterable'; -import { fromArray } from './fromArray'; -import { fromPromise } from './fromPromise'; -import { fromIterable } from './fromIterable'; -import { fromObservable } from './fromObservable'; import { subscribeTo } from '../util/subscribeTo'; import { ObservableInput, SchedulerLike, ObservedValueOf } from '../types'; +import { scheduled } from '../scheduled/scheduled'; -export function from>(input: O, scheduler?: SchedulerLike): Observable>; +export function from>(input: O): Observable>; +/** @deprecated use {@link scheduled} instead. */ +export function from>(input: O, scheduler: SchedulerLike): Observable>; /** * Creates an Observable from an Array, an array-like object, a Promise, an iterable object, or an Observable-like object. @@ -111,26 +106,13 @@ export function from>(input: O, scheduler?: Sched * @name from * @owner Observable */ - export function from(input: ObservableInput, scheduler?: SchedulerLike): Observable { if (!scheduler) { if (input instanceof Observable) { return input; } return new Observable(subscribeTo(input)); + } else { + return scheduled(input, scheduler); } - - if (input != null) { - if (isInteropObservable(input)) { - return fromObservable(input, scheduler); - } else if (isPromise(input)) { - return fromPromise(input, scheduler); - } else if (isArrayLike(input)) { - return fromArray(input, scheduler); - } else if (isIterable(input) || typeof input === 'string') { - return fromIterable(input, scheduler); - } - } - - throw new TypeError((input !== null && typeof input || input) + ' is not observable'); } diff --git a/src/internal/observable/fromArray.ts b/src/internal/observable/fromArray.ts index 53ead5bc2a..b5953bebef 100644 --- a/src/internal/observable/fromArray.ts +++ b/src/internal/observable/fromArray.ts @@ -1,26 +1,12 @@ import { Observable } from '../Observable'; import { SchedulerLike } from '../types'; -import { Subscription } from '../Subscription'; import { subscribeToArray } from '../util/subscribeToArray'; +import { scheduleArray } from '../scheduled/scheduleArray'; export function fromArray(input: ArrayLike, scheduler?: SchedulerLike) { if (!scheduler) { return new Observable(subscribeToArray(input)); } else { - return new Observable(subscriber => { - const sub = new Subscription(); - let i = 0; - sub.add(scheduler.schedule(function () { - if (i === input.length) { - subscriber.complete(); - return; - } - subscriber.next(input[i++]); - if (!subscriber.closed) { - sub.add(this.schedule()); - } - })); - return sub; - }); + return scheduleArray(input, scheduler); } } diff --git a/src/internal/observable/fromIterable.ts b/src/internal/observable/fromIterable.ts index 94e3719ba0..e7ffd2b2aa 100644 --- a/src/internal/observable/fromIterable.ts +++ b/src/internal/observable/fromIterable.ts @@ -1,50 +1,15 @@ import { Observable } from '../Observable'; import { SchedulerLike } from '../types'; -import { Subscription } from '../Subscription'; -import { iterator as Symbol_iterator } from '../symbol/iterator'; import { subscribeToIterable } from '../util/subscribeToIterable'; +import { scheduleIterable } from '../scheduled/scheduleIterable'; -export function fromIterable(input: Iterable, scheduler: SchedulerLike) { +export function fromIterable(input: Iterable, scheduler?: SchedulerLike) { if (!input) { throw new Error('Iterable cannot be null'); } if (!scheduler) { return new Observable(subscribeToIterable(input)); } else { - return new Observable(subscriber => { - const sub = new Subscription(); - let iterator: Iterator; - sub.add(() => { - // Finalize generators - if (iterator && typeof iterator.return === 'function') { - iterator.return(); - } - }); - sub.add(scheduler.schedule(() => { - iterator = input[Symbol_iterator](); - sub.add(scheduler.schedule(function () { - if (subscriber.closed) { - return; - } - let value: T; - let done: boolean; - try { - const result = iterator.next(); - value = result.value; - done = result.done; - } catch (err) { - subscriber.error(err); - return; - } - if (done) { - subscriber.complete(); - } else { - subscriber.next(value); - this.schedule(); - } - })); - })); - return sub; - }); + return scheduleIterable(input, scheduler); } } diff --git a/src/internal/observable/fromObservable.ts b/src/internal/observable/fromObservable.ts index c3f3e06567..6a297b4469 100644 --- a/src/internal/observable/fromObservable.ts +++ b/src/internal/observable/fromObservable.ts @@ -1,24 +1,12 @@ import { Observable } from '../Observable'; -import { Subscription } from '../Subscription'; -import { observable as Symbol_observable } from '../symbol/observable'; import { subscribeToObservable } from '../util/subscribeToObservable'; -import { InteropObservable, SchedulerLike, Subscribable } from '../types'; +import { InteropObservable, SchedulerLike } from '../types'; +import { scheduleObservable } from '../scheduled/scheduleObservable'; -export function fromObservable(input: InteropObservable, scheduler: SchedulerLike) { +export function fromObservable(input: InteropObservable, scheduler?: SchedulerLike) { if (!scheduler) { return new Observable(subscribeToObservable(input)); } else { - return new Observable(subscriber => { - const sub = new Subscription(); - sub.add(scheduler.schedule(() => { - const observable: Subscribable = input[Symbol_observable](); - sub.add(observable.subscribe({ - next(value) { sub.add(scheduler.schedule(() => subscriber.next(value))); }, - error(err) { sub.add(scheduler.schedule(() => subscriber.error(err))); }, - complete() { sub.add(scheduler.schedule(() => subscriber.complete())); }, - })); - })); - return sub; - }); + return scheduleObservable(input, scheduler); } } diff --git a/src/internal/observable/fromPromise.ts b/src/internal/observable/fromPromise.ts index 214e2cede4..28ebef65ea 100644 --- a/src/internal/observable/fromPromise.ts +++ b/src/internal/observable/fromPromise.ts @@ -1,26 +1,12 @@ import { Observable } from '../Observable'; import { SchedulerLike } from '../types'; -import { Subscription } from '../Subscription'; import { subscribeToPromise } from '../util/subscribeToPromise'; +import { schedulePromise } from '../scheduled/schedulePromise'; export function fromPromise(input: PromiseLike, scheduler?: SchedulerLike) { if (!scheduler) { return new Observable(subscribeToPromise(input)); } else { - return new Observable(subscriber => { - const sub = new Subscription(); - sub.add(scheduler.schedule(() => input.then( - value => { - sub.add(scheduler.schedule(() => { - subscriber.next(value); - sub.add(scheduler.schedule(() => subscriber.complete())); - })); - }, - err => { - sub.add(scheduler.schedule(() => subscriber.error(err))); - } - ))); - return sub; - }); + return schedulePromise(input, scheduler); } } diff --git a/src/internal/observable/merge.ts b/src/internal/observable/merge.ts index ef2a493f12..f9f1faa77a 100644 --- a/src/internal/observable/merge.ts +++ b/src/internal/observable/merge.ts @@ -5,19 +5,48 @@ import { mergeAll } from '../operators/mergeAll'; import { fromArray } from './fromArray'; /* tslint:disable:max-line-length */ -export function merge(v1: ObservableInput, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, concurrent?: number, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, concurrent?: number, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, concurrent?: number, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, concurrent?: number, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, concurrent?: number, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, scheduler?: SchedulerLike): Observable; -export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, concurrent?: number, scheduler?: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, concurrent: number, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, concurrent: number, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, concurrent: number, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, concurrent: number, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, concurrent: number, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, concurrent: number, scheduler: SchedulerLike): Observable; + +export function merge(v1: ObservableInput): Observable; +export function merge(v1: ObservableInput, concurrent?: number): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, concurrent?: number): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, concurrent?: number): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, concurrent?: number): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, concurrent?: number): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput): Observable; +export function merge(v1: ObservableInput, v2: ObservableInput, v3: ObservableInput, v4: ObservableInput, v5: ObservableInput, v6: ObservableInput, concurrent?: number): Observable; +export function merge(...observables: (ObservableInput | number)[]): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ export function merge(...observables: (ObservableInput | SchedulerLike | number)[]): Observable; +export function merge(...observables: (ObservableInput | number)[]): Observable; +/** @deprecated use {@link scheduled} and {@link mergeAll} (e.g. `scheduled([ob1, ob2, ob3], scheduled).pipe(mergeAll())*/ export function merge(...observables: (ObservableInput | SchedulerLike | number)[]): Observable; /* tslint:enable:max-line-length */ /** diff --git a/src/internal/observable/of.ts b/src/internal/observable/of.ts index ed79df7989..0d1411b5d0 100644 --- a/src/internal/observable/of.ts +++ b/src/internal/observable/of.ts @@ -1,24 +1,48 @@ import { SchedulerLike } from '../types'; import { isScheduler } from '../util/isScheduler'; import { fromArray } from './fromArray'; -import { empty } from './empty'; -import { scalar } from './scalar'; import { Observable } from '../Observable'; +import { scheduleArray } from '../scheduled/scheduleArray'; /* tslint:disable:max-line-length */ -export function of(a: T, scheduler?: SchedulerLike): Observable; -export function of(a: T, b: T2, scheduler?: SchedulerLike): Observable; -export function of(a: T, b: T2, c: T3, scheduler?: SchedulerLike): Observable; -export function of(a: T, b: T2, c: T3, d: T4, scheduler?: SchedulerLike): Observable; -export function of(a: T, b: T2, c: T3, d: T4, e: T5, scheduler?: SchedulerLike): Observable; -export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, scheduler?: SchedulerLike): Observable; -export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, scheduler?: SchedulerLike): +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, c: T3, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, c: T3, d: T4, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, c: T3, d: T4, e: T5, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, scheduler: SchedulerLike): Observable; +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, scheduler: SchedulerLike): Observable; -export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, h: T8, scheduler?: SchedulerLike): +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, h: T8, scheduler: SchedulerLike): Observable; -export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, h: T8, i: T9, scheduler?: SchedulerLike): +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, h: T8, i: T9, scheduler: SchedulerLike): Observable; -export function of(...args: Array): Observable; +/** @deprecated use {@link scheduled} instead `scheduled([a, b, c], scheduler)` */ +export function of(...args: (T | SchedulerLike)[]): Observable; + +// TODO(benlesh): Update the typings for this when we can switch to TS 3.x +export function of(a: T): Observable; +export function of(a: T, b: T2): Observable; +export function of(a: T, b: T2, c: T3): Observable; +export function of(a: T, b: T2, c: T3, d: T4): Observable; +export function of(a: T, b: T2, c: T3, d: T4, e: T5): Observable; +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6): Observable; +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7): + Observable; +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, h: T8): + Observable; +export function of(a: T, b: T2, c: T3, d: T4, e: T5, f: T6, g: T7, h: T8, i: T9): + Observable; +export function of(...args: T[]): Observable; /* tslint:enable:max-line-length */ /** @@ -80,15 +104,8 @@ export function of(...args: Array): Observable { let scheduler = args[args.length - 1] as SchedulerLike; if (isScheduler(scheduler)) { args.pop(); + return scheduleArray(args as T[], scheduler); } else { - scheduler = undefined; - } - switch (args.length) { - case 0: - return empty(scheduler); - case 1: - return scheduler ? fromArray(args as T[], scheduler) : scalar(args[0] as T); - default: - return fromArray(args as T[], scheduler); + return fromArray(args as T[]); } } diff --git a/src/internal/observable/scalar.ts b/src/internal/observable/scalar.ts deleted file mode 100644 index 6d66b7ea07..0000000000 --- a/src/internal/observable/scalar.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { Observable } from '../Observable'; - -export function scalar(value: T) { - const result = new Observable(subscriber => { - subscriber.next(value); - subscriber.complete(); - }); - result._isScalar = true; - (result as any).value = value; - return result; -} diff --git a/src/internal/operators/endWith.ts b/src/internal/operators/endWith.ts index 09f928d085..2619192067 100644 --- a/src/internal/operators/endWith.ts +++ b/src/internal/operators/endWith.ts @@ -1,19 +1,31 @@ import { Observable } from '../Observable'; -import { fromArray } from '../observable/fromArray'; -import { scalar } from '../observable/scalar'; -import { empty } from '../observable/empty'; -import { concat as concatStatic } from '../observable/concat'; -import { isScheduler } from '../util/isScheduler'; +import { concat } from '../observable/concat'; import { MonoTypeOperatorFunction, SchedulerLike, OperatorFunction } from '../types'; /* tslint:disable:max-line-length */ -export function endWith(scheduler?: SchedulerLike): MonoTypeOperatorFunction; -export function endWith(v1: A, scheduler?: SchedulerLike): OperatorFunction; -export function endWith(v1: A, v2: B, scheduler?: SchedulerLike): OperatorFunction; -export function endWith(v1: A, v2: B, v3: C, scheduler?: SchedulerLike): OperatorFunction; -export function endWith(v1: A, v2: B, v3: C, v4: D, scheduler?: SchedulerLike): OperatorFunction; -export function endWith(v1: A, v2: B, v3: C, v4: D, v5: E, scheduler?: SchedulerLike): OperatorFunction; -export function endWith(v1: A, v2: B, v3: C, v4: D, v5: E, v6: F, scheduler?: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ +export function endWith(scheduler: SchedulerLike): MonoTypeOperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ +export function endWith(v1: A, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ +export function endWith(v1: A, v2: B, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ +export function endWith(v1: A, v2: B, v3: C, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ +export function endWith(v1: A, v2: B, v3: C, v4: D, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ +export function endWith(v1: A, v2: B, v3: C, v4: D, v5: E, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ +export function endWith(v1: A, v2: B, v3: C, v4: D, v5: E, v6: F, scheduler: SchedulerLike): OperatorFunction; + +export function endWith(v1: A): OperatorFunction; +export function endWith(v1: A, v2: B): OperatorFunction; +export function endWith(v1: A, v2: B, v3: C): OperatorFunction; +export function endWith(v1: A, v2: B, v3: C, v4: D): OperatorFunction; +export function endWith(v1: A, v2: B, v3: C, v4: D, v5: E): OperatorFunction; +export function endWith(v1: A, v2: B, v3: C, v4: D, v5: E, v6: F): OperatorFunction; +export function endWith(...array: Z[]): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([source, [a, b, c]], scheduler).pipe(concatAll())`) */ export function endWith(...array: Array): OperatorFunction; /* tslint:enable:max-line-length */ @@ -50,21 +62,5 @@ export function endWith(...array: Array): OperatorF * @owner Observable */ export function endWith(...array: Array): MonoTypeOperatorFunction { - return (source: Observable) => { - let scheduler = array[array.length - 1]; - if (isScheduler(scheduler)) { - array.pop(); - } else { - scheduler = null; - } - - const len = array.length; - if (len === 1 && !scheduler) { - return concatStatic(source, scalar(array[0] as T)); - } else if (len > 0) { - return concatStatic(source, fromArray(array as T[], scheduler)); - } else { - return concatStatic(source, empty(scheduler)); - } - }; + return (source: Observable) => concat(source, ...(array as any[])) as Observable; } diff --git a/src/internal/operators/startWith.ts b/src/internal/operators/startWith.ts index 162003881b..81ce063213 100644 --- a/src/internal/operators/startWith.ts +++ b/src/internal/operators/startWith.ts @@ -1,19 +1,32 @@ import { Observable } from '../Observable'; -import { fromArray } from '../observable/fromArray'; -import { scalar } from '../observable/scalar'; -import { empty } from '../observable/empty'; -import { concat as concatStatic } from '../observable/concat'; +import { concat } from '../observable/concat'; import { isScheduler } from '../util/isScheduler'; import { MonoTypeOperatorFunction, OperatorFunction, SchedulerLike } from '../types'; /* tslint:disable:max-line-length */ -export function startWith(scheduler?: SchedulerLike): MonoTypeOperatorFunction; -export function startWith(v1: D, scheduler?: SchedulerLike): OperatorFunction; -export function startWith(v1: D, v2: E, scheduler?: SchedulerLike): OperatorFunction; -export function startWith(v1: D, v2: E, v3: F, scheduler?: SchedulerLike): OperatorFunction; -export function startWith(v1: D, v2: E, v3: F, v4: G, scheduler?: SchedulerLike): OperatorFunction; -export function startWith(v1: D, v2: E, v3: F, v4: G, v5: H, scheduler?: SchedulerLike): OperatorFunction; -export function startWith(v1: D, v2: E, v3: F, v4: G, v5: H, v6: I, scheduler?: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ +export function startWith(scheduler: SchedulerLike): MonoTypeOperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ +export function startWith(v1: D, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ +export function startWith(v1: D, v2: E, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ +export function startWith(v1: D, v2: E, v3: F, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ +export function startWith(v1: D, v2: E, v3: F, v4: G, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ +export function startWith(v1: D, v2: E, v3: F, v4: G, v5: H, scheduler: SchedulerLike): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ +export function startWith(v1: D, v2: E, v3: F, v4: G, v5: H, v6: I, scheduler: SchedulerLike): OperatorFunction; + +export function startWith(v1: D): OperatorFunction; +export function startWith(v1: D, v2: E): OperatorFunction; +export function startWith(v1: D, v2: E, v3: F): OperatorFunction; +export function startWith(v1: D, v2: E, v3: F, v4: G): OperatorFunction; +export function startWith(v1: D, v2: E, v3: F, v4: G, v5: H): OperatorFunction; +export function startWith(v1: D, v2: E, v3: F, v4: G, v5: H, v6: I): OperatorFunction; +export function startWith(...array: D[]): OperatorFunction; +/** @deprecated use {@link scheduled} and {@link concatAll} (e.g. `scheduled([[a, b, c], source], scheduler).pipe(concatAll())`) */ export function startWith(...array: Array): OperatorFunction; /* tslint:enable:max-line-length */ @@ -53,21 +66,12 @@ export function startWith(...array: Array): Operato * @owner Observable */ export function startWith(...array: Array): OperatorFunction { - return (source: Observable) => { - let scheduler = array[array.length - 1]; - if (isScheduler(scheduler)) { - array.pop(); - } else { - scheduler = null; - } - - const len = array.length; - if (len === 1 && !scheduler) { - return concatStatic(scalar(array[0] as T), source); - } else if (len > 0) { - return concatStatic(fromArray(array as T[], scheduler), source); - } else { - return concatStatic(empty(scheduler), source); - } - }; + const scheduler = array[array.length - 1] as SchedulerLike; + if (isScheduler(scheduler)) { + // deprecated path + array.pop(); + return (source: Observable) => concat(array as T[], source, scheduler); + } else { + return (source: Observable) => concat(array as T[], source); + } } diff --git a/src/internal/scheduled/scheduleArray.ts b/src/internal/scheduled/scheduleArray.ts new file mode 100644 index 0000000000..1b5686064e --- /dev/null +++ b/src/internal/scheduled/scheduleArray.ts @@ -0,0 +1,21 @@ +import { Observable } from '../Observable'; +import { SchedulerLike } from '../types'; +import { Subscription } from '../Subscription'; + +export function scheduleArray(input: ArrayLike, scheduler: SchedulerLike) { + return new Observable(subscriber => { + const sub = new Subscription(); + let i = 0; + sub.add(scheduler.schedule(function () { + if (i === input.length) { + subscriber.complete(); + return; + } + subscriber.next(input[i++]); + if (!subscriber.closed) { + sub.add(this.schedule()); + } + })); + return sub; + }); +} diff --git a/src/internal/scheduled/scheduleIterable.ts b/src/internal/scheduled/scheduleIterable.ts new file mode 100644 index 0000000000..dc9043825e --- /dev/null +++ b/src/internal/scheduled/scheduleIterable.ts @@ -0,0 +1,45 @@ +import { Observable } from '../Observable'; +import { SchedulerLike } from '../types'; +import { Subscription } from '../Subscription'; +import { iterator as Symbol_iterator } from '../symbol/iterator'; + +export function scheduleIterable(input: Iterable, scheduler: SchedulerLike) { + if (!input) { + throw new Error('Iterable cannot be null'); + } + return new Observable(subscriber => { + const sub = new Subscription(); + let iterator: Iterator; + sub.add(() => { + // Finalize generators + if (iterator && typeof iterator.return === 'function') { + iterator.return(); + } + }); + sub.add(scheduler.schedule(() => { + iterator = input[Symbol_iterator](); + sub.add(scheduler.schedule(function () { + if (subscriber.closed) { + return; + } + let value: T; + let done: boolean; + try { + const result = iterator.next(); + value = result.value; + done = result.done; + } catch (err) { + subscriber.error(err); + return; + } + if (done) { + subscriber.complete(); + } else { + subscriber.next(value); + this.schedule(); + } + })); + })); + return sub; + }); +} diff --git a/src/internal/scheduled/scheduleObservable.ts b/src/internal/scheduled/scheduleObservable.ts new file mode 100644 index 0000000000..9e970c090d --- /dev/null +++ b/src/internal/scheduled/scheduleObservable.ts @@ -0,0 +1,19 @@ +import { Observable } from '../Observable'; +import { Subscription } from '../Subscription'; +import { observable as Symbol_observable } from '../symbol/observable'; +import { InteropObservable, SchedulerLike, Subscribable } from '../types'; + +export function scheduleObservable(input: InteropObservable, scheduler: SchedulerLike) { + return new Observable(subscriber => { + const sub = new Subscription(); + sub.add(scheduler.schedule(() => { + const observable: Subscribable = input[Symbol_observable](); + sub.add(observable.subscribe({ + next(value) { sub.add(scheduler.schedule(() => subscriber.next(value))); }, + error(err) { sub.add(scheduler.schedule(() => subscriber.error(err))); }, + complete() { sub.add(scheduler.schedule(() => subscriber.complete())); }, + })); + })); + return sub; + }); +} diff --git a/src/internal/scheduled/schedulePromise.ts b/src/internal/scheduled/schedulePromise.ts new file mode 100644 index 0000000000..ec1bfafc94 --- /dev/null +++ b/src/internal/scheduled/schedulePromise.ts @@ -0,0 +1,21 @@ +import { Observable } from '../Observable'; +import { SchedulerLike } from '../types'; +import { Subscription } from '../Subscription'; + +export function schedulePromise(input: PromiseLike, scheduler: SchedulerLike) { + return new Observable(subscriber => { + const sub = new Subscription(); + sub.add(scheduler.schedule(() => input.then( + value => { + sub.add(scheduler.schedule(() => { + subscriber.next(value); + sub.add(scheduler.schedule(() => subscriber.complete())); + })); + }, + err => { + sub.add(scheduler.schedule(() => subscriber.error(err))); + } + ))); + return sub; + }); +} diff --git a/src/internal/scheduled/scheduled.ts b/src/internal/scheduled/scheduled.ts new file mode 100644 index 0000000000..ec819f7301 --- /dev/null +++ b/src/internal/scheduled/scheduled.ts @@ -0,0 +1,36 @@ +import { scheduleObservable } from './scheduleObservable'; +import { schedulePromise } from './schedulePromise'; +import { scheduleArray } from './scheduleArray'; +import { scheduleIterable } from './scheduleIterable'; +import { ObservableInput, SchedulerLike, Observable } from 'rxjs'; +import { isInteropObservable } from '../util/isInteropObservable'; +import { isPromise } from '../util/isPromise'; +import { isArrayLike } from '../util/isArrayLike'; +import { isIterable } from '../util/isIterable'; + +/** + * Converts from a common {@link ObservableInput} type to an observable where subscription and emissions + * are scheduled on the provided scheduler. + * + * @see from + * @see of + * + * @param input The observable, array, promise, iterable, etc you would like to schedule + * @param scheduler The scheduler to use to schedule the subscription and emissions from + * the returned observable. + */ +export function scheduled(input: ObservableInput, scheduler: SchedulerLike): Observable { + if (input != null) { + if (isInteropObservable(input)) { + return scheduleObservable(input, scheduler); + } else if (isPromise(input)) { + return schedulePromise(input, scheduler); + } else if (isArrayLike(input)) { + return scheduleArray(input, scheduler); + } else if (isIterable(input) || typeof input === 'string') { + return scheduleIterable(input, scheduler); + } + } + + throw new TypeError((input !== null && typeof input || input) + ' is not observable'); +} diff --git a/src/internal/util/subscribeTo.ts b/src/internal/util/subscribeTo.ts index 27ed81abc1..c872f6a74c 100644 --- a/src/internal/util/subscribeTo.ts +++ b/src/internal/util/subscribeTo.ts @@ -1,4 +1,3 @@ -import { Observable } from '../Observable'; import { ObservableInput } from '../types'; import { subscribeToArray } from './subscribeToArray'; import { subscribeToPromise } from './subscribeToPromise'; @@ -9,20 +8,11 @@ import { isPromise } from './isPromise'; import { isObject } from './isObject'; import { iterator as Symbol_iterator } from '../symbol/iterator'; import { observable as Symbol_observable } from '../symbol/observable'; +import { Subscription } from '../Subscription'; import { Subscriber } from '../Subscriber'; -export const subscribeTo = (result: ObservableInput) => { - if (result instanceof Observable) { - return (subscriber: Subscriber) => { - if (result._isScalar) { - subscriber.next((result as any).value); - subscriber.complete(); - return undefined; - } else { - return result.subscribe(subscriber); - } - }; - } else if (!!result && typeof result[Symbol_observable] === 'function') { +export const subscribeTo = (result: ObservableInput): (subscriber: Subscriber) => Subscription | void => { + if (!!result && typeof result[Symbol_observable] === 'function') { return subscribeToObservable(result as any); } else if (isArrayLike(result)) { return subscribeToArray(result); diff --git a/src/internal/util/subscribeToArray.ts b/src/internal/util/subscribeToArray.ts index 54bbbc9443..0ca529428b 100644 --- a/src/internal/util/subscribeToArray.ts +++ b/src/internal/util/subscribeToArray.ts @@ -8,7 +8,5 @@ export const subscribeToArray = (array: ArrayLike) => (subscriber: Subscri for (let i = 0, len = array.length; i < len && !subscriber.closed; i++) { subscriber.next(array[i]); } - if (!subscriber.closed) { - subscriber.complete(); - } + subscriber.complete(); }; diff --git a/src/internal/util/subscribeToResult.ts b/src/internal/util/subscribeToResult.ts index 4a8df42117..f350b85733 100644 --- a/src/internal/util/subscribeToResult.ts +++ b/src/internal/util/subscribeToResult.ts @@ -1,9 +1,9 @@ -import { ObservableInput } from '../types'; import { Subscription } from '../Subscription'; import { InnerSubscriber } from '../InnerSubscriber'; import { OuterSubscriber } from '../OuterSubscriber'; import { Subscriber } from '../Subscriber'; import { subscribeTo } from './subscribeTo'; +import { Observable } from '../Observable'; export function subscribeToResult( outerSubscriber: OuterSubscriber, @@ -20,7 +20,10 @@ export function subscribeToResult( destination: Subscriber = new InnerSubscriber(outerSubscriber, outerValue, outerIndex) ): Subscription | void { if (destination.closed) { - return; + return undefined; + } + if (result instanceof Observable) { + return result.subscribe(destination); } return subscribeTo(result)(destination); }