Skip to content

Commit

Permalink
fix(share): Ensure proper memory clean up
Browse files Browse the repository at this point in the history
* test: add shareReplay leak test

* fix: null inner subscription in share

* fix(share): don't have the subscriber capture itself in a closure

Resolves an issue where we were inadvertently returning a teardown that was effectively adding a closure to a subscriber to itself.

Basically it was doing something like this:

```ts
new Observable(subscriber => {
    return () => {
        subscriber.unsubscribe();
    };
});
```

Co-authored-by: Ben Lesh <ben@benlesh.com>
  • Loading branch information
cartant and benlesh authored Feb 19, 2021
1 parent 912f5d7 commit 1aa400a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
22 changes: 22 additions & 0 deletions spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,26 @@ describe('shareReplay operator', () => {
expect(sideEffects).to.deep.equal([0, 1, 2]);
});

const FinalizationRegistry = (global as any).FinalizationRegistry;
if (FinalizationRegistry) {

it('should not leak the subscriber for sync sources', (done) => {
const registry = new FinalizationRegistry((value: any) => {
expect(value).to.equal('callback');
done();
});
let callback: (() => void) | undefined = () => { /* noop */ };
registry.register(callback, 'callback');

const shared = of(42).pipe(shareReplay(1));
shared.subscribe(callback);

callback = undefined;
global.gc();
});

} else {
console.warn(`No support for FinalizationRegistry in Node ${process.version}`);
}

});
15 changes: 5 additions & 10 deletions spec/support/.mocharc.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
module.exports = {
require: [
'spec/support/mocha-path-mappings.js',
'dist/spec/helpers/polyfills.js',
'dist/spec/helpers/testScheduler-ui.js'
],
ui: [
'dist/spec/helpers/testScheduler-ui.js'
],
require: ['spec/support/mocha-path-mappings.js', 'dist/spec/helpers/polyfills.js', 'dist/spec/helpers/testScheduler-ui.js'],
ui: ['dist/spec/helpers/testScheduler-ui.js'],
reporter: 'dot',
timeout: 5000,
recursive: true,
'enable-source-maps': true
};
'enable-source-maps': true,
'expose-gc': true,
};
26 changes: 20 additions & 6 deletions src/internal/operators/share.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,24 +95,30 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
let hasCompleted = false;
let hasErrored = false;

// Used to reset the internal state to a "cold"
// state, as though it had never been subscribed to.
const reset = () => {
connection = subject = null;
hasCompleted = hasErrored = false;
};

return operate((source, subscriber) => {
refCount++;
if (!subject) {
subject = connector!();
}

const castSubscription = subject.subscribe(subscriber);
// Create the subject if we don't have one yet.
subject = subject ?? connector();

// The following line adds the subscription to the subscriber passed.
// Basically, `subscriber === subject.subscribe(subscriber)` is `true`.
subject.subscribe(subscriber);

if (!connection) {
connection = from(source).subscribe({
next: (value) => subject!.next(value),
error: (err) => {
hasErrored = true;
// We need to capture the subject before
// we reset (if we need to reset).
const dest = subject!;
if (resetOnError) {
reset();
Expand All @@ -122,6 +128,8 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
complete: () => {
hasCompleted = true;
const dest = subject!;
// We need to capture the subject before
// we reset (if we need to reset).
if (resetOnComplete) {
reset();
}
Expand All @@ -130,10 +138,16 @@ export function share<T>(options?: ShareConfig<T>): OperatorFunction<T, T> {
});
}

// This is also added to `subscriber`, technically.
return () => {
refCount--;
castSubscription.unsubscribe();
if (!refCount && resetOnRefCountZero && !hasErrored && !hasCompleted) {

// If we're resetting on refCount === 0, and it's 0, we only want to do
// that on "unsubscribe", really. Resetting on error or completion is a different
// configuration.
if (resetOnRefCountZero && !refCount && !hasErrored && !hasCompleted) {
// We need to capture the connection before
// we reset (if we need to reset).
const conn = connection;
reset();
conn?.unsubscribe();
Expand Down

0 comments on commit 1aa400a

Please sign in to comment.