Skip to content

Commit

Permalink
watch: enable passthrough ipc in watch mode
Browse files Browse the repository at this point in the history
PR-URL: #50890
Reviewed-By: Moshe Atlow <moshe@atlow.co.il>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
  • Loading branch information
znewsham authored and marco-ippolito committed Jun 17, 2024
1 parent 3df3e37 commit 1ff8f31
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 11 deletions.
32 changes: 21 additions & 11 deletions lib/internal/main/watch_mode.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ function start() {
process.stdout.write(`${red}Failed running ${kCommandStr}${white}\n`);
}
});
return child;
}

async function killAndWait(signal = kKillSignal, force = false) {
Expand Down Expand Up @@ -113,34 +114,43 @@ function reportGracefulTermination() {
};
}

async function stop() {
async function stop(child) {
// Without this line, the child process is still able to receive IPC, but is unable to send additional messages
watcher.destroyIPC(child);
watcher.clearFileFilters();
const clearGraceReport = reportGracefulTermination();
await killAndWait();
clearGraceReport();
}

let restarting = false;
async function restart() {
async function restart(child) {
if (restarting) return;
restarting = true;
try {
if (!kPreserveOutput) process.stdout.write(clear);
process.stdout.write(`${green}Restarting ${kCommandStr}${white}\n`);
await stop();
start();
await stop(child);
return start();
} finally {
restarting = false;
}
}

start();
watcher
.on('changed', restart)
.on('error', (error) => {
watcher.off('changed', restart);
triggerUncaughtException(error, true /* fromPromise */);
});
async function init() {
let child = start();
const restartChild = async () => {
child = await restart(child);
};
watcher
.on('changed', restartChild)
.on('error', (error) => {
watcher.off('changed', restartChild);
triggerUncaughtException(error, true /* fromPromise */);
});
}

init();

// Exiting gracefully to avoid stdout/stderr getting written after
// parent process is killed.
Expand Down
29 changes: 29 additions & 0 deletions lib/internal/watch_mode/files_watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
const {
ArrayIsArray,
ArrayPrototypeForEach,
Boolean,
SafeMap,
SafeSet,
SafeWeakMap,
StringPrototypeStartsWith,
} = primordials;

Expand All @@ -31,6 +33,8 @@ class FilesWatcher extends EventEmitter {
#debounce;
#mode;
#signal;
#passthroughIPC = false;
#ipcHandlers = new SafeWeakMap();

constructor({ debounce = 200, mode = 'filter', signal } = kEmptyObject) {
super({ __proto__: null, captureRejections: true });
Expand All @@ -40,6 +44,7 @@ class FilesWatcher extends EventEmitter {
this.#debounce = debounce;
this.#mode = mode;
this.#signal = signal;
this.#passthroughIPC = Boolean(process.send);

if (signal) {
addAbortListener(signal, () => this.clear());
Expand Down Expand Up @@ -128,7 +133,31 @@ class FilesWatcher extends EventEmitter {
this.#ownerDependencies.set(owner, dependencies);
}
}


#setupIPC(child) {
const handlers = {
__proto__: null,
parentToChild: (message) => child.send(message),
childToParent: (message) => process.send(message),
};
this.#ipcHandlers.set(child, handlers);
process.on('message', handlers.parentToChild);
child.on('message', handlers.childToParent);
}

destroyIPC(child) {
const handlers = this.#ipcHandlers.get(child);
if (this.#passthroughIPC && handlers !== undefined) {
process.off('message', handlers.parentToChild);
child.off('message', handlers.childToParent);
}
}

watchChildProcessModules(child, key = null) {
if (this.#passthroughIPC) {
this.#setupIPC(child);
}
if (this.#mode !== 'filter') {
return;
}
Expand Down
81 changes: 81 additions & 0 deletions test/sequential/test-watch-mode.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { spawn } from 'node:child_process';
import { writeFileSync, readFileSync, mkdirSync } from 'node:fs';
import { inspect } from 'node:util';
import { pathToFileURL } from 'node:url';
import { once } from 'node:events';
import { createInterface } from 'node:readline';

if (common.isIBMi)
Expand Down Expand Up @@ -574,4 +575,84 @@ console.log(values.random);
`Completed running ${inspect(file)}`,
]);
});

it('should pass IPC messages from a spawning parent to the child and back', async () => {
const file = createTmpFile(`console.log('running');
process.on('message', (message) => {
if (message === 'exit') {
process.exit(0);
} else {
console.log('Received:', message);
process.send(message);
}
})`);

const child = spawn(
execPath,
[
'--watch',
'--no-warnings',
file,
],
{
encoding: 'utf8',
stdio: ['pipe', 'pipe', 'pipe', 'ipc'],
},
);

let stderr = '';
let stdout = '';

child.stdout.on('data', (data) => stdout += data);
child.stderr.on('data', (data) => stderr += data);
async function waitForEcho(msg) {
const receivedPromise = new Promise((resolve) => {
const fn = (message) => {
if (message === msg) {
child.off('message', fn);
resolve();
}
};
child.on('message', fn);
});
child.send(msg);
await receivedPromise;
}

async function waitForText(text) {
const seenPromise = new Promise((resolve) => {
const fn = (data) => {
if (data.toString().includes(text)) {
resolve();
child.stdout.off('data', fn);
}
};
child.stdout.on('data', fn);
});
await seenPromise;
}

await waitForText('running');
await waitForEcho('first message');
const stopRestarts = restart(file);
await waitForText('running');
stopRestarts();
await waitForEcho('second message');
const exitedPromise = once(child, 'exit');
child.send('exit');
await waitForText('Completed');
child.disconnect();
child.kill();
await exitedPromise;
assert.strictEqual(stderr, '');
const lines = stdout.split(/\r?\n/).filter(Boolean);
assert.deepStrictEqual(lines, [
'running',
'Received: first message',
`Restarting ${inspect(file)}`,
'running',
'Received: second message',
`Completed running ${inspect(file)}`,
]);
});
});

0 comments on commit 1ff8f31

Please sign in to comment.