From 134bc87c6554f79443cd46e9e6b00d4ed1425eaf Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Wed, 13 May 2020 23:52:37 -0500 Subject: [PATCH] Add static multicasters --- spec/Observable-spec.ts | 32 +++ src/index.ts | 8 +- .../observable/ConnectableObservable.ts | 225 ++++++++++-------- src/internal/operators/multicast.ts | 72 ++++-- 4 files changed, 209 insertions(+), 128 deletions(-) diff --git a/spec/Observable-spec.ts b/spec/Observable-spec.ts index f5a3863aba3..330e4b1803e 100644 --- a/spec/Observable-spec.ts +++ b/spec/Observable-spec.ts @@ -759,6 +759,38 @@ describe('Observable.lift', () => { }); }); + // TODO: This test shows a limitation of the library as it is currently implemented + // This test should never pass, and it was probably not a great goal to have the one + // above this pass. + // See issue here: https://github.com/ReactiveX/rxjs/issues/5431 + xit('should compose through more than one multicast and a refCount', (done) => { + const result = new MyCustomObservable((observer) => { + observer.next(1); + observer.next(2); + observer.next(3); + observer.complete(); + }).pipe( + multicast(() => new Subject()), + multicast(() => new Subject()), + refCount(), + map(x => 10 * x), + ); + + // NOTE: This was a bad goal. + expect(result instanceof MyCustomObservable).to.be.true; + + const expected = [10, 20, 30]; + + result.subscribe( + function (x) { + expect(x).to.equal(expected.shift()); + }, (x) => { + done(new Error('should not be called')); + }, () => { + done(); + }); + }); + it('should compose through multicast with selector function', (done) => { const result = new MyCustomObservable((observer) => { observer.next(1); diff --git a/src/index.ts b/src/index.ts index bafa33eeec9..a021dd63f69 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,12 @@ /* Observable */ export { Observable } from './internal/Observable'; -export { ConnectableObservable } from './internal/observable/ConnectableObservable'; +export { + ConnectableObservable, + multicastFrom, + publishFrom, + publishBehaviorFrom, + publishReplayFrom, +} from './internal/observable/ConnectableObservable'; export { GroupedObservable } from './internal/operators/groupBy'; export { Operator } from './internal/Operator'; export { observable } from './internal/symbol/observable'; diff --git a/src/internal/observable/ConnectableObservable.ts b/src/internal/observable/ConnectableObservable.ts index a77a497b938..045c82a04af 100644 --- a/src/internal/observable/ConnectableObservable.ts +++ b/src/internal/observable/ConnectableObservable.ts @@ -1,24 +1,35 @@ import { Subject, SubjectSubscriber } from '../Subject'; -import { Operator } from '../Operator'; import { Observable } from '../Observable'; import { Subscriber } from '../Subscriber'; import { Subscription } from '../Subscription'; -import { TeardownLogic } from '../types'; -import { refCount as higherOrderRefCount } from '../operators/refCount'; +import { ObservableInput, TimestampProvider } from '../types'; +import { refCount } from '../operators/refCount'; +import { from } from './from'; +import { ReplaySubject } from '../ReplaySubject'; +import { BehaviorSubject } from '../BehaviorSubject'; /** - * @class ConnectableObservable + * An observable that connects a single subscription to a source, through a subject, + * to multiple subscribers, when `connect()` is called on it. + * + * Subclassing is not recommended. */ export class ConnectableObservable extends Observable { - protected _subject: Subject | undefined; protected _refCount: number = 0; protected _connection: Subscription | null | undefined; /** @internal */ _isComplete = false; - constructor(public source: Observable, - protected subjectFactory: () => Subject) { + /** + * Creates a new ConnectableObservable. + * Do not use directly. Instead, use {@link multicastFrom}. + * + * @param source The source observable to subcribe to upon connection + * @param subjectFactory A factor function used to create the `Subject` that + * connects the source to all subscribers. + */ + constructor(public source: Observable, protected subjectFactory: () => Subject) { super(); } @@ -27,6 +38,7 @@ export class ConnectableObservable extends Observable { return this.getSubject().subscribe(subscriber); } + /** @deprecated Implementation detail, do not use */ protected getSubject(): Subject { const subject = this._subject; if (!subject || subject.isStopped) { @@ -35,13 +47,16 @@ export class ConnectableObservable extends Observable { return this._subject!; } + /** + * Connects all current and future subscribers to this observable + * to the source observable + */ connect(): Subscription { let connection = this._connection; if (!connection) { this._isComplete = false; connection = this._connection = new Subscription(); - connection.add(this.source - .subscribe(new ConnectableSubscriber(this.getSubject(), this))); + connection.add(this.source.subscribe(new ConnectableSubscriber(this.getSubject(), this))); if (connection.closed) { this._connection = null; connection = Subscription.EMPTY; @@ -50,29 +65,23 @@ export class ConnectableObservable extends Observable { return connection; } + /** + * Returns an Observable that will count the number of active subscriptions to + * the connectable observable, and: + * + * 1. Increments the active subscriptions count for each subscription to the resulting observable + * 2. When the active subscriptions count goes from 0 to 1, will "connect" the `ConnectableObservable` automatically. + * 3. Unsubscribing from the resulting observable will decrement the active subscriptions count. + * 4. If the active subscriptions count returns to zero, the "connection" will be terminated, and the + * subscription to the source observable will be unsubscribed. + */ refCount(): Observable { - return higherOrderRefCount()(this) as Observable; + return refCount()(this); } } -export const connectableObservableDescriptor: PropertyDescriptorMap = (() => { - const connectableProto = ConnectableObservable.prototype; - return { - operator: { value: null as null }, - _refCount: { value: 0, writable: true }, - _subject: { value: null as null, writable: true }, - _connection: { value: null as null, writable: true }, - _subscribe: { value: connectableProto._subscribe }, - _isComplete: { value: connectableProto._isComplete, writable: true }, - getSubject: { value: connectableProto.getSubject }, - connect: { value: connectableProto.connect }, - refCount: { value: connectableProto.refCount } - }; -})(); - class ConnectableSubscriber extends SubjectSubscriber { - constructor(destination: Subject, - private connectable: ConnectableObservable) { + constructor(destination: Subject, private connectable: ConnectableObservable) { super(destination); } protected _error(err: any): void { @@ -99,84 +108,94 @@ class ConnectableSubscriber extends SubjectSubscriber { } } -class RefCountOperator implements Operator { - constructor(private connectable: ConnectableObservable) { - } - call(subscriber: Subscriber, source: any): TeardownLogic { - - const { connectable } = this; - ( connectable)._refCount++; - - const refCounter = new RefCountSubscriber(subscriber, connectable); - const subscription = source.subscribe(refCounter); - - if (!refCounter.closed) { - ( refCounter).connection = connectable.connect(); - } - - return subscription; - } +/** + * Multicasts values from a single subscription to an observable source, through a subject. + * + * Requires "connection". + * + * This returns a {@link ConnectableObservable}, that connects a single observable subscription + * to many subscribers through a created subject. + * + * Subscribers to the returned observable will actually be subscribing to the subject provided by + * the second argument. When `connect()` is called on the returned `ConnectableObservable`, that + * subject is used to create a single subscription to the observable source provided as the `input` + * argument, and that {@link Subscription} is returned. If you `unsubscribe` from the subscription + * returned by `connect()`, all subscribers will be "disconnected" and stop recieving notifications. + * + * When the subscription to the shared source, provided via the `input` argument, is torn down, + * either by completion of the source, an error from the source, or "disconnection" via calling `unsubscribe` + * on the `Subscription` returned by `connect()`, one of two things will happen: + * + * 1. If you provided a factory function that creates a `Subject`, the subject state is "reset", and you + * may reconnect, which will call the subject factor and create a new Subject for use with the new connection. + * 2. If you provided a `Subject` directly, that subject instance will remain the subject new subscriptions + * will attempt to "connect" through, however, that `Subject` will likely be "closed", thus meaning that + * the returned `ConnectableObservable` cannot be retried or repeated. + * + * Note that multicasting in this manner is not generally recommended, but RxJS provides this functionality + * for the following generalized cases: + * + * 1. Multicasting synchronous emissions from an observable source. + * 2. Multicasting through a custom `Subject` in a "connectable" way. + * 3. Both 1 and 2. + * + * In most cases, if you want to share values from a single subscription to an observable to + * multiple subscribers, you really should be using the {@link share} operator. + * + * ### Example + * + * ```ts + * import { range, multicastFrom } from 'rxjs'; + * import { finalize } from 'rxjs/operators'; + * + * const source = range(0, 5).pipe( + * finalize(() => console.log('finalized')) + * ); + * + * const published = multicastFrom(source, () => new Subject()); + * + * published.subscribe(x => console.log('A', x)); + * published.subscribe(x => console.log('B', x)); + * + * // Nothing happens until you connect + * const subcription = publish.connect(); + * + * // subscription.unsubscribe() will disconnect all subscribers. + * ``` + * @param input The observable input to publish + * @param subjectFactoryOrSubject A Subject instance, or a function used to create the subject upon connection + */ +export function multicastFrom( + input: ObservableInput, + subjectFactoryOrSubject: Subject | (() => Subject) +): ConnectableObservable { + const subjectFactory = + typeof subjectFactoryOrSubject === 'function' ? subjectFactoryOrSubject : () => subjectFactoryOrSubject; + return new ConnectableObservable(from(input), subjectFactory); } -class RefCountSubscriber extends Subscriber { - - private connection: Subscription | null | undefined; - - constructor(destination: Subscriber, - private connectable: ConnectableObservable) { - super(destination); - } - - protected _unsubscribe() { - - const { connectable } = this; - if (!connectable) { - this.connection = null; - return; - } - - this.connectable = null!; - const refCount = ( connectable)._refCount; - if (refCount <= 0) { - this.connection = null; - return; - } - - ( connectable)._refCount = refCount - 1; - if (refCount > 1) { - this.connection = null; - return; - } +/** + * Identical to {@link multicastFrom} called as `multicastFrom(input, new Subject())` + */ +export function publishFrom(input: ObservableInput) { + return multicastFrom(input, new Subject()); +} - /// - // Compare the local RefCountSubscriber's connection Subscription to the - // connection Subscription on the shared ConnectableObservable. In cases - // where the ConnectableObservable source synchronously emits values, and - // the RefCountSubscriber's downstream Observers synchronously unsubscribe, - // execution continues to here before the RefCountOperator has a chance to - // supply the RefCountSubscriber with the shared connection Subscription. - // For example: - // ``` - // range(0, 10).pipe( - // publish(), - // refCount(), - // take(5), - // ).subscribe(); - // ``` - // In order to account for this case, RefCountSubscriber should only dispose - // the ConnectableObservable's shared connection Subscription if the - // connection Subscription exists, *and* either: - // a. RefCountSubscriber doesn't have a reference to the shared connection - // Subscription yet, or, - // b. RefCountSubscriber's connection Subscription reference is identical - // to the shared connection Subscription - /// - const { connection } = this; - const sharedConnection = ( connectable)._connection; - this.connection = null; +/** + * Identical to {@link multicastFrom} called as `multicastFrom(input, new ReplaySubject(bufferSize, windowTime, timestampProvider);` + */ +export function publishReplayFrom( + input: ObservableInput, + bufferSize?: number, + windowTime?: number, + timestampProvider?: TimestampProvider +) { + return multicastFrom(input, new ReplaySubject(bufferSize, windowTime, timestampProvider)); +} - if (sharedConnection && (!connection || sharedConnection === connection)) { - sharedConnection.unsubscribe(); - } - } +/** + * Identical to {@link multicastFrom} called as `multicastFrom(input, new BehaviorSubject(initialValue);` + */ +export function publishBehaviorFrom(input: ObservableInput, initialValue: T) { + return multicastFrom(input, new BehaviorSubject(initialValue)); } diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 03d7b63b131..c4f52550dae 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -2,14 +2,40 @@ import { Subject } from '../Subject'; import { Operator } from '../Operator'; import { Subscriber } from '../Subscriber'; import { Observable } from '../Observable'; -import { ConnectableObservable, connectableObservableDescriptor } from '../observable/ConnectableObservable'; +import { ConnectableObservable } from '../observable/ConnectableObservable'; import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types'; +// HACK: Used below to get publish operator variants that are supposed to +// return connectable observables to use `lift`. We can remove this once +// operators that return connectable observables are elimated. +const connectableObservableDescriptor: PropertyDescriptorMap = (() => { + const connectableProto = ConnectableObservable.prototype; + return { + operator: { value: null }, + _refCount: { value: 0, writable: true }, + _subject: { value: null, writable: true }, + _connection: { value: null, writable: true }, + _subscribe: { value: connectableProto._subscribe }, + _isComplete: { value: connectableProto._isComplete, writable: true }, + getSubject: { value: (connectableProto as any).getSubject }, + connect: { value: connectableProto.connect }, + refCount: { value: connectableProto.refCount }, + }; +})(); + /* tslint:disable:max-line-length */ export function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; -export function multicast>(subject: Subject, selector: (shared: Observable) => O): UnaryFunction, ConnectableObservable>>; -export function multicast(subjectFactory: (this: Observable) => Subject): UnaryFunction, ConnectableObservable>; -export function multicast>(SubjectFactory: (this: Observable) => Subject, selector: (shared: Observable) => O): OperatorFunction>; +export function multicast>( + subject: Subject, + selector: (shared: Observable) => O +): UnaryFunction, ConnectableObservable>>; +export function multicast( + subjectFactory: (this: Observable) => Subject +): UnaryFunction, ConnectableObservable>; +export function multicast>( + SubjectFactory: (this: Observable) => Subject, + selector: (shared: Observable) => O +): OperatorFunction>; /* tslint:enable:max-line-length */ /** @@ -18,46 +44,44 @@ export function multicast>(SubjectFactory: (th * * ![](multicast.png) * - * @param {Function|Subject} subjectOrSubjectFactory - Factory function to create an intermediate subject through + * @param subjectOrSubjectFactory Factory function to create an intermediate subject through * which the source sequence's elements will be multicasted to the selector function * or Subject to push source elements into. - * @param {Function} [selector] - Optional selector function that can use the multicasted source stream + * @param selector Optional selector function that can use the multicasted source stream * as many times as needed, without causing multiple subscriptions to the source stream. * Subscribers to the given source will receive all notifications of the source from the * time of the subscription forward. - * @return {Observable} An Observable that emits the results of invoking the selector - * on the items emitted by a `ConnectableObservable` that shares a single subscription to + * @return An observable that emits the results of invoking the selector + * on the items emitted by a {@link ConnectableObservable} that shares a single subscription to * the underlying stream. - * @name multicast */ -export function multicast(subjectOrSubjectFactory: Subject | (() => Subject), - selector?: (source: Observable) => Observable): OperatorFunction { +export function multicast( + subjectOrSubjectFactory: Subject | (() => Subject), + selector?: (source: Observable) => Observable +): OperatorFunction { return function multicastOperatorFunction(source: Observable): Observable { let subjectFactory: () => Subject; if (typeof subjectOrSubjectFactory === 'function') { - subjectFactory = <() => Subject>subjectOrSubjectFactory; + subjectFactory = subjectOrSubjectFactory; } else { - subjectFactory = function subjectFactory() { - return >subjectOrSubjectFactory; - }; + subjectFactory = () => subjectOrSubjectFactory; } if (typeof selector === 'function') { return source.lift(new MulticastOperator(subjectFactory, selector)); + } else { + // const connectable = new ConnectableObservable(source, subjectFactory); + const connectable: any = Object.create(source, connectableObservableDescriptor); + connectable.source = source; + connectable.subjectFactory = subjectFactory; + return connectable; } - - const connectable: any = Object.create(source, connectableObservableDescriptor); - connectable.source = source; - connectable.subjectFactory = subjectFactory; - - return > connectable; }; } export class MulticastOperator implements Operator { - constructor(private subjectFactory: () => Subject, - private selector: (source: Observable) => Observable) { - } + constructor(private subjectFactory: () => Subject, private selector: (source: Observable) => Observable) {} + call(subscriber: Subscriber, source: any): any { const { selector } = this; const subject = this.subjectFactory();