Skip to content

Commit

Permalink
fix(multicast): Fixes multicast with selector to create a new source …
Browse files Browse the repository at this point in the history
…connection per subscriber.

Multicast with a selector function should create a new ConnectableObservable for each subscriber to
the MulticastObservable. This ensures each subscriber creates a new connection to the source
Observable, and don't share subscription side-effects.
  • Loading branch information
trxcllnt committed Jun 16, 2016
1 parent de39b5e commit 4aa9041
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 9 deletions.
4 changes: 3 additions & 1 deletion spec/operators/multicast-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ describe('Observable.prototype.multicast', () => {

it('should accept selectors to factory functions', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const multicasted = source.multicast(() => new Subject(),
x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(multicasted);
Expand Down
6 changes: 4 additions & 2 deletions spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ describe('Observable.prototype.publish', () => {

it('should accept selectors', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const sourceSubs = ['^ !',
' ^ !',
' ^ !'];
const published = source.publish(x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(published);
const expected1 = '-2-4-6----8-|';
Expand Down Expand Up @@ -321,4 +323,4 @@ describe('Observable.prototype.publish', () => {
expect(subscriptions).to.equal(1);
done();
});
});
});
14 changes: 14 additions & 0 deletions spec/support/debug.opts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
--require source-map-support/register
--require spec/support/mocha-setup-node.js
--require spec-js/helpers/test-helper.js
--require spec-js/helpers/ajax-helper.js
--ui spec-js/helpers/testScheduler-ui.js

--reporter dot
--bail
--full-trace
--check-leaks
--globals WebSocket,FormData

--recursive
--timeout 100000
9 changes: 5 additions & 4 deletions src/observable/MulticastObservable.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {ConnectableObservable} from '../observable/ConnectableObservable';

export class MulticastObservable<T> extends Observable<T> {
constructor(protected source: Observable<T>,
private connectable: ConnectableObservable<T>,
private subjectFactory: () => Subject<T>,
private selector: (source: Observable<T>) => Observable<T>) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): Subscription {
const {selector, connectable} = this;

const { selector, source } = this;
const connectable = new ConnectableObservable(source, this.subjectFactory);
const subscription = selector(connectable).subscribe(subscriber);
subscription.add(connectable.connect());
return subscription;
}
}
}
5 changes: 3 additions & 2 deletions src/operator/multicast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subjec
};
}

const connectable = new ConnectableObservable(this, subjectFactory);
return selector ? new MulticastObservable(this, connectable, selector) : connectable;
return !selector ?
new ConnectableObservable(this, subjectFactory) :
new MulticastObservable(this, subjectFactory, selector);
}

export type factoryOrValue<T> = T | (() => T);
Expand Down

0 comments on commit 4aa9041

Please sign in to comment.