Skip to content

Commit 73b5c16

Browse files
ShogunPandamarco-ippolito
authored andcommitted
worker: add postMessageToThread
PR-URL: #53682 Backport-PR-URL: #57101 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de> Reviewed-By: Marco Ippolito <marcoippolito54@gmail.com>
1 parent 9e975f1 commit 73b5c16

14 files changed

+685
-5
lines changed

doc/api/errors.md

+49
Original file line numberDiff line numberDiff line change
@@ -3213,6 +3213,54 @@ The `Worker` instance terminated because it reached its memory limit.
32133213
The path for the main script of a worker is neither an absolute path
32143214
nor a relative path starting with `./` or `../`.
32153215

3216+
<a id="ERR_WORKER_MESSAGING_ERRORED"></a>
3217+
3218+
### `ERR_WORKER_MESSAGING_ERRORED`
3219+
3220+
<!-- YAML
3221+
added: REPLACEME
3222+
-->
3223+
3224+
> Stability: 1.1 - Active development
3225+
3226+
The destination thread threw an error while processing a message sent via [`postMessageToThread()`][].
3227+
3228+
<a id="ERR_WORKER_MESSAGING_FAILED"></a>
3229+
3230+
### `ERR_WORKER_MESSAGING_FAILED`
3231+
3232+
<!-- YAML
3233+
added: REPLACEME
3234+
-->
3235+
3236+
> Stability: 1.1 - Active development
3237+
3238+
The thread requested in [`postMessageToThread()`][] is invalid or has no `workerMessage` listener.
3239+
3240+
<a id="ERR_WORKER_MESSAGING_SAME_THREAD"></a>
3241+
3242+
### `ERR_WORKER_MESSAGING_SAME_THREAD`
3243+
3244+
<!-- YAML
3245+
added: REPLACEME
3246+
-->
3247+
3248+
> Stability: 1.1 - Active development
3249+
3250+
The thread id requested in [`postMessageToThread()`][] is the current thread id.
3251+
3252+
<a id="ERR_WORKER_MESSAGING_TIMEOUT"></a>
3253+
3254+
### `ERR_WORKER_MESSAGING_TIMEOUT`
3255+
3256+
<!-- YAML
3257+
added: REPLACEME
3258+
-->
3259+
3260+
> Stability: 1.1 - Active development
3261+
3262+
Sending a message via [`postMessageToThread()`][] timed out.
3263+
32163264
<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
32173265

32183266
### `ERR_WORKER_UNSERIALIZABLE_ERROR`
@@ -3954,6 +4002,7 @@ An error occurred trying to allocate memory. This should never happen.
39544002
[`new URLSearchParams(iterable)`]: url.md#new-urlsearchparamsiterable
39554003
[`package.json`]: packages.md#nodejs-packagejson-field-definitions
39564004
[`postMessage()`]: worker_threads.md#portpostmessagevalue-transferlist
4005+
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
39574006
[`process.on('exit')`]: process.md#event-exit
39584007
[`process.send()`]: process.md#processsendmessage-sendhandle-options-callback
39594008
[`process.setUncaughtExceptionCaptureCallback()`]: process.md#processsetuncaughtexceptioncapturecallbackfn

doc/api/process.md

+13
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is
327327
likely best for long-running application) or upon process exit (which is likely
328328
most convenient for scripts).
329329

330+
### Event: `'workerMessage'`
331+
332+
<!-- YAML
333+
added: REPLACEME
334+
-->
335+
336+
* `value` {any} A value transmitted using [`postMessageToThread()`][].
337+
* `source` {number} The transmitting worker thread ID or `0` for the main thread.
338+
339+
The `'workerMessage'` event is emitted for any incoming message send by the other
340+
party by using [`postMessageToThread()`][].
341+
330342
### Event: `'uncaughtException'`
331343

332344
<!-- YAML
@@ -4173,6 +4185,7 @@ cases:
41734185
[`net.Server`]: net.md#class-netserver
41744186
[`net.Socket`]: net.md#class-netsocket
41754187
[`os.constants.dlopen`]: os.md#dlopen-constants
4188+
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
41764189
[`process.argv`]: #processargv
41774190
[`process.config`]: #processconfig
41784191
[`process.execPath`]: #processexecpath

doc/api/worker_threads.md

+116
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,118 @@ if (isMainThread) {
220220
}
221221
```
222222

223+
## `worker.postMessageToThread(threadId, value[, transferList][, timeout])`
224+
225+
<!-- YAML
226+
added: REPLACEME
227+
-->
228+
229+
> Stability: 1.1 - Active development
230+
231+
* `destination` {number} The target thread ID. If the thread ID is invalid, a
232+
[`ERR_WORKER_MESSAGING_FAILED`][] error will be thrown. If the target thread ID is the current thread ID,
233+
a [`ERR_WORKER_MESSAGING_SAME_THREAD`][] error will be thrown.
234+
* `value` {any} The value to send.
235+
* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`,
236+
a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown.
237+
See [`port.postMessage()`][] for more information.
238+
* `timeout` {number} Time to wait for the message to be delivered in milliseconds.
239+
By default it's `undefined`, which means wait forever. If the operation times out,
240+
a [`ERR_WORKER_MESSAGING_TIMEOUT`][] error is thrown.
241+
* Returns: {Promise} A promise which is fulfilled if the message was successfully processed by destination thread.
242+
243+
Sends a value to another worker, identified by its thread ID.
244+
245+
If the target thread has no listener for the `workerMessage` event, then the operation will throw
246+
a [`ERR_WORKER_MESSAGING_FAILED`][] error.
247+
248+
If the target thread threw an error while processing the `workerMessage` event, then the operation will throw
249+
a [`ERR_WORKER_MESSAGING_ERRORED`][] error.
250+
251+
This method should be used when the target thread is not the direct
252+
parent or child of the current thread.
253+
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
254+
and the [`worker.postMessage()`][] to let the threads communicate.
255+
256+
The example below shows the use of of `postMessageToThread`: it creates 10 nested threads,
257+
the last one will try to communicate with the main thread.
258+
259+
```mjs
260+
import { fileURLToPath } from 'node:url';
261+
import { once } from 'node:events';
262+
import process from 'node:process';
263+
import {
264+
isMainThread,
265+
postMessageToThread,
266+
threadId,
267+
workerData,
268+
Worker,
269+
} from 'node:worker_threads';
270+
271+
const channel = new BroadcastChannel('sync');
272+
const level = workerData?.level ?? 0;
273+
274+
if (level < 10) {
275+
const worker = new Worker(fileURLToPath(import.meta.url), {
276+
workerData: { level: level + 1 },
277+
});
278+
}
279+
280+
if (level === 0) {
281+
process.on('workerMessage', (value, source) => {
282+
console.log(`${source} -> ${threadId}:`, value);
283+
postMessageToThread(source, { message: 'pong' });
284+
});
285+
} else if (level === 10) {
286+
process.on('workerMessage', (value, source) => {
287+
console.log(`${source} -> ${threadId}:`, value);
288+
channel.postMessage('done');
289+
channel.close();
290+
});
291+
292+
await postMessageToThread(0, { message: 'ping' });
293+
}
294+
295+
channel.onmessage = channel.close;
296+
```
297+
298+
```cjs
299+
const { once } = require('node:events');
300+
const {
301+
isMainThread,
302+
postMessageToThread,
303+
threadId,
304+
workerData,
305+
Worker,
306+
} = require('node:worker_threads');
307+
308+
const channel = new BroadcastChannel('sync');
309+
const level = workerData?.level ?? 0;
310+
311+
if (level < 10) {
312+
const worker = new Worker(__filename, {
313+
workerData: { level: level + 1 },
314+
});
315+
}
316+
317+
if (level === 0) {
318+
process.on('workerMessage', (value, source) => {
319+
console.log(`${source} -> ${threadId}:`, value);
320+
postMessageToThread(source, { message: 'pong' });
321+
});
322+
} else if (level === 10) {
323+
process.on('workerMessage', (value, source) => {
324+
console.log(`${source} -> ${threadId}:`, value);
325+
channel.postMessage('done');
326+
channel.close();
327+
});
328+
329+
postMessageToThread(0, { message: 'ping' });
330+
}
331+
332+
channel.onmessage = channel.close;
333+
```
334+
223335
## `worker.receiveMessageOnPort(port)`
224336
225337
<!-- YAML
@@ -1361,6 +1473,10 @@ thread spawned will spawn another until the application crashes.
13611473
[`Buffer.allocUnsafe()`]: buffer.md#static-method-bufferallocunsafesize
13621474
[`Buffer`]: buffer.md
13631475
[`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.md#err_missing_message_port_in_transfer_list
1476+
[`ERR_WORKER_MESSAGING_ERRORED`]: errors.md#err_worker_messaging_errored
1477+
[`ERR_WORKER_MESSAGING_FAILED`]: errors.md#err_worker_messaging_failed
1478+
[`ERR_WORKER_MESSAGING_SAME_THREAD`]: errors.md#err_worker_messaging_same_thread
1479+
[`ERR_WORKER_MESSAGING_TIMEOUT`]: errors.md#err_worker_messaging_timeout
13641480
[`ERR_WORKER_NOT_RUNNING`]: errors.md#err_worker_not_running
13651481
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
13661482
[`FileHandle`]: fs.md#class-filehandle

lib/internal/errors.js

+4
Original file line numberDiff line numberDiff line change
@@ -1911,6 +1911,10 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
19111911
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
19121912
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
19131913
Error);
1914+
E('ERR_WORKER_MESSAGING_ERRORED', 'The destination thread threw an error while processing the message', Error);
1915+
E('ERR_WORKER_MESSAGING_FAILED', 'Cannot find the destination thread or listener', Error);
1916+
E('ERR_WORKER_MESSAGING_SAME_THREAD', 'Cannot sent a message to the same thread', Error);
1917+
E('ERR_WORKER_MESSAGING_TIMEOUT', 'Sending a message to another thread timed out', Error);
19141918
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
19151919
E('ERR_WORKER_OUT_OF_MEMORY',
19161920
'Worker terminated due to reaching memory limit: %s', Error);

lib/internal/main/worker_thread.js

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ const {
4444
kStdioWantsMoreDataCallback,
4545
} = workerIo;
4646

47+
const { setupMainThreadPort } = require('internal/worker/messaging');
48+
4749
const {
4850
onGlobalUncaughtException,
4951
} = require('internal/process/execution');
@@ -99,6 +101,7 @@ port.on('message', (message) => {
99101
manifestURL,
100102
publicPort,
101103
workerData,
104+
mainThreadPort,
102105
} = message;
103106

104107
if (doEval !== 'internal') {
@@ -112,6 +115,7 @@ port.on('message', (message) => {
112115
}
113116

114117
require('internal/worker').assignEnvironmentData(environmentData);
118+
setupMainThreadPort(mainThreadPort);
115119

116120
if (SharedArrayBuffer !== undefined && Atomics !== undefined) {
117121
// The counter is only passed to the workers created by the main thread,

lib/internal/worker.js

+11-4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const {
5656
ReadableWorkerStdio,
5757
WritableWorkerStdio,
5858
} = workerIo;
59+
const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker/messaging');
5960
const { deserializeError } = require('internal/error_serdes');
6061
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6162
const { kEmptyObject } = require('internal/util');
@@ -251,14 +252,18 @@ class Worker extends EventEmitter {
251252

252253
this[kParentSideStdio] = { stdin, stdout, stderr };
253254

254-
const { port1, port2 } = new MessageChannel();
255-
const transferList = [port2];
255+
const mainThreadPortToWorker = createMainThreadPort(this.threadId);
256+
const {
257+
port1: publicPortToParent,
258+
port2: publicPortToWorker,
259+
} = new MessageChannel();
260+
const transferList = [mainThreadPortToWorker, publicPortToWorker];
256261
// If transferList is provided.
257262
if (options.transferList)
258263
ArrayPrototypePush(transferList,
259264
...new SafeArrayIterator(options.transferList));
260265

261-
this[kPublicPort] = port1;
266+
this[kPublicPort] = publicPortToParent;
262267
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
263268
this[kPublicPort].on(event, (message) => this.emit(event, message));
264269
});
@@ -272,14 +277,15 @@ class Worker extends EventEmitter {
272277
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
273278
workerData: options.workerData,
274279
environmentData,
275-
publicPort: port2,
276280
manifestURL: getOptionValue('--experimental-policy') ?
277281
require('internal/process/policy').url :
278282
null,
279283
manifestSrc: getOptionValue('--experimental-policy') ?
280284
require('internal/process/policy').src :
281285
null,
282286
hasStdin: !!options.stdin,
287+
publicPort: publicPortToWorker,
288+
mainThreadPort: mainThreadPortToWorker,
283289
}, transferList);
284290
// Use this to cache the Worker's loopStart value once available.
285291
this[kLoopStartTime] = -1;
@@ -302,6 +308,7 @@ class Worker extends EventEmitter {
302308
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
303309
drainMessagePort(this[kPublicPort]);
304310
drainMessagePort(this[kPort]);
311+
destroyMainThreadPort(this.threadId);
305312
this.removeAllListeners('message');
306313
this.removeAllListeners('messageerrors');
307314
this[kPublicPort].unref();

0 commit comments

Comments
 (0)