diff --git a/doc/api/deprecations.md b/doc/api/deprecations.md index 9dd2f7628d84b0..64b577b149a17b 100644 --- a/doc/api/deprecations.md +++ b/doc/api/deprecations.md @@ -3145,6 +3145,24 @@ parameter in [`fs.write()`][], [`fs.writeFile()`][], [`fs.appendFile()`][], [`fs.writeFileSync()`][], and [`fs.appendFileSync()`][] is deprecated. Convert them to primitive strings. +### DEP0163: `channel.subscribe(onMessage)`, `channel.unsubscribe(onMessage)` + + + +Type: Documentation-only + +These methods were deprecated because they can be used in a way which does not +hold the channel reference alive long enough to receive the events. + +Use [`diagnostics_channel.subscribe(name, onMessage)`][] or +[`diagnostics_channel.unsubscribe(name, onMessage)`][] which does the same +thing instead. + [Legacy URL API]: url.md#legacy-url-api [NIST SP 800-38D]: https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-38d.pdf [RFC 6066]: https://tools.ietf.org/html/rfc6066#section-3 @@ -3185,6 +3203,8 @@ Convert them to primitive strings. [`crypto.scrypt()`]: crypto.md#cryptoscryptpassword-salt-keylen-options-callback [`decipher.final()`]: crypto.md#decipherfinaloutputencoding [`decipher.setAuthTag()`]: crypto.md#deciphersetauthtagbuffer-encoding +[`diagnostics_channel.subscribe(name, onMessage)`]: diagnostics_channel.md#diagnostics_channelsubscribename-onmessage +[`diagnostics_channel.unsubscribe(name, onMessage)`]: diagnostics_channel.md#diagnostics_channelunsubscribename-onmessage [`dns.lookup()`]: dns.md#dnslookuphostname-options-callback [`dnsPromises.lookup()`]: dns.md#dnspromiseslookuphostname-options [`domain`]: domain.md diff --git a/doc/api/diagnostics_channel.md b/doc/api/diagnostics_channel.md index 75d714d933ab07..04a602195848a3 100644 --- a/doc/api/diagnostics_channel.md +++ b/doc/api/diagnostics_channel.md @@ -43,10 +43,12 @@ import diagnostics_channel from 'node:diagnostics_channel'; // Get a reusable channel object const channel = diagnostics_channel.channel('my-channel'); -// Subscribe to the channel -channel.subscribe((message, name) => { +function onMessage(message, name) { // Received data -}); +} + +// Subscribe to the channel +diagnostics_channel.subscribe('my-channel', onMessage); // Check if the channel has an active subscriber if (channel.hasSubscribers) { @@ -55,6 +57,9 @@ if (channel.hasSubscribers) { some: 'data' }); } + +// Unsubscribe from the channel +diagnostics_channel.unsubscribe('my-channel', onMessage); ``` ```cjs @@ -63,10 +68,12 @@ const diagnostics_channel = require('node:diagnostics_channel'); // Get a reusable channel object const channel = diagnostics_channel.channel('my-channel'); -// Subscribe to the channel -channel.subscribe((message, name) => { +function onMessage(message, name) { // Received data -}); +} + +// Subscribe to the channel +diagnostics_channel.subscribe('my-channel', onMessage); // Check if the channel has an active subscriber if (channel.hasSubscribers) { @@ -75,6 +82,9 @@ if (channel.hasSubscribers) { some: 'data' }); } + +// Unsubscribe from the channel +diagnostics_channel.unsubscribe('my-channel', onMessage); ``` #### `diagnostics_channel.hasSubscribers(name)` @@ -121,7 +131,7 @@ added: * `name` {string|symbol} The channel name * Returns: {Channel} The named channel object -This is the primary entry-point for anyone wanting to interact with a named +This is the primary entry-point for anyone wanting to publish to a named channel. It produces a channel object which is optimized to reduce overhead at publish time as much as possible. @@ -137,6 +147,76 @@ const diagnostics_channel = require('node:diagnostics_channel'); const channel = diagnostics_channel.channel('my-channel'); ``` +#### `diagnostics_channel.subscribe(name, onMessage)` + + + +* `name` {string|symbol} The channel name +* `onMessage` {Function} The handler to receive channel messages + * `message` {any} The message data + * `name` {string|symbol} The name of the channel + +Register a message handler to subscribe to this channel. This message handler +will be run synchronously whenever a message is published to the channel. Any +errors thrown in the message handler will trigger an [`'uncaughtException'`][]. + +```mjs +import diagnostics_channel from 'diagnostics_channel'; + +diagnostics_channel.subscribe('my-channel', (message, name) => { + // Received data +}); +``` + +```cjs +const diagnostics_channel = require('diagnostics_channel'); + +diagnostics_channel.subscribe('my-channel', (message, name) => { + // Received data +}); +``` + +#### `diagnostics_channel.unsubscribe(name, onMessage)` + + + +* `name` {string|symbol} The channel name +* `onMessage` {Function} The previous subscribed handler to remove +* Returns: {boolean} `true` if the handler was found, `false` otherwise. + +Remove a message handler previously registered to this channel with +[`diagnostics_channel.subscribe(name, onMessage)`][]. + +```mjs +import diagnostics_channel from 'diagnostics_channel'; + +function onMessage(message, name) { + // Received data +} + +diagnostics_channel.subscribe('my-channel', onMessage); + +diagnostics_channel.unsubscribe('my-channel', onMessage); +``` + +```cjs +const diagnostics_channel = require('diagnostics_channel'); + +function onMessage(message, name) { + // Received data +} + +diagnostics_channel.subscribe('my-channel', onMessage); + +diagnostics_channel.unsubscribe('my-channel', onMessage); +``` + ### Class: `Channel` +> Stability: 0 - Deprecated: Use [`diagnostics_channel.subscribe(name, onMessage)`][] + * `onMessage` {Function} The handler to receive channel messages * `message` {any} The message data * `name` {string|symbol} The name of the channel @@ -264,6 +347,7 @@ channel.subscribe((message, name) => { added: - v15.1.0 - v14.17.0 +deprecated: REPLACEME changes: - version: - v17.1.0 @@ -273,6 +357,8 @@ changes: description: Added return value. Added to channels without subscribers. --> +> Stability: 0 - Deprecated: Use [`diagnostics_channel.unsubscribe(name, onMessage)`][] + * `onMessage` {Function} The previous subscribed handler to remove * Returns: {boolean} `true` if the handler was found, `false` otherwise. @@ -345,3 +431,5 @@ Emitted when server sends a response. [`'uncaughtException'`]: process.md#event-uncaughtexception [`channel.subscribe(onMessage)`]: #channelsubscribeonmessage [`diagnostics_channel.channel(name)`]: #diagnostics_channelchannelname +[`diagnostics_channel.subscribe(name, onMessage)`]: #diagnostics_channelsubscribename-onmessage +[`diagnostics_channel.unsubscribe(name, onMessage)`]: #diagnostics_channelunsubscribename-onmessage diff --git a/lib/diagnostics_channel.js b/lib/diagnostics_channel.js index c85a9532b4d372..f8c1edb96dfe8a 100644 --- a/lib/diagnostics_channel.js +++ b/lib/diagnostics_channel.js @@ -109,6 +109,22 @@ function channel(name) { return channel; } +function subscribe(name, subscription) { + const chan = channel(name); + channels[name].incRef(); + chan.subscribe(subscription); +} + +function unsubscribe(name, subscription) { + const chan = channel(name); + if (!chan.unsubscribe(subscription)) { + return false; + } + + channels[name].decRef(); + return true; +} + function hasSubscribers(name) { let channel; const ref = channels[name]; @@ -123,5 +139,7 @@ function hasSubscribers(name) { module.exports = { channel, hasSubscribers, + subscribe, + unsubscribe, Channel }; diff --git a/test/parallel/test-diagnostics-channel-pub-sub.js b/test/parallel/test-diagnostics-channel-pub-sub.js new file mode 100644 index 00000000000000..2317d90dbbc554 --- /dev/null +++ b/test/parallel/test-diagnostics-channel-pub-sub.js @@ -0,0 +1,44 @@ +'use strict'; + +const common = require('../common'); +const dc = require('diagnostics_channel'); +const assert = require('assert'); +const { Channel } = dc; + +const name = 'test'; +const input = { + foo: 'bar' +}; + +// Individual channel objects can be created to avoid future lookups +const channel = dc.channel(name); +assert.ok(channel instanceof Channel); + +// No subscribers yet, should not publish +assert.ok(!channel.hasSubscribers); + +const subscriber = common.mustCall((message, name) => { + assert.strictEqual(name, channel.name); + assert.deepStrictEqual(message, input); +}); + +// Now there's a subscriber, should publish +dc.subscribe(name, subscriber); +assert.ok(channel.hasSubscribers); + +// The ActiveChannel prototype swap should not fail instanceof +assert.ok(channel instanceof Channel); + +// Should trigger the subscriber once +channel.publish(input); + +// Should not publish after subscriber is unsubscribed +assert.ok(dc.unsubscribe(name, subscriber)); +assert.ok(!channel.hasSubscribers); + +// unsubscribe() should return false when subscriber is not found +assert.ok(!dc.unsubscribe(name, subscriber)); + +assert.throws(() => { + dc.subscribe(name, null); +}, { code: 'ERR_INVALID_ARG_TYPE' });