Skip to content

Commit 91d4d65

Browse files
committed
worker: add connect and setConnectionsListener
1 parent 2333573 commit 91d4d65

14 files changed

+476
-11
lines changed

doc/api/errors.md

+31
Original file line numberDiff line numberDiff line change
@@ -3072,6 +3072,16 @@ added: v18.1.0
30723072
The `Response` that has been passed to `WebAssembly.compileStreaming` or to
30733073
`WebAssembly.instantiateStreaming` is not a valid WebAssembly response.
30743074

3075+
<a id="ERR_WORKER_CONNECTION_REFUSED"></a>
3076+
3077+
### `ERR_WORKER_CONNECTION_REFUSED`
3078+
3079+
<!-- YAML
3080+
added: REPLACEME
3081+
-->
3082+
3083+
The thread requested in [`connect()`][] refused the connection or has no connections listener provided.
3084+
30753085
<a id="ERR_WORKER_INIT_FAILED"></a>
30763086

30773087
### `ERR_WORKER_INIT_FAILED`
@@ -3085,6 +3095,16 @@ The `Worker` initialization failed.
30853095
The `execArgv` option passed to the `Worker` constructor contains
30863096
invalid flags.
30873097

3098+
<a id="ERR_WORKER_INVALID_ID"></a>
3099+
3100+
### `ERR_WORKER_INVALID_ID`
3101+
3102+
<!-- YAML
3103+
added: REPLACEME
3104+
-->
3105+
3106+
The thread id requested in [`connect()`][] is invalid.
3107+
30883108
<a id="ERR_WORKER_NOT_RUNNING"></a>
30893109

30903110
### `ERR_WORKER_NOT_RUNNING`
@@ -3104,6 +3124,16 @@ The `Worker` instance terminated because it reached its memory limit.
31043124
The path for the main script of a worker is neither an absolute path
31053125
nor a relative path starting with `./` or `../`.
31063126

3127+
<a id="ERR_WORKER_SAME_THREAD"></a>
3128+
3129+
### `ERR_WORKER_SAME_THREAD`
3130+
3131+
<!-- YAML
3132+
added: REPLACEME
3133+
-->
3134+
3135+
The thread id requested in [`connect()`][] is the current thread id.
3136+
31073137
<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
31083138

31093139
### `ERR_WORKER_UNSERIALIZABLE_ERROR`
@@ -3999,6 +4029,7 @@ An error occurred trying to allocate memory. This should never happen.
39994029
[`Writable`]: stream.md#class-streamwritable
40004030
[`child_process`]: child_process.md
40014031
[`cipher.getAuthTag()`]: crypto.md#ciphergetauthtag
4032+
[`connect()`]: worker_threads.md#workerconnecttarget-data
40024033
[`crypto.getDiffieHellman()`]: crypto.md#cryptogetdiffiehellmangroupname
40034034
[`crypto.scrypt()`]: crypto.md#cryptoscryptpassword-salt-keylen-options-callback
40044035
[`crypto.scryptSync()`]: crypto.md#cryptoscryptsyncpassword-salt-keylen-options

doc/api/worker_threads.md

+44
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,24 @@ Worker threads inherit non-process-specific options by default. Refer to
6161
[`Worker constructor options`][] to know how to customize worker thread options,
6262
specifically `argv` and `execArgv` options.
6363

64+
## `worker.connect(target, data)`
65+
66+
<!-- YAML
67+
added: REPLACEME
68+
-->
69+
70+
> Stability: 1.1 - Active development
71+
72+
* `target` {number} The target thread id.
73+
* `data` {any} Any arbitrary, cloneable JavaScript value.
74+
* Returns: {Promise} A promise for a `MessagePort`.
75+
76+
Establishes a connection to another worker thread in the same process, returning a
77+
`MessagePort` that can be used for the communication.
78+
79+
The target thread must have a connection listener setup via [`worker.setConnectionsListener()`][]
80+
otherwise the connection request will fail.
81+
6482
## `worker.getEnvironmentData(key)`
6583

6684
<!-- YAML
@@ -325,6 +343,30 @@ new Worker('process.env.SET_IN_WORKER = "foo"', { eval: true, env: SHARE_ENV })
325343
});
326344
```
327345

346+
## `worker.setConnectionsListener(fn)`
347+
348+
<!-- YAML
349+
added: REPLACEME
350+
-->
351+
352+
> Stability: 1.1 - Active development
353+
354+
* `fn` {Function} A callback to be executed when [`worker.connect()`][] is called from another thread.
355+
The function will receive the following arguments:
356+
357+
* `sender` {number} The other thread id.
358+
* `port` {MessagePort} The port than can be used to communicate with the other thread.
359+
* `data` {any} The data passed to [`worker.connect()`][].
360+
361+
The function must return `true` to accept the connection or any other value to
362+
refuse the connection. If the function returns a `Promise`, it will be awaited.
363+
364+
Sets the callback that handles connection from other worker threads in the same process.
365+
If the callback is `null` or `undefined` then the current listener is removed.
366+
367+
When no listeners are present (the default) all connection requests are immediately
368+
refused.
369+
328370
## `worker.setEnvironmentData(key[, value])`
329371

330372
<!-- YAML
@@ -1437,8 +1479,10 @@ thread spawned will spawn another until the application crashes.
14371479
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
14381480
[`vm`]: vm.md
14391481
[`worker.SHARE_ENV`]: #workershare_env
1482+
[`worker.connect()`]: #workerconnecttarget-data
14401483
[`worker.on('message')`]: #event-message_1
14411484
[`worker.postMessage()`]: #workerpostmessagevalue-transferlist
1485+
[`worker.setConnectionsListener()`]: #workersetconnectionslistenerfn
14421486
[`worker.terminate()`]: #workerterminate
14431487
[`worker.threadId`]: #workerthreadid_1
14441488
[async-resource-worker-pool]: async_context.md#using-asyncresource-for-a-worker-thread-pool

lib/internal/errors.js

+3
Original file line numberDiff line numberDiff line change
@@ -1858,10 +1858,12 @@ E('ERR_VM_MODULE_NOT_MODULE',
18581858
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
18591859
E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
18601860
E('ERR_WEBASSEMBLY_RESPONSE', 'WebAssembly response %s', TypeError);
1861+
E('ERR_WORKER_CONNECTION_REFUSED', 'Connection refused from worker', Error);
18611862
E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
18621863
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
18631864
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
18641865
Error);
1866+
E('ERR_WORKER_INVALID_ID', 'Invalid worker id %d', Error);
18651867
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
18661868
E('ERR_WORKER_OUT_OF_MEMORY',
18671869
'Worker terminated due to reaching memory limit: %s', Error);
@@ -1876,6 +1878,7 @@ E('ERR_WORKER_PATH', (filename) =>
18761878
) +
18771879
` Received "${filename}"`,
18781880
TypeError);
1881+
E('ERR_WORKER_SAME_THREAD', 'Cannot connect to the same thread', Error);
18791882
E('ERR_WORKER_UNSERIALIZABLE_ERROR',
18801883
'Serializing an uncaught exception failed', Error);
18811884
E('ERR_WORKER_UNSUPPORTED_OPERATION',

lib/internal/main/worker_thread.js

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const {
2828
getEnvMessagePort,
2929
} = internalBinding('worker');
3030

31+
const { processConnectionRequest } = require('internal/worker');
3132
const workerIo = require('internal/worker/io');
3233
const {
3334
messageTypes: {
@@ -40,6 +41,7 @@ const {
4041
// Messages that may be either received or posted
4142
STDIO_PAYLOAD,
4243
STDIO_WANTS_MORE_DATA,
44+
CONNECT,
4345
},
4446
kStdioWantsMoreDataCallback,
4547
} = workerIo;
@@ -182,6 +184,8 @@ port.on('message', (message) => {
182184
break;
183185
}
184186
}
187+
} else if (message.type === CONNECT) {
188+
processConnectionRequest(message);
185189
} else if (message.type === STDIO_PAYLOAD) {
186190
const { stream, chunks } = message;
187191
ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {

lib/internal/worker.js

+97-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ const {
55
ArrayPrototypeMap,
66
ArrayPrototypePush,
77
AtomicsAdd,
8+
AtomicsNotify,
9+
AtomicsStore,
10+
AtomicsWaitAsync,
811
Float64Array,
912
FunctionPrototypeBind,
13+
Int32Array,
1014
JSONStringify,
1115
MathMax,
1216
ObjectEntries,
@@ -34,12 +38,14 @@ const {
3438

3539
const errorCodes = require('internal/errors').codes;
3640
const {
41+
ERR_INVALID_ARG_TYPE,
42+
ERR_INVALID_ARG_VALUE,
43+
ERR_WORKER_CONNECTION_REFUSED,
44+
ERR_WORKER_INVALID_EXEC_ARGV,
3745
ERR_WORKER_NOT_RUNNING,
3846
ERR_WORKER_PATH,
47+
ERR_WORKER_SAME_THREAD,
3948
ERR_WORKER_UNSERIALIZABLE_ERROR,
40-
ERR_WORKER_INVALID_EXEC_ARGV,
41-
ERR_INVALID_ARG_TYPE,
42-
ERR_INVALID_ARG_VALUE,
4349
} = errorCodes;
4450

4551
const workerIo = require('internal/worker/io');
@@ -59,7 +65,7 @@ const {
5965
const { deserializeError } = require('internal/error_serdes');
6066
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
6167
const { kEmptyObject } = require('internal/util');
62-
const { validateArray, validateString } = require('internal/validators');
68+
const { validateArray, validateFunction, validateString } = require('internal/validators');
6369
const {
6470
throwIfBuildingSnapshot,
6571
} = require('internal/v8/startup_snapshot');
@@ -74,6 +80,8 @@ const {
7480
kCodeRangeSizeMb,
7581
kStackSizeMb,
7682
kTotalResourceLimitCount,
83+
sendToWorker,
84+
setMainPort,
7785
} = internalBinding('worker');
7886

7987
const kHandle = Symbol('kHandle');
@@ -100,6 +108,14 @@ let cwdCounter;
100108

101109
const environmentData = new SafeMap();
102110

111+
// SharedArrayBuffer must always be Int32, so it's * 4.
112+
// We need one for the operation status (performing / performed) and one for the result (success / failure).
113+
const WORKER_MESSAGING_SHARED_DATA = 2 * 4;
114+
const WORKER_MESSAGING_STATUS_INDEX = 0;
115+
const WORKER_MESSAGING_RESULT_INDEX = 1;
116+
let connectionsListener = null;
117+
let mainPortWasSetup = false;
118+
103119
// SharedArrayBuffers can be disabled with --enable-sharedarraybuffer-per-context.
104120
if (isMainThread && SharedArrayBuffer !== undefined) {
105121
cwdCounter = new Uint32Array(new SharedArrayBuffer(4));
@@ -527,6 +543,79 @@ function eventLoopUtilization(util1, util2) {
527543
);
528544
}
529545

546+
function setConnectionsListener(fn) {
547+
if (isMainThread && !mainPortWasSetup) {
548+
setupMainPort();
549+
mainPortWasSetup = true;
550+
}
551+
552+
if (typeof fn === 'undefined' || fn === null) {
553+
connectionsListener = null;
554+
return;
555+
}
556+
557+
validateFunction(fn, 'fn');
558+
connectionsListener = fn;
559+
}
560+
561+
async function processConnectionRequest(request) {
562+
const status = new Int32Array(request.memory);
563+
564+
try {
565+
const result = await connectionsListener?.(request.sender, request.port, request.data);
566+
AtomicsStore(status, WORKER_MESSAGING_RESULT_INDEX, result === true ? 0 : 1);
567+
} catch (e) {
568+
debug('connections listener rejected', e);
569+
AtomicsStore(status, WORKER_MESSAGING_RESULT_INDEX, 2);
570+
} finally {
571+
AtomicsNotify(status, WORKER_MESSAGING_STATUS_INDEX, 1);
572+
}
573+
}
574+
575+
async function connect(target, data) {
576+
if (target === threadId) {
577+
throw new ERR_WORKER_SAME_THREAD();
578+
}
579+
580+
// Create a shared array to exchange the status and the result
581+
const memory = new SharedArrayBuffer(WORKER_MESSAGING_SHARED_DATA);
582+
const status = new Int32Array(memory);
583+
const promise = AtomicsWaitAsync(status, WORKER_MESSAGING_STATUS_INDEX, 0).value;
584+
585+
// Create the channel and send it to the other thread
586+
const { port1, port2 } = new MessageChannel();
587+
sendToWorker(target, { type: messageTypes.CONNECT, sender: threadId, port: port2, memory, data }, [port2]);
588+
589+
// Wait for the response
590+
await promise;
591+
592+
if (status[WORKER_MESSAGING_RESULT_INDEX] === 1) {
593+
port1.close();
594+
port2.close();
595+
throw new ERR_WORKER_CONNECTION_REFUSED();
596+
}
597+
598+
return port1;
599+
}
600+
601+
function setupMainPort() {
602+
const { port1, port2 } = new MessageChannel();
603+
setMainPort(port2);
604+
605+
// Set message management
606+
port1.on('message', (message) => {
607+
if (message.type === messageTypes.CONNECT) {
608+
processConnectionRequest(message);
609+
} else {
610+
assert(message.type === messageTypes.CONNECT, `Unknown worker message type ${message.type}`);
611+
}
612+
});
613+
614+
// Never block the process on this channel
615+
port1.unref();
616+
port2.unref();
617+
}
618+
530619
module.exports = {
531620
ownsProcessState,
532621
kIsOnline,
@@ -537,6 +626,10 @@ module.exports = {
537626
setEnvironmentData,
538627
getEnvironmentData,
539628
assignEnvironmentData,
629+
setConnectionsListener,
630+
setupMainPort,
631+
processConnectionRequest,
632+
connect,
540633
threadId,
541634
InternalWorker,
542635
Worker,

lib/internal/worker/io.js

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ const messageTypes = {
8383
STDIO_PAYLOAD: 'stdioPayload',
8484
STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData',
8585
LOAD_SCRIPT: 'loadScript',
86+
CONNECT: 'connect',
8687
};
8788

8889
// createFastMessageEvent skips webidl argument validation when the arguments

lib/worker_threads.js

+4
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ const {
66
resourceLimits,
77
setEnvironmentData,
88
getEnvironmentData,
9+
connect,
910
threadId,
1011
Worker,
12+
setConnectionsListener,
1113
} = require('internal/worker');
1214

1315
const {
@@ -40,4 +42,6 @@ module.exports = {
4042
BroadcastChannel,
4143
setEnvironmentData,
4244
getEnvironmentData,
45+
setConnectionsListener,
46+
connect,
4347
};

src/node_errors.h

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details);
102102
V(ERR_VM_MODULE_LINK_FAILURE, Error) \
103103
V(ERR_WASI_NOT_STARTED, Error) \
104104
V(ERR_WORKER_INIT_FAILED, Error) \
105+
V(ERR_WORKER_INVALID_ID, Error) \
105106
V(ERR_PROTO_ACCESS, Error)
106107

107108
#define V(code, type) \

src/node_messaging.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ class MessagePort;
1919

2020
typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
2121

22+
// Get a transfer list out of an array of Javascript objects.
23+
bool GetTransferList(Environment* env,
24+
v8::Local<v8::Context> context,
25+
v8::Local<v8::Value> transfer_list_v,
26+
TransferList* transfer_list_out);
27+
2228
// Used to represent the in-flight structure of an object that is being
2329
// transferred or cloned using postMessage().
2430
class TransferData : public MemoryRetainer {
@@ -288,7 +294,7 @@ class MessagePort : public HandleWrap {
288294
// alone is often not enough, since the backing C++ MessagePort object may
289295
// have been deleted already. For all intents and purposes, an object with a
290296
// NULL pointer to the C++ MessagePort object is also detached.
291-
inline bool IsDetached() const;
297+
bool IsDetached() const;
292298

293299
BaseObject::TransferMode GetTransferMode() const override;
294300
std::unique_ptr<TransferData> TransferForMessaging() override;

0 commit comments

Comments
 (0)