From a114c54a95d12af959133124f441e8fc77908427 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Mon, 15 Jun 2020 19:04:18 -0500 Subject: [PATCH 1/2] fix(subscribeOn): allow Infinity as valid delay Removes use of `isNumeric` that excludes `Infinity` as a valid number - Moves SubscribeOnObservable to the only location it is used. - Removes redundant tests of internals --- .../observables/SubscribeOnObservable-spec.ts | 56 ------------------ spec/operators/subscribeOn-spec.ts | 10 ++++ .../observable/SubscribeOnObservable.ts | 52 ---------------- src/internal/operators/subscribeOn.ts | 59 +++++++++++++++---- src/internal/util/isScheduler.ts | 2 +- 5 files changed, 59 insertions(+), 120 deletions(-) delete mode 100644 spec/observables/SubscribeOnObservable-spec.ts delete mode 100644 src/internal/observable/SubscribeOnObservable.ts diff --git a/spec/observables/SubscribeOnObservable-spec.ts b/spec/observables/SubscribeOnObservable-spec.ts deleted file mode 100644 index 6de612c279..0000000000 --- a/spec/observables/SubscribeOnObservable-spec.ts +++ /dev/null @@ -1,56 +0,0 @@ -import { expect } from 'chai'; -import * as sinon from 'sinon'; -import { SubscribeOnObservable } from 'rxjs/internal/observable/SubscribeOnObservable'; -import { hot, expectObservable, expectSubscriptions } from '../helpers/marble-testing'; -import { TestScheduler } from 'rxjs/testing'; -import { asapScheduler } from 'rxjs'; - -declare const rxTestScheduler: TestScheduler; - -describe('SubscribeOnObservable', () => { - it('should create Observable to be subscribed on specified scheduler', () => { - const e1 = hot('--a--b--|'); - const expected = '--a--b--|'; - const sub = '^ !'; - const subscribe = new SubscribeOnObservable(e1, 0, rxTestScheduler); - - expectObservable(subscribe).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(sub); - }); - - it('should specify default scheduler if incorrect scheduler specified', () => { - const e1 = hot('--a--b--|'); - const obj: any = sinon.spy(); - - const scheduler = (new SubscribeOnObservable(e1, 0, obj)).scheduler; - - expect(scheduler).to.deep.equal(asapScheduler); - }); - - it('should create observable via staic create function', () => { - const s = new SubscribeOnObservable(null as any, null as any, rxTestScheduler); - const r = SubscribeOnObservable.create(null as any, null as any, rxTestScheduler); - - expect(s).to.deep.equal(r); - }); - - it('should subscribe after specified delay', () => { - const e1 = hot('--a--b--|'); - const expected = '-----b--|'; - const sub = ' ^ !'; - const subscribe = new SubscribeOnObservable(e1, 30, rxTestScheduler); - - expectObservable(subscribe).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(sub); - }); - - it('should consider negative delay as zero', () => { - const e1 = hot('--a--b--|'); - const expected = '--a--b--|'; - const sub = '^ !'; - const subscribe = new SubscribeOnObservable(e1, -10, rxTestScheduler); - - expectObservable(subscribe).toBe(expected); - expectSubscriptions(e1.subscriptions).toBe(sub); - }); -}); diff --git a/spec/operators/subscribeOn-spec.ts b/spec/operators/subscribeOn-spec.ts index f1cc7fba98..51c4516647 100644 --- a/spec/operators/subscribeOn-spec.ts +++ b/spec/operators/subscribeOn-spec.ts @@ -79,4 +79,14 @@ describe('subscribeOn operator', () => { expectObservable(result, unsub).toBe(expected); expectSubscriptions(e1.subscriptions).toBe(sub); }); + + it('should properly support a delayTime of Infinity', () => { + const e1 = hot('--a--b--|'); + const expected = '---------'; + // TODO: Support lack of subscription in `expectSubscriptions` + // const sub = ' '; + + expectObservable(e1.pipe(subscribeOn(rxTestScheduler, Infinity))).toBe(expected); + // expectSubscriptions(e1.subscriptions).toBe(sub); + }); }); diff --git a/src/internal/observable/SubscribeOnObservable.ts b/src/internal/observable/SubscribeOnObservable.ts deleted file mode 100644 index f84c8e9708..0000000000 --- a/src/internal/observable/SubscribeOnObservable.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { SchedulerLike, SchedulerAction } from '../types'; -import { Subscriber } from '../Subscriber'; -import { Subscription } from '../Subscription'; -import { Observable } from '../Observable'; -import { asap } from '../scheduler/asap'; -import { isNumeric } from '../util/isNumeric'; - -export interface DispatchArg { - source: Observable; - subscriber: Subscriber; -} - -/** - * We need this JSDoc comment for affecting ESDoc. - * @extends {Ignored} - * @hide true - */ -export class SubscribeOnObservable extends Observable { - /** @nocollapse */ - static create(source: Observable, delay: number = 0, scheduler: SchedulerLike = asap): Observable { - return new SubscribeOnObservable(source, delay, scheduler); - } - - /** @nocollapse */ - static dispatch(this: SchedulerAction, arg: DispatchArg): Subscription { - const { source, subscriber } = arg; - return this.add(source.subscribe(subscriber)); - } - - constructor(public source: Observable, - private delayTime: number = 0, - private scheduler: SchedulerLike = asap) { - super(); - if (!isNumeric(delayTime) || delayTime < 0) { - this.delayTime = 0; - } - if (!scheduler || typeof scheduler.schedule !== 'function') { - this.scheduler = asap; - } - } - - /** @deprecated This is an internal implementation detail, do not use. */ - _subscribe(subscriber: Subscriber) { - const delay = this.delayTime; - const source = this.source; - const scheduler = this.scheduler; - - return scheduler.schedule>(SubscribeOnObservable.dispatch as any, delay, { - source, subscriber - }); - } -} diff --git a/src/internal/operators/subscribeOn.ts b/src/internal/operators/subscribeOn.ts index 03173ea5bc..ad5e0e7d20 100644 --- a/src/internal/operators/subscribeOn.ts +++ b/src/internal/operators/subscribeOn.ts @@ -1,8 +1,49 @@ import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { SubscribeOnObservable } from '../observable/SubscribeOnObservable'; -import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types'; +import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic, SchedulerAction } from '../types'; +import { asap as asapScheduler } from '../scheduler/asap'; +import { Subscription } from '../Subscription'; +import { isScheduler } from '../util/isScheduler'; + +export interface DispatchArg { + source: Observable; + subscriber: Subscriber; +} + +class SubscribeOnObservable extends Observable { + /** @nocollapse */ + static dispatch(this: SchedulerAction, arg: DispatchArg): Subscription { + const { source, subscriber } = arg; + return this.add(source.subscribe(subscriber)); + } + + constructor( + public source: Observable, + private delayTime: number = 0, + private scheduler: SchedulerLike = asapScheduler + ) { + super(); + if (delayTime < 0) { + this.delayTime = 0; + } + if (!isScheduler(scheduler)) { + this.scheduler = asapScheduler; + } + } + + /** @deprecated This is an internal implementation detail, do not use. */ + _subscribe(subscriber: Subscriber) { + const delay = this.delayTime; + const source = this.source; + const scheduler = this.scheduler; + + return scheduler.schedule>(SubscribeOnObservable.dispatch as any, delay, { + source, + subscriber, + }); + } +} /** * Asynchronously subscribes Observers to this Observable on the specified {@link SchedulerLike}. @@ -59,9 +100,9 @@ import { MonoTypeOperatorFunction, SchedulerLike, TeardownLogic } from '../types * The reason for this is that Observable `b` emits its values directly and synchronously like before * but the emissions from `a` are scheduled on the event loop because we are now using the {@link asyncScheduler} for that specific Observable. * - * @param {SchedulerLike} scheduler - The {@link SchedulerLike} to perform subscription actions on. - * @return {Observable} The source Observable modified so that its subscriptions happen on the specified {@link SchedulerLike}. - * @name subscribeOn + * @param scheduler The {@link SchedulerLike} to perform subscription actions on. + * @param delay A delay to pass to the scheduler to delay subscriptions + * @return The source Observable modified so that its subscriptions happen on the specified {@link SchedulerLike}. */ export function subscribeOn(scheduler: SchedulerLike, delay: number = 0): MonoTypeOperatorFunction { return function subscribeOnOperatorFunction(source: Observable): Observable { @@ -70,12 +111,8 @@ export function subscribeOn(scheduler: SchedulerLike, delay: number = 0): Mon } class SubscribeOnOperator implements Operator { - constructor(private scheduler: SchedulerLike, - private delay: number) { - } + constructor(private scheduler: SchedulerLike, private delay: number) {} call(subscriber: Subscriber, source: any): TeardownLogic { - return new SubscribeOnObservable( - source, this.delay, this.scheduler - ).subscribe(subscriber); + return new SubscribeOnObservable(source, this.delay, this.scheduler).subscribe(subscriber); } } diff --git a/src/internal/util/isScheduler.ts b/src/internal/util/isScheduler.ts index 594f195fbf..616285bead 100644 --- a/src/internal/util/isScheduler.ts +++ b/src/internal/util/isScheduler.ts @@ -1,5 +1,5 @@ import { SchedulerLike } from '../types'; export function isScheduler(value: any): value is SchedulerLike { - return value && typeof (value).schedule === 'function'; + return value && typeof value.schedule === 'function'; } From 0f80e2ccbf315890264406c0a5eb9b6086ec2d33 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Tue, 16 Jun 2020 09:30:19 -0500 Subject: [PATCH 2/2] chore: update test with proper subscription assertion --- spec/operators/subscribeOn-spec.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spec/operators/subscribeOn-spec.ts b/spec/operators/subscribeOn-spec.ts index 51c4516647..06dd8ee5f5 100644 --- a/spec/operators/subscribeOn-spec.ts +++ b/spec/operators/subscribeOn-spec.ts @@ -83,10 +83,8 @@ describe('subscribeOn operator', () => { it('should properly support a delayTime of Infinity', () => { const e1 = hot('--a--b--|'); const expected = '---------'; - // TODO: Support lack of subscription in `expectSubscriptions` - // const sub = ' '; expectObservable(e1.pipe(subscribeOn(rxTestScheduler, Infinity))).toBe(expected); - // expectSubscriptions(e1.subscriptions).toBe(sub); + expectSubscriptions(e1.subscriptions).toBe([]); }); });