Skip to content

Commit fb6014d

Browse files
committed
feat(multicast): add higher-order lettable variant of multicast
Also adds MonoTypeOperatorFunction
1 parent 9974fc2 commit fb6014d

File tree

4 files changed

+78
-39
lines changed

4 files changed

+78
-39
lines changed

src/interfaces.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,7 @@ import { Observable } from './Observable';
33
export type UnaryFunction<T, R> = (source: T) => R;
44

55
export type OperatorFunction<T, R> = UnaryFunction<Observable<T>, Observable<R>>;
6+
7+
export type FactoryOrValue<T> = T | (() => T);
8+
9+
export type MonoTypeOperatorFunction<T> = OperatorFunction<T, T>;

src/operator/multicast.ts

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { Subject } from '../Subject';
2-
import { Operator } from '../Operator';
3-
import { Subscriber } from '../Subscriber';
42
import { Observable } from '../Observable';
5-
import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable';
3+
import { ConnectableObservable } from '../observable/ConnectableObservable';
4+
import { multicast as higherOrder } from '../operators';
5+
import { FactoryOrValue, MonoTypeOperatorFunction } from '../interfaces';
66

77
/* tslint:disable:max-line-length */
8-
export function multicast<T>(this: Observable<T>, subjectOrSubjectFactory: factoryOrValue<Subject<T>>): ConnectableObservable<T>;
9-
export function multicast<T>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector?: selector<T>): Observable<T>;
8+
export function multicast<T>(this: Observable<T>, subjectOrSubjectFactory: FactoryOrValue<Subject<T>>): ConnectableObservable<T>;
9+
export function multicast<T>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector?: MonoTypeOperatorFunction<T>): Observable<T>;
1010
/* tslint:enable:max-line-length */
1111

1212
/**
@@ -30,38 +30,5 @@ export function multicast<T>(SubjectFactory: (this: Observable<T>) => Subject<T>
3030
*/
3131
export function multicast<T>(this: Observable<T>, subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
3232
selector?: (source: Observable<T>) => Observable<T>): Observable<T> | ConnectableObservable<T> {
33-
let subjectFactory: () => Subject<T>;
34-
if (typeof subjectOrSubjectFactory === 'function') {
35-
subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
36-
} else {
37-
subjectFactory = function subjectFactory() {
38-
return <Subject<T>>subjectOrSubjectFactory;
39-
};
40-
}
41-
42-
if (typeof selector === 'function') {
43-
return this.lift(new MulticastOperator(subjectFactory, selector));
44-
}
45-
46-
const connectable: any = Object.create(this, connectableObservableDescriptor);
47-
connectable.source = this;
48-
connectable.subjectFactory = subjectFactory;
49-
50-
return <ConnectableObservable<T>> connectable;
51-
}
52-
53-
export type factoryOrValue<T> = T | (() => T);
54-
export type selector<T> = (source: Observable<T>) => Observable<T>;
55-
56-
export class MulticastOperator<T> implements Operator<T, T> {
57-
constructor(private subjectFactory: () => Subject<T>,
58-
private selector: (source: Observable<T>) => Observable<T>) {
59-
}
60-
call(subscriber: Subscriber<T>, source: any): any {
61-
const { selector } = this;
62-
const subject = this.subjectFactory();
63-
const subscription = selector(subject).subscribe(subscriber);
64-
subscription.add(source.subscribe(subject));
65-
return subscription;
66-
}
33+
return higherOrder(<any>subjectOrSubjectFactory, selector)(this);
6734
}

src/operators/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export { map } from './map';
55
export { max } from './max';
66
export { mergeMap } from './mergeMap';
77
export { min } from './min';
8+
export { multicast } from './multicast';
89
export { reduce } from './reduce';
910
export { scan } from './scan';
1011
export { switchMap } from './switchMap';

src/operators/multicast.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { Subject } from '../Subject';
2+
import { Operator } from '../Operator';
3+
import { Subscriber } from '../Subscriber';
4+
import { Observable } from '../Observable';
5+
import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable';
6+
import { FactoryOrValue, MonoTypeOperatorFunction } from '../interfaces';
7+
8+
/* tslint:disable:max-line-length */
9+
export function multicast<T>(subjectOrSubjectFactory: FactoryOrValue<Subject<T>>): MonoTypeOperatorFunction<T>;
10+
export function multicast<T>(SubjectFactory: (this: Observable<T>) => Subject<T>, selector?: MonoTypeOperatorFunction<T>): MonoTypeOperatorFunction<T>;
11+
/* tslint:enable:max-line-length */
12+
13+
/**
14+
* Returns an Observable that emits the results of invoking a specified selector on items
15+
* emitted by a ConnectableObservable that shares a single subscription to the underlying stream.
16+
*
17+
* <img src="./img/multicast.png" width="100%">
18+
*
19+
* @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through
20+
* which the source sequence's elements will be multicast to the selector function
21+
* or Subject to push source elements into.
22+
* @param {Function} [selector] - Optional selector function that can use the multicasted source stream
23+
* as many times as needed, without causing multiple subscriptions to the source stream.
24+
* Subscribers to the given source will receive all notifications of the source from the
25+
* time of the subscription forward.
26+
* @return {Observable} An Observable that emits the results of invoking the selector
27+
* on the items emitted by a `ConnectableObservable` that shares a single subscription to
28+
* the underlying stream.
29+
* @method multicast
30+
* @owner Observable
31+
*/
32+
export function multicast<T>(subjectOrSubjectFactory: Subject<T> | (() => Subject<T>),
33+
selector?: (source: Observable<T>) => Observable<T>): MonoTypeOperatorFunction<T> {
34+
return function multicastOperatorFunction(source: Observable<T>): Observable<T> {
35+
let subjectFactory: () => Subject<T>;
36+
if (typeof subjectOrSubjectFactory === 'function') {
37+
subjectFactory = <() => Subject<T>>subjectOrSubjectFactory;
38+
} else {
39+
subjectFactory = function subjectFactory() {
40+
return <Subject<T>>subjectOrSubjectFactory;
41+
};
42+
}
43+
44+
if (typeof selector === 'function') {
45+
return source.lift(new MulticastOperator(subjectFactory, selector));
46+
}
47+
48+
const connectable: any = Object.create(source, connectableObservableDescriptor);
49+
connectable.source = source;
50+
connectable.subjectFactory = subjectFactory;
51+
52+
return <ConnectableObservable<T>> connectable;
53+
};
54+
}
55+
56+
export class MulticastOperator<T> implements Operator<T, T> {
57+
constructor(private subjectFactory: () => Subject<T>,
58+
private selector: (source: Observable<T>) => Observable<T>) {
59+
}
60+
call(subscriber: Subscriber<T>, source: any): any {
61+
const { selector } = this;
62+
const subject = this.subjectFactory();
63+
const subscription = selector(subject).subscribe(subscriber);
64+
subscription.add(source.subscribe(subject));
65+
return subscription;
66+
}
67+
}

0 commit comments

Comments
 (0)