From 4aa904182e58abef17ca02ff82a73591978e77dc Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Wed, 15 Jun 2016 22:08:29 -0700 Subject: [PATCH] fix(multicast): Fixes multicast with selector to create a new source connection per subscriber. Multicast with a selector function should create a new ConnectableObservable for each subscriber to the MulticastObservable. This ensures each subscriber creates a new connection to the source Observable, and don't share subscription side-effects. --- spec/operators/multicast-spec.ts | 4 +++- spec/operators/publish-spec.ts | 6 ++++-- spec/support/debug.opts | 14 ++++++++++++++ src/observable/MulticastObservable.ts | 9 +++++---- src/operator/multicast.ts | 5 +++-- 5 files changed, 29 insertions(+), 9 deletions(-) create mode 100644 spec/support/debug.opts diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index 3f39ed7d7b..e366263ec1 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -51,7 +51,9 @@ describe('Observable.prototype.multicast', () => { it('should accept selectors to factory functions', () => { const source = hot('-1-2-3----4-|'); - const sourceSubs = ['^ !']; + const sourceSubs = ['^ !', + ' ^ !', + ' ^ !']; const multicasted = source.multicast(() => new Subject(), x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString())); const subscriber1 = hot('a| ').mergeMapTo(multicasted); diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index 88cf080d1e..1a49ce4e2a 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -54,7 +54,9 @@ describe('Observable.prototype.publish', () => { it('should accept selectors', () => { const source = hot('-1-2-3----4-|'); - const sourceSubs = ['^ !']; + const sourceSubs = ['^ !', + ' ^ !', + ' ^ !']; const published = source.publish(x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString())); const subscriber1 = hot('a| ').mergeMapTo(published); const expected1 = '-2-4-6----8-|'; @@ -321,4 +323,4 @@ describe('Observable.prototype.publish', () => { expect(subscriptions).to.equal(1); done(); }); -}); \ No newline at end of file +}); diff --git a/spec/support/debug.opts b/spec/support/debug.opts new file mode 100644 index 0000000000..cd99a064bd --- /dev/null +++ b/spec/support/debug.opts @@ -0,0 +1,14 @@ +--require source-map-support/register +--require spec/support/mocha-setup-node.js +--require spec-js/helpers/test-helper.js +--require spec-js/helpers/ajax-helper.js +--ui spec-js/helpers/testScheduler-ui.js + +--reporter dot +--bail +--full-trace +--check-leaks +--globals WebSocket,FormData + +--recursive +--timeout 100000 diff --git a/src/observable/MulticastObservable.ts b/src/observable/MulticastObservable.ts index f90aea6425..177872ebc4 100644 --- a/src/observable/MulticastObservable.ts +++ b/src/observable/MulticastObservable.ts @@ -1,3 +1,4 @@ +import {Subject} from '../Subject'; import {Observable} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; @@ -5,16 +6,16 @@ import {ConnectableObservable} from '../observable/ConnectableObservable'; export class MulticastObservable extends Observable { constructor(protected source: Observable, - private connectable: ConnectableObservable, + private subjectFactory: () => Subject, private selector: (source: Observable) => Observable) { super(); } protected _subscribe(subscriber: Subscriber): Subscription { - const {selector, connectable} = this; - + const { selector, source } = this; + const connectable = new ConnectableObservable(source, this.subjectFactory); const subscription = selector(connectable).subscribe(subscriber); subscription.add(connectable.connect()); return subscription; } -} \ No newline at end of file +} diff --git a/src/operator/multicast.ts b/src/operator/multicast.ts index a72213f3d5..0b406c897d 100644 --- a/src/operator/multicast.ts +++ b/src/operator/multicast.ts @@ -33,8 +33,9 @@ export function multicast(subjectOrSubjectFactory: Subject | (() => Subjec }; } - const connectable = new ConnectableObservable(this, subjectFactory); - return selector ? new MulticastObservable(this, connectable, selector) : connectable; + return !selector ? + new ConnectableObservable(this, subjectFactory) : + new MulticastObservable(this, subjectFactory, selector); } export type factoryOrValue = T | (() => T);