From 05d47de4374d05cafbe619071c995d8a3f13dad9 Mon Sep 17 00:00:00 2001 From: OJ Kwon Date: Tue, 26 Apr 2016 16:13:55 -0700 Subject: [PATCH] feat(multicast): subjectfactory allows selectors --- spec/operators/multicast-spec.ts | 18 ++++++++++++++++++ src/Rx.ts | 1 + src/add/operator/multicast.ts | 2 +- src/observable/MulticastObservable.ts | 20 ++++++++++++++++++++ src/operator/multicast.ts | 16 +++++++++++++--- 5 files changed, 53 insertions(+), 4 deletions(-) create mode 100644 src/observable/MulticastObservable.ts diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index 19596fe842..017f4219a9 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -49,6 +49,24 @@ describe('Observable.prototype.multicast', () => { connectable.connect(); }); + it('should accept selectors to factory functions', () => { + const source = hot('-1-2-3----4-|'); + const sourceSubs = ['^ !']; + const multicasted = source.multicast(() => new Subject(), + x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString())); + const subscriber1 = hot('a| ').mergeMapTo(multicasted); + const expected1 = '-2-4-6----8-|'; + const subscriber2 = hot(' b| ').mergeMapTo(multicasted); + const expected2 = ' -6----8-|'; + const subscriber3 = hot(' c| ').mergeMapTo(multicasted); + const expected3 = ' --8-|'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + it('should do nothing if connect is not called, despite subscriptions', () => { const source = cold('--1-2---3-4--5-|'); const sourceSubs = []; diff --git a/src/Rx.ts b/src/Rx.ts index 1d1c8df6c1..29c40964c2 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -124,6 +124,7 @@ export {Subscriber} from './Subscriber'; export {AsyncSubject} from './AsyncSubject'; export {ReplaySubject} from './ReplaySubject'; export {BehaviorSubject} from './BehaviorSubject'; +export {MulticastObservable} from './observable/MulticastObservable'; export {ConnectableObservable} from './observable/ConnectableObservable'; export {Notification} from './Notification'; export {EmptyError} from './util/EmptyError'; diff --git a/src/add/operator/multicast.ts b/src/add/operator/multicast.ts index 8ff3686284..5917841f32 100644 --- a/src/add/operator/multicast.ts +++ b/src/add/operator/multicast.ts @@ -2,7 +2,7 @@ import {Observable} from '../../Observable'; import {multicast, MulticastSignature} from '../../operator/multicast'; -Observable.prototype.multicast = multicast; +Observable.prototype.multicast = multicast; declare module '../../Observable' { interface Observable { diff --git a/src/observable/MulticastObservable.ts b/src/observable/MulticastObservable.ts new file mode 100644 index 0000000000..f90aea6425 --- /dev/null +++ b/src/observable/MulticastObservable.ts @@ -0,0 +1,20 @@ +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Subscription} from '../Subscription'; +import {ConnectableObservable} from '../observable/ConnectableObservable'; + +export class MulticastObservable extends Observable { + constructor(protected source: Observable, + private connectable: ConnectableObservable, + private selector: (source: Observable) => Observable) { + super(); + } + + protected _subscribe(subscriber: Subscriber): Subscription { + const {selector, connectable} = this; + + const subscription = selector(connectable).subscribe(subscriber); + subscription.add(connectable.connect()); + return subscription; + } +} \ No newline at end of file diff --git a/src/operator/multicast.ts b/src/operator/multicast.ts index 3d7b9cb4fe..a72213f3d5 100644 --- a/src/operator/multicast.ts +++ b/src/operator/multicast.ts @@ -1,4 +1,6 @@ import {Subject} from '../Subject'; +import {Observable} from '../Observable'; +import {MulticastObservable} from '../observable/MulticastObservable'; import {ConnectableObservable} from '../observable/ConnectableObservable'; /** @@ -7,7 +9,10 @@ import {ConnectableObservable} from '../observable/ConnectableObservable'; * * * - * @param {Function} selector - a function that can use the multicasted source stream + * @param {Function|Subject} Factory function to create an intermediate subject through + * which the source sequence's elements will be multicast to the selector function + * or Subject to push source elements into. + * @param {Function} Optional selector function that can use the multicasted source stream * as many times as needed, without causing multiple subscriptions to the source stream. * Subscribers to the given source will receive all notifications of the source from the * time of the subscription forward. @@ -17,7 +22,8 @@ import {ConnectableObservable} from '../observable/ConnectableObservable'; * @method multicast * @owner Observable */ -export function multicast(subjectOrSubjectFactory: Subject | (() => Subject)): ConnectableObservable { +export function multicast(subjectOrSubjectFactory: Subject | (() => Subject), + selector?: (source: Observable) => Observable): Observable | ConnectableObservable { let subjectFactory: () => Subject; if (typeof subjectOrSubjectFactory === 'function') { subjectFactory = <() => Subject>subjectOrSubjectFactory; @@ -26,11 +32,15 @@ export function multicast(subjectOrSubjectFactory: Subject | (() => Subjec return >subjectOrSubjectFactory; }; } - return new ConnectableObservable(this, subjectFactory); + + const connectable = new ConnectableObservable(this, subjectFactory); + return selector ? new MulticastObservable(this, connectable, selector) : connectable; } export type factoryOrValue = T | (() => T); +export type selector = (source: Observable) => Observable; export interface MulticastSignature { (subjectOrSubjectFactory: factoryOrValue>): ConnectableObservable; + (SubjectFactory: () => Subject, selector?: selector): Observable; }