Skip to content

Commit

Permalink
feat(publish): support optional selectors
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Apr 27, 2016
1 parent 05d47de commit b3a27e9
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 3 deletions.
17 changes: 17 additions & 0 deletions spec/operators/publish-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ describe('Observable.prototype.publish', () => {
published.connect();
});

it('should accept selectors', () => {
const source = hot('-1-2-3----4-|');
const sourceSubs = ['^ !'];
const published = source.publish(x => x.zip(x, (a, b) => (parseInt(a) + parseInt(b)).toString()));
const subscriber1 = hot('a| ').mergeMapTo(published);
const expected1 = '-2-4-6----8-|';
const subscriber2 = hot(' b| ').mergeMapTo(published);
const expected2 = ' -6----8-|';
const subscriber3 = hot(' c| ').mergeMapTo(published);
const expected3 = ' --8-|';

expectObservable(subscriber1).toBe(expected1);
expectObservable(subscriber2).toBe(expected2);
expectObservable(subscriber3).toBe(expected3);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

it('should multicast an error from the source to multiple observers', () => {
const source = cold('-1-2-3----4-#');
const sourceSubs = '^ !';
Expand Down
2 changes: 1 addition & 1 deletion src/add/operator/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import {Observable} from '../../Observable';
import {publish, PublishSignature} from '../../operator/publish';

Observable.prototype.publish = publish;
Observable.prototype.publish = <any>publish;

declare module '../../Observable' {
interface Observable<T> {
Expand Down
12 changes: 10 additions & 2 deletions src/operator/publish.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {Subject} from '../Subject';
import {Observable} from '../Observable';
import {multicast} from './multicast';
import {ConnectableObservable} from '../observable/ConnectableObservable';

Expand All @@ -8,14 +9,21 @@ import {ConnectableObservable} from '../observable/ConnectableObservable';
*
* <img src="./img/publish.png" width="100%">
*
* @param {Function} Optional selector function which can use the multicasted source sequence as many times as needed,
* without causing multiple subscriptions to the source sequence.
* Subscribers to the given source will receive all notifications of the source from the time of the subscription on.
* @return a ConnectableObservable that upon connection causes the source Observable to emit items to its Observers.
* @method publish
* @owner Observable
*/
export function publish<T>(): ConnectableObservable<T> {
return multicast.call(this, new Subject<T>());
export function publish<T>(selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T> {
return selector ? multicast.call(this, () => new Subject<T>(), selector) :
multicast.call(this, new Subject<T>());
}

export type selector<T> = (source: Observable<T>) => Observable<T>;

export interface PublishSignature<T> {
(): ConnectableObservable<T>;
(selector: selector<T>): Observable<T>;
}

0 comments on commit b3a27e9

Please sign in to comment.