From 1b7ff2cd8fdc469e3327cb62c3905d221260e47b Mon Sep 17 00:00:00 2001 From: miginmrs Date: Fri, 16 Oct 2020 17:22:04 +0100 Subject: [PATCH 1/2] fix(multicast,share,refCount,shareReplay): enable synchronous firehose - use known subscriber when possible instead of returned subscription - remove unused unsubscribe from refCount Operator - add teardown logic before calling subscribe when possible - enable remaining synchronous firehose tests fixes #5658 --- spec/operators/multicast-spec.ts | 3 +- spec/operators/refCount-spec.ts | 3 +- spec/operators/share-spec.ts | 3 +- spec/operators/shareReplay-spec.ts | 5 +- .../observable/ConnectableObservable.ts | 47 ++++++++++++------- src/internal/operators/multicast.ts | 6 ++- src/internal/operators/refCount.ts | 33 ++----------- src/internal/operators/shareReplay.ts | 43 ++++++++--------- 8 files changed, 66 insertions(+), 77 deletions(-) diff --git a/spec/operators/multicast-spec.ts b/spec/operators/multicast-spec.ts index 78ba78abf3..9ed0270356 100644 --- a/spec/operators/multicast-spec.ts +++ b/spec/operators/multicast-spec.ts @@ -701,7 +701,6 @@ describe('multicast operator', () => { }); }); - // TODO: fix firehose unsubscription // AFAICT, it's not possible for multicast observables to support ASAP // unsubscription from synchronous firehose sources. The problem is that the // chaining of the closed 'signal' is broken by the subject. For example, @@ -718,7 +717,7 @@ describe('multicast operator', () => { // That breaks the chaining of closed - i.e. even if the unsubscribe is // called on the subject, closing it, the SafeSubscriber's closed property // won't reflect that. - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/refCount-spec.ts b/spec/operators/refCount-spec.ts index 20df864ef1..914941d282 100644 --- a/spec/operators/refCount-spec.ts +++ b/spec/operators/refCount-spec.ts @@ -115,8 +115,7 @@ describe('refCount', () => { expect(arr[1]).to.equal('the number two'); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/share-spec.ts b/spec/operators/share-spec.ts index cbb7ace6c1..22dc47b908 100644 --- a/spec/operators/share-spec.ts +++ b/spec/operators/share-spec.ts @@ -309,8 +309,7 @@ describe('share operator', () => { expectObservable(e1.pipe(share())).toBe(expected); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index be308b5784..9520ab6bce 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -299,8 +299,7 @@ describe('shareReplay operator', () => { expectObservable(result).toBe(expected); }); - // TODO: fix firehose unsubscription - it.skip('should stop listening to a synchronous observable when unsubscribed', () => { + it('should stop listening to a synchronous observable when unsubscribed', () => { const sideEffects: number[] = []; const synchronousObservable = new Observable(subscriber => { // This will check to see if the subscriber was closed on each loop @@ -312,7 +311,7 @@ describe('shareReplay operator', () => { }); synchronousObservable.pipe( - shareReplay(), + shareReplay({ refCount: true }), take(3), ).subscribe(() => { /* noop */ }); diff --git a/src/internal/observable/ConnectableObservable.ts b/src/internal/observable/ConnectableObservable.ts index 0650cfcdf5..9403012538 100644 --- a/src/internal/observable/ConnectableObservable.ts +++ b/src/internal/observable/ConnectableObservable.ts @@ -13,6 +13,7 @@ export class ConnectableObservable extends Observable { protected _subject: Subject | null = null; protected _refCount: number = 0; protected _connection: Subscription | null = null; + protected _waiting: boolean = false; constructor(public source: Observable, protected subjectFactory: () => Subject) { super(); @@ -37,28 +38,40 @@ export class ConnectableObservable extends Observable { _connection?.unsubscribe(); } - connect(): Subscription { + /** @deprecated This is an internal implementation detail, do not use. */ + prepare(): Subscription { let connection = this._connection; if (!connection) { connection = this._connection = new Subscription(); + this._waiting = true; + } + return connection; + } + + connect(): Subscription { + let connection = this._connection; + if (!connection || this._waiting) { + if (!connection) { + connection = this._connection = new Subscription(); + } else { + this._waiting = false; + } const subject = this.getSubject(); - connection.add( - this.source.subscribe( - new OperatorSubscriber( - subject as any, - undefined, - (err) => { - this._teardown(); - subject.error(err); - }, - () => { - this._teardown(); - subject.complete(); - }, - () => this._teardown() - ) - ) + const subs = new OperatorSubscriber( + subject as any, + undefined, + (err) => { + this._teardown(); + subject.error(err); + }, + () => { + this._teardown(); + subject.complete(); + }, + () => this._teardown() ); + connection.add(subs); + this.source.subscribe(subs); if (connection.closed) { this._connection = null; diff --git a/src/internal/operators/multicast.ts b/src/internal/operators/multicast.ts index 3f9ae7285b..035e23a19a 100644 --- a/src/internal/operators/multicast.ts +++ b/src/internal/operators/multicast.ts @@ -5,6 +5,7 @@ import { ConnectableObservable } from '../observable/ConnectableObservable'; import { OperatorFunction, UnaryFunction, ObservedValueOf, ObservableInput } from '../types'; import { hasLift, operate } from '../util/lift'; import { isFunction } from '../util/isFunction'; +import { SafeSubscriber } from '../Subscriber'; /* tslint:disable:max-line-length */ export function multicast(subject: Subject): UnaryFunction, ConnectableObservable>; @@ -50,7 +51,10 @@ export function multicast( // that to the resulting subscription. The act of subscribing with `this`, // the primary destination subscriber, will automatically add the subscription // to the result. - selector(subject).subscribe(subscriber).add(source.subscribe(subject)); + const subscription = selector(subject).subscribe(subscriber); + const subjectSubscriber = new SafeSubscriber(subject); + subscription.add(subjectSubscriber); + source.subscribe(subjectSubscriber); }); } diff --git a/src/internal/operators/refCount.ts b/src/internal/operators/refCount.ts index 292cd45aa3..268ab5be3a 100644 --- a/src/internal/operators/refCount.ts +++ b/src/internal/operators/refCount.ts @@ -70,46 +70,21 @@ export function refCount(): MonoTypeOperatorFunction { return; } - /// - // Compare the local RefCountSubscriber's connection Subscription to the - // connection Subscription on the shared ConnectableObservable. In cases - // where the ConnectableObservable source synchronously emits values, and - // the RefCountSubscriber's downstream Observers synchronously unsubscribe, - // execution continues to here before the RefCountOperator has a chance to - // supply the RefCountSubscriber with the shared connection Subscription. - // For example: - // ``` - // range(0, 10).pipe( - // publish(), - // refCount(), - // take(5), - // ) - // .subscribe(); - // ``` - // In order to account for this case, RefCountSubscriber should only dispose - // the ConnectableObservable's shared connection Subscription if the - // connection Subscription exists, *and* either: - // a. RefCountSubscriber doesn't have a reference to the shared connection - // Subscription yet, or, - // b. RefCountSubscriber's connection Subscription reference is identical - // to the shared connection Subscription - /// - const sharedConnection = (source as any)._connection; const conn = connection; connection = null; - if (sharedConnection && (!conn || sharedConnection === conn)) { + if (sharedConnection && sharedConnection === conn) { sharedConnection.unsubscribe(); } - - subscriber.unsubscribe(); }); source.subscribe(refCounter); if (!refCounter.closed) { - connection = (source as ConnectableObservable).connect(); + const connectable = source as ConnectableObservable; + connection = connectable.prepare(); + connectable.connect(); } }); } diff --git a/src/internal/operators/shareReplay.ts b/src/internal/operators/shareReplay.ts index 6570c549ba..be6547c0ee 100644 --- a/src/internal/operators/shareReplay.ts +++ b/src/internal/operators/shareReplay.ts @@ -2,7 +2,7 @@ import { Observable } from '../Observable'; import { ReplaySubject } from '../ReplaySubject'; import { Subscription } from '../Subscription'; import { MonoTypeOperatorFunction, SchedulerLike } from '../types'; -import { Subscriber } from '../Subscriber'; +import { SafeSubscriber, Subscriber } from '../Subscriber'; import { operate } from '../util/lift'; export interface ShareReplayConfig { @@ -150,45 +150,46 @@ function shareReplayOperator({ }: ShareReplayConfig) { let subject: ReplaySubject | undefined; let refCount = 0; - let subscription: Subscription | undefined; + let outerSubscriber: Subscriber | undefined; return (source: Observable, subscriber: Subscriber) => { refCount++; - let innerSub: Subscription; + const innerSub = new Subscription(); + subscriber.add(() => { + refCount--; + innerSub.unsubscribe(); + if (useRefCount && refCount === 0 && outerSubscriber) { + outerSubscriber.unsubscribe(); + outerSubscriber = undefined; + subject = undefined; + } + }); + if (!subject) { subject = new ReplaySubject(bufferSize, windowTime, scheduler); - innerSub = subject.subscribe(subscriber); - subscription = source.subscribe({ + innerSub.add(subject.subscribe(subscriber)); + outerSubscriber = new SafeSubscriber({ next(value) { subject!.next(value); }, error(err) { const dest = subject; - subscription = undefined; + outerSubscriber = undefined; subject = undefined; dest!.error(err); }, complete() { - subscription = undefined; + outerSubscriber = undefined; subject!.complete(); }, - }); + }) + source.subscribe(outerSubscriber); // The following condition is needed because source can complete synchronously // upon subscription. When that happens `subscription` is first set to `undefined` // and right after is set to the "closed subscription" returned by `subscribe` - if (subscription.closed) { - subscription = undefined; + if (outerSubscriber.closed) { + outerSubscriber = undefined; } } else { - innerSub = subject.subscribe(subscriber); + innerSub.add(subject.subscribe(subscriber)); } - - subscriber.add(() => { - refCount--; - innerSub.unsubscribe(); - if (useRefCount && refCount === 0 && subscription) { - subscription.unsubscribe(); - subscription = undefined; - subject = undefined; - } - }); }; } From 48bf9bf6f770d41c5c600fd7140de5ad47f396b8 Mon Sep 17 00:00:00 2001 From: miginmrs Date: Fri, 16 Oct 2020 17:50:27 +0100 Subject: [PATCH 2/2] fix(connectable): update api guardian ConnectableObservable contains new properties: * _waiting: boolean, it indicates whether _connection is a not yet subscription to source or not * prepare: ()=>Subscription, internal, is called to get the subscription used for the next `connect` --- api_guard/dist/types/index.d.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/api_guard/dist/types/index.d.ts b/api_guard/dist/types/index.d.ts index 409f3ef917..c462b6e34c 100644 --- a/api_guard/dist/types/index.d.ts +++ b/api_guard/dist/types/index.d.ts @@ -198,6 +198,7 @@ export declare class ConnectableObservable extends Observable { protected _connection: Subscription | null; protected _refCount: number; protected _subject: Subject | null; + protected _waiting: boolean; source: Observable; protected subjectFactory: () => Subject; constructor(source: Observable, subjectFactory: () => Subject); @@ -205,6 +206,7 @@ export declare class ConnectableObservable extends Observable { protected _teardown(): void; connect(): Subscription; protected getSubject(): Subject; + prepare(): Subscription; refCount(): Observable; }