Skip to content

Commit

Permalink
fix(schedulers): fix asap and animationFrame schedulers to execute ac…
Browse files Browse the repository at this point in the history
…ross async boundaries.

The AsapScheduler and AnimationFrameSchedulers were totally busted. My bad. Now they execute their scheduled actions in batches. If actions reschedule while executing a batch, a new frame is requested for the rescheduled action to execute in.

This PR also simplifies the public `Scheduler` and `Action` APIs. Implementation details like the `actions` queue and `active` boolean are now on the concrete implementations, so it's easier for people to implement the Scheduler API. This PR also renames `FutureAction` -> `AsyncAction` to conform to the same naming convention as the rest of the Action types.

Fixes ReactiveX#1814
  • Loading branch information
trxcllnt committed Jul 13, 2016
1 parent 4f65b03 commit 8968293
Show file tree
Hide file tree
Showing 32 changed files with 735 additions and 602 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
"publish_docs": "./publish_docs.sh",
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",
Expand Down
17 changes: 16 additions & 1 deletion spec/Scheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ describe('Scheduler.queue', () => {
expect(call2).to.be.true;
});

it('should schedule things recursively via this.schedule', () => {
let call1 = false;
let call2 = false;
Scheduler.queue.active = false;
Scheduler.queue.schedule(function (state) {
call1 = state.call1;
call2 = state.call2;
if (!call2) {
this.schedule({ call1: true, call2: true });
}
}, 0, { call1: true, call2: false });
expect(call1).to.be.true;
expect(call2).to.be.true;
});

it('should schedule things in the future too', (done: MochaDone) => {
let called = false;
Scheduler.queue.schedule(() => {
Expand Down Expand Up @@ -55,4 +70,4 @@ describe('Scheduler.queue', () => {
});
}, 0);
});
});
});
88 changes: 88 additions & 0 deletions spec/schedulers/AnimationFrameScheduler-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';

const animationFrame = Rx.Scheduler.animationFrame;

/** @test {Scheduler} */
describe('Scheduler.animationFrame', () => {
it('should exist', () => {
expect(animationFrame).exist;
});

it('should schedule an action to happen later', (done: MochaDone) => {
let actionHappened = false;
animationFrame.schedule(() => {
actionHappened = true;
done();
});
if (actionHappened) {
done(new Error('Scheduled action happened synchronously'));
}
});

it('should execute recursively scheduled actions in separate asynchronous contexts', (done: MochaDone) => {
let syncExec1 = true;
let syncExec2 = true;
animationFrame.schedule(function (index) {
if (index === 0) {
this.schedule(1);
animationFrame.schedule(() => { syncExec1 = false; });
} else if (index === 1) {
this.schedule(2);
animationFrame.schedule(() => { syncExec2 = false; });
} else if (index === 2) {
this.schedule(3);
} else if (index === 3) {
if (!syncExec1 && !syncExec2) {
done();
} else {
done(new Error('Execution happened synchronously.'));
}
}
}, 0, 0);
});

it('should cancel the animation frame if all scheduled actions unsubscribe before it executes', (done: MochaDone) => {
let animationFrameExec1 = false;
let animationFrameExec2 = false;
const action1 = animationFrame.schedule(() => { animationFrameExec1 = true; });
const action2 = animationFrame.schedule(() => { animationFrameExec2 = true; });
expect(animationFrame.scheduled).to.exist;
expect(animationFrame.actions.length).to.equal(2);
action1.unsubscribe();
action2.unsubscribe();
expect(animationFrame.actions.length).to.equal(0);
expect(animationFrame.scheduled).to.equal(undefined);
animationFrame.schedule(() => {
expect(animationFrameExec1).to.equal(false);
expect(animationFrameExec2).to.equal(false);
done();
});
});

it('should execute the rest of the scheduled actions if the first action is canceled', (done: MochaDone) => {
let actionHappened = false;
let firstSubscription = null;
let secondSubscription = null;

firstSubscription = animationFrame.schedule(() => {
actionHappened = true;
if (secondSubscription) {
secondSubscription.unsubscribe();
}
done(new Error('The first action should not have executed.'));
});

secondSubscription = animationFrame.schedule(() => {
if (!actionHappened) {
done();
}
});

if (actionHappened) {
done(new Error('Scheduled action happened synchronously'));
} else {
firstSubscription.unsubscribe();
}
});
});
40 changes: 40 additions & 0 deletions spec/schedulers/AsapScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,46 @@ describe('Scheduler.asap', () => {
}
});

it('should execute recursively scheduled actions in separate asynchronous contexts', (done: MochaDone) => {
let syncExec1 = true;
let syncExec2 = true;
asap.schedule(function (index) {
if (index === 0) {
this.schedule(1);
asap.schedule(() => { syncExec1 = false; });
} else if (index === 1) {
this.schedule(2);
asap.schedule(() => { syncExec2 = false; });
} else if (index === 2) {
this.schedule(3);
} else if (index === 3) {
if (!syncExec1 && !syncExec2) {
done();
} else {
done(new Error('Execution happened synchronously.'));
}
}
}, 0, 0);
});

it('should cancel the setImmediate if all scheduled actions unsubscribe before it executes', (done: MochaDone) => {
let asapExec1 = false;
let asapExec2 = false;
const action1 = asap.schedule(() => { asapExec1 = true; });
const action2 = asap.schedule(() => { asapExec2 = true; });
expect(asap.scheduled).to.exist;
expect(asap.actions.length).to.equal(2);
action1.unsubscribe();
action2.unsubscribe();
expect(asap.actions.length).to.equal(0);
expect(asap.scheduled).to.equal(undefined);
asap.schedule(() => {
expect(asapExec1).to.equal(false);
expect(asapExec2).to.equal(false);
done();
});
});

it('should execute the rest of the scheduled actions if the first action is canceled', (done: MochaDone) => {
let actionHappened = false;
let firstSubscription = null;
Expand Down
56 changes: 56 additions & 0 deletions spec/schedulers/QueueScheduler-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';

const Scheduler = Rx.Scheduler;
const queue = Scheduler.queue;

/** @test {Scheduler} */
describe('Scheduler.queue', () => {
it('should switch from synchronous to asynchronous at will', (done: MochaDone) => {
let lastExecTime = 0;
let asyncExec = false;
queue.schedule(function (index) {
if (index === 0) {
lastExecTime = queue.now();
this.schedule(1, 100);
} else if (index === 1) {
if (queue.now() - lastExecTime < 100) {
done(new Error('Execution happened synchronously.'));
} else {
asyncExec = true;
lastExecTime = queue.now();
this.schedule(2, 0);
}
} else if (index === 2) {
if (asyncExec === false) {
done(new Error('Execution happened synchronously.'));
} else {
done();
}
}
}, 0, 0);
asyncExec = false;
});
it('should unsubscribe the rest of the scheduled actions if an action throws an error', () => {
const actions = [];
let action2Exec = false;
let action3Exec = false;
let errorValue = undefined;
try {
queue.schedule(() => {
actions.push(
queue.schedule(() => { throw new Error('oops'); }),
queue.schedule(() => { action2Exec = true; }),
queue.schedule(() => { action3Exec = true; })
);
});
} catch (e) {
errorValue = e;
}
expect(actions.every((action) => action.isUnsubscribed)).to.be.true;
expect(action2Exec).to.be.false;
expect(action3Exec).to.be.false;
expect(errorValue).exist;
expect(errorValue.message).to.equal('oops');
});
});
2 changes: 1 addition & 1 deletion spec/support/debug.opts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
--bail
--full-trace
--check-leaks
--globals WebSocket,FormData
--globals WebSocket,FormData,XDomainRequest,ActiveXObject

--recursive
--timeout 100000
89 changes: 0 additions & 89 deletions src/MiscJSDoc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import {Subscriber} from './Subscriber';
import {TeardownLogic} from './Subscription';
import {Observable} from './Observable';
import {Subscription} from './Subscription';
import {Action} from './scheduler/Action';
import './scheduler/MiscJSDoc';
import './observable/dom/MiscJSDoc';

Expand Down Expand Up @@ -130,90 +128,3 @@ export class ObserverDoc<T> {
return void 0;
}
}

/**
* An execution context and a data structure to order tasks and schedule their
* execution. Provides a notion of (potentially virtual) time, through the
* `now()` getter method.
*
* Each unit of work in a Scheduler is called an {@link Action}.
*
* ```ts
* interface Scheduler {
* now(): number;
* schedule(work, delay?, state?): Subscription;
* flush(): void;
* active: boolean;
* actions: Action[];
* scheduledId: number;
* }
* ```
*
* @interface
* @name Scheduler
* @noimport true
*/
export class SchedulerDoc {
/**
* A getter method that returns a number representing the current time
* (at the time this function was called) according to the scheduler's own
* internal clock.
* @return {number} A number that represents the current time. May or may not
* have a relation to wall-clock time. May or may not refer to a time unit
* (e.g. milliseconds).
*/
now(): number {
return 0;
}

/**
* Schedules a function, `work`, for execution. May happen at some point in
* the future, according to the `delay` parameter, if specified. May be passed
* some context object, `state`, which will be passed to the `work` function.
*
* The given arguments will be processed an stored as an Action object in a
* queue of actions.
*
* @param {function(state: ?T): ?Subscription} work A function representing a
* task, or some unit of work to be executed by the Scheduler.
* @param {number} [delay] Time to wait before executing the work, where the
* time unit is implicit and defined by the Scheduler itself.
* @param {T} [state] Some contextual data that the `work` function uses when
* called by the Scheduler.
* @return {Subscription} A subscription in order to be able to unsubscribe
* the scheduled work.
*/
schedule<T>(work: (state?: T) => Subscription | void, delay?: number, state?: T): Subscription {
return void 0;
}

/**
* Prompt the Scheduler to execute all of its queued actions, therefore
* clearing its queue.
* @return {void}
*/
flush(): void {
return void 0;
}

/**
* A flag to indicate whether the Scheduler is currently executing a batch of
* queued actions.
* @type {boolean}
*/
active: boolean = false;

/**
* The queue of scheduled actions as an array.
* @type {Action[]}
*/
actions: Action<any>[] = [];

/**
* An internal ID used to track the latest asynchronous task such as those
* coming from `setTimeout`, `setInterval`, `requestAnimationFrame`, and
* others.
* @type {number}
*/
scheduledId: number = 0;
}
2 changes: 2 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ import observable from 'symbol-observable';
* asynchronous conversions.
* @property {Scheduler} async Schedules work with `setInterval`. Use this for
* time-based operations.
* @property {Scheduler} animationFrame Schedules work with `requestAnimationFrame`.
* Use this for synchronizing with the platform's painting
*/
let Scheduler = {
asap,
Expand Down
Loading

0 comments on commit 8968293

Please sign in to comment.