Skip to content

Commit

Permalink
separate worker data & lock buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
JakobJingleheimer committed Dec 31, 2022
1 parent 2a54826 commit c8ea77a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
33 changes: 19 additions & 14 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -446,23 +446,19 @@ ObjectSetPrototypeOf(Hooks.prototype, null);

class HooksProxy {
/**
* The communication vehicle constituting all memory shared between the main and the worker.
* The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert
* requests & responses into a format supported by the comms channel) reads and writes with
* Uint8Array.
*/
#commsChannel = new SharedArrayBuffer(2048); // maybe use buffer.constants.MAX_LENGTH ?
#data;
/**
* The lock/unlock segment of the shared memory. Atomics require this to be a Int32Array. This
* segment is used to tell the main to sleep when the worker is processing, and vice versa
* (for the worker to sleep whilst the main thread is processing).
* 0 -> main sleeps
* 1 -> worker sleeps
*/
#lock = new Int32Array(this.#commsChannel, 0, 4);
/**
* The request & response segment of the shared memory. TextEncoder/Decoder (needed to convert
* requests & responses into a format supported by the comms channel) reads and writes with
* Uint8Array.
*/
#requestResponseData = new Uint8Array(this.#commsChannel, 4, 2044);
#lock;

#isReady = false;

Expand All @@ -472,12 +468,21 @@ class HooksProxy {
*/
constructor() {
const { InternalWorker } = require('internal/worker');

const data = new SharedArrayBuffer(2048);
const lock = new SharedArrayBuffer(4);
this.#data = new Uint8Array(data);
this.#lock = new Int32Array(lock);

const worker = this.worker = new InternalWorker('internal/modules/esm/worker', {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: { commsChannel: this.#commsChannel },
workerData: {
data,
lock,
},
});
worker.unref(); // ! Allows the process to eventually exit when worker is in its final sleep.
}
Expand All @@ -492,20 +497,20 @@ class HooksProxy {
this.#isReady = true;
}

TypedArrayPrototypeFill(this.#requestResponseData, 0); // Erase handled request/response data
TypedArrayPrototypeFill(this.#data, 0); // Erase handled request/response data

const request = serialize({ method, args });
TypedArrayPrototypeSet(this.#requestResponseData, request);
TypedArrayPrototypeSet(this.#data, request);

Atomics.store(this.#lock, 0, 0); // Send request to worker
Atomics.notify(this.#lock, 0); // Notify worker of new request
Atomics.wait(this.#lock, 0, 0); // Sleep until worker responds

let response;
try {
response = deserialize(this.#requestResponseData);
response = deserialize(this.#data);
} catch (exception) {
if (this.#requestResponseData.every((byte) => byte === 0)) {
if (this.#data.every((byte) => byte === 0)) {
throw new ERR_INVALID_RETURN_VALUE('an object', method, undefined);
} else {
throw exception;
Expand Down
18 changes: 8 additions & 10 deletions lib/internal/modules/esm/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ internalBinding('module_wrap').callbackMap = new SafeWeakMap();

const { isMainThread, workerData } = require('worker_threads');
if (isMainThread) { return; } // Needed to pass some tests that happen to load this file on the main thread
const { commsChannel } = workerData;

// lock = 0 -> main sleeps
// lock = 1 -> worker sleeps
const lock = new Int32Array(commsChannel, 0, 4); // Required by Atomics
const requestResponseDataSize = 2044;
const requestResponseData = new Uint8Array(commsChannel, 4, requestResponseDataSize); // For v8.deserialize/serialize
const lock = new Int32Array(workerData.lock); // Required by Atomics
const data = new Uint8Array(workerData.data); // For v8.deserialize/serialize

function releaseLock() {
Atomics.store(lock, 0, 1); // Send response to main
Expand Down Expand Up @@ -58,8 +56,8 @@ function releaseLock() {
while (true) {
Atomics.wait(lock, 0, 1); // This pauses the while loop

const { method, args } = deserialize(requestResponseData);
TypedArrayPrototypeFill(requestResponseData, 0);
const { method, args } = deserialize(data);
TypedArrayPrototypeFill(data, 0);

// Each potential exception needs to be caught individually so that the correct error is sent to the main thread
let response, serializedResponse;
Expand All @@ -78,19 +76,19 @@ function releaseLock() {

try {
serializedResponse = serialize(response);
if (serializedResponse.byteLength > requestResponseDataSize) {
throw new ERR_OUT_OF_RANGE('serializedResponse.byteLength', `<= ${requestResponseDataSize}`, serializedResponse.byteLength);
if (serializedResponse.byteLength > data.length) {
throw new ERR_OUT_OF_RANGE('serializedResponse.byteLength', `<= ${data.length}`, serializedResponse.byteLength);
}
} catch (exception) {
serializedResponse = serialize(exception);
}

// Send the method response (or exception) to the main thread
try {
TypedArrayPrototypeSet(requestResponseData, serializedResponse);
TypedArrayPrototypeSet(data, serializedResponse);
} catch (exception) {
// Or send the exception thrown when trying to send the response
TypedArrayPrototypeSet(requestResponseData, serialize(exception));
TypedArrayPrototypeSet(data, serialize(exception));
}
releaseLock();
}
Expand Down

0 comments on commit c8ea77a

Please sign in to comment.