Skip to content

Commit

Permalink
Refactor data transport to use named pipes directly
Browse files Browse the repository at this point in the history
Because node's spawn() implementation is broken on windows
Ref:

nodejs/node#14046
https://github.com/mcollina/autocannon/pull/145/files
  • Loading branch information
yurynix committed Jun 19, 2023
1 parent 66c2b50 commit 579a00d
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 13 deletions.
2 changes: 1 addition & 1 deletion lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Worker extends EventEmitter {
this.isTerminating = false;
this.isProcessAlive = false;

const process = this.process = new WorkerProcess(srcFilePath, { stopTimeout, asyncWorkerInitialization, resourceLimits, workerType });
const process = this.process = new WorkerProcess(srcFilePath, { workerId: this.id, stopTimeout, asyncWorkerInitialization, resourceLimits, workerType });

process.once('ready', () => {
this.isProcessAlive = true;
Expand Down
10 changes: 7 additions & 3 deletions lib/worker/child-loader.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ const Profiler = require('../util/cpu-profiler');
const getHeapSnapshot = require('../util/get-heap-snapshot');

let $module;
let $dataTransport;

if (threadId === 0) {
$dataTransport = new Transport(net.connect(process.env.WORKER_NODES_PIPE_NAME));
}

const parentHandle = threadId === 0 ? process : parentPort;
const sendMessageToParent = threadId === 0 ? process.send.bind(process) : parentPort.postMessage.bind(parentPort);
Expand All @@ -15,7 +20,7 @@ let sendDataToParent = null;
process.on('uncaughtException', (err) => {
// This might happen when parent is already exited and we try to send it messages
// It can be commonly seen in tests
if (err.message === "write EPIPE") {
if (err.message === "write EPIPE" || err.message.includes(`connect ENOENT ${process.env.WORKER_NODES_PIPE_NAME}`)) {
return;
}
console.error(`Child PID: ${process.pid} ThreadID: ${threadId} encountered an error: ${err.message}`, err.stack);
Expand All @@ -26,8 +31,7 @@ function setupModule({ modulePath, asyncWorkerInitialization }) {
$module = require(modulePath);

// setup data channel
if (threadId === 0) {
const $dataTransport = new Transport(new net.Socket({ fd: 3 }));
if (threadId === 0 && $dataTransport) {
$dataTransport.on('message', callSplitter);
sendDataToParent = (message) => $dataTransport.send(message);
} else {
Expand Down
60 changes: 52 additions & 8 deletions lib/worker/process.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ const worker = require('worker_threads');
const child_process = require('child_process');
const EventEmitter = require('events');
const Transport = require('./transport');
const path = require('path');
const os = require('os');
const net = require('net');
const fs = require('fs');

const EXIT_WAIT_TIME_MS = 200;

Expand All @@ -20,29 +24,66 @@ const execArgsProcessor = {
}
};

function createChannel(workerId, onConnect) {
const pipeName = `${process.pid}.worker-nodes-${workerId}`;
const socketPath = process.platform === 'win32'
? `\\\\?\\pipe\\${pipeName}`
: path.join(os.tmpdir(), pipeName);

const server = net.createServer((socket) => {
onConnect(socket);
});

server.listen(socketPath);

server.on('close', () => {
try {
fs.unlinkSync(socketPath)
} catch (err) {
console.warn(`Failed to remove pipe: ${err}`);
}
});

server.on("error", (err) => {
console.error(`Error on ${socketPath} of ${workerId}`, err);
})

return { socketPath, server }
}

class WorkerProcess extends EventEmitter {
constructor(modulePath, { asyncWorkerInitialization, resourceLimits, workerType }) {
constructor(modulePath, { asyncWorkerInitialization, resourceLimits, workerType, workerId }) {
super();
this.workerType = workerType;

let child;
let transportSetupPromise = null;

if (workerType === "thread") {
child = new worker.Worker(require.resolve('./child-loader'), {
resourceLimits,
});
this._sendMessageToChild = child.postMessage.bind(child);
this._sendDataToChild = this._sendMessageToChild;
transportSetupPromise = Promise.resolve();
} else if (workerType === "process") {
let transport = null;
let resolveWhenTransportSetup = null;
transportSetupPromise = new Promise((_resolve) => resolveWhenTransportSetup = _resolve);
const { socketPath } = createChannel(workerId, (socket) => {
transport = new Transport(socket);
transport.on('message', message => this.emit('message', message));
resolveWhenTransportSetup();
});
child = child_process.fork(require.resolve('./child-loader'), {
env: process.env,
env: {
...process.env,
"WORKER_NODES_PIPE_NAME": socketPath,
},
cwd: process.cwd(),
execArgv: execArgsProcessor.map(process.execArgv),
stdio: [0, 1, 2, 'pipe', 'ipc']
stdio: [0, 1, 2, 'ipc']
});

const transport = new Transport(child.stdio[3]);
transport.on('message', message => this.emit('message', message));

this._sendMessageToChild = child.send.bind(child);
this._sendDataToChild = (message) => transport.send(message);
Expand All @@ -54,8 +95,11 @@ class WorkerProcess extends EventEmitter {
// report readiness on a first message received from the child
// process (as it means that the child has loaded all the stuff
// into a memory and is ready to handle the calls)
this.emit('ready');
this.startDate = new Date();
// we also wait for the data transport to connect
transportSetupPromise.then(() => {
this.emit('ready');
this.startDate = new Date();
});

child.on('message', message => {
this.emit('message', message);
Expand Down
9 changes: 8 additions & 1 deletion lib/worker/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ class Transport extends EventEmitter {
}
};

pipe.on('error', console.error);
pipe.on('error', error => {
// This might happen when parent is already exited and we try to send it messages
// It can be commonly seen in tests
if (error.message.includes('connect ENOENT')) {
return;
}
console.error(error);
});
pipe.on('data', processChunk);
}

Expand Down

0 comments on commit 579a00d

Please sign in to comment.