Skip to content

Commit

Permalink
diagnostics_channel: fix ref counting bug when reaching zero subscribers
Browse files Browse the repository at this point in the history
PR-URL: #47520
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
  • Loading branch information
Stephen Belanger authored Apr 13, 2023
1 parent a67cae2 commit 6fb74c7
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
52 changes: 31 additions & 21 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
SafeFinalizationRegistry,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
Expand All @@ -29,14 +30,29 @@ const { triggerUncaughtException } = internalBinding('errors');

const { WeakReference } = internalBinding('util');

function decRef(channel) {
if (channels.get(channel.name).decRef() === 0) {
channels.delete(channel.name);
// Can't delete when weakref count reaches 0 as it could increment again.
// Only GC can be used as a valid time to clean up the channels map.
class WeakRefMap extends SafeMap {
#finalizers = new SafeFinalizationRegistry((key) => {
this.delete(key);
});

set(key, value) {
this.#finalizers.register(value, key);
return super.set(key, new WeakReference(value));
}
}

function incRef(channel) {
channels.get(channel.name).incRef();
get(key) {
return super.get(key)?.get();
}

incRef(key) {
return super.get(key)?.incRef();
}

decRef(key) {
return super.get(key)?.decRef();
}
}

function markActive(channel) {
Expand Down Expand Up @@ -81,7 +97,7 @@ class ActiveChannel {
subscribe(subscription) {
validateFunction(subscription, 'subscription');
ArrayPrototypePush(this._subscribers, subscription);
incRef(this);
channels.incRef(this.name);
}

unsubscribe(subscription) {
Expand All @@ -90,15 +106,15 @@ class ActiveChannel {

ArrayPrototypeSplice(this._subscribers, index, 1);

decRef(this);
channels.decRef(this.name);
maybeMarkInactive(this);

return true;
}

bindStore(store, transform) {
const replacing = this._stores.has(store);
if (!replacing) incRef(this);
if (!replacing) channels.incRef(this.name);
this._stores.set(store, transform);
}

Expand All @@ -109,7 +125,7 @@ class ActiveChannel {

this._stores.delete(store);

decRef(this);
channels.decRef(this.name);
maybeMarkInactive(this);

return true;
Expand Down Expand Up @@ -154,7 +170,7 @@ class Channel {
this._stores = undefined;
this.name = name;

channels.set(name, new WeakReference(this));
channels.set(name, this);
}

static [SymbolHasInstance](instance) {
Expand Down Expand Up @@ -192,12 +208,10 @@ class Channel {
}
}

const channels = new SafeMap();
const channels = new WeakRefMap();

function channel(name) {
let channel;
const ref = channels.get(name);
if (ref) channel = ref.get();
const channel = channels.get(name);
if (channel) return channel;

if (typeof name !== 'string' && typeof name !== 'symbol') {
Expand All @@ -216,12 +230,8 @@ function unsubscribe(name, subscription) {
}

function hasSubscribers(name) {
let channel;
const ref = channels.get(name);
if (ref) channel = ref.get();
if (!channel) {
return false;
}
const channel = channels.get(name);
if (!channel) return false;

return channel.hasSubscribers;
}
Expand Down
7 changes: 7 additions & 0 deletions test/parallel/test-diagnostics-channel-pub-sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ assert.ok(!dc.unsubscribe(name, subscriber));
assert.throws(() => {
dc.subscribe(name, null);
}, { code: 'ERR_INVALID_ARG_TYPE' });

// Reaching zero subscribers should not delete from the channels map as there
// will be no more weakref to incRef if another subscribe happens while the
// channel object itself exists.
channel.subscribe(subscriber);
channel.unsubscribe(subscriber);
channel.subscribe(subscriber);

0 comments on commit 6fb74c7

Please sign in to comment.