Skip to content

Commit

Permalink
feat: support Bun
Browse files Browse the repository at this point in the history
  • Loading branch information
AriPerkkio committed Sep 16, 2023
1 parent a5b6669 commit 6f72bda
Show file tree
Hide file tree
Showing 9 changed files with 213 additions and 49 deletions.
5 changes: 5 additions & 0 deletions benchmark/fixtures/wrap-add-bun.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import add from './add.mjs'

self.onmessage = (event) => {
postMessage(add(event.data))
}
126 changes: 105 additions & 21 deletions benchmark/isolate-benchmark.mjs
Original file line number Diff line number Diff line change
@@ -1,35 +1,60 @@
/*
* Benchmark for testing whether Tinypool's worker creation and teardown is expensive.
* Benchmark focusing on the performance `isolateWorkers` option
*
* Options:
* - `--rounds` (optional) - Specify how many iterations to run
* - `--threads` (optional) - Specify how many threads to use
*/
import { cpus } from 'node:os'
import { Worker } from 'node:worker_threads'

import * as os from 'node:os'
import * as WorkerThreads from 'node:worker_threads'

import Tinypool from '../dist/esm/index.js'

const THREADS = cpus().length - 1
const ROUNDS = 5_000
const IS_BUN = process.versions.bun !== undefined
const USE_ATOMICS = !IS_BUN
const THREADS = parseInt(getArgument('--threads') ?? getMaxThreads(), 10)
const ROUNDS = parseInt(getArgument('--rounds') ?? '5_000', 10)

console.log('Options:', { THREADS, ROUNDS, IS_BUN }, '\n')

await logTime(
"Tinypool { runtime: 'worker_threds' }",
runTinypool('worker_threds')
)
await logTime(
"Tinypool { runtime: 'child_process' }",
runTinypool('child_process')
)

await logTime('Tinypool', runTinypool)
await logTime('Worker threads', runWorkerThreads)
if (IS_BUN) {
await logTime('Native Bun workers', runBunWorkers())
}

await logTime('Native node:worker_threads', runNodeWorkerThreads())

async function runTinypool() {
function runTinypool(runtime) {
const pool = new Tinypool({
runtime,
filename: new URL('./fixtures/add.mjs', import.meta.url).href,
isolateWorkers: true,
minThreads: THREADS,
maxThreads: THREADS,
useAtomics: USE_ATOMICS,
})

await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
return async function run() {
await Promise.all(
Array(ROUNDS)
.fill()
.map(() => pool.run({ a: 1, b: 2 }))
)
}
}

async function runWorkerThreads() {
function runNodeWorkerThreads() {
async function task() {
const worker = new Worker('./fixtures/wrap-add.mjs')
const worker = new WorkerThreads.Worker('./fixtures/wrap-add.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) =>
Expand All @@ -50,16 +75,75 @@ async function runWorkerThreads() {
}
}

await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function runBunWorkers() {
async function task() {
const worker = new Worker('./fixtures/wrap-add-bun.mjs')
worker.postMessage({ a: 1, b: 2 })

await new Promise((resolve, reject) => {
worker.onmessage = (event) =>
event.data === 3 ? resolve() : reject('Not 3')
})

await worker.terminate()
}

const pool = Array(ROUNDS).fill(task)

async function execute() {
const task = pool.shift()

if (task) {
await task()
return execute()
}
}

return async function run() {
await Promise.all(
Array(THREADS)
.fill(execute)
.map((task) => task())
)
}
}

function getArgument(flag) {
const index = process.argv.indexOf(flag)
if (index === -1) return

return process.argv[index + 1]
}

function getMaxThreads() {
return os.availableParallelism?.() || os.cpus().length - 1
}

async function logTime(label, method) {
console.log(`${label} | START`)

const start = process.hrtime.bigint()
await method()
const end = process.hrtime.bigint()
console.log(label, 'took', ((end - start) / 1_000_000n).toString(), 'ms')

console.log(`${label} | END ${((end - start) / 1_000_000n).toString()} ms`)

console.log('Cooling down for 2s')
const interval = setInterval(() => process.stdout.write('.'), 100)
await sleep(2_000)
clearInterval(interval)
console.log(' ✓\n')
}

async function sleep(ms) {
await new Promise((resolve) => setTimeout(resolve, ms))
}
6 changes: 5 additions & 1 deletion src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export interface TinypoolWorker {
resourceLimits?: any
workerData?: TinypoolData
trackUnmanagedFds?: boolean
}): void
}): Promise<void>
terminate(): Promise<any>
postMessage(message: any, transferListItem?: TransferListItem[]): void
setChannel?: (channel: TinypoolChannel) => void
Expand Down Expand Up @@ -57,6 +57,10 @@ export interface RequestMessage {
name: string
}

export interface SpawnMessage {
spawned: true
}

export interface ReadyMessage {
ready: true
}
Expand Down
7 changes: 7 additions & 0 deletions src/entry/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
ReadyMessage,
RequestMessage,
ResponseMessage,
SpawnMessage,
StartupMessage,
TinypoolWorkerMessage,
} from '../common'
Expand All @@ -23,6 +24,12 @@ process.__tinypool_state__ = {
workerId: process.pid,
}

let emittedReady = false
if (!emittedReady) {
process.send!(<SpawnMessage>{ spawned: true })
emittedReady = true
}

process.on('message', (message: IncomingMessage) => {
// Message was not for port or pool
// It's likely end-users own communication between main and worker
Expand Down
7 changes: 7 additions & 0 deletions src/entry/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ parentPort!.on('message', (message: StartupMessage) => {
useAtomics =
process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics

if (useAtomics && process.versions.bun) {
const error = 'useAtomics cannot be used with Bun at the moment.'
console.error(error)
throw new Error(error)
}

const { port, sharedBuffer, filename, name } = message

;(async function () {
Expand All @@ -48,6 +54,7 @@ parentPort!.on('message', (message: StartupMessage) => {
const readyMessage: ReadyMessage = { ready: true }
parentPort!.postMessage(readyMessage)

port.start()
port.on('message', onMessage.bind(null, port, sharedBuffer))
atomicsWaitLoop(port, sharedBuffer)
})().catch(throwInNextTick)
Expand Down
36 changes: 19 additions & 17 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -658,31 +658,32 @@ class ThreadPool {
this.workers.onAvailable((w: WorkerInfo) => this._onWorkerAvailable(w))

this.startingUp = true
this._ensureMinimumWorkers()
this.startingUp = false
this._ensureMinimumWorkers().then(() => {
this.startingUp = false
})
}
_ensureEnoughWorkersForTaskQueue(): void {
async _ensureEnoughWorkersForTaskQueue(): Promise<void> {
while (
this.workers.size < this.taskQueue.size &&
this.workers.size < this.options.maxThreads
) {
this._addNewWorker()
await this._addNewWorker()
}
}

_ensureMaximumWorkers(): void {
async _ensureMaximumWorkers(): Promise<void> {
while (this.workers.size < this.options.maxThreads) {
this._addNewWorker()
await this._addNewWorker()
}
}

_ensureMinimumWorkers(): void {
async _ensureMinimumWorkers(): Promise<void> {
while (this.workers.size < this.options.minThreads) {
this._addNewWorker()
await this._addNewWorker()
}
}

_addNewWorker(): void {
async _addNewWorker(): Promise<void> {
const pool = this
const workerIds = this.workerIds

Expand All @@ -701,7 +702,7 @@ class ThreadPool {
? new ProcessWorker()
: new ThreadWorker()

worker.initialize({
await worker.initialize({
env: this.options.env,
argv: this.options.argv,
execArgv: this.options.execArgv,
Expand Down Expand Up @@ -740,6 +741,7 @@ class ThreadPool {
}

const { port1, port2 } = new MessageChannel()
port1.start()
const workerInfo = new WorkerInfo(
worker,
port1,
Expand Down Expand Up @@ -781,7 +783,7 @@ class ThreadPool {
)
})

worker.on('error', (err: Error) => {
worker.on('error', async (err: Error) => {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {}

Expand All @@ -795,7 +797,7 @@ class ThreadPool {
this._removeWorker(workerInfo)

if (workerInfo.isReady() && !this.workerFailsDuringBootstrap) {
this._ensureMinimumWorkers()
await this._ensureMinimumWorkers()
} else {
// Do not start new workers over and over if they already fail during
// bootstrap, there's no point.
Expand Down Expand Up @@ -881,7 +883,7 @@ class ThreadPool {
}
}

runTask(task: any, options: RunOptions): Promise<any> {
async runTask(task: any, options: RunOptions): Promise<any> {
let { filename, name } = options
const { transferList = [], signal = null, channel } = options

Expand Down Expand Up @@ -944,7 +946,7 @@ class ThreadPool {
if (taskInfo.workerInfo !== null) {
// Already running: We cancel the Worker this is running on.
this._removeWorker(taskInfo.workerInfo)
this._ensureMinimumWorkers()
void this._ensureMinimumWorkers()
} else {
// Not yet running: Remove it from the queue.
this.taskQueue.remove(taskInfo)
Expand All @@ -965,7 +967,7 @@ class ThreadPool {
}
} else {
if (this.workers.size < this.options.maxThreads) {
this._addNewWorker()
await this._addNewWorker()
}
this.taskQueue.push(taskInfo)
}
Expand All @@ -989,7 +991,7 @@ class ThreadPool {
(workerInfo === null || workerInfo.currentUsage() > 0) &&
this.workers.size < this.options.maxThreads
) {
this._addNewWorker()
await this._addNewWorker()
waitingForNewWorker = true
}

Expand Down Expand Up @@ -1099,7 +1101,7 @@ class ThreadPool {

await Promise.all(exitEvents)

this._ensureMinimumWorkers()
await this._ensureMinimumWorkers()
}
}

Expand Down
Loading

0 comments on commit 6f72bda

Please sign in to comment.