Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pr 2135 #2

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
"publish_docs": "./publish_docs.sh",
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
"debug_mocha": "node --inspect --debug-brk ./node_modules/.bin/_mocha --opts spec/support/debug.opts spec-js",
"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",
Expand Down
23 changes: 23 additions & 0 deletions spec/operators/timeout-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,27 @@ describe('Observable.prototype.timeout', () => {
expectObservable(result).toBe(expected, values, value);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('--a--b--c---d--e--|');
const e1subs = '^ ! ';
const expected = '--a--b--c-- ';
const unsub = ' ! ';

const result = e1
.lift(function(source) {
const timeoutSubscriber = this;
const { action } = timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source._subscribe(timeoutSubscriber);
})
.timeout(50, null, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
26 changes: 26 additions & 0 deletions spec/operators/timeoutWith-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,30 @@ describe('Observable.prototype.timeoutWith', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});

it('should unsubscribe from the scheduled timeout action when timeout is unsubscribed early', () => {
const e1 = hot('---a---b-----c----|');
const e1subs = '^ ! ';
const e2 = cold( '-x---y| ');
const e2subs = ' ^ ! ';
const expected = '---a---b----x-- ';
const unsub = ' ! ';

const result = e1
.lift(function(source) {
const timeoutSubscriber = this;
const { action } = timeoutSubscriber; // get a ref to the action here
timeoutSubscriber.add(() => { // because it'll be null by the
if (!action.closed) { // time we get into this function.
throw new Error('TimeoutSubscriber scheduled action wasn\'t canceled');
}
});
return source._subscribe(timeoutSubscriber);
})
.timeoutWith(40, e2, rxTestScheduler);

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
13 changes: 12 additions & 1 deletion spec/schedulers/VirtualTimeScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ describe('VirtualTimeScheduler', () => {
v.flush();
expect(count).to.equal(3);
});
});

it('should not execute virtual actions that have been rescheduled before flush', () => {
const v = new VirtualTimeScheduler();
let messages = [];
let action: VirtualAction<string> = <VirtualAction<string>> v.schedule(function(state: string) {
messages.push(state);
}, 10, 'first message');
action = <VirtualAction<string>> action.schedule('second message' , 10);
v.flush();
expect(messages).to.deep.equal(['second message']);
});
});
56 changes: 23 additions & 33 deletions src/operator/timeout.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Action } from '../scheduler/Action';
import { async } from '../scheduler/async';
import { isDate } from '../util/isDate';
import { Operator } from '../Operator';
Expand Down Expand Up @@ -44,15 +45,8 @@ class TimeoutOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutSubscriber<T> extends Subscriber<T> {
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

private action: Action<TimeoutSubscriber<T>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
Expand All @@ -63,40 +57,36 @@ class TimeoutSubscriber<T> extends Subscriber<T> {
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.notifyTimeout();
}
private static dispatchTimeout<T>(subscriber: TimeoutSubscriber<T>): void {
subscriber.error(subscriber.errorToSend);
}

private scheduleTimeout(): void {
let currentIndex = this.index;
this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutSubscriber<T>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutSubscriber<T>>> this.scheduler.schedule(
TimeoutSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T): void {
this.destination.next(value);

if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any): void {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete(): void {
this.destination.complete();
this._hasCompleted = true;
}

notifyTimeout(): void {
this.error(this.errorToSend);
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.errorToSend = null;
}
}
74 changes: 31 additions & 43 deletions src/operator/timeoutWith.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { Action } from '../scheduler/Action';
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { Scheduler } from '../Scheduler';
import { async } from '../scheduler/async';
import { Subscription, TeardownLogic } from '../Subscription';
import { TeardownLogic } from '../Subscription';
import { Observable, ObservableInput } from '../Observable';
import { isDate } from '../util/isDate';
import { OuterSubscriber } from '../OuterSubscriber';
Expand Down Expand Up @@ -49,65 +50,52 @@ class TimeoutWithOperator<T> implements Operator<T, T> {
* @extends {Ignored}
*/
class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
private timeoutSubscription: Subscription = undefined;
private index: number = 0;
private _previousIndex: number = 0;
get previousIndex(): number {
return this._previousIndex;
}
private _hasCompleted: boolean = false;
get hasCompleted(): boolean {
return this._hasCompleted;
}

constructor(public destination: Subscriber<T>,
private action: Action<TimeoutWithSubscriber<T, R>> = null;

constructor(destination: Subscriber<T>,
private absoluteTimeout: boolean,
private waitFor: number,
private withObservable: ObservableInput<any>,
private scheduler: Scheduler) {
super();
destination.add(this);
super(destination);
this.scheduleTimeout();
}

private static dispatchTimeout(state: any): void {
const source = state.subscriber;
const currentIndex = state.index;
if (!source.hasCompleted && source.previousIndex === currentIndex) {
source.handleTimeout();
}
private static dispatchTimeout<T, R>(subscriber: TimeoutWithSubscriber<T, R>): void {
const { withObservable } = subscriber;
subscriber.unsubscribe();
subscriber.closed = false;
subscriber.isStopped = false;
subscriber.add(subscribeToResult(subscriber, withObservable));
}

private scheduleTimeout(): void {
let currentIndex = this.index;
const timeoutState = { subscriber: this, index: currentIndex };
this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, timeoutState);
this.index++;
this._previousIndex = currentIndex;
const { action } = this;
if (action) {
// Recycle the action if we've already scheduled one. All the production
// Scheduler Actions mutate their state/delay time and return themeselves.
// VirtualActions are immutable, so they create and return a clone. In this
// case, we need to set the action reference to the most recent VirtualAction,
// to ensure that's the one we clone from next time.
this.action = (<Action<TimeoutWithSubscriber<T, R>>> action.schedule(this, this.waitFor));
} else {
this.add(this.action = (<Action<TimeoutWithSubscriber<T, R>>> this.scheduler.schedule(
TimeoutWithSubscriber.dispatchTimeout, this.waitFor, this
)));
}
}

protected _next(value: T) {
this.destination.next(value);
protected _next(value: T): void {
if (!this.absoluteTimeout) {
this.scheduleTimeout();
}
super._next(value);
}

protected _error(err: any) {
this.destination.error(err);
this._hasCompleted = true;
}

protected _complete() {
this.destination.complete();
this._hasCompleted = true;
}

handleTimeout(): void {
if (!this.closed) {
const withObservable = this.withObservable;
this.unsubscribe();
this.destination.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
}
protected _unsubscribe() {
this.action = null;
this.scheduler = null;
this.withObservable = null;
}
}
15 changes: 13 additions & 2 deletions src/scheduler/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export class VirtualTimeScheduler extends AsyncScheduler {
*/
export class VirtualAction<T> extends AsyncAction<T> {

protected active: boolean = true;

constructor(protected scheduler: VirtualTimeScheduler,
protected work: (this: VirtualAction<T>, state?: T) => void,
protected index: number = scheduler.index += 1) {
Expand All @@ -54,8 +56,11 @@ export class VirtualAction<T> extends AsyncAction<T> {
}

public schedule(state?: T, delay: number = 0): Subscription {
return !this.id ?
super.schedule(state, delay) : (
if (!this.id) {
return super.schedule(state, delay);
}
this.active = false;
return (
// If an action is rescheduled, we save allocations by mutating its state,
// pushing it to the end of the scheduler queue, and recycling the action.
// But since the VirtualTimeScheduler is used for testing, VirtualActions
Expand All @@ -77,6 +82,12 @@ export class VirtualAction<T> extends AsyncAction<T> {
return undefined;
}

protected _execute(state: T, delay: number): any {
if (this.active === true) {
return super._execute(state, delay);
}
}

public static sortActions<T>(a: VirtualAction<T>, b: VirtualAction<T>) {
if (a.delay === b.delay) {
if (a.index === b.index) {
Expand Down