diff --git a/.github/workflows/nodejs.yml b/.github/workflows/nodejs.yml index 9bb9b30..a452c82 100644 --- a/.github/workflows/nodejs.yml +++ b/.github/workflows/nodejs.yml @@ -13,14 +13,28 @@ jobs: runs-on: ${{matrix.os}} steps: - uses: actions/checkout@v2 + - name: Use Node.js ${{ matrix.node-version }} uses: actions/setup-node@v1 with: node-version: ${{ matrix.node-version }} + + - uses: oven-sh/setup-bun@v1 + if: ${{ matrix.os != 'windows-latest' }} + with: + bun-version: latest + - uses: pnpm/action-setup@v2 + - name: Install Dependencies run: pnpm install + - name: Build run: pnpm build + - name: Test run: pnpm test:ci + + - name: Test Bun + if: ${{ matrix.os != 'windows-latest' }} + run: pnpm test:bun diff --git a/.gitignore b/.gitignore index 71b423d..8982589 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .nyc_output .vscode node_modules +bun.lockb dist coverage diff --git a/benchmark/fixtures/wrap-add-bun.mjs b/benchmark/fixtures/wrap-add-bun.mjs new file mode 100644 index 0000000..3034084 --- /dev/null +++ b/benchmark/fixtures/wrap-add-bun.mjs @@ -0,0 +1,5 @@ +import add from './add.mjs' + +self.onmessage = (event) => { + postMessage(add(event.data)) +} diff --git a/benchmark/isolate-benchmark.mjs b/benchmark/isolate-benchmark.mjs index 6462a12..3c6fa85 100644 --- a/benchmark/isolate-benchmark.mjs +++ b/benchmark/isolate-benchmark.mjs @@ -1,35 +1,66 @@ /* - * Benchmark for testing whether Tinypool's worker creation and teardown is expensive. + * Benchmark focusing on the performance `isolateWorkers` option + * + * Options: + * - `--rounds` (optional) - Specify how many iterations to run + * - `--threads` (optional) - Specify how many threads to use */ -import { cpus } from 'node:os' -import { Worker } from 'node:worker_threads' + +import * as os from 'node:os' +import * as WorkerThreads from 'node:worker_threads' import Tinypool from '../dist/esm/index.js' -const THREADS = cpus().length - 1 -const ROUNDS = 5_000 +const IS_BUN = process.versions.bun !== undefined +const USE_ATOMICS = !IS_BUN +const THREADS = parseInt(getArgument('--threads') ?? getMaxThreads(), 10) +const ROUNDS = parseInt(getArgument('--rounds') ?? '2000', 10) + +console.log('Options:', { THREADS, ROUNDS, IS_BUN }, '\n') + +if (IS_BUN) { + await logTime( + "Tinypool { runtime: 'bun_workers' }", + runTinypool('bun_workers') + ) + + await logTime('Native Bun workers', runBunWorkers()) + process.exit(0) +} + +await logTime( + "Tinypool { runtime: 'worker_threds' }", + runTinypool('worker_threds') +) +await logTime( + "Tinypool { runtime: 'child_process' }", + runTinypool('child_process') +) -await logTime('Tinypool', runTinypool) -await logTime('Worker threads', runWorkerThreads) +await logTime('Native node:worker_threads', runNodeWorkerThreads()) -async function runTinypool() { +function runTinypool(runtime) { const pool = new Tinypool({ + runtime, filename: new URL('./fixtures/add.mjs', import.meta.url).href, isolateWorkers: true, minThreads: THREADS, maxThreads: THREADS, + useAtomics: USE_ATOMICS, }) - await Promise.all( - Array(ROUNDS) - .fill() - .map(() => pool.run({ a: 1, b: 2 })) - ) + return async function run() { + await Promise.all( + Array(ROUNDS) + .fill() + .map(() => pool.run({ a: 1, b: 2 })) + ) + } } -async function runWorkerThreads() { +function runNodeWorkerThreads() { async function task() { - const worker = new Worker('./fixtures/wrap-add.mjs') + const worker = new WorkerThreads.Worker('./fixtures/wrap-add.mjs') worker.postMessage({ a: 1, b: 2 }) await new Promise((resolve, reject) => @@ -50,16 +81,75 @@ async function runWorkerThreads() { } } - await Promise.all( - Array(THREADS) - .fill(execute) - .map((task) => task()) - ) + return async function run() { + await Promise.all( + Array(THREADS) + .fill(execute) + .map((task) => task()) + ) + } +} + +function runBunWorkers() { + async function task() { + const worker = new Worker('./fixtures/wrap-add-bun.mjs') + worker.postMessage({ a: 1, b: 2 }) + + await new Promise((resolve, reject) => { + worker.onmessage = (event) => + event.data === 3 ? resolve() : reject('Not 3') + }) + + await worker.terminate() + } + + const pool = Array(ROUNDS).fill(task) + + async function execute() { + const task = pool.shift() + + if (task) { + await task() + return execute() + } + } + + return async function run() { + await Promise.all( + Array(THREADS) + .fill(execute) + .map((task) => task()) + ) + } +} + +function getArgument(flag) { + const index = process.argv.indexOf(flag) + if (index === -1) return + + return process.argv[index + 1] +} + +function getMaxThreads() { + return os.availableParallelism?.() || os.cpus().length - 1 } async function logTime(label, method) { + console.log(`${label} | START`) + const start = process.hrtime.bigint() await method() const end = process.hrtime.bigint() - console.log(label, 'took', ((end - start) / 1_000_000n).toString(), 'ms') + + console.log(`${label} | END ${((end - start) / 1_000_000n).toString()} ms`) + + console.log('Cooling down for 2s') + const interval = setInterval(() => process.stdout.write('.'), 100) + await sleep(2_000) + clearInterval(interval) + console.log(' ✓\n') +} + +async function sleep(ms) { + await new Promise((resolve) => setTimeout(resolve, ms)) } diff --git a/bun-test/fixtures/eval.js b/bun-test/fixtures/eval.js new file mode 100644 index 0000000..91ea2c6 --- /dev/null +++ b/bun-test/fixtures/eval.js @@ -0,0 +1,4 @@ +// eslint-disable-next-line no-eval +export default function (code) { + return eval(code) +} diff --git a/bun-test/runtime.test.ts b/bun-test/runtime.test.ts new file mode 100644 index 0000000..0da0e39 --- /dev/null +++ b/bun-test/runtime.test.ts @@ -0,0 +1,59 @@ +import * as path from 'path' +import { fileURLToPath } from 'url' +import { Tinypool } from '../dist/esm' + +const __dirname = path.dirname(fileURLToPath(import.meta.url)) + +describe('Bun Workers', () => { + test('runs code in Bun Worker', async () => { + const pool = createPool({ runtime: 'bun_workers' }) + + const result = await pool.run(` + (async () => { + return { + sum: 11 + 12, + isMainThread: Bun.isMainThread, + pid: process.pid, + } + })() + `) + expect(result.sum).toBe(23) + expect(result.isMainThread).toBe(false) + expect(result.pid).toBe(process.pid) + }) + + test('sets tinypool state', async () => { + const pool = createPool({ runtime: 'bun_workers' }) + + const result = await pool.run('process.__tinypool_state__') + expect(result.isTinypoolWorker).toBe(true) + expect(result.isBunWorker).toBe(true) + expect(result.isWorkerThread).toBe(undefined) + expect(result.isChildProcess).toBe(undefined) + }) + + test('errors are serialized', async () => { + const pool = createPool({ runtime: 'bun_workers' }) + + const error = await pool + .run("throw new TypeError('Test message');") + .catch((e: Error) => e) + + expect(error.name).toBe('TypeError') + expect(error.message).toBe('Test message') + + // Nope Bun does not do this + // expect(error.stack).toMatch('fixtures/eval.js') + }) +}) + +function createPool(options) { + const pool = new Tinypool({ + filename: path.resolve(__dirname, './fixtures/eval.js'), + minThreads: 1, + maxThreads: 1, + ...options, + }) + + return pool +} diff --git a/package.json b/package.json index 0f83adf..fffaf03 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,7 @@ "scripts": { "test:ci": "node --experimental-vm-modules node_modules/jest/bin/jest.js --no-coverage --runInBand", "test:dev": "node --experimental-vm-modules --trace-warnings node_modules/jest/bin/jest.js --watch --no-coverage", + "test:bun": "bun --bun test bun-test/**", "dev": "tsup --watch", "build": "tsup", "publish": "clean-publish", @@ -72,7 +73,9 @@ "extensionsToTreatAsEsm": [ ".ts" ], - "testRegex": "test.(js|ts|tsx)$", + "testMatch": [ + "**/test/**/*.test.ts" + ], "verbose": true, "coverageDirectory": "./coverage/", "collectCoverage": true, diff --git a/src/entry/bun-worker.ts b/src/entry/bun-worker.ts new file mode 100644 index 0000000..d5a8bf6 --- /dev/null +++ b/src/entry/bun-worker.ts @@ -0,0 +1,78 @@ +import { + StartupMessage, + ReadyMessage, + RequestMessage, + ResponseMessage, +} from '../common' +import { getHandler, throwInNextTick } from './utils' +import { stderr, stdout } from 'src/utils' + +process.__tinypool_state__ = { + isTinypoolWorker: true, + isBunWorker: true, + workerData: null, + workerId: 1, +} + +self.onmessage = onWorkerMessage + +function onWorkerMessage(event: MessageEvent) { + const { filename, name } = event.data + + ;(async function () { + if (filename !== null) { + await getHandler(filename, name) + } + + const readyMessage: ReadyMessage = { ready: true } + self.postMessage(readyMessage, '') + })().catch(throwInNextTick) + + if (event.ports?.[0]) { + event.ports[0].start() + event.ports[0].onmessage = onPortMessage.bind(null, event.ports[0]) + } +} + +function onPortMessage(port: MessagePort, event: MessageEvent) { + const message = event.data + const { taskId, task, filename, name } = message + + ;(async function () { + let response: ResponseMessage + + try { + const handler = await getHandler(filename, name) + if (handler === null) { + throw new Error(`No handler function exported from ${filename}`) + } + let result = await handler(task) + response = { + taskId, + result: result, + error: null, + usedMemory: process.memoryUsage().heapUsed, + } + + // If the task used e.g. console.log(), wait for the stream to drain + // before potentially entering the `Atomics.wait()` loop, and before + // returning the result so that messages will always be printed even + // if the process would otherwise be ready to exit. + if (stdout()?.writableLength! > 0) { + await new Promise((resolve) => process.stdout.write('', resolve)) + } + if (stderr()?.writableLength! > 0) { + await new Promise((resolve) => process.stderr.write('', resolve)) + } + } catch (error) { + response = { + taskId, + result: null, + error, + usedMemory: process.memoryUsage().heapUsed, + } + } + + port.postMessage(response) + })().catch(throwInNextTick) +} diff --git a/src/index.ts b/src/index.ts index 2e10e91..2fdd2ca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -36,6 +36,7 @@ import { } from './common' import ThreadWorker from './runtime/thread-worker' import ProcessWorker from './runtime/process-worker' +import BunWorker from './runtime/bun-worker' declare global { namespace NodeJS { @@ -44,6 +45,7 @@ declare global { isTinypoolWorker: boolean isWorkerThread?: boolean isChildProcess?: boolean + isBunWorker?: boolean workerData: any workerId: number } @@ -135,7 +137,7 @@ class ArrayTaskQueue implements TaskQueue { interface Options { filename?: string | null - runtime?: 'worker_threads' | 'child_process' + runtime?: 'worker_threads' | 'child_process' | 'bun_workers' name?: string minThreads?: number maxThreads?: number @@ -699,6 +701,8 @@ class ThreadPool { const worker = this.options.runtime === 'child_process' ? new ProcessWorker() + : this.options.runtime === 'bun_workers' + ? new BunWorker() : new ThreadWorker() worker.initialize({ @@ -740,6 +744,7 @@ class ThreadPool { } const { port1, port2 } = new MessageChannel() + port1.start() const workerInfo = new WorkerInfo( worker, port1, diff --git a/src/runtime/bun-worker.ts b/src/runtime/bun-worker.ts new file mode 100644 index 0000000..b9d6153 --- /dev/null +++ b/src/runtime/bun-worker.ts @@ -0,0 +1,104 @@ +import { fileURLToPath } from 'url' +import { dirname, resolve } from 'path' +import { + TransferListItem, + MessagePort as NodeMessagePort, +} from 'worker_threads' +import { TinypoolWorker, TinypoolChannel } from '../common' +import { StartupMessage } from 'tinypool' + +let ids = 1 + +export default class BunWorker implements TinypoolWorker { + name = 'BunWorker' + runtime = 'bun_workers' + worker!: Worker + threadId!: number + port?: NodeMessagePort + channel?: TinypoolChannel + waitForExit!: Promise + onExit!: () => void + + initialize(_: Parameters[0]) { + const __dirname = dirname(fileURLToPath(import.meta.url)) + + this.worker = new Worker(resolve(__dirname, './entry/bun-worker.js')) + this.threadId = ids++ + this.waitForExit = new Promise((resolve) => { + this.onExit = resolve + }) + } + + async terminate() { + this.port?.close() + this.worker.terminate() + this.onExit() + + return this.waitForExit + } + + postMessage(message: any, transferListItem?: Readonly) { + transferListItem?.forEach((item) => { + if (item instanceof NodeMessagePort) { + this.port = item + } + }) + + const channel = new MessageChannel() + + // Mirror port's messages to process + this.port!.start() + channel.port1.start() + this.port!.on('message', (message) => { + channel.port1.postMessage(message) + }) + + channel.port1.onmessage = (event) => { + this.port!.postMessage(event.data) + } + this.port!.on('close', () => { + channel.port1.close() + channel.port2.close() + }) + + return this.worker.postMessage( + { + filename: message.filename, + name: message.name, + }, + [channel.port2] + ) + } + + on(event: string, callback: (...args: any[]) => void) { + if (event === 'message') { + this.worker.onmessage = (e) => callback(e.data) + } else if (event === 'error') { + this.worker.onerror = callback + } else if (event === 'exit') { + this.waitForExit.then(callback) + } else { + throw new Error(`Unknown event: ${event}`) + } + } + + once(event: string, callback: (...args: any[]) => void) { + if (event === 'exit') { + this.waitForExit.then(callback) + } else { + throw new Error(`Unknown event: ${event}`) + } + } + + emit(event: string, ..._data: any[]) { + throw new Error(`Unknown emit event: ${event}`) + } + + ref() {} + + unref() {} + + setChannel() { + throw new Error('BunWorker does not support channel') + } +} diff --git a/tsup.config.ts b/tsup.config.ts index b8b30fe..fd4e155 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -1,6 +1,6 @@ import { defineConfig } from 'tsup' -export default defineConfig({ +export default defineConfig((mode) => ({ entryPoints: ['src/index.ts', 'src/entry/*.ts'], splitting: true, legacyOutput: true, @@ -9,5 +9,5 @@ export default defineConfig({ tsconfig: './tsconfig.json', target: 'es2020', clean: true, - dts: true, -}) + dts: mode.watch ? false : true, +}))