diff --git a/spec/operators/merge-map-spec.js b/spec/operators/merge-map-spec.js index a70656a890..7e95c07493 100644 --- a/spec/operators/merge-map-spec.js +++ b/spec/operators/merge-map-spec.js @@ -1,8 +1,25 @@ +/* globals expectObservable, cold, hot, describe, it, expect */ var Rx = require('../../dist/cjs/Rx'); var Observable = Rx.Observable; var Promise = require('promise'); describe('Observable.prototype.mergeMap()', function () { + it('should mergeMap many regular interval inners', function () { + var a = cold('----a---a---a---(a|)' ); + var b = cold( '----b---b---(b|)' ); + var c = cold( '----c---c---c---c---(c|)'); + var d = cold( '----(d|)' ); + var e1 = hot('a---b-----------c-------d-------|' ); + var expected = '----a---(ab)(ab)(ab)c---c---(cd)c---(c|)'; + + var observableLookup = { a: a, b: b, c: c, d: d }; + var source = e1.mergeMap(function (value) { + return observableLookup[value]; + }); + + expectObservable(source).toBe(expected); + }); + it('should map values to constant resolved promises and merge', function (done) { var source = Rx.Observable.from([4,3,2,1]); var project = function (value) { diff --git a/src/schedulers/VirtualTimeScheduler.ts b/src/schedulers/VirtualTimeScheduler.ts index 3b7b5cabfe..1e838662c8 100644 --- a/src/schedulers/VirtualTimeScheduler.ts +++ b/src/schedulers/VirtualTimeScheduler.ts @@ -67,6 +67,7 @@ export default class VirtualTimeScheduler implements Scheduler { class VirtualAction extends Subscription implements Action { state: any; delay: number; + calls = 0; constructor(public scheduler: VirtualTimeScheduler, public work: (x?: any) => Subscription | void, @@ -79,8 +80,16 @@ class VirtualAction extends Subscription implements Action { return this; } const scheduler = this.scheduler; - let action = scheduler.frame === this.delay ? this : - new VirtualAction(scheduler, this.work, scheduler.index += 1); + let action; + if (this.calls++ === 0) { + // the action is not being rescheduled. + action = this; + } else { + // the action is being rescheduled, and we can't mutate the one in the actions list + // in the scheduler, so we'll create a new one. + action = new VirtualAction(scheduler, this.work, scheduler.index += 1); + this.add(action); + } action.state = state; action.delay = scheduler.frame + delay; scheduler.addAction(action); diff --git a/src/testing/ColdObservable.ts b/src/testing/ColdObservable.ts index ca1d4abff2..5cfbde97ce 100644 --- a/src/testing/ColdObservable.ts +++ b/src/testing/ColdObservable.ts @@ -29,12 +29,11 @@ export default class ColdObservable extends Observable implements Subscrip scheduleMessages(subscriber) { const messagesLength = this.messages.length; for (let i = 0; i < messagesLength; i++) { - const message = this.messages[i]; + let message = this.messages[i]; subscriber.add( - this.scheduler.schedule( - () => { message.notification.observe(subscriber); }, - message.frame - ) + this.scheduler.schedule(({message, subscriber}) => { message.notification.observe(subscriber); }, + message.frame, + {message, subscriber}) ); } } diff --git a/src/testing/TestScheduler.ts b/src/testing/TestScheduler.ts index ef5a9cb5e2..800b9e736c 100644 --- a/src/testing/TestScheduler.ts +++ b/src/testing/TestScheduler.ts @@ -100,7 +100,6 @@ export class TestScheduler extends VirtualTimeScheduler { const readyFlushTests = this.flushTests.filter(test => test.ready); while (readyFlushTests.length > 0) { let test = readyFlushTests.shift(); - test.actual.sort((a, b) => a.frame === b.frame ? 0 : (a.frame > b.frame ? 1 : -1)); this.assertDeepEqual(test.actual, test.expected); } }