Skip to content

Commit

Permalink
feat: implement sync-within-subscribe marker
Browse files Browse the repository at this point in the history
  • Loading branch information
cartant committed Sep 28, 2020
1 parent 8c1b76a commit 1743122
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 43 deletions.
74 changes: 43 additions & 31 deletions spec/schedulers/TestScheduler-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { intervalProvider } from 'rxjs/internal/scheduler/intervalProvider';
declare const rxTestScheduler: TestScheduler;

/** @test {TestScheduler} */
describe('TestScheduler', () => {
describe.only('TestScheduler', () => {
it('should exist', () => {
expect(TestScheduler).exist;
expect(TestScheduler).to.be.a('function');
Expand All @@ -25,76 +25,88 @@ describe('TestScheduler', () => {
it('should parse a marble string into a series of notifications and types', () => {
const result = TestScheduler.parseMarbles('-------a---b---|', { a: 'A', b: 'B' });
expect(result).deep.equal([
{ frame: 70, notification: nextNotification('A') },
{ frame: 110, notification: nextNotification('B') },
{ frame: 150, notification: COMPLETE_NOTIFICATION }
{ frame: 70, notification: nextNotification('A'), subscribing: false },
{ frame: 110, notification: nextNotification('B'), subscribing: false },
{ frame: 150, notification: COMPLETE_NOTIFICATION, subscribing: false }
]);
});

it('should parse a marble string, allowing spaces too', () => {
const result = TestScheduler.parseMarbles('--a--b--| ', { a: 'A', b: 'B' });
expect(result).deep.equal([
{ frame: 20, notification: nextNotification('A') },
{ frame: 50, notification: nextNotification('B') },
{ frame: 80, notification: COMPLETE_NOTIFICATION }
{ frame: 20, notification: nextNotification('A'), subscribing: false },
{ frame: 50, notification: nextNotification('B'), subscribing: false },
{ frame: 80, notification: COMPLETE_NOTIFICATION, subscribing: false }
]);
});

it('should parse a marble string with a subscription point', () => {
const result = TestScheduler.parseMarbles('---^---a---b---|', { a: 'A', b: 'B' });
expect(result).deep.equal([
{ frame: 40, notification: nextNotification('A') },
{ frame: 80, notification: nextNotification('B') },
{ frame: 120, notification: COMPLETE_NOTIFICATION }
{ frame: 40, notification: nextNotification('A'), subscribing: false },
{ frame: 80, notification: nextNotification('B'), subscribing: false },
{ frame: 120, notification: COMPLETE_NOTIFICATION, subscribing: false }
]);
});

it('should parse a marble string with an error', () => {
const result = TestScheduler.parseMarbles('-------a---b---#', { a: 'A', b: 'B' }, 'omg error!');
expect(result).deep.equal([
{ frame: 70, notification: nextNotification('A') },
{ frame: 110, notification: nextNotification('B') },
{ frame: 150, notification: errorNotification('omg error!') }
{ frame: 70, notification: nextNotification('A'), subscribing: false },
{ frame: 110, notification: nextNotification('B'), subscribing: false },
{ frame: 150, notification: errorNotification('omg error!'), subscribing: false }
]);
});

it('should default in the letter for the value if no value hash was passed', () => {
const result = TestScheduler.parseMarbles('--a--b--c--');
expect(result).deep.equal([
{ frame: 20, notification: nextNotification('a') },
{ frame: 50, notification: nextNotification('b') },
{ frame: 80, notification: nextNotification('c') },
{ frame: 20, notification: nextNotification('a'), subscribing: false },
{ frame: 50, notification: nextNotification('b'), subscribing: false },
{ frame: 80, notification: nextNotification('c'), subscribing: false },
]);
});

it('should handle grouped values', () => {
const result = TestScheduler.parseMarbles('---(abc)---');
expect(result).deep.equal([
{ frame: 30, notification: nextNotification('a') },
{ frame: 30, notification: nextNotification('b') },
{ frame: 30, notification: nextNotification('c') }
{ frame: 30, notification: nextNotification('a'), subscribing: false },
{ frame: 30, notification: nextNotification('b'), subscribing: false },
{ frame: 30, notification: nextNotification('c'), subscribing: false }
]);
});

it('should ignore whitespace when runMode=true', () => {
const runMode = true;
const result = TestScheduler.parseMarbles(' -a - b - c | ', { a: 'A', b: 'B', c: 'C' }, undefined, undefined, runMode);
expect(result).deep.equal([
{ frame: 10, notification: nextNotification('A') },
{ frame: 30, notification: nextNotification('B') },
{ frame: 50, notification: nextNotification('C') },
{ frame: 60, notification: COMPLETE_NOTIFICATION }
{ frame: 10, notification: nextNotification('A'), subscribing: false },
{ frame: 30, notification: nextNotification('B'), subscribing: false },
{ frame: 50, notification: nextNotification('C'), subscribing: false },
{ frame: 60, notification: COMPLETE_NOTIFICATION, subscribing: false }
]);
});

it('should suppport time progression syntax when runMode=true', () => {
it('should support time progression syntax when runMode=true', () => {
const runMode = true;
const result = TestScheduler.parseMarbles('10.2ms a 1.2s b 1m c|', { a: 'A', b: 'B', c: 'C' }, undefined, undefined, runMode);
expect(result).deep.equal([
{ frame: 10.2, notification: nextNotification('A') },
{ frame: 10.2 + 10 + (1.2 * 1000), notification: nextNotification('B') },
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60), notification: nextNotification('C') },
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60) + 10, notification: COMPLETE_NOTIFICATION }
{ frame: 10.2, notification: nextNotification('A'), subscribing: false },
{ frame: 10.2 + 10 + (1.2 * 1000), notification: nextNotification('B'), subscribing: false },
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60), notification: nextNotification('C'), subscribing: false },
{ frame: 10.2 + 10 + (1.2 * 1000) + 10 + (1000 * 60) + 10, notification: COMPLETE_NOTIFICATION, subscribing: false }
]);
});

it('should support a synchronous-within-subscribe marker', () => {
const runMode = true;
const result = TestScheduler.parseMarbles('--(^abc)--(d|)', undefined, undefined, undefined, runMode);
expect(result).deep.equal([
{ frame: 20, notification: nextNotification('a'), subscribing: true },
{ frame: 20, notification: nextNotification('b'), subscribing: true },
{ frame: 20, notification: nextNotification('c'), subscribing: true },
{ frame: 100, notification: nextNotification('d'), subscribing: false },
{ frame: 100, notification: COMPLETE_NOTIFICATION, subscribing: false }
]);
});
});
Expand Down Expand Up @@ -161,10 +173,10 @@ describe('TestScheduler', () => {
expect(expected.length).to.equal(0);
});

it('should emit notifications at frame zero synchronously upon subscription', () => {
it('should emit notifications marked with "^" synchronously upon subscription', () => {
const result: string[] = [];
const scheduler = new TestScheduler(null!);
const source = scheduler.createColdObservable('(ab|)');
const source = scheduler.createColdObservable('(^ab|)');
source.subscribe({
next: (value) => result.push(value),
complete: () => result.push('complete'),
Expand Down Expand Up @@ -252,7 +264,7 @@ describe('TestScheduler', () => {
});

it('should handle empty', () => {
expectObservable(EMPTY).toBe('|', {});
expectObservable(EMPTY).toBe('(^|)', {});
});

it('should handle never', () => {
Expand Down
2 changes: 1 addition & 1 deletion src/internal/testing/ColdObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class ColdObservable<T> extends Observable<T> implements SubscriptionLogg
const messagesLength = this.messages.length;
for (let i = 0; i < messagesLength; i++) {
const message = this.messages[i];
if (message.frame === 0) {
if (message.subscribing) {
observeNotification(message.notification, subscriber);
} else {
subscriber.add(
Expand Down
2 changes: 1 addition & 1 deletion src/internal/testing/TestMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ import { ObservableNotification } from '../types';
export interface TestMessage {
frame: number;
notification: ObservableNotification<any>;
isGhost?: boolean;
subscribing: boolean;
}
41 changes: 31 additions & 10 deletions src/internal/testing/TestScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class TestScheduler extends VirtualTimeScheduler {
* @param error The error to use for the `#` marble (if present).
*/
createColdObservable<T = string>(marbles: string, values?: { [marble: string]: T }, error?: any): ColdObservable<T> {
if (marbles.indexOf('^') !== -1) {
if (marbles.match(/[^(]\^/)) {
throw new Error('cold observable cannot have subscription offset "^"');
}
if (marbles.indexOf('!') !== -1) {
Expand Down Expand Up @@ -116,13 +116,15 @@ export class TestScheduler extends VirtualTimeScheduler {
private materializeInnerObservable(observable: Observable<any>,
outerFrame: number): TestMessage[] {
const messages: TestMessage[] = [];
let subscribing = true;
observable.subscribe((value) => {
messages.push({ frame: this.frame - outerFrame, notification: nextNotification(value) });
messages.push({ frame: this.frame - outerFrame, notification: nextNotification(value), subscribing });
}, (error) => {
messages.push({ frame: this.frame - outerFrame, notification: errorNotification(error) });
messages.push({ frame: this.frame - outerFrame, notification: errorNotification(error), subscribing });
}, () => {
messages.push({ frame: this.frame - outerFrame, notification: COMPLETE_NOTIFICATION });
messages.push({ frame: this.frame - outerFrame, notification: COMPLETE_NOTIFICATION, subscribing });
});
subscribing = false;
return messages;
}

Expand All @@ -137,18 +139,20 @@ export class TestScheduler extends VirtualTimeScheduler {
let subscription: Subscription;

this.schedule(() => {
let subscribing = true;
subscription = observable.subscribe(x => {
let value = x;
// Support Observable-of-Observables
if (x instanceof Observable) {
value = this.materializeInnerObservable(value, this.frame);
}
actual.push({ frame: this.frame, notification: nextNotification(value) });
actual.push({ frame: this.frame, notification: nextNotification(value), subscribing });
}, (error) => {
actual.push({ frame: this.frame, notification: errorNotification(error) });
actual.push({ frame: this.frame, notification: errorNotification(error), subscribing });
}, () => {
actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION });
actual.push({ frame: this.frame, notification: COMPLETE_NOTIFICATION, subscribing });
});
subscribing = false;
}, subscriptionFrame);

if (unsubscriptionFrame !== Infinity) {
Expand Down Expand Up @@ -308,8 +312,13 @@ export class TestScheduler extends VirtualTimeScheduler {
}
const len = marbles.length;
const testMessages: TestMessage[] = [];
const subIndex = runMode ? marbles.replace(/^[ ]+/, '').indexOf('^') : marbles.indexOf('^');
let frame = subIndex === -1 ? 0 : (subIndex * -this.frameTimeFactor);
const subMarbles = runMode ? marbles.replace(/^[ ]+/, '') : marbles;
const subIndex = subMarbles.indexOf('^');
// If the ^ character is the first character within a () group, it's a
// synchronous-within-subscribe indicator for a cold observable.
let frame = (subIndex === -1) || (subMarbles.charAt(subIndex - 1) === '(')
? 0
: (subIndex * -this.frameTimeFactor);
const getValue = typeof values !== 'object' ?
(x: any) => x :
(x: any) => {
Expand All @@ -320,6 +329,7 @@ export class TestScheduler extends VirtualTimeScheduler {
return values[x];
};
let groupStart = -1;
let subscribing = false;

for (let i = 0; i < len; i++) {
let nextFrame = frame;
Expand All @@ -345,13 +355,20 @@ export class TestScheduler extends VirtualTimeScheduler {
break;
case ')':
groupStart = -1;
subscribing = false;
advanceFrameBy(1);
break;
case '|':
notification = COMPLETE_NOTIFICATION;
advanceFrameBy(1);
break;
case '^':
if (groupStart > -1) {
if (testMessages.length) {
throw new Error('the synchronous-upon-subscription marker "^" must precede all notifications');
}
subscribing = true;
}
advanceFrameBy(1);
break;
case '#':
Expand Down Expand Up @@ -398,7 +415,11 @@ export class TestScheduler extends VirtualTimeScheduler {
}

if (notification) {
testMessages.push({ frame: groupStart > -1 ? groupStart : frame, notification });
testMessages.push({
frame: groupStart > -1 ? groupStart : frame,
notification,
subscribing
});
}

frame = nextFrame;
Expand Down

0 comments on commit 1743122

Please sign in to comment.