Skip to content

Commit

Permalink
feat: support Bun Worker runtime
Browse files Browse the repository at this point in the history
- Instead of using Node APIs like in tinylibs#70, use Bun's native Workers API
  • Loading branch information
AriPerkkio committed Sep 26, 2023
1 parent a5b6669 commit 8c10873
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 25 deletions.
5 changes: 5 additions & 0 deletions benchmark/fixtures/wrap-add-bun.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import add from './add.mjs'

self.onmessage = (event) => {
postMessage(add(event.data))
}
132 changes: 111 additions & 21 deletions benchmark/isolate-benchmark.mjs
Original file line number Diff line number Diff line change
@@ -1,35 +1,66 @@
/*
* Benchmark for testing whether Tinypool's worker creation and teardown is expensive.
* Benchmark focusing on the performance `isolateWorkers` option
*
* Options:
* - `--rounds` (optional) - Specify how many iterations to run
* - `--threads` (optional) - Specify how many threads to use
*/
import { cpus } from 'node:os'
import { Worker } from 'node:worker_threads'

import * as os from 'node:os'
import * as WorkerThreads from 'node:worker_threads'

import Tinypool from '../dist/esm/index.js'

const THREADS = cpus().length - 1
const ROUNDS = 5_000
const IS_BUN = process.versions.bun !== undefined
const USE_ATOMICS = !IS_BUN
const THREADS = parseInt(getArgument('--threads') ?? getMaxThreads(), 10)
const ROUNDS = parseInt(getArgument('--rounds') ?? '5_000', 10)

console.log('Options:', { THREADS, ROUNDS, IS_BUN }, '\n')

if (IS_BUN) {
await logTime(
"Tinypool { runtime: 'bun_workers' }",
runTinypool('bun_workers')
)

await logTime('Native Bun workers', runBunWorkers())
process.exit(0)
}

await logTime(
"Tinypool { runtime: 'worker_threds' }",
runTinypool('worker_threds')
)
await logTime(
"Tinypool { runtime: 'child_process' }",
runTinypool('child_process')
)

await logTime('Tinypool', runTinypool)
await logTime('Worker threads', runWorkerThreads)
await logTime('Native node:worker_threads', runNodeWorkerThreads())

async function runTinypool() {
function runTinypool(runtime) {
const pool = new Tinypool({
runtime,
filename: new URL('./fixtures/add.mjs', import.meta.url).href,
isolateWorkers: true,
minThreads: THREADS,
maxThreads: THREADS,
useAtomics: USE_ATOMICS,
})

await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
return async function run() {
await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
}
}

async function runWorkerThreads() {
function runNodeWorkerThreads() {
async function task() {
const worker = new Worker('./fixtures/wrap-add.mjs')
const worker = new WorkerThreads.Worker('./fixtures/wrap-add.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) =>
Expand All @@ -50,16 +81,75 @@ async function runWorkerThreads() {
}
}

await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function runBunWorkers() {
async function task() {
const worker = new Worker('./fixtures/wrap-add-bun.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) => {
worker.onmessage = (event) =>
event.data === 3 ? resolve() : reject('Not 3')
})

await worker.terminate()
}

const pool = Array(ROUNDS).fill(task)

async function execute() {
const task = pool.shift()

if (task) {
await task()
return execute()
}
}

return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function getArgument(flag) {
const index = process.argv.indexOf(flag)
if (index === -1) return

return process.argv[index + 1]
}

function getMaxThreads() {
return os.availableParallelism?.() || os.cpus().length - 1
}

async function logTime(label, method) {
console.log(`${label} | START`)

const start = process.hrtime.bigint()
await method()
const end = process.hrtime.bigint()
console.log(label, 'took', ((end - start) / 1_000_000n).toString(), 'ms')

console.log(`${label} | END ${((end - start) / 1_000_000n).toString()} ms`)

console.log('Cooling down for 2s')
const interval = setInterval(() => process.stdout.write('.'), 100)
await sleep(2_000)
clearInterval(interval)
console.log(' ✓\n')
}

async function sleep(ms) {
await new Promise((resolve) => setTimeout(resolve, ms))
}
77 changes: 77 additions & 0 deletions src/entry/bun-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import {
StartupMessage,
ReadyMessage,
RequestMessage,
ResponseMessage,
} from '../common'
import { getHandler, throwInNextTick } from './utils'
import { stderr, stdout } from 'src/utils'

process.__tinypool_state__ = {
isTinypoolWorker: true,
workerData: null,
workerId: 1,
}

self.onmessage = onWorkerMessage

function onWorkerMessage(event: MessageEvent<StartupMessage>) {
const { filename, name } = event.data

;(async function () {
if (filename !== null) {
await getHandler(filename, name)
}

const readyMessage: ReadyMessage = { ready: true }
self.postMessage(readyMessage, '')
})().catch(throwInNextTick)

if (event.ports?.[0]) {
event.ports[0].start()
event.ports[0].onmessage = onPortMessage.bind(null, event.ports[0])
}
}

function onPortMessage(port: MessagePort, event: MessageEvent<RequestMessage>) {
const message = event.data
const { taskId, task, filename, name } = message

;(async function () {
let response: ResponseMessage

try {
const handler = await getHandler(filename, name)
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`)
}
let result = await handler(task)
response = {
taskId,
result: result,
error: null,
usedMemory: process.memoryUsage().heapUsed,
}

// If the task used e.g. console.log(), wait for the stream to drain
// before potentially entering the `Atomics.wait()` loop, and before
// returning the result so that messages will always be printed even
// if the process would otherwise be ready to exit.
if (stdout()?.writableLength! > 0) {
await new Promise((resolve) => process.stdout.write('', resolve))
}
if (stderr()?.writableLength! > 0) {
await new Promise((resolve) => process.stderr.write('', resolve))
}
} catch (error) {
response = {
taskId,
result: null,
error,
usedMemory: process.memoryUsage().heapUsed,
}
}

port.postMessage(response)
})().catch(throwInNextTick)
}
5 changes: 4 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
} from './common'
import ThreadWorker from './runtime/thread-worker'
import ProcessWorker from './runtime/process-worker'
import BunWorker from './runtime/bun-worker'

declare global {
namespace NodeJS {
Expand Down Expand Up @@ -135,7 +136,7 @@ class ArrayTaskQueue implements TaskQueue {

interface Options {
filename?: string | null
runtime?: 'worker_threads' | 'child_process'
runtime?: 'worker_threads' | 'child_process' | 'bun_workers'
name?: string
minThreads?: number
maxThreads?: number
Expand Down Expand Up @@ -699,6 +700,8 @@ class ThreadPool {
const worker =
this.options.runtime === 'child_process'
? new ProcessWorker()
: this.options.runtime === 'bun_workers'
? new BunWorker()
: new ThreadWorker()

worker.initialize({
Expand Down
76 changes: 76 additions & 0 deletions src/runtime/bun-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { fileURLToPath } from 'url'
import { dirname, resolve } from 'path'
import { TransferListItem } from 'worker_threads'
import { TinypoolWorker, TinypoolChannel } from '../common'

let ids = 1

export default class BunWorker implements TinypoolWorker {
name = 'BunWorker'
runtime = 'bun_workers'
worker!: Worker
threadId!: number
port?: MessagePort
channel?: TinypoolChannel
waitForExit!: Promise<void>
onExit!: () => void

initialize(_: Parameters<TinypoolWorker['initialize']>[0]) {
const __dirname = dirname(fileURLToPath(import.meta.url))

this.worker = new Worker(resolve(__dirname, './entry/bun-worker.js'))
this.threadId = ids++
this.waitForExit = new Promise((resolve) => {
this.onExit = resolve
})
}

async terminate() {
console.log('Terminating')
this.worker.terminate()
this.onExit()

return this.waitForExit
}

postMessage(message: any, transferListItem?: Readonly<TransferListItem[]>) {
return this.worker.postMessage(
message,
getOnlyMessagePorts(transferListItem)
)
}

on(event: string, callback: (...args: any[]) => void) {
if (event === 'message') {
this.worker.onmessage = (e) => callback(e.data)
}
if (event === 'error') {
this.worker.onerror = callback
}
if (event === 'exit') {
this.waitForExit.then(callback)
}
}

once(event: string, callback: (...args: any[]) => void) {
if (event === 'exit') {
this.waitForExit.then(callback)
}
}

emit(_event: string, ..._data: any[]) {}

ref() {}

unref() {}

setChannel() {
throw new Error('BunWorker does not support channel')
}
}

function getOnlyMessagePorts(list?: Readonly<unknown[]>): MessagePort[] {
return (list || []).filter(
(item): item is MessagePort => item instanceof MessagePort
)
}
6 changes: 3 additions & 3 deletions tsup.config.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { defineConfig } from 'tsup'

export default defineConfig({
export default defineConfig((mode) => ({
entryPoints: ['src/index.ts', 'src/entry/*.ts'],
splitting: true,
legacyOutput: true,
Expand All @@ -9,5 +9,5 @@ export default defineConfig({
tsconfig: './tsconfig.json',
target: 'es2020',
clean: true,
dts: true,
})
dts: mode.watch ? false : true,
}))

0 comments on commit 8c10873

Please sign in to comment.