Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(child_process): prevent writing to terminating process #85

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/runtime/process-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export default class ProcessWorker implements TinypoolWorker {
port?: MessagePort
channel?: TinypoolChannel
waitForExit!: Promise<void>
isTerminating = false

initialize(options: Parameters<TinypoolWorker['initialize']>[0]) {
this.process = fork(
Expand All @@ -42,6 +43,7 @@ export default class ProcessWorker implements TinypoolWorker {
}

async terminate() {
this.isTerminating = true
this.process.off('exit', this.onUnexpectedExit)

const sigkillTimeout = setTimeout(
Expand All @@ -61,10 +63,16 @@ export default class ProcessWorker implements TinypoolWorker {

// Mirror channel's messages to process
this.channel.onMessage((message: any) => {
this.process.send(message)
this.send(message)
})
}

private send(message: Parameters<NonNullable<typeof process['send']>>[0]) {
if (!this.isTerminating) {
this.process.send(message)
}
}

postMessage(message: any, transferListItem?: Readonly<TransferListItem[]>) {
transferListItem?.forEach((item) => {
if (item instanceof MessagePort) {
Expand All @@ -75,15 +83,15 @@ export default class ProcessWorker implements TinypoolWorker {
// Mirror port's messages to process
if (this.port) {
this.port.on('message', (message) =>
this.process.send(<TinypoolWorkerMessage<'port'>>{
this.send(<TinypoolWorkerMessage<'port'>>{
...message,
source: 'port',
__tinypool_worker_message__,
})
)
}

return this.process.send(<TinypoolWorkerMessage<'pool'>>{
return this.send(<TinypoolWorkerMessage<'pool'>>{
...message,
source: 'pool',
__tinypool_worker_message__,
Expand Down
26 changes: 26 additions & 0 deletions test/termination-timeout.test.ts → test/termination.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,29 @@ test('termination timeout throws when worker does not terminate in time', async
'Failed to terminate worker'
)
})

test('writing to terminating worker does not crash', async () => {
const listeners: ((msg: any) => void)[] = []

const pool = new Tinypool({
runtime: 'child_process',
filename: resolve(__dirname, 'fixtures/sleep.js'),
minThreads: 1,
maxThreads: 1,
})

await pool.run(
{},
{
channel: {
onMessage: (listener) => listeners.push(listener),
postMessage: () => {},
},
}
)

const destroyed = pool.destroy()
listeners.forEach((listener) => listener('Hello from main thread'))

await destroyed
})
Loading