Skip to content

Commit

Permalink
refactor: preparation for custom runtimes
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Jun 16, 2024
1 parent 474ac9f commit 5777c59
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 69 deletions.
21 changes: 18 additions & 3 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ export interface TinypoolChannel {
postMessage(message: any): void
}

type Listener = (...args: any[]) => void

export interface TinypoolWorker {
runtime: string
initialize(options: {
Expand All @@ -19,12 +21,25 @@ export interface TinypoolWorker {
workerData: TinypoolData
trackUnmanagedFds?: boolean
}): void

/** Terminates the worker */
terminate(): Promise<any>

/** Send message to the worker */
postMessage(message: any, transferListItem?: TransferListItem[]): void

/** Listen on ready messages */
onReady(listener: Listener): void

/** Listen on errors */
onError(listener: Listener): void

/** Listen on exit. Called only **once**. */
onExit(listener: Listener): void

/** Set's channel for 'main <-> worker' communication */
setChannel?: (channel: TinypoolChannel) => void
on(event: string, listener: (...args: any[]) => void): void
once(event: string, listener: (...args: any[]) => void): void
emit(event: string, ...data: any[]): void

ref?: () => void
unref?: () => void
threadId: number
Expand Down
45 changes: 18 additions & 27 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
type MessagePort,
receiveMessageOnPort,
} from 'node:worker_threads'
import { once, EventEmitterAsyncResource } from 'node:events'
import { EventEmitterAsyncResource } from 'node:events'
import { AsyncResource } from 'node:async_hooks'
import { fileURLToPath, URL } from 'node:url'
import { join } from 'node:path'
Expand All @@ -13,7 +13,6 @@ import { performance } from 'node:perf_hooks'
import { readFileSync } from 'node:fs'
import { amount as physicalCpuCount } from './physicalCpuCount'
import {
type ReadyMessage,
type RequestMessage,
type ResponseMessage,
type StartupMessage,
Expand Down Expand Up @@ -694,7 +693,7 @@ class ThreadPool {
})
const tinypoolPrivateData = { workerId: workerId! }

const worker =
const worker: TinypoolWorker =
this.options.runtime === 'child_process'
? new ProcessWorker()
: new ThreadWorker()
Expand Down Expand Up @@ -761,25 +760,17 @@ class ThreadPool {

worker.postMessage(message, [port2])

worker.on('message', (message: ReadyMessage) => {
if (message.ready === true) {
if (workerInfo.currentUsage() === 0) {
workerInfo.unref()
}

if (!workerInfo.isReady()) {
workerInfo.markAsReady()
}
return
worker.onReady(() => {
if (workerInfo.currentUsage() === 0) {
workerInfo.unref()
}

worker.emit(
'error',
new Error(`Unexpected message on Worker: ${inspect(message)}`)
)
if (!workerInfo.isReady()) {
workerInfo.markAsReady()
}
})

worker.on('error', (err: Error) => {
worker.onError((err: Error) => {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {}

Expand Down Expand Up @@ -809,12 +800,12 @@ class ThreadPool {
}
})

worker.unref()
worker.unref?.()
port1.on('close', () => {
// The port is only closed if the Worker stops for some reason, but we
// always .unref() the Worker itself. We want to receive e.g. 'error'
// events on it, so we ref it once we know it's going to exit anyway.
worker.ref()
worker.ref?.()
})

this.workers.add(workerInfo)
Expand Down Expand Up @@ -1056,13 +1047,14 @@ class ThreadPool {
taskInfo.done(new Error('Terminating worker thread'))
}

const exitEvents: Promise<any[]>[] = []
const exitEvents: Promise<void>[] = []
while (this.workers.size > 0) {
const [workerInfo] = this.workers
// @ts-expect-error -- TODO Fix
exitEvents.push(once(workerInfo.worker, 'exit'))
// @ts-expect-error -- TODO Fix
void this._removeWorker(workerInfo)

if (workerInfo) {
exitEvents.push(new Promise((r) => workerInfo.worker.onExit(r)))
void this._removeWorker(workerInfo)
}
}

await Promise.all(exitEvents)
Expand All @@ -1087,8 +1079,7 @@ class ThreadPool {
Array.from(this.workers).filter((workerInfo) => {
// Remove idle workers
if (workerInfo.currentUsage() === 0) {
// @ts-expect-error -- TODO Fix
exitEvents.push(once(workerInfo.worker, 'exit'))
exitEvents.push(new Promise((r) => workerInfo.worker.onExit(r)))
void this._removeWorker(workerInfo)
}
// Mark on-going workers for recycling.
Expand Down
50 changes: 34 additions & 16 deletions src/runtime/process-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { type ChildProcess, fork } from 'node:child_process'
import { MessagePort, type TransferListItem } from 'node:worker_threads'
import { fileURLToPath } from 'node:url'
import {
type ReadyMessage,
type TinypoolChannel,
type TinypoolWorker,
type TinypoolWorkerMessage,
Expand Down Expand Up @@ -36,6 +37,16 @@ export default class ProcessWorker implements TinypoolWorker {

this.process.on('exit', this.onUnexpectedExit)
this.waitForExit = new Promise((r) => this.process.on('exit', r))

this.process.on('message', (data: TinypoolWorkerMessage) => {
if (!data || !data.__tinypool_worker_message__) {
return this.channel?.postMessage(data)
}

if (data.source === 'port') {
this.port!.postMessage(data)
}
})
}

onUnexpectedExit = () => {
Expand Down Expand Up @@ -98,27 +109,34 @@ export default class ProcessWorker implements TinypoolWorker {
})
}

on(event: string, callback: (...args: any[]) => void) {
return this.process.on(event, (data: TinypoolWorkerMessage) => {
// All errors should be forwarded to the pool
if (event === 'error') {
return callback(data)
}

if (!data || !data.__tinypool_worker_message__) {
return this.channel?.postMessage(data)
onReady(callback: (...args: any[]) => void) {
return this.process.on(
'message',
(data: TinypoolWorkerMessage & ReadyMessage) => {
if (
data.__tinypool_worker_message__ === true &&
data.source === 'pool' &&
data.ready === true
) {
callback()
}
}
)
}

if (data.source === 'pool') {
callback(data)
} else if (data.source === 'port') {
this.port!.postMessage(data)
}
onError(callback: (...args: any[]) => void) {
return this.process.on('error', (data) => {
// All errors should be forwarded to the pool
return callback(data)
})
}

once(event: string, callback: (...args: any[]) => void) {
return this.process.once(event, callback)
onExit(callback: (...args: any[]) => void) {
if (this.isTerminating) {
return callback()
}

return this.process.once('exit', callback)
}

emit(event: string, ...data: any[]) {
Expand Down
24 changes: 17 additions & 7 deletions src/runtime/thread-worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { fileURLToPath } from 'node:url'
import { inspect } from 'node:util'
import { type TransferListItem, Worker } from 'node:worker_threads'
import { type TinypoolWorker } from '../common'
import { type ReadyMessage, type TinypoolWorker } from '../common'

export default class ThreadWorker implements TinypoolWorker {
name = 'ThreadWorker'
Expand All @@ -24,16 +25,25 @@ export default class ThreadWorker implements TinypoolWorker {
return this.thread.postMessage(message, transferListItem)
}

on(event: string, callback: (...args: any[]) => void) {
return this.thread.on(event, callback)
onReady(callback: (...args: any[]) => void) {
return this.thread.on('message', (message: ReadyMessage) => {
if (message.ready === true) {
return callback()
}

this.thread.emit(
'error',
new Error(`Unexpected message on Worker: ${inspect(message)}`)
)
})
}

once(event: string, callback: (...args: any[]) => void) {
return this.thread.once(event, callback)
onError(callback: (...args: any[]) => void) {
return this.thread.on('error', callback)
}

emit(event: string, ...data: any[]) {
return this.thread.emit(event, ...data)
onExit(callback: (...args: any[]) => void) {
return this.thread.once('exit', callback)
}

ref() {
Expand Down
34 changes: 19 additions & 15 deletions test/termination.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,24 @@ test('writing to terminating worker does not crash', async () => {
await destroyed
})

test('recycling workers while closing pool does not crash', async () => {
const pool = new Tinypool({
runtime: 'child_process',
filename: resolve(__dirname, 'fixtures/nested-pool.mjs'),
isolateWorkers: true,
minThreads: 1,
maxThreads: 1,
})
test(
'recycling workers while closing pool does not crash',
{ timeout: 10_000 },
async () => {
const pool = new Tinypool({
runtime: 'child_process',
filename: resolve(__dirname, 'fixtures/nested-pool.mjs'),
isolateWorkers: true,
minThreads: 1,
maxThreads: 1,
})

await Promise.all(
(Array(10) as (() => Promise<any>)[])
.fill(() => pool.run({}))
.map((fn) => fn())
)
await Promise.all(
(Array(10) as (() => Promise<any>)[])
.fill(() => pool.run({}))
.map((fn) => fn())
)

await pool.destroy()
})
await pool.destroy()
}
)
2 changes: 1 addition & 1 deletion test/uncaught-exception-from-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ test('uncaught exception in immediate after task yields error event', async () =

// Hack a bit to make sure we get the 'exit'/'error' events.
expect(pool.threads.length).toBe(1)
pool.threads[0]!.ref?.()
pool.threads[0]!.ref!()

// This is the main aassertion here.
expect((await errorEvent)[0]!.message).toEqual('not_caught')
Expand Down

0 comments on commit 5777c59

Please sign in to comment.