diff --git a/spec/operators/publish-spec.ts b/spec/operators/publish-spec.ts index ba4784bab1..c461bb8442 100644 --- a/spec/operators/publish-spec.ts +++ b/spec/operators/publish-spec.ts @@ -77,6 +77,23 @@ describe('Observable.prototype.publish', () => { published.connect(); }); + it('should accept selectors', () => { + const source = hot('-1-2-3----4-|'); + const sourceSubs = ['^ !']; + const published = source.publish(x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString())); + const subscriber1 = hot('a| ').mergeMapTo(published); + const expected1 = '-2-4-6----8-|'; + const subscriber2 = hot(' b| ').mergeMapTo(published); + const expected2 = ' -6----8-|'; + const subscriber3 = hot(' c| ').mergeMapTo(published); + const expected3 = ' --8-|'; + + expectObservable(subscriber1).toBe(expected1); + expectObservable(subscriber2).toBe(expected2); + expectObservable(subscriber3).toBe(expected3); + expectSubscriptions(source.subscriptions).toBe(sourceSubs); + }); + it('should multicast an error from the source to multiple observers', () => { const source = cold('-1-2-3----4-#'); const sourceSubs = '^ !'; diff --git a/src/add/operator/publish.ts b/src/add/operator/publish.ts index cf571880e2..6a15ecf834 100644 --- a/src/add/operator/publish.ts +++ b/src/add/operator/publish.ts @@ -2,7 +2,7 @@ import {Observable} from '../../Observable'; import {publish, PublishSignature} from '../../operator/publish'; -Observable.prototype.publish = publish; +Observable.prototype.publish = publish; declare module '../../Observable' { interface Observable { diff --git a/src/operator/publish.ts b/src/operator/publish.ts index 133b9d078f..66c972a365 100644 --- a/src/operator/publish.ts +++ b/src/operator/publish.ts @@ -1,4 +1,5 @@ import {Subject} from '../Subject'; +import {Observable} from '../Observable'; import {multicast} from './multicast'; import {ConnectableObservable} from '../observable/ConnectableObservable'; @@ -8,14 +9,21 @@ import {ConnectableObservable} from '../observable/ConnectableObservable'; * * * + * @param {Function} Optional selector function which can use the multicasted source sequence as many times as needed, + * without causing multiple subscriptions to the source sequence. + * Subscribers to the given source will receive all notifications of the source from the time of the subscription on. * @return a ConnectableObservable that upon connection causes the source Observable to emit items to its Observers. * @method publish * @owner Observable */ -export function publish(): ConnectableObservable { - return multicast.call(this, new Subject()); +export function publish(selector?: (source: Observable) => Observable): Observable | ConnectableObservable { + return selector ? multicast.call(this, () => new Subject(), selector) : + multicast.call(this, new Subject()); } +export type selector = (source: Observable) => Observable; + export interface PublishSignature { (): ConnectableObservable; + (selector: selector): Observable; }