Skip to content

Commit

Permalink
feat(dialect-bun-worker): function to create custom OnMessageCallback…
Browse files Browse the repository at this point in the history
… in worker, add stream support

stream support requires `Bun@^1.1.31`
  • Loading branch information
subframe7536 committed Oct 29, 2024
1 parent 611ad1c commit 417504c
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 55 deletions.
24 changes: 24 additions & 0 deletions packages/dialect-bun-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,30 @@ From `v0.7.0`, this dialect requires `Bun@^1.1.14`
bun install kysely kysely-bun-worker
```

## Usage

```ts
import { BunWorkerDialect } from 'kysely-bun-worker'

const dialect = new SqliteWorkerDialect({
url: ':memory:',
})
```

### Custom Worker

in `worker.ts`

```ts
import { createOnMessageCallback } from 'kysely-bun-worker'

onmessage = createOnMessageCallback(
async (db) => {
db.loadExtension(/* ... */)
}
)
```

## Config

```ts
Expand Down
54 changes: 47 additions & 7 deletions packages/dialect-bun-worker/src/driver.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { DatabaseConnection, Driver, QueryResult } from 'kysely'
import type { BunWorkerDialectConfig, EventWithError, MainMsg, WorkerMsg } from './type'
import type { BunWorkerDialectConfig, EventWithError, MainToWorkerMsg, WorkerToMainMsg } from './type'
import { EventEmitter } from 'node:events'
import { CompiledQuery, SelectQueryNode } from 'kysely'

Expand All @@ -18,14 +18,14 @@ export class BunWorkerDriver implements Driver {
{ type: 'module' },
)
this.mitt = new EventEmitter()
this.worker.onmessage = ({ data: [type, data, err] }: MessageEvent<WorkerMsg>) => {
this.worker.onmessage = ({ data: [type, data, err] }: MessageEvent<WorkerToMainMsg>) => {
this.mitt?.emit(type, data, err)
}
this.worker.postMessage([
0, // init
this.config?.url,
this.config?.cacheStatment,
] satisfies MainMsg)
] satisfies MainToWorkerMsg)
await new Promise<void>((resolve, reject) => {
this.mitt?.once(0/* init */, (_, err) => err ? reject(err) : resolve())
})
Expand Down Expand Up @@ -61,7 +61,7 @@ export class BunWorkerDriver implements Driver {
if (!this.worker) {
return
}
this.worker.postMessage([2] satisfies MainMsg)
this.worker.postMessage([2] satisfies MainToWorkerMsg)
return new Promise<void>((resolve, reject) => {
this.mitt?.once(2/* close */, (_, err) => {
if (err) {
Expand Down Expand Up @@ -107,14 +107,54 @@ class BunWorkerConnection implements DatabaseConnection {
private mitt?: EventEmitter<EventWithError>,
) { }

streamQuery<R>(): AsyncIterableIterator<QueryResult<R>> {
throw new Error('Bun:sqlite-worker driver doesn\'t support streaming')
async *streamQuery<R>(compiledQuery: CompiledQuery): AsyncIterableIterator<QueryResult<R>> {
const { parameters, sql, query } = compiledQuery
if (!SelectQueryNode.is(query)) {
throw new Error('WaSqlite dialect only supported SELECT queries')
}
this.worker.postMessage([3, sql, parameters] satisfies MainToWorkerMsg)
let resolver: ((value: IteratorResult<{ rows: QueryResult<R>[] }>) => void) | null = null
let rejecter: ((reason: any) => void) | null = null

this.mitt!.on(3, (data, err) => {
if (err && rejecter) {
rejecter(err)
}
if (resolver) {
resolver({ value: { rows: data! }, done: false })
resolver = null
}
})

this.mitt!.on(4, (_, err) => {
if (err && rejecter) {
rejecter(err)
}
if (resolver) {
resolver({ value: undefined, done: true })
}
})

return {
[Symbol.asyncIterator]() {
return this
},
async next() {
return new Promise<IteratorResult<any>>((resolve, reject) => {
resolver = resolve
rejecter = reject
})
},
async return() {
return { value: undefined, done: true }
},
}
}

async executeQuery<R>(compiledQuery: CompiledQuery<unknown>): Promise<QueryResult<R>> {
const { parameters, sql, query } = compiledQuery
const isSelect = SelectQueryNode.is(query)
this.worker.postMessage([1/* run */, isSelect, sql, parameters] satisfies MainMsg)
this.worker.postMessage([1/* run */, isSelect, sql, parameters] satisfies MainToWorkerMsg)
return new Promise((resolve, reject) => {
if (!this.mitt) {
reject(new Error('kysely instance has been destroyed'))
Expand Down
2 changes: 2 additions & 0 deletions packages/dialect-bun-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { BunWorkerDialectConfig } from './type'
import { SqliteAdapter, SqliteIntrospector, SqliteQueryCompiler } from 'kysely'
import { BunWorkerDriver } from './driver'

export { createOnMessageCallback } from './worker/utils'

export class BunWorkerDialect implements Dialect {
/**
* dialect for `bun:sqlite`, run sql in worker
Expand Down
12 changes: 10 additions & 2 deletions packages/dialect-bun-worker/src/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,24 @@ type InitMsg = [
cache?: boolean,
]

export type MainMsg = InitMsg | RunMsg | CloseMsg
type StreamMsg = [
type: 3,
sql: string,
parameters?: readonly unknown[],
]

export type MainToWorkerMsg = InitMsg | RunMsg | CloseMsg | StreamMsg

export type WorkerMsg = {
export type WorkerToMainMsg = {
[K in keyof Events]: [ type: K, data: Events[K], err: unknown ]
}[keyof Events]

type Events = {
0: null
1: QueryResult<any> | null
2: null
3: QueryResult<any>[] | null
4: null
}

export type EventWithError = {
Expand Down
46 changes: 0 additions & 46 deletions packages/dialect-bun-worker/src/worker.ts

This file was deleted.

3 changes: 3 additions & 0 deletions packages/dialect-bun-worker/src/worker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { createOnMessageCallback } from './utils'

onmessage = createOnMessageCallback()
77 changes: 77 additions & 0 deletions packages/dialect-bun-worker/src/worker/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import type { QueryResult } from 'kysely'
import type { MainToWorkerMsg, WorkerToMainMsg } from '../type'
import Database from 'bun:sqlite'

let db: Database
let fn: 'query' | 'prepare'
function run(isSelect: boolean, sql: string, parameters: readonly unknown[]): QueryResult<any> {
const stmt = db[fn](sql)
const rows = stmt.all(parameters as any)

if (isSelect || rows.length) {
return { rows }
}
const { changes, lastInsertRowid } = db.query('SELECT 1').run()
return {
rows,
insertId: BigInt(changes),
numAffectedRows: BigInt(lastInsertRowid),
}
}

function stream(onData: (data: any) => void, sql: string, parameters?: readonly unknown[]): void {
const stmt = db[fn](sql)
if (!('iterate' in stmt)) {
throw new Error('Streaming not supported, please upgrade to Bun@^1.1.31')
}
for (const row of stmt.iterate(...parameters as any)) {
onData(row)
}
}

/**
* Handle worker message, support custom callback on initialization
* @example
* // worker.ts
* import { createOnMessageCallback } from 'kysely-bun-worker'
*
* createOnMessageCallback(
* async (db) => {
* db.loadExtension(...)
* }
* )
*/
export function createOnMessageCallback(
onInit?: (db: typeof Database) => void,
): (event: MessageEvent<MainToWorkerMsg>) => void {
return ({ data: [type, data1, data2, data3] }: MessageEvent<MainToWorkerMsg>) => {
const ret: WorkerToMainMsg = [
type,
null,
null,
]

try {
switch (type) {
case 0:
db = new Database(data1, { create: true })
onInit?.(db as any)
fn = data2 ? 'query' : 'prepare'
break
case 1:
ret[1] = run(data1, data2, data3 || [])
break
case 2:
db.close()
break
case 3:
stream(val => postMessage([3, [val], null] satisfies WorkerToMainMsg), data1, data2)
ret[0] = 4
break
}
} catch (error) {
ret[2] = error
}
postMessage(ret)
}
}

0 comments on commit 417504c

Please sign in to comment.