Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "module: have a single hooks thread for all workers" #53183

Merged
merged 2 commits into from
Jun 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ port.on('message', (message) => {
filename,
hasStdin,
publicPort,
hooksPort,
workerData,
} = message;

Expand All @@ -110,7 +109,6 @@ port.on('message', (message) => {
}

require('internal/worker').assignEnvironmentData(environmentData);
require('internal/worker').hooksPort = hooksPort;

if (SharedArrayBuffer !== undefined) {
// The counter is only passed to the workers created by the main thread,
Expand Down
127 changes: 43 additions & 84 deletions lib/internal/modules/esm/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const {
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
const { URL } = require('internal/url');
const { canParse: URLCanParse } = internalBinding('url');
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
const { receiveMessageOnPort } = require('worker_threads');
const {
isAnyArrayBuffer,
isArrayBufferView,
Expand Down Expand Up @@ -482,8 +482,6 @@ class HooksProxy {
*/
#worker;

#portToHooksThread;

/**
* The last notification ID received from the worker. This is used to detect
* if the worker has already sent a notification before putting the main
Expand All @@ -501,38 +499,26 @@ class HooksProxy {
#isReady = false;

constructor() {
const { InternalWorker, hooksPort } = require('internal/worker');
const { InternalWorker } = require('internal/worker');
MessageChannel ??= require('internal/worker/io').MessageChannel;

const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
this.#lock = new Int32Array(lock);

if (isMainThread) {
// Main thread is the only one that creates the internal single hooks worker
this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
this.#portToHooksThread = this.#worker;
} else {
this.#portToHooksThread = hooksPort;
}
this.#worker = new InternalWorker(loaderWorkerId, {
stderr: false,
stdin: false,
stdout: false,
trackUnmanagedFds: false,
workerData: {
lock,
},
});
this.#worker.unref(); // ! Allows the process to eventually exit.
this.#worker.on('exit', process.exit);
}

waitForWorker() {
// There is one Hooks instance for each worker thread. But only one of these Hooks instances
// has an InternalWorker. That was the Hooks instance created for the main thread.
// It means for all Hooks instances that are not on the main thread => they are ready because they
// delegate to the single InternalWorker anyway.
if (!isMainThread) {
return;
}

if (!this.#isReady) {
const { kIsOnline } = require('internal/worker');
if (!this.#worker[kIsOnline]) {
Expand All @@ -549,37 +535,6 @@ class HooksProxy {
}
}

#postMessageToWorker(method, type, transferList, args) {
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;

const {
port1: fromHooksThread,
port2: toHooksThread,
} = new MessageChannel();

// Pass work to the worker.
debug(`post ${type} message to worker`, { method, args, transferList });
const usedTransferList = [toHooksThread];
if (transferList) {
ArrayPrototypePushApply(usedTransferList, transferList);
}

this.#portToHooksThread.postMessage(
{
__proto__: null,
args,
lock: this.#lock,
method,
port: toHooksThread,
},
usedTransferList,
);

return fromHooksThread;
}

/**
* Invoke a remote method asynchronously.
* @param {string} method Method to invoke
Expand All @@ -588,7 +543,22 @@ class HooksProxy {
* @returns {Promise<any>}
*/
async makeAsyncRequest(method, transferList, ...args) {
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args);
this.waitForWorker();

MessageChannel ??= require('internal/worker/io').MessageChannel;
const asyncCommChannel = new MessageChannel();

// Pass work to the worker.
debug('post async message to worker', { method, args, transferList });
const finalTransferList = [asyncCommChannel.port2];
if (transferList) {
ArrayPrototypePushApply(finalTransferList, transferList);
}
this.#worker.postMessage({
__proto__: null,
method, args,
port: asyncCommChannel.port2,
}, finalTransferList);

if (this.#numberOfPendingAsyncResponses++ === 0) {
// On the next lines, the main thread will await a response from the worker thread that might
Expand All @@ -597,11 +567,7 @@ class HooksProxy {
// However we want to keep the process alive until the worker thread responds (or until the
// event loop of the worker thread is also empty), so we ref the worker until we get all the
// responses back.
if (this.#worker) {
this.#worker.ref();
} else {
this.#portToHooksThread.ref();
}
this.#worker.ref();
}

let response;
Expand All @@ -610,26 +576,18 @@ class HooksProxy {
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(fromHooksThread);
response = receiveMessageOnPort(asyncCommChannel.port1);
} while (response == null);
debug('got async response from worker', { method, args }, this.#lock);

if (--this.#numberOfPendingAsyncResponses === 0) {
// We got all the responses from the worker, its job is done (until next time).
if (this.#worker) {
this.#worker.unref();
} else {
this.#portToHooksThread.unref();
}
}

if (response.message.status === 'exit') {
process.exit(response.message.body);
this.#worker.unref();
}

fromHooksThread.close();

return this.#unwrapMessage(response);
const body = this.#unwrapMessage(response);
asyncCommChannel.port1.close();
return body;
}

/**
Expand All @@ -640,7 +598,11 @@ class HooksProxy {
* @returns {any}
*/
makeSyncRequest(method, transferList, ...args) {
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args);
this.waitForWorker();

// Pass work to the worker.
debug('post sync message to worker', { method, args, transferList });
this.#worker.postMessage({ __proto__: null, method, args }, transferList);

let response;
do {
Expand All @@ -649,17 +611,14 @@ class HooksProxy {
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);

response = receiveMessageOnPort(fromHooksThread);
response = this.#worker.receiveMessageSync();
} while (response == null);
debug('got sync response from worker', { method, args });
if (response.message.status === 'never-settle') {
process.exit(kUnsettledTopLevelAwait);
} else if (response.message.status === 'exit') {
process.exit(response.message.body);
}

fromHooksThread.close();

return this.#unwrapMessage(response);
}

Expand Down
19 changes: 3 additions & 16 deletions lib/internal/modules/esm/loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
const {
urlToFilename,
} = require('internal/modules/helpers');
const { isMainThread } = require('worker_threads');
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;

/**
Expand Down Expand Up @@ -608,26 +607,21 @@ class CustomizedModuleLoader {
*/
constructor() {
getHooksProxy();
_hasCustomizations = true;
}

/**
* Register a loader specifier.
* Register some loader specifier.
* @param {string} originalSpecifier The specified URL path of the loader to
* be registered.
* @param {string} parentURL The parent URL from where the loader will be
* registered if using it package name as specifier
* @param {any} [data] Arbitrary data to be passed from the custom loader
* (user-land) to the worker.
* @param {any[]} [transferList] Objects in `data` that are changing ownership
* @returns {{ format: string, url: URL['href'] } | undefined}
* @returns {{ format: string, url: URL['href'] }}
*/
register(originalSpecifier, parentURL, data, transferList) {
if (isMainThread) {
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
// delegate their hooks to the HooksThread of the main thread.
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
}

/**
Expand Down Expand Up @@ -725,12 +719,6 @@ function getHooksProxy() {
return hooksProxy;
}

let _hasCustomizations = false;
function hasCustomizations() {
return _hasCustomizations;
}


let cascadedLoader;

/**
Expand Down Expand Up @@ -792,7 +780,6 @@ function register(specifier, parentURL = undefined, options) {

module.exports = {
createModuleLoader,
hasCustomizations,
getHooksProxy,
getOrInitializeCascadedLoader,
register,
Expand Down
Loading