From 1aa400a5214325bc843a74602022a7912da20166 Mon Sep 17 00:00:00 2001 From: Nicholas Jamieson Date: Sat, 20 Feb 2021 00:58:09 +1000 Subject: [PATCH] fix(share): Ensure proper memory clean up * test: add shareReplay leak test * fix: null inner subscription in share * fix(share): don't have the subscriber capture itself in a closure Resolves an issue where we were inadvertently returning a teardown that was effectively adding a closure to a subscriber to itself. Basically it was doing something like this: ```ts new Observable(subscriber => { return () => { subscriber.unsubscribe(); }; }); ``` Co-authored-by: Ben Lesh --- spec/operators/shareReplay-spec.ts | 22 ++++++++++++++++++++++ spec/support/.mocharc.js | 15 +++++---------- src/internal/operators/share.ts | 26 ++++++++++++++++++++------ 3 files changed, 47 insertions(+), 16 deletions(-) diff --git a/spec/operators/shareReplay-spec.ts b/spec/operators/shareReplay-spec.ts index be308b5784..bedbdafeb1 100644 --- a/spec/operators/shareReplay-spec.ts +++ b/spec/operators/shareReplay-spec.ts @@ -319,4 +319,26 @@ describe('shareReplay operator', () => { expect(sideEffects).to.deep.equal([0, 1, 2]); }); + const FinalizationRegistry = (global as any).FinalizationRegistry; + if (FinalizationRegistry) { + + it('should not leak the subscriber for sync sources', (done) => { + const registry = new FinalizationRegistry((value: any) => { + expect(value).to.equal('callback'); + done(); + }); + let callback: (() => void) | undefined = () => { /* noop */ }; + registry.register(callback, 'callback'); + + const shared = of(42).pipe(shareReplay(1)); + shared.subscribe(callback); + + callback = undefined; + global.gc(); + }); + + } else { + console.warn(`No support for FinalizationRegistry in Node ${process.version}`); + } + }); diff --git a/spec/support/.mocharc.js b/spec/support/.mocharc.js index 608e61a81a..d65a70fc12 100644 --- a/spec/support/.mocharc.js +++ b/spec/support/.mocharc.js @@ -1,14 +1,9 @@ module.exports = { - require: [ - 'spec/support/mocha-path-mappings.js', - 'dist/spec/helpers/polyfills.js', - 'dist/spec/helpers/testScheduler-ui.js' - ], - ui: [ - 'dist/spec/helpers/testScheduler-ui.js' - ], + require: ['spec/support/mocha-path-mappings.js', 'dist/spec/helpers/polyfills.js', 'dist/spec/helpers/testScheduler-ui.js'], + ui: ['dist/spec/helpers/testScheduler-ui.js'], reporter: 'dot', timeout: 5000, recursive: true, - 'enable-source-maps': true -}; \ No newline at end of file + 'enable-source-maps': true, + 'expose-gc': true, +}; diff --git a/src/internal/operators/share.ts b/src/internal/operators/share.ts index acf1599622..25a0d4b263 100644 --- a/src/internal/operators/share.ts +++ b/src/internal/operators/share.ts @@ -95,6 +95,8 @@ export function share(options?: ShareConfig): OperatorFunction { let hasCompleted = false; let hasErrored = false; + // Used to reset the internal state to a "cold" + // state, as though it had never been subscribed to. const reset = () => { connection = subject = null; hasCompleted = hasErrored = false; @@ -102,17 +104,21 @@ export function share(options?: ShareConfig): OperatorFunction { return operate((source, subscriber) => { refCount++; - if (!subject) { - subject = connector!(); - } - const castSubscription = subject.subscribe(subscriber); + // Create the subject if we don't have one yet. + subject = subject ?? connector(); + + // The following line adds the subscription to the subscriber passed. + // Basically, `subscriber === subject.subscribe(subscriber)` is `true`. + subject.subscribe(subscriber); if (!connection) { connection = from(source).subscribe({ next: (value) => subject!.next(value), error: (err) => { hasErrored = true; + // We need to capture the subject before + // we reset (if we need to reset). const dest = subject!; if (resetOnError) { reset(); @@ -122,6 +128,8 @@ export function share(options?: ShareConfig): OperatorFunction { complete: () => { hasCompleted = true; const dest = subject!; + // We need to capture the subject before + // we reset (if we need to reset). if (resetOnComplete) { reset(); } @@ -130,10 +138,16 @@ export function share(options?: ShareConfig): OperatorFunction { }); } + // This is also added to `subscriber`, technically. return () => { refCount--; - castSubscription.unsubscribe(); - if (!refCount && resetOnRefCountZero && !hasErrored && !hasCompleted) { + + // If we're resetting on refCount === 0, and it's 0, we only want to do + // that on "unsubscribe", really. Resetting on error or completion is a different + // configuration. + if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) { + // We need to capture the connection before + // we reset (if we need to reset). const conn = connection; reset(); conn?.unsubscribe();