Skip to content

Commit

Permalink
feat(TestScheduler): add TestScheduler
Browse files Browse the repository at this point in the history
adds a test scheduler that features a marble parser as well as
cold and hot observable creation methods

closes #270
  • Loading branch information
benlesh committed Sep 6, 2015
1 parent 96f9386 commit b23daf1
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 2 deletions.
67 changes: 67 additions & 0 deletions spec/schedulers/TestScheduler-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var TestScheduler = Rx.TestScheduler;
var Notification = Rx.Notification;

describe('TestScheduler', function() {
it('should exist', function () {
expect(typeof TestScheduler).toBe('function');
});

describe('parseMarbles()', function () {
it('should parse a marble string into a series of notifications and types', function () {
var result = TestScheduler.parseMarbles('-------a---b---|', { a: 'A', b: 'B' });
expect(result).toDeepEqual([
{ frame: 70, notification: Notification.createNext('A') },
{ frame: 110, notification: Notification.createNext('B') },
{ frame: 150, notification: Notification.createComplete() }
]);
});

it('should parse a marble string with a subscription point', function () {
var result = TestScheduler.parseMarbles('---^---a---b---|', { a: 'A', b: 'B' });
expect(result).toDeepEqual([
{ frame: 40, notification: Notification.createNext('A') },
{ frame: 80, notification: Notification.createNext('B') },
{ frame: 120, notification: Notification.createComplete() }
]);
});

it('should parse a marble string with an error', function () {
var result = TestScheduler.parseMarbles('-------a---b---#', { a: 'A', b: 'B' }, 'omg error!');
expect(result).toDeepEqual([
{ frame: 70, notification: Notification.createNext('A') },
{ frame: 110, notification: Notification.createNext('B') },
{ frame: 150, notification: Notification.createError('omg error!') }
]);
});
});

describe('createColdObservable()', function () {
it('should create a cold observable', function () {
var expected = ['A', 'B'];
var scheduler = new TestScheduler();
var source = scheduler.createColdObservable('--a---b--|', { a: 'A', b: 'B' });
expect(source instanceof Rx.Observable).toBe(true);
source.subscribe(function (x) {
expect(x).toBe(expected.shift());
});
scheduler.flush();
expect(expected.length).toBe(0);
});
});

describe('createHotObservable()', function () {
it('should create a cold observable', function () {
var expected = ['A', 'B'];
var scheduler = new TestScheduler();
var source = scheduler.createHotObservable('--a---b--|', { a: 'A', b: 'B' });
expect(source instanceof Rx.Subject).toBe(true);
source.subscribe(function (x) {
expect(x).toBe(expected.shift());
});
scheduler.flush();
expect(expected.length).toBe(0);
});
});
});
4 changes: 3 additions & 1 deletion src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Subject from './Subject';
import ImmediateScheduler from './schedulers/ImmediateScheduler';
import NextTickScheduler from './schedulers/NextTickScheduler';
import VirtualTimeScheduler from './schedulers/VirtualTimeScheduler';
import TestScheduler from './schedulers/TestScheduler';
import immediate from './schedulers/immediate';
import nextTick from './schedulers/nextTick';
import Observable from './Observable';
Expand Down Expand Up @@ -229,5 +230,6 @@ export {
BehaviorSubject,
ConnectableObservable,
Notification,
VirtualTimeScheduler
VirtualTimeScheduler,
TestScheduler
};
66 changes: 66 additions & 0 deletions src/schedulers/TestScheduler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import Observable from '../Observable';
import VirtualTimeScheduler from './VirtualTimeScheduler';
import Notification from '../Notification';
import Subject from '../Subject';

export default class TestScheduler extends VirtualTimeScheduler {
createColdObservable(marbles: string, values?: any, error?: any) {
if (marbles.indexOf('^') !== -1) {
throw new Error('cold observable cannot have subscription offset "^"');
}
let messages = TestScheduler.parseMarbles(marbles, values, error);
return Observable.create(subscriber => {
messages.forEach(({ notification, frame }) => {
this.schedule(() => {
notification.observe(subscriber);
}, frame);
});
});
}

createHotObservable(marbles: string, values?: any, error?: any) {
let messages = TestScheduler.parseMarbles(marbles, values, error);
let subject = new Subject();
messages.forEach(({ notification, frame }) => {
this.schedule(() => {
notification.observe(subject);
}, frame);
});
return subject;
}

static parseMarbles(marbles: string, values?: any, errorValue?: any) : ({ notification: Notification<any>, frame: number })[] {
let len = marbles.length;
let results: ({ notification: Notification<any>, frame: number })[] = [];
let subIndex = marbles.indexOf('^');
let frameOffset = subIndex === -1 ? 0 : (subIndex * -10);

for (let i = 0; i < len; i++) {
let frame = i * 10;
let notification;
let c = marbles[i];
switch (c) {
case '-':
break;
case '|':
notification = Notification.createComplete();
break;
case '^':
break;
case '#':
notification = Notification.createError(errorValue || 'error');
break;
default:
notification = Notification.createNext(values[c]);
break;
}

frame += frameOffset;

if (notification) {
results.push({ notification, frame });
}
}
return results;
}
}
5 changes: 4 additions & 1 deletion src/schedulers/VirtualTimeScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export default class VirtualTimeScheduler implements Scheduler {
scheduled: boolean = false;
index: number = 0;
sorted: boolean = false;
frame: number = -1;

now() {
return 0;
Expand All @@ -24,10 +25,12 @@ export default class VirtualTimeScheduler implements Scheduler {

flush() {
this.sortActions();
this.actions.forEach(action => {
this.actions.forEach((action, frame) => {
this.frame = frame;
action.execute();
});
this.actions.length = 0;
this.frame = -1;
}

schedule<T>(work: (x?: any) => Subscription<T> | void, delay: number = 0, state?: any): Subscription<T> {
Expand Down

0 comments on commit b23daf1

Please sign in to comment.