Skip to content

Commit

Permalink
fix: schedulers will no longer error while rescheduling and unsubscri…
Browse files Browse the repository at this point in the history
…bing during flushes

* chore: use sinon sandbox consistently

* test: add failing flush tests

* fix: don't execute actions scheduled within flush

* test: add failing tests

* fix: avoid calling flush with empty actions queue

Closes #6672

* chore: remove accidental auto-import

* test: call all the dones
  • Loading branch information
cartant authored and benlesh committed Nov 29, 2021
1 parent 5225692 commit 7f9c8d3
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 27 deletions.
72 changes: 71 additions & 1 deletion spec/schedulers/AnimationFrameScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ describe('Scheduler.animationFrame', () => {
const requestSpy = sinon.spy(animationFrameProvider, 'requestAnimationFrame');
const setSpy = sinon.spy(intervalProvider, 'setInterval');
const clearSpy = sinon.spy(intervalProvider, 'clearInterval');

animate(' ----------x--');
const a = cold(' a ');
const ta = time(' ----| ');
Expand Down Expand Up @@ -168,4 +168,74 @@ describe('Scheduler.animationFrame', () => {
}, 0, 0);
scheduledIndices.push(0);
});

it('should execute actions scheduled when flushing in a subsequent flush', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(animationFrameScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
});
b = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
});
});

it('should execute actions scheduled when flushing in a subsequent flush when some actions are unsubscribed', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(animationFrameScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = animationFrameScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
b.unsubscribe();
});
b = animationFrameScheduler.schedule(() => {
done(new Error('Unexpected execution of b'));
});
});

it('should properly cancel an unnecessary flush', (done) => {
const sandbox = sinon.createSandbox();
const cancelAnimationFrameStub = sandbox.stub(animationFrameProvider, 'cancelAnimationFrame').callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = animationFrameScheduler.schedule(() => {
expect(animationFrameScheduler.actions).to.have.length(1);
c = animationFrameScheduler.schedule(() => {
done(new Error('Unexpected execution of c'));
});
expect(animationFrameScheduler.actions).to.have.length(2);
// What we're testing here is that the unsubscription of action c effects
// the cancellation of the animation frame in a scenario in which the
// actions queue is not empty - it contains action b.
c.unsubscribe();
expect(animationFrameScheduler.actions).to.have.length(1);
expect(cancelAnimationFrameStub).to.have.callCount(1);
});
b = animationFrameScheduler.schedule(() => {
sandbox.restore();
done();
});
});
});
87 changes: 77 additions & 10 deletions spec/schedulers/AsapScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ describe('Scheduler.asap', () => {

it('should cancel asap actions when delay > 0', () => {
testScheduler.run(({ cold, expectObservable, flush, time }) => {
const setImmediateSpy = sinon.spy(immediateProvider, 'setImmediate');
const setSpy = sinon.spy(intervalProvider, 'setInterval');
const clearSpy = sinon.spy(intervalProvider, 'clearInterval');
const sandbox = sinon.createSandbox();
const setImmediateSpy = sandbox.spy(immediateProvider, 'setImmediate');
const setSpy = sandbox.spy(intervalProvider, 'setInterval');
const clearSpy = sandbox.spy(intervalProvider, 'clearInterval');

const a = cold(' a ');
const ta = time(' ----| ');
Expand All @@ -57,17 +58,15 @@ describe('Scheduler.asap', () => {
expect(setImmediateSpy).to.have.not.been.called;
expect(setSpy).to.have.been.calledOnce;
expect(clearSpy).to.have.been.calledOnce;
setImmediateSpy.restore();
setSpy.restore();
clearSpy.restore();
sandbox.restore();
});
});

it('should reuse the interval for recursively scheduled actions with the same delay', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
// callThrough is missing from the declarations installed by the typings tool in stable
const stubSetInterval = (<any> sinon.stub(global, 'setInterval')).callThrough();
const stubSetInterval = (<any> sandbox.stub(global, 'setInterval')).callThrough();
const period = 50;
const state = { index: 0, period };
type State = typeof state;
Expand All @@ -86,15 +85,14 @@ describe('Scheduler.asap', () => {
fakeTimer.tick(period);
expect(state).to.have.property('index', 2);
expect(stubSetInterval).to.have.property('callCount', 1);
stubSetInterval.restore();
sandbox.restore();
});

it('should not reuse the interval for recursively scheduled actions with a different delay', () => {
const sandbox = sinon.createSandbox();
const fakeTimer = sandbox.useFakeTimers();
// callThrough is missing from the declarations installed by the typings tool in stable
const stubSetInterval = (<any> sinon.stub(global, 'setInterval')).callThrough();
const stubSetInterval = (<any> sandbox.stub(global, 'setInterval')).callThrough();
const period = 50;
const state = { index: 0, period };
type State = typeof state;
Expand All @@ -114,7 +112,6 @@ describe('Scheduler.asap', () => {
fakeTimer.tick(period);
expect(state).to.have.property('index', 2);
expect(stubSetInterval).to.have.property('callCount', 3);
stubSetInterval.restore();
sandbox.restore();
});

Expand Down Expand Up @@ -221,4 +218,74 @@ describe('Scheduler.asap', () => {
}, 0, 0);
scheduledIndices.push(0);
});

it('should execute actions scheduled when flushing in a subsequent flush', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(asapScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
});
b = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
});
});

it('should execute actions scheduled when flushing in a subsequent flush when some actions are unsubscribed', (done) => {
const sandbox = sinon.createSandbox();
const stubFlush = (sandbox.stub(asapScheduler, 'flush')).callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(1);
c = asapScheduler.schedule(() => {
expect(stubFlush).to.have.callCount(2);
sandbox.restore();
done();
});
b.unsubscribe();
});
b = asapScheduler.schedule(() => {
done(new Error('Unexpected execution of b'));
});
});

it('should properly cancel an unnecessary flush', (done) => {
const sandbox = sinon.createSandbox();
const clearImmediateStub = sandbox.stub(immediateProvider, 'clearImmediate').callThrough();

let a: Subscription;
let b: Subscription;
let c: Subscription;

a = asapScheduler.schedule(() => {
expect(asapScheduler.actions).to.have.length(1);
c = asapScheduler.schedule(() => {
done(new Error('Unexpected execution of c'));
});
expect(asapScheduler.actions).to.have.length(2);
// What we're testing here is that the unsubscription of action c effects
// the cancellation of the microtask in a scenario in which the actions
// queue is not empty - it contains action b.
c.unsubscribe();
expect(asapScheduler.actions).to.have.length(1);
expect(clearImmediateStub).to.have.callCount(1);
});
b = asapScheduler.schedule(() => {
sandbox.restore();
done();
});
});
});
8 changes: 4 additions & 4 deletions src/internal/scheduler/AnimationFrameAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ export class AnimationFrameAction<T> extends AsyncAction<T> {
if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
return super.recycleAsyncId(scheduler, id, delay);
}
// If the scheduler queue is empty, cancel the requested animation frame and
// set the scheduled flag to undefined so the next AnimationFrameAction will
// request its own.
if (scheduler.actions.length === 0) {
// If the scheduler queue has no remaining actions with the same async id,
// cancel the requested animation frame and set the scheduled flag to
// undefined so the next AnimationFrameAction will request its own.
if (!scheduler.actions.some((action) => action.id === id)) {
animationFrameProvider.cancelAnimationFrame(id);
scheduler._scheduled = undefined;
}
Expand Down
16 changes: 12 additions & 4 deletions src/internal/scheduler/AnimationFrameScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@ import { AsyncScheduler } from './AsyncScheduler';
export class AnimationFrameScheduler extends AsyncScheduler {
public flush(action?: AsyncAction<any>): void {
this._active = true;
// The async id that effects a call to flush is stored in _scheduled.
// Before executing an action, it's necessary to check the action's async
// id to determine whether it's supposed to be executed in the current
// flush.
// Previous implementations of this method used a count to determine this,
// but that was unsound, as actions that are unsubscribed - i.e. cancelled -
// are removed from the actions array and that can shift actions that are
// scheduled to be executed in a subsequent flush into positions at which
// they are executed within the current flush.
const flushId = this._scheduled;
this._scheduled = undefined;

const { actions } = this;
let error: any;
let index = -1;
action = action || actions.shift()!;
const count = actions.length;

do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while (++index < count && (action = actions.shift()));
} while ((action = actions[0]) && action.id === flushId && actions.shift());

this._active = false;

if (error) {
while (++index < count && (action = actions.shift())) {
while ((action = actions[0]) && action.id === flushId && actions.shift()) {
action.unsubscribe();
}
throw error;
Expand Down
8 changes: 4 additions & 4 deletions src/internal/scheduler/AsapAction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ export class AsapAction<T> extends AsyncAction<T> {
if ((delay != null && delay > 0) || (delay == null && this.delay > 0)) {
return super.recycleAsyncId(scheduler, id, delay);
}
// If the scheduler queue is empty, cancel the requested microtask and
// set the scheduled flag to undefined so the next AsapAction will schedule
// its own.
if (scheduler.actions.length === 0) {
// If the scheduler queue has no remaining actions with the same async id,
// cancel the requested microtask and set the scheduled flag to undefined
// so the next AsapAction will request its own.
if (!scheduler.actions.some((action) => action.id === id)) {
immediateProvider.clearImmediate(id);
scheduler._scheduled = undefined;
}
Expand Down
16 changes: 12 additions & 4 deletions src/internal/scheduler/AsapScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,32 @@ import { AsyncScheduler } from './AsyncScheduler';
export class AsapScheduler extends AsyncScheduler {
public flush(action?: AsyncAction<any>): void {
this._active = true;
// The async id that effects a call to flush is stored in _scheduled.
// Before executing an action, it's necessary to check the action's async
// id to determine whether it's supposed to be executed in the current
// flush.
// Previous implementations of this method used a count to determine this,
// but that was unsound, as actions that are unsubscribed - i.e. cancelled -
// are removed from the actions array and that can shift actions that are
// scheduled to be executed in a subsequent flush into positions at which
// they are executed within the current flush.
const flushId = this._scheduled;
this._scheduled = undefined;

const { actions } = this;
let error: any;
let index = -1;
action = action || actions.shift()!;
const count = actions.length;

do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while (++index < count && (action = actions.shift()));
} while ((action = actions[0]) && action.id === flushId && actions.shift());

this._active = false;

if (error) {
while (++index < count && (action = actions.shift())) {
while ((action = actions[0]) && action.id === flushId && actions.shift()) {
action.unsubscribe();
}
throw error;
Expand Down

0 comments on commit 7f9c8d3

Please sign in to comment.