Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable passthrough IPC in watch mode #50890

Merged
merged 9 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions lib/internal/main/watch_mode.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ function start() {
process.stdout.write(`${red}Failed running ${kCommandStr}${white}\n`);
}
});
return child;
}

async function killAndWait(signal = kKillSignal, force = false) {
Expand Down Expand Up @@ -113,34 +114,43 @@ function reportGracefulTermination() {
};
}

async function stop() {
async function stop(child) {
// Without this line, the child process is still able to receive IPC, but is unable to send additional messages
watcher.destroyIPC(child);
watcher.clearFileFilters();
const clearGraceReport = reportGracefulTermination();
await killAndWait();
clearGraceReport();
}

let restarting = false;
async function restart() {
async function restart(child) {
if (restarting) return;
restarting = true;
try {
if (!kPreserveOutput) process.stdout.write(clear);
process.stdout.write(`${green}Restarting ${kCommandStr}${white}\n`);
await stop();
start();
await stop(child);
return start();
} finally {
restarting = false;
}
}

start();
watcher
.on('changed', restart)
.on('error', (error) => {
watcher.off('changed', restart);
triggerUncaughtException(error, true /* fromPromise */);
});
async function init() {
let child = start();
const restartChild = async () => {
child = await restart(child);
};
watcher
.on('changed', restartChild)
.on('error', (error) => {
watcher.off('changed', restartChild);
triggerUncaughtException(error, true /* fromPromise */);
});
}

init();

// Exiting gracefully to avoid stdout/stderr getting written after
// parent process is killed.
Expand Down
29 changes: 29 additions & 0 deletions lib/internal/watch_mode/files_watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
const {
ArrayIsArray,
ArrayPrototypeForEach,
Boolean,
SafeMap,
znewsham marked this conversation as resolved.
Show resolved Hide resolved
SafeSet,
SafeWeakMap,
StringPrototypeStartsWith,
} = primordials;

Expand All @@ -31,6 +33,8 @@ class FilesWatcher extends EventEmitter {
#debounce;
#mode;
#signal;
#passthroughIPC = false;
#ipcHandlers = new SafeWeakMap();

constructor({ debounce = 200, mode = 'filter', signal } = kEmptyObject) {
super({ __proto__: null, captureRejections: true });
Expand All @@ -40,6 +44,7 @@ class FilesWatcher extends EventEmitter {
this.#debounce = debounce;
this.#mode = mode;
this.#signal = signal;
this.#passthroughIPC = Boolean(process.send);

if (signal) {
addAbortListener(signal, () => this.clear());
Expand Down Expand Up @@ -128,7 +133,31 @@ class FilesWatcher extends EventEmitter {
this.#ownerDependencies.set(owner, dependencies);
}
}


#setupIPC(child) {
const handlers = {
__proto__: null,
parentToChild: (message) => child.send(message),
childToParent: (message) => process.send(message),
};
this.#ipcHandlers.set(child, handlers);
process.on('message', handlers.parentToChild);
child.on('message', handlers.childToParent);
}

destroyIPC(child) {
const handlers = this.#ipcHandlers.get(child);
if (this.#passthroughIPC && handlers !== undefined) {
process.off('message', handlers.parentToChild);
child.off('message', handlers.childToParent);
}
}

watchChildProcessModules(child, key = null) {
if (this.#passthroughIPC) {
this.#setupIPC(child);
}
if (this.#mode !== 'filter') {
return;
}
Expand Down
81 changes: 81 additions & 0 deletions test/sequential/test-watch-mode.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { spawn } from 'node:child_process';
import { writeFileSync, readFileSync, mkdirSync } from 'node:fs';
import { inspect } from 'node:util';
import { pathToFileURL } from 'node:url';
import { once } from 'node:events';
import { createInterface } from 'node:readline';

if (common.isIBMi)
Expand Down Expand Up @@ -574,4 +575,84 @@ console.log(values.random);
`Completed running ${inspect(file)}`,
]);
});
znewsham marked this conversation as resolved.
Show resolved Hide resolved

it('should pass IPC messages from a spawning parent to the child and back', async () => {
const file = createTmpFile(`console.log('running');
process.on('message', (message) => {
if (message === 'exit') {
process.exit(0);
} else {
console.log('Received:', message);
process.send(message);
}
})`);

const child = spawn(
znewsham marked this conversation as resolved.
Show resolved Hide resolved
execPath,
[
'--watch',
'--no-warnings',
file,
],
{
encoding: 'utf8',
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
},
);

let stderr = '';
let stdout = '';

child.stdout.on('data', (data) => stdout += data);
child.stderr.on('data', (data) => stderr += data);
async function waitForEcho(msg) {
const receivedPromise = new Promise((resolve) => {
const fn = (message) => {
if (message === msg) {
child.off('message', fn);
resolve();
}
};
child.on('message', fn);
});
child.send(msg);
await receivedPromise;
}

async function waitForText(text) {
const seenPromise = new Promise((resolve) => {
const fn = (data) => {
if (data.toString().includes(text)) {
resolve();
child.stdout.off('data', fn);
}
};
child.stdout.on('data', fn);
});
await seenPromise;
}

await waitForText('running');
await waitForEcho('first message');
const stopRestarts = restart(file);
await waitForText('running');
stopRestarts();
await waitForEcho('second message');
const exitedPromise = once(child, 'exit');
child.send('exit');
await waitForText('Completed');
child.disconnect();
child.kill();
await exitedPromise;
assert.strictEqual(stderr, '');
const lines = stdout.split(/\r?\n/).filter(Boolean);
assert.deepStrictEqual(lines, [
'running',
'Received: first message',
`Restarting ${inspect(file)}`,
'running',
'Received: second message',
`Completed running ${inspect(file)}`,
]);
});
});