From a12087154f3f17759d092740ec10aae4de89dee4 Mon Sep 17 00:00:00 2001 From: Jay Phelps Date: Tue, 15 Nov 2016 16:42:21 -0800 Subject: [PATCH 1/5] fix(timeout): Cancels scheduled timeout, if no longer needed fixes #2134 --- src/operator/timeout.ts | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index e69c786905..3df4bf1bee 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -5,6 +5,7 @@ import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { Observable } from '../Observable'; import { TeardownLogic } from '../Subscription'; +import { Subscription } from '../Subscription'; import { TimeoutError } from '../util/TimeoutError'; /** @@ -46,6 +47,8 @@ class TimeoutOperator implements Operator { class TimeoutSubscriber extends Subscriber { private index: number = 0; private _previousIndex: number = 0; + private action: Subscription = null; + get previousIndex(): number { return this._previousIndex; } @@ -66,18 +69,34 @@ class TimeoutSubscriber extends Subscriber { private static dispatchTimeout(state: any): void { const source = state.subscriber; const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { + if (source.previousIndex === currentIndex) { source.notifyTimeout(); } } private scheduleTimeout(): void { - let currentIndex = this.index; - this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex }); + const currentIndex = this.index; + const timeoutState = { subscriber: this, index: currentIndex }; + + this.cancelTimeout(); + this.action = this.scheduler.schedule( + TimeoutSubscriber.dispatchTimeout, this.waitFor, timeoutState + ); + this.add(this.action); + this.index++; this._previousIndex = currentIndex; } + private cancelTimeout(): void { + const { action } = this; + if (action !== null) { + this.remove(action); + action.unsubscribe(); + this.action = null; + } + } + protected _next(value: T): void { this.destination.next(value); From 109f9bb3a7e00a0fe46f150cf595e2d16275b4a0 Mon Sep 17 00:00:00 2001 From: Jay Phelps Date: Tue, 15 Nov 2016 16:42:56 -0800 Subject: [PATCH 2/5] fix(timeoutWith): Cancels scheduled timeout, if no longer needed --- src/operator/timeoutWith.ts | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index 0ab488059a..bb1a126492 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -50,6 +50,7 @@ class TimeoutWithOperator implements Operator { */ class TimeoutWithSubscriber extends OuterSubscriber { private timeoutSubscription: Subscription = undefined; + private action: Subscription = null; private index: number = 0; private _previousIndex: number = 0; get previousIndex(): number { @@ -79,13 +80,28 @@ class TimeoutWithSubscriber extends OuterSubscriber { } private scheduleTimeout(): void { - let currentIndex = this.index; + const currentIndex = this.index; const timeoutState = { subscriber: this, index: currentIndex }; - this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState); + + this.cancelTimeout(); + this.action = this.scheduler.schedule( + TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState + ); + this.add(this.action); + this.index++; this._previousIndex = currentIndex; } + private cancelTimeout(): void { + const { action } = this; + if (action !== null) { + this.remove(action); + action.unsubscribe(); + this.action = null; + } + } + protected _next(value: T) { this.destination.next(value); if (!this.absoluteTimeout) { From 672af1729ff2696f86852dce9be4dc93fb2acead Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Tue, 15 Nov 2016 23:44:24 -0800 Subject: [PATCH 3/5] build(npm-scripts): update debug_mocha npm script for node 6 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 7279fe9ea8..3508353fec 100644 --- a/package.json +++ b/package.json @@ -89,7 +89,7 @@ "prepublish": "shx rm -rf ./typings && typings install && npm run build_all", "publish_docs": "./publish_docs.sh", "test_mocha": "mocha --opts spec/support/default.opts spec-js", - "debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js", + "debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js", "test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html", "test": "npm-run-all clean_spec build_spec test_mocha clean_spec", "tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js", From 2ce6ab50c648b942b8d043907aeabb12e5deb2b2 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Tue, 15 Nov 2016 23:49:17 -0800 Subject: [PATCH 4/5] fix(VirtualAction): Block rescheduled VirtualActions from executing their scheduled work. VirtualActions are immutable so they can be inspected by the TestScheduler. In order to mirror rescheduled stateful Actions, rescheduled VirtualActions shouldn't execute if they've been rescheduled before execution. --- src/scheduler/VirtualTimeScheduler.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/scheduler/VirtualTimeScheduler.ts b/src/scheduler/VirtualTimeScheduler.ts index 21279b9696..acc86214ea 100644 --- a/src/scheduler/VirtualTimeScheduler.ts +++ b/src/scheduler/VirtualTimeScheduler.ts @@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler { */ export class VirtualAction extends AsyncAction { + protected active: boolean = true; + constructor(protected scheduler: VirtualTimeScheduler, protected work: (this: VirtualAction, state?: T) => void, protected index: number = scheduler.index += 1) { @@ -54,8 +56,11 @@ export class VirtualAction extends AsyncAction { } public schedule(state?: T, delay: number = 0): Subscription { - return !this.id ? - super.schedule(state, delay) : ( + if (!this.id) { + return super.schedule(state, delay); + } + this.active = false; + return ( // If an action is rescheduled, we save allocations by mutating its state, // pushing it to the end of the scheduler queue, and recycling the action. // But since the VirtualTimeScheduler is used for testing, VirtualActions @@ -77,6 +82,12 @@ export class VirtualAction extends AsyncAction { return undefined; } + protected _execute(state: T, delay: number): any { + if (this.active === true) { + return super._execute(state, delay); + } + } + public static sortActions(a: VirtualAction, b: VirtualAction) { if (a.delay === b.delay) { if (a.index === b.index) { From 1c00d45f2166d6c474672eced0453117ab569951 Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Sat, 19 Nov 2016 19:21:35 -0800 Subject: [PATCH 5/5] fix(timeout): Update timeout and timeoutWith to recycle their scheduled timeout actions. The timeout and timeoutWith operators should dispose their scheduled timeout actions on unsubscription. Also, given the new scheduling architecture, they can recycle their scheduled actions so just one action is allocated per subscription. --- spec/operators/timeout-spec.ts | 23 ++++++ spec/operators/timeoutWith-spec.ts | 26 ++++++ spec/schedulers/VirtualTimeScheduler-spec.ts | 13 ++- src/operator/timeout.ts | 69 +++++----------- src/operator/timeoutWith.ts | 86 +++++++------------- 5 files changed, 110 insertions(+), 107 deletions(-) diff --git a/spec/operators/timeout-spec.ts b/spec/operators/timeout-spec.ts index 95e66bde9b..abea44e91a 100644 --- a/spec/operators/timeout-spec.ts +++ b/spec/operators/timeout-spec.ts @@ -139,4 +139,27 @@ describe('Observable.prototype.timeout', () => { expectObservable(result).toBe(expected, values, value); expectSubscriptions(e1.subscriptions).toBe(e1subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('--a--b--c---d--e--|'); + const e1subs = '^ ! '; + const expected = '--a--b--c-- '; + const unsub = ' ! '; + + const result = e1 + .lift(function(source) { + const timeoutSubscriber = this; + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source._subscribe(timeoutSubscriber); + }) + .timeout(50, null, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); }); diff --git a/spec/operators/timeoutWith-spec.ts b/spec/operators/timeoutWith-spec.ts index fad796032f..ffe082a03c 100644 --- a/spec/operators/timeoutWith-spec.ts +++ b/spec/operators/timeoutWith-spec.ts @@ -260,4 +260,30 @@ describe('Observable.prototype.timeoutWith', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); expectSubscriptions(e2.subscriptions).toBe(e2subs); }); + + it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => { + const e1 = hot('---a---b-----c----|'); + const e1subs = '^ ! '; + const e2 = cold( '-x---y| '); + const e2subs = ' ^ ! '; + const expected = '---a---b----x-- '; + const unsub = ' ! '; + + const result = e1 + .lift(function(source) { + const timeoutSubscriber = this; + const { action } = timeoutSubscriber; // get a ref to the action here + timeoutSubscriber.add(() => { // because it'll be null by the + if (!action.closed) { // time we get into this function. + throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled'); + } + }); + return source._subscribe(timeoutSubscriber); + }) + .timeoutWith(40, e2, rxTestScheduler); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + expectSubscriptions(e2.subscriptions).toBe(e2subs); + }); }); diff --git a/spec/schedulers/VirtualTimeScheduler-spec.ts b/spec/schedulers/VirtualTimeScheduler-spec.ts index 6af1fe5e96..e0e113d6cf 100644 --- a/spec/schedulers/VirtualTimeScheduler-spec.ts +++ b/spec/schedulers/VirtualTimeScheduler-spec.ts @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => { v.flush(); expect(count).to.equal(3); }); -}); \ No newline at end of file + + it('should not execute virtual actions that have been rescheduled before flush', () => { + const v = new VirtualTimeScheduler(); + let messages = []; + let action: VirtualAction = > v.schedule(function(state: string) { + messages.push(state); + }, 10, 'first message'); + action = > action.schedule('second message' , 10); + v.flush(); + expect(messages).to.deep.equal(['second message']); + }); +}); diff --git a/src/operator/timeout.ts b/src/operator/timeout.ts index 3df4bf1bee..1b7af0c627 100644 --- a/src/operator/timeout.ts +++ b/src/operator/timeout.ts @@ -1,3 +1,4 @@ +import { Action } from '../scheduler/Action'; import { async } from '../scheduler/async'; import { isDate } from '../util/isDate'; import { Operator } from '../Operator'; @@ -5,7 +6,6 @@ import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { Observable } from '../Observable'; import { TeardownLogic } from '../Subscription'; -import { Subscription } from '../Subscription'; import { TimeoutError } from '../util/TimeoutError'; /** @@ -45,17 +45,8 @@ class TimeoutOperator implements Operator { * @extends {Ignored} */ class TimeoutSubscriber extends Subscriber { - private index: number = 0; - private _previousIndex: number = 0; - private action: Subscription = null; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } + private action: Action> = null; constructor(destination: Subscriber, private absoluteTimeout: boolean, @@ -66,56 +57,36 @@ class TimeoutSubscriber extends Subscriber { this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (source.previousIndex === currentIndex) { - source.notifyTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutSubscriber): void { + subscriber.error(subscriber.errorToSend); } private scheduleTimeout(): void { - const currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - - this.cancelTimeout(); - this.action = this.scheduler.schedule( - TimeoutSubscriber.dispatchTimeout, this.waitFor, timeoutState - ); - this.add(this.action); - - this.index++; - this._previousIndex = currentIndex; - } - - private cancelTimeout(): void { const { action } = this; - if (action !== null) { - this.remove(action); - action.unsubscribe(); - this.action = null; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutSubscriber.dispatchTimeout, this.waitFor, this + ))); } } protected _next(value: T): void { - this.destination.next(value); - if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any): void { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete(): void { - this.destination.complete(); - this._hasCompleted = true; - } - - notifyTimeout(): void { - this.error(this.errorToSend); + protected _unsubscribe() { + this.action = null; + this.scheduler = null; + this.errorToSend = null; } } diff --git a/src/operator/timeoutWith.ts b/src/operator/timeoutWith.ts index bb1a126492..d57e87820d 100644 --- a/src/operator/timeoutWith.ts +++ b/src/operator/timeoutWith.ts @@ -1,8 +1,9 @@ +import { Action } from '../scheduler/Action'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Scheduler } from '../Scheduler'; import { async } from '../scheduler/async'; -import { Subscription, TeardownLogic } from '../Subscription'; +import { TeardownLogic } from '../Subscription'; import { Observable, ObservableInput } from '../Observable'; import { isDate } from '../util/isDate'; import { OuterSubscriber } from '../OuterSubscriber'; @@ -49,81 +50,52 @@ class TimeoutWithOperator implements Operator { * @extends {Ignored} */ class TimeoutWithSubscriber extends OuterSubscriber { - private timeoutSubscription: Subscription = undefined; - private action: Subscription = null; - private index: number = 0; - private _previousIndex: number = 0; - get previousIndex(): number { - return this._previousIndex; - } - private _hasCompleted: boolean = false; - get hasCompleted(): boolean { - return this._hasCompleted; - } - constructor(public destination: Subscriber, + private action: Action> = null; + + constructor(destination: Subscriber, private absoluteTimeout: boolean, private waitFor: number, private withObservable: ObservableInput, private scheduler: Scheduler) { - super(); - destination.add(this); + super(destination); this.scheduleTimeout(); } - private static dispatchTimeout(state: any): void { - const source = state.subscriber; - const currentIndex = state.index; - if (!source.hasCompleted && source.previousIndex === currentIndex) { - source.handleTimeout(); - } + private static dispatchTimeout(subscriber: TimeoutWithSubscriber): void { + const { withObservable } = subscriber; + subscriber.unsubscribe(); + subscriber.closed = false; + subscriber.isStopped = false; + subscriber.add(subscribeToResult(subscriber, withObservable)); } private scheduleTimeout(): void { - const currentIndex = this.index; - const timeoutState = { subscriber: this, index: currentIndex }; - - this.cancelTimeout(); - this.action = this.scheduler.schedule( - TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState - ); - this.add(this.action); - - this.index++; - this._previousIndex = currentIndex; - } - - private cancelTimeout(): void { const { action } = this; - if (action !== null) { - this.remove(action); - action.unsubscribe(); - this.action = null; + if (action) { + // Recycle the action if we've already scheduled one. All the production + // Scheduler Actions mutate their state/delay time and return themeselves. + // VirtualActions are immutable, so they create and return a clone. In this + // case, we need to set the action reference to the most recent VirtualAction, + // to ensure that's the one we clone from next time. + this.action = (>> action.schedule(this, this.waitFor)); + } else { + this.add(this.action = (>> this.scheduler.schedule( + TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this + ))); } } - protected _next(value: T) { - this.destination.next(value); + protected _next(value: T): void { if (!this.absoluteTimeout) { this.scheduleTimeout(); } + super._next(value); } - protected _error(err: any) { - this.destination.error(err); - this._hasCompleted = true; - } - - protected _complete() { - this.destination.complete(); - this._hasCompleted = true; - } - - handleTimeout(): void { - if (!this.closed) { - const withObservable = this.withObservable; - this.unsubscribe(); - this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable)); - } + protected _unsubscribe() { + this.action = null; + this.scheduler = null; + this.withObservable = null; } }