From bedb6cfcf1edbb1123da7045556604bcb96c5e88 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Wed, 17 Feb 2021 08:57:48 +1000 Subject: [PATCH 1/3] test: add shareReplay leak test --- spec/operators/shareReplay-spec.ts | 22 ++++++++++++++++++++++ spec/support/.mocharc.js | 15 +++++---------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index be308b5784..bedbdafeb1 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -319,4 +319,26 @@ describe('shareReplay operator', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + const FinalizationRegistry = (global as any).FinalizationRegistry; + if (FinalizationRegistry) { + + it('should not leak the subscriber for sync sources', (done) => { + const registry = new FinalizationRegistry((value: any) => { + expect(value).to.equal('callback'); + done(); + }); + let callback: (() => void) | undefined = () => { /* noop */ }; + registry.register(callback, 'callback'); + + const shared = of(42).pipe(shareReplay(1)); + shared.subscribe(callback); + + callback = undefined; + global.gc(); + }); + + } else { + console.warn(`No support for FinalizationRegistry in Node ${process.version}`); + } + }); diff --git a/spec/support/.mocharc.js b/spec/support/.mocharc.js index 608e61a81a..d65a70fc12 100644 --- a/spec/support/.mocharc.js +++ b/spec/support/.mocharc.js @@ -1,14 +1,9 @@ module.exports = { - require: [ - 'spec/support/mocha-path-mappings.js', - 'dist/spec/helpers/polyfills.js', - 'dist/spec/helpers/testScheduler-ui.js' - ], - ui: [ - 'dist/spec/helpers/testScheduler-ui.js' - ], + require: ['spec/support/mocha-path-mappings.js', 'dist/spec/helpers/polyfills.js', 'dist/spec/helpers/testScheduler-ui.js'], + ui: ['dist/spec/helpers/testScheduler-ui.js'], reporter: 'dot', timeout: 5000, recursive: true, - 'enable-source-maps': true -}; \ No newline at end of file + 'enable-source-maps': true, + 'expose-gc': true, +}; From 285c6fd2eb11b10cb08e51580724dd2fb9012a08 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Wed, 17 Feb 2021 08:59:03 +1000 Subject: [PATCH 2/3] fix: null inner subscription in share --- src/internal/operators/share.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index acf1599622..b2c8fcdb64 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -1,6 +1,6 @@ import { Subject } from '../Subject'; -import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types'; +import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike, Unsubscribable } from '../types'; import { Subscription } from '../Subscription'; import { from } from '../observable/from'; import { operate } from '../util/lift'; @@ -106,7 +106,7 @@ export function share(options?: ShareConfig): OperatorFunction { subject = connector!(); } - const castSubscription = subject.subscribe(subscriber); + let castSubscription: Unsubscribable | null = subject.subscribe(subscriber); if (!connection) { connection = from(source).subscribe({ @@ -132,7 +132,8 @@ export function share(options?: ShareConfig): OperatorFunction { return () => { refCount--; - castSubscription.unsubscribe(); + castSubscription!.unsubscribe(); + castSubscription = null; if (!refCount && resetOnRefCountZero && !hasErrored && !hasCompleted) { const conn = connection; reset(); From 67b80350f45cb2bc2e95b084282f9b6bef1f42d2 Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 18 Feb 2021 19:25:57 -0600 Subject: [PATCH 3/3] fix(share): don't have the subscriber capture itself in a closure Resolves an issue where we were inadvertantly returning a teardown that was effectively adding a closure to a subscriber to itself. Basically it was doing something like this: ```ts new Observable(subscriber => { return () => { subscriber.unsubscribe(); }; }); ``` --- src/internal/operators/share.ts | 29 +++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index b2c8fcdb64..25a0d4b263 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -1,6 +1,6 @@ import { Subject } from '../Subject'; -import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike, Unsubscribable } from '../types'; +import { MonoTypeOperatorFunction, OperatorFunction, SubjectLike } from '../types'; import { Subscription } from '../Subscription'; import { from } from '../observable/from'; import { operate } from '../util/lift'; @@ -95,6 +95,8 @@ export function share(options?: ShareConfig): OperatorFunction { let hasCompleted = false; let hasErrored = false; + // Used to reset the internal state to a "cold" + // state, as though it had never been subscribed to. const reset = () => { connection = subject = null; hasCompleted = hasErrored = false; @@ -102,17 +104,21 @@ export function share(options?: ShareConfig): OperatorFunction { return operate((source, subscriber) => { refCount++; - if (!subject) { - subject = connector!(); - } - let castSubscription: Unsubscribable | null = subject.subscribe(subscriber); + // Create the subject if we don't have one yet. + subject = subject ?? connector(); + + // The following line adds the subscription to the subscriber passed. + // Basically, `subscriber === subject.subscribe(subscriber)` is `true`. + subject.subscribe(subscriber); if (!connection) { connection = from(source).subscribe({ next: (value) => subject!.next(value), error: (err) => { hasErrored = true; + // We need to capture the subject before + // we reset (if we need to reset). const dest = subject!; if (resetOnError) { reset(); @@ -122,6 +128,8 @@ export function share(options?: ShareConfig): OperatorFunction { complete: () => { hasCompleted = true; const dest = subject!; + // We need to capture the subject before + // we reset (if we need to reset). if (resetOnComplete) { reset(); } @@ -130,11 +138,16 @@ export function share(options?: ShareConfig): OperatorFunction { }); } + // This is also added to `subscriber`, technically. return () => { refCount--; - castSubscription!.unsubscribe(); - castSubscription = null; - if (!refCount && resetOnRefCountZero && !hasErrored && !hasCompleted) { + + // If we're resetting on refCount === 0, and it's 0, we only want to do + // that on "unsubscribe", really. Resetting on error or completion is a different + // configuration. + if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { + // We need to capture the connection before + // we reset (if we need to reset). const conn = connection; reset(); conn?.unsubscribe();