Skip to content

Commit

Permalink
worker: add postMessageToThread
Browse files Browse the repository at this point in the history
PR-URL: #53682
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
Reviewed-By: Marco Ippolito <marcoippolito54@gmail.com>
  • Loading branch information
ShogunPanda authored and aduh95 committed Jul 16, 2024
1 parent 5ae8ea4 commit 22ca334
Show file tree
Hide file tree
Showing 14 changed files with 677 additions and 5 deletions.
49 changes: 49 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3105,6 +3105,54 @@ The `Worker` instance terminated because it reached its memory limit.
The path for the main script of a worker is neither an absolute path
nor a relative path starting with `./` or `../`.

<a id="ERR_WORKER_MESSAGING_ERRORED"></a>

### `ERR_WORKER_MESSAGING_ERRORED`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development
The destination thread threw an error while processing a message sent via [`postMessageToThread()`][].

<a id="ERR_WORKER_MESSAGING_FAILED"></a>

### `ERR_WORKER_MESSAGING_FAILED`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development
The thread requested in [`postMessageToThread()`][] is invalid or has no `workerMessage` listener.

<a id="ERR_WORKER_MESSAGING_SAME_THREAD"></a>

### `ERR_WORKER_MESSAGING_SAME_THREAD`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development
The thread id requested in [`postMessageToThread()`][] is the current thread id.

<a id="ERR_WORKER_MESSAGING_TIMEOUT"></a>

### `ERR_WORKER_MESSAGING_TIMEOUT`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development
Sending a message via [`postMessageToThread()`][] timed out.

<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>

### `ERR_WORKER_UNSERIALIZABLE_ERROR`
Expand Down Expand Up @@ -4017,6 +4065,7 @@ An error occurred trying to allocate memory. This should never happen.
[`new URLSearchParams(iterable)`]: url.md#new-urlsearchparamsiterable
[`package.json`]: packages.md#nodejs-packagejson-field-definitions
[`postMessage()`]: worker_threads.md#portpostmessagevalue-transferlist
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
[`process.on('exit')`]: process.md#event-exit
[`process.send()`]: process.md#processsendmessage-sendhandle-options-callback
[`process.setUncaughtExceptionCaptureCallback()`]: process.md#processsetuncaughtexceptioncapturecallbackfn
Expand Down
13 changes: 13 additions & 0 deletions doc/api/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is
likely best for long-running application) or upon process exit (which is likely
most convenient for scripts).

### Event: `'workerMessage'`

<!-- YAML
added: REPLACEME
-->

* `value` {any} A value transmitted using [`postMessageToThread()`][].
* `source` {number} The transmitting worker thread ID or `0` for the main thread.

The `'workerMessage'` event is emitted for any incoming message send by the other
party by using [`postMessageToThread()`][].

### Event: `'uncaughtException'`

<!-- YAML
Expand Down Expand Up @@ -4065,6 +4077,7 @@ cases:
[`net.Server`]: net.md#class-netserver
[`net.Socket`]: net.md#class-netsocket
[`os.constants.dlopen`]: os.md#dlopen-constants
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
[`process.argv`]: #processargv
[`process.config`]: #processconfig
[`process.execPath`]: #processexecpath
Expand Down
116 changes: 116 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,118 @@ if (isMainThread) {
}
```

## `worker.postMessageToThread(threadId, value[, transferList][, timeout])`

<!-- YAML
added: REPLACEME
-->

> Stability: 1.1 - Active development
* `destination` {number} The target thread ID. If the thread ID is invalid, a
[`ERR_WORKER_MESSAGING_FAILED`][] error will be thrown. If the target thread ID is the current thread ID,
a [`ERR_WORKER_MESSAGING_SAME_THREAD`][] error will be thrown.
* `value` {any} The value to send.
* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`,
a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown.
See [`port.postMessage()`][] for more information.
* `timeout` {number} Time to wait for the message to be delivered in milliseconds.
By default it's `undefined`, which means wait forever. If the operation times out,
a [`ERR_WORKER_MESSAGING_TIMEOUT`][] error is thrown.
* Returns: {Promise} A promise which is fulfilled if the message was successfully processed by destination thread.

Sends a value to another worker, identified by its thread ID.

If the target thread has no listener for the `workerMessage` event, then the operation will throw
a [`ERR_WORKER_MESSAGING_FAILED`][] error.

If the target thread threw an error while processing the `workerMessage` event, then the operation will throw
a [`ERR_WORKER_MESSAGING_ERRORED`][] error.

This method should be used when the target thread is not the direct
parent or child of the current thread.
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
and the [`worker.postMessage()`][] to let the threads communicate.

The example below shows the use of of `postMessageToThread`: it creates 10 nested threads,
the last one will try to communicate with the main thread.

```mjs
import { fileURLToPath } from 'node:url';
import { once } from 'node:events';
import process from 'node:process';
import {
isMainThread,
postMessageToThread,
threadId,
workerData,
Worker,
} from 'node:worker_threads';

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
const worker = new Worker(fileURLToPath(import.meta.url), {
workerData: { level: level + 1 },
});
}

if (level === 0) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
postMessageToThread(source, { message: 'pong' });
});
} else if (level === 10) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
channel.postMessage('done');
channel.close();
});

await postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;
```
```cjs
const { once } = require('node:events');
const {
isMainThread,
postMessageToThread,
threadId,
workerData,
Worker,
} = require('node:worker_threads');

const channel = new BroadcastChannel('sync');
const level = workerData?.level ?? 0;

if (level < 10) {
const worker = new Worker(__filename, {
workerData: { level: level + 1 },
});
}

if (level === 0) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
postMessageToThread(source, { message: 'pong' });
});
} else if (level === 10) {
process.on('workerMessage', (value, source) => {
console.log(`${source} -> ${threadId}:`, value);
channel.postMessage('done');
channel.close();
});

postMessageToThread(0, { message: 'ping' });
}

channel.onmessage = channel.close;
```
## `worker.receiveMessageOnPort(port)`
<!-- YAML
Expand Down Expand Up @@ -1399,6 +1511,10 @@ thread spawned will spawn another until the application crashes.
[`Buffer.allocUnsafe()`]: buffer.md#static-method-bufferallocunsafesize
[`Buffer`]: buffer.md
[`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.md#err_missing_message_port_in_transfer_list
[`ERR_WORKER_MESSAGING_ERRORED`]: errors.md#err_worker_messaging_errored
[`ERR_WORKER_MESSAGING_FAILED`]: errors.md#err_worker_messaging_failed
[`ERR_WORKER_MESSAGING_SAME_THREAD`]: errors.md#err_worker_messaging_same_thread
[`ERR_WORKER_MESSAGING_TIMEOUT`]: errors.md#err_worker_messaging_timeout
[`ERR_WORKER_NOT_RUNNING`]: errors.md#err_worker_not_running
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
[`FileHandle`]: fs.md#class-filehandle
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,10 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
Error);
E('ERR_WORKER_MESSAGING_ERRORED', 'The destination thread threw an error while processing the message', Error);
E('ERR_WORKER_MESSAGING_FAILED', 'Cannot find the destination thread or listener', Error);
E('ERR_WORKER_MESSAGING_SAME_THREAD', 'Cannot sent a message to the same thread', Error);
E('ERR_WORKER_MESSAGING_TIMEOUT', 'Sending a message to another thread timed out', Error);
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
E('ERR_WORKER_OUT_OF_MEMORY',
'Worker terminated due to reaching memory limit: %s', Error);
Expand Down
4 changes: 4 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const {
kStdioWantsMoreDataCallback,
} = workerIo;

const { setupMainThreadPort } = require('internal/worker/messaging');

const {
onGlobalUncaughtException,
} = require('internal/process/execution');
Expand Down Expand Up @@ -96,6 +98,7 @@ port.on('message', (message) => {
hasStdin,
publicPort,
workerData,
mainThreadPort,
} = message;

if (doEval !== 'internal') {
Expand All @@ -109,6 +112,7 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
setupMainThreadPort(mainThreadPort);

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
15 changes: 11 additions & 4 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const {
ReadableWorkerStdio,
WritableWorkerStdio,
} = workerIo;
const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker/messaging');
const { deserializeError } = require('internal/error_serdes');
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
const { kEmptyObject } = require('internal/util');
Expand Down Expand Up @@ -250,14 +251,18 @@ class Worker extends EventEmitter {

this[kParentSideStdio] = { stdin, stdout, stderr };

const { port1, port2 } = new MessageChannel();
const transferList = [port2];
const mainThreadPortToWorker = createMainThreadPort(this.threadId);
const {
port1: publicPortToParent,
port2: publicPortToWorker,
} = new MessageChannel();
const transferList = [mainThreadPortToWorker, publicPortToWorker];
// If transferList is provided.
if (options.transferList)
ArrayPrototypePush(transferList,
...new SafeArrayIterator(options.transferList));

this[kPublicPort] = port1;
this[kPublicPort] = publicPortToParent;
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
this[kPublicPort].on(event, (message) => this.emit(event, message));
});
Expand All @@ -271,8 +276,9 @@ class Worker extends EventEmitter {
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
environmentData,
publicPort: port2,
hasStdin: !!options.stdin,
publicPort: publicPortToWorker,
mainThreadPort: mainThreadPortToWorker,
}, transferList);
// Use this to cache the Worker's loopStart value once available.
this[kLoopStartTime] = -1;
Expand All @@ -295,6 +301,7 @@ class Worker extends EventEmitter {
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
drainMessagePort(this[kPublicPort]);
drainMessagePort(this[kPort]);
destroyMainThreadPort(this.threadId);
this.removeAllListeners('message');
this.removeAllListeners('messageerrors');
this[kPublicPort].unref();
Expand Down
Loading

0 comments on commit 22ca334

Please sign in to comment.