From 5b51d5143a6efbd4b6997b8008e45d64e6b7e57d Mon Sep 17 00:00:00 2001 From: Antoine du Hamel Date: Fri, 10 Mar 2023 16:27:57 +0100 Subject: [PATCH] worker: skip main thread when stdio is inherited --- doc/api/worker_threads.md | 24 ------------------ lib/internal/main/worker_thread.js | 25 +++++++++++++++++++ lib/internal/worker.js | 16 ++---------- test/fixtures/worker-stdio.mjs | 13 ++++++++++ .../test-worker-never-blocked-stdio.mjs | 20 +++++++++++++++ 5 files changed, 60 insertions(+), 38 deletions(-) create mode 100644 test/fixtures/worker-stdio.mjs create mode 100644 test/parallel/test-worker-never-blocked-stdio.mjs diff --git a/doc/api/worker_threads.md b/doc/api/worker_threads.md index 674424f6f0bf90..52218fc7955312 100644 --- a/doc/api/worker_threads.md +++ b/doc/api/worker_threads.md @@ -1293,30 +1293,6 @@ active handle in the event system. If the worker is already `unref()`ed calling ## Notes -### Synchronous blocking of stdio - -`Worker`s utilize message passing via {MessagePort} to implement interactions -with `stdio`. This means that `stdio` output originating from a `Worker` can -get blocked by synchronous code on the receiving end that is blocking the -Node.js event loop. - -```mjs -import { - Worker, - isMainThread, -} from 'worker_threads'; - -if (isMainThread) { - new Worker(new URL(import.meta.url)); - for (let n = 0; n < 1e10; n++) { - // Looping to simulate work. - } -} else { - // This output will be blocked by the for loop in the main thread. - console.log('foo'); -} -``` - ```cjs 'use strict'; diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js index 2ebfb849663eb6..05480eadd9ea19 100644 --- a/lib/internal/main/worker_thread.js +++ b/lib/internal/main/worker_thread.js @@ -5,6 +5,7 @@ const { ArrayPrototypeForEach, + ArrayPrototypePop, ArrayPrototypePushApply, ArrayPrototypeSplice, ObjectDefineProperty, @@ -26,6 +27,7 @@ const { threadId, getEnvMessagePort, } = internalBinding('worker'); +const { write } = require('fs'); const workerIo = require('internal/worker/io'); const { @@ -97,6 +99,8 @@ port.on('message', (message) => { manifestSrc, manifestURL, hasStdin, + directStdout, + directStderr, } = message; if (argv !== undefined) { @@ -135,6 +139,27 @@ port.on('message', (message) => { if (!hasStdin) process.stdin.push(null); + if (directStdout) { + process.stdout._writev = function _writev(chunks, cb) { + const { chunk, encoding } = ArrayPrototypePop(chunks); + write(1, chunk, { __proto__: null }, encoding, (err) => { + if (err) cb(err); + else if (chunks.length === 0) cb(); + else this._writev(chunks, cb); + }); + }; + } + if (directStderr) { + process.stderr._writev = function _writev(chunks, cb) { + const { chunk, encoding } = ArrayPrototypePop(chunks); + write(2, chunk, { __proto__: null }, encoding, (err) => { + if (err) cb(err); + else if (chunks.length === 0) cb(); + else this._writev(chunks, cb); + }); + }; + } + debug(`[${threadId}] starts worker script ${filename} ` + `(eval = ${doEval}) at cwd = ${process.cwd()}`); port.postMessage({ type: UP_AND_RUNNING }); diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 3b4b2f6ffaf0dc..eab20329def07b 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -225,12 +225,10 @@ class Worker extends EventEmitter { const stdout = new ReadableWorkerStdio(this[kPort], 'stdout'); if (!options.stdout) { stdout[kIncrementsPortRef] = false; - pipeWithoutWarning(stdout, process.stdout); } const stderr = new ReadableWorkerStdio(this[kPort], 'stderr'); if (!options.stderr) { stderr[kIncrementsPortRef] = false; - pipeWithoutWarning(stderr, process.stderr); } this[kParentSideStdio] = { stdin, stdout, stderr }; @@ -263,6 +261,8 @@ class Worker extends EventEmitter { require('internal/process/policy').src : null, hasStdin: !!options.stdin, + directStdout: !options.stdout, + directStderr: !options.stderr, }, transferList); // Use this to cache the Worker's loopStart value once available. this[kLoopStartTime] = -1; @@ -441,18 +441,6 @@ class Worker extends EventEmitter { } } -function pipeWithoutWarning(source, dest) { - const sourceMaxListeners = source._maxListeners; - const destMaxListeners = dest._maxListeners; - source.setMaxListeners(Infinity); - dest.setMaxListeners(Infinity); - - source.pipe(dest); - - source._maxListeners = sourceMaxListeners; - dest._maxListeners = destMaxListeners; -} - const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount); function parseResourceLimits(obj) { const ret = resourceLimitsArray; diff --git a/test/fixtures/worker-stdio.mjs b/test/fixtures/worker-stdio.mjs new file mode 100644 index 00000000000000..f537117ae3d142 --- /dev/null +++ b/test/fixtures/worker-stdio.mjs @@ -0,0 +1,13 @@ +import { + Worker, + isMainThread, +} from 'worker_threads'; + +if (isMainThread) { + new Worker(new URL(import.meta.url)); + while(true); // never-ending loop +} else { + // This output will be blocked by the for loop in the main thread. + console.log('foo'); + console.error('bar'); +} diff --git a/test/parallel/test-worker-never-blocked-stdio.mjs b/test/parallel/test-worker-never-blocked-stdio.mjs new file mode 100644 index 00000000000000..8fca7d666ced1e --- /dev/null +++ b/test/parallel/test-worker-never-blocked-stdio.mjs @@ -0,0 +1,20 @@ +import '../common/index.mjs'; +import * as fixtures from '../common/fixtures.mjs'; +import assert from 'node:assert'; +import { execPath } from 'node:process'; +import { spawn } from 'node:child_process'; +import { once } from 'node:events'; + +const cp = spawn(execPath, [fixtures.path('worker-stdio.mjs')]); +try { + await Promise.all([ + once(cp.stdout, 'data').then((data) => { + assert.match(data.toString(), /^foo\r?\n$/); + }), + once(cp.stderr, 'data').then((data) => { + assert.match(data.toString(), /^bar\r?\n$/); + }), + ]); +} finally { + cp.kill(); +}