Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: skip main thread when stdio is inherited #47036

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -1293,30 +1293,6 @@ active handle in the event system. If the worker is already `unref()`ed calling

## Notes

### Synchronous blocking of stdio

`Worker`s utilize message passing via {MessagePort} to implement interactions
with `stdio`. This means that `stdio` output originating from a `Worker` can
get blocked by synchronous code on the receiving end that is blocking the
Node.js event loop.

```mjs
import {
Worker,
isMainThread,
} from 'worker_threads';

if (isMainThread) {
new Worker(new URL(import.meta.url));
for (let n = 0; n < 1e10; n++) {
// Looping to simulate work.
}
} else {
// This output will be blocked by the for loop in the main thread.
console.log('foo');
}
```

```cjs
'use strict';

Expand Down
25 changes: 25 additions & 0 deletions lib/internal/main/worker_thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

const {
ArrayPrototypeForEach,
ArrayPrototypePop,
ArrayPrototypePushApply,
ArrayPrototypeSplice,
ObjectDefineProperty,
Expand All @@ -26,6 +27,7 @@ const {
threadId,
getEnvMessagePort,
} = internalBinding('worker');
const { write } = require('fs');

const workerIo = require('internal/worker/io');
const {
Expand Down Expand Up @@ -97,6 +99,8 @@ port.on('message', (message) => {
manifestSrc,
manifestURL,
hasStdin,
directStdout,
directStderr,
} = message;

if (argv !== undefined) {
Expand Down Expand Up @@ -135,6 +139,27 @@ port.on('message', (message) => {
if (!hasStdin)
process.stdin.push(null);

if (directStdout) {
process.stdout._writev = function _writev(chunks, cb) {
const { chunk, encoding } = ArrayPrototypePop(chunks);
write(1, chunk, { __proto__: null }, encoding, (err) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work on Windows with non-ASCII text, unfortunately

Copy link
Contributor Author

@aduh95 aduh95 Mar 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a bummer. Do we know where does this limitation come from? EDIT: seems that it's explained in #24550.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love to land this PR. Is this non-ASCII thing an issue that needs resolution beyond just documenting as a known limitation?

Assuming that it is, could we use whatever solution console.log itself uses for outputting to stdout for Windows?

if (err) cb(err);
else if (chunks.length === 0) cb();
else this._writev(chunks, cb);
});
};
}
if (directStderr) {
process.stderr._writev = function _writev(chunks, cb) {
const { chunk, encoding } = ArrayPrototypePop(chunks);
write(2, chunk, { __proto__: null }, encoding, (err) => {
if (err) cb(err);
else if (chunks.length === 0) cb();
else this._writev(chunks, cb);
});
};
}

debug(`[${threadId}] starts worker script ${filename} ` +
`(eval = ${doEval}) at cwd = ${process.cwd()}`);
port.postMessage({ type: UP_AND_RUNNING });
Expand Down
16 changes: 2 additions & 14 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,10 @@ class Worker extends EventEmitter {
const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
if (!options.stdout) {
stdout[kIncrementsPortRef] = false;
pipeWithoutWarning(stdout, process.stdout);
}
const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
if (!options.stderr) {
stderr[kIncrementsPortRef] = false;
pipeWithoutWarning(stderr, process.stderr);
}

this[kParentSideStdio] = { stdin, stdout, stderr };
Expand Down Expand Up @@ -263,6 +261,8 @@ class Worker extends EventEmitter {
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin,
directStdout: !options.stdout,
directStderr: !options.stderr,
}, transferList);
// Use this to cache the Worker's loopStart value once available.
this[kLoopStartTime] = -1;
Expand Down Expand Up @@ -441,18 +441,6 @@ class Worker extends EventEmitter {
}
}

function pipeWithoutWarning(source, dest) {
const sourceMaxListeners = source._maxListeners;
const destMaxListeners = dest._maxListeners;
source.setMaxListeners(Infinity);
dest.setMaxListeners(Infinity);

source.pipe(dest);

source._maxListeners = sourceMaxListeners;
dest._maxListeners = destMaxListeners;
}

const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount);
function parseResourceLimits(obj) {
const ret = resourceLimitsArray;
Expand Down
13 changes: 13 additions & 0 deletions test/fixtures/worker-stdio.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import {
Worker,
isMainThread,
} from 'worker_threads';

if (isMainThread) {
new Worker(new URL(import.meta.url));
while(true); // never-ending loop
} else {
// This output will be blocked by the for loop in the main thread.
console.log('foo');
console.error('bar');
}
20 changes: 20 additions & 0 deletions test/parallel/test-worker-never-blocked-stdio.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import '../common/index.mjs';
import * as fixtures from '../common/fixtures.mjs';
import assert from 'node:assert';
import { execPath } from 'node:process';
import { spawn } from 'node:child_process';
import { once } from 'node:events';

const cp = spawn(execPath, [fixtures.path('worker-stdio.mjs')]);
try {
await Promise.all([
once(cp.stdout, 'data').then((data) => {
assert.match(data.toString(), /^foo\r?\n$/);
}),
once(cp.stderr, 'data').then((data) => {
assert.match(data.toString(), /^bar\r?\n$/);
}),
]);
} finally {
cp.kill();
}