Skip to content

Commit

Permalink
better setup for async storage interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheraff committed Jun 29, 2024
1 parent 08dda52 commit 4ea30cc
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 22 deletions.
25 changes: 20 additions & 5 deletions src/v3/lib/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Pipe } from "./pipe"
import { exec, Job } from "./job"
import type { Storage } from "./storage"
import { registration, type RegistrationContext } from "./context"
import { hash, serializeError } from "./utils"
import { isPromise, serializeError } from "./utils"

type SafeKeys<K extends string> = { [k in K]: { id: k } }

Expand All @@ -24,6 +24,8 @@ export class Queue<
public readonly storage: Storage
/** @public */
public readonly parallel: number
/** @public */
public readonly ready: Promise<void>

constructor(opts: {
id: string,
Expand Down Expand Up @@ -69,11 +71,24 @@ export class Queue<
])) as Pipes
}

for (const job of Object.values(this.jobs)) {
job.start()
const init = this.storage.init()
if (isPromise(init)) {
this.ready = init
init.then(() => {
for (const job of Object.values(this.jobs)) {
job.start()
}
this.#loop()
this.#cron(opts.cronScheduler)
})
} else {
this.ready = Promise.resolve()
for (const job of Object.values(this.jobs)) {
job.start()
}
this.#loop()
this.#cron(opts.cronScheduler)
}
this.#loop()
this.#cron(opts.cronScheduler)
}

#registrationContext: RegistrationContext = {
Expand Down
53 changes: 36 additions & 17 deletions src/v3/lib/storage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import BetterSqlite3 from "better-sqlite3"

type TaskStatus =
export type TaskStatus =
/** task is ready to be picked up */
| 'pending'
/** task is being processed, do not pick up */
Expand Down Expand Up @@ -32,7 +32,7 @@ export type Task = {
data: string | null
}

type StepStatus =
export type StepStatus =
/** step is a promise, currently resolving */
| 'running'
/** step ran (at least) once, but needs to re-run */
Expand Down Expand Up @@ -84,6 +84,8 @@ export type Step = {
}

export interface Storage {
/** sets up the db, useful for async version of storage as a continuation of the constructor */
init(): void | Promise<void>
/** Close the database connection if any. This should only close it if the database is not external to the Storage instance */
close(): void | Promise<void>
/** Simply return the full Task based on unique index queue+job+key */
Expand Down Expand Up @@ -146,6 +148,11 @@ export interface Storage {
export class SQLiteStorage implements Storage {
#db: BetterSqlite3.Database
#externalDb: boolean
#initialized = false

#tasksTable: string
#stepsTable: string
#eventsTable: string

constructor({
db,
Expand All @@ -162,20 +169,32 @@ export class SQLiteStorage implements Storage {
} = {}) {
if (!db || typeof db === 'string') {
this.#db = new BetterSqlite3(db)
this.#externalDb = false
} else {
this.#db = db
this.#externalDb = true
}

this.#tasksTable = typeof tables === 'object' ? tables.tasks ?? 'tasks' : tables ? `${tables}_tasks` : 'tasks'
this.#stepsTable = typeof tables === 'object' ? tables.steps ?? 'steps' : tables ? `${tables}_steps` : 'steps'
this.#eventsTable = typeof tables === 'object' ? tables.events ?? 'events' : tables ? `${tables}_events` : 'events'
}

init(): void {
if (this.#initialized) return
this.#initialized = true

if (!this.#externalDb) {
this.#db.pragma('journal_mode = WAL')
this.#db.pragma('busy_timeout = 100')
this.#db.pragma('synchronous = NORMAL')
this.#db.pragma('cache_size = 2000')
this.#db.pragma('temp_store = MEMORY')
this.#externalDb = false
} else {
this.#db = db
this.#externalDb = true
}

const tasksTable = typeof tables === 'object' ? tables.tasks : tables ? `${tables}_tasks` : 'tasks'
const stepsTable = typeof tables === 'object' ? tables.steps : tables ? `${tables}_steps` : 'steps'
const eventsTable = typeof tables === 'object' ? tables.events : tables ? `${tables}_events` : 'events'
const tasksTable = this.#tasksTable
const stepsTable = this.#stepsTable
const eventsTable = this.#eventsTable

this.#db.exec(/* sql */ `
CREATE TABLE IF NOT EXISTS ${tasksTable} (
Expand Down Expand Up @@ -752,12 +771,12 @@ export class SQLiteStorage implements Storage {
`)
}

#getTaskStmt: BetterSqlite3.Statement<{ queue: string, job: string, key: string }, Task | undefined>
#getNextTaskTx: BetterSqlite3.Transaction<(queue: string) => [task: Task, steps: Step[], hasNext: boolean] | undefined>
#getNextFutureTaskStmt: BetterSqlite3.Statement<{ queue: string }, { ms: number | null }>
#resolveTaskStmt: BetterSqlite3.Statement<{ queue: string, job: string, key: string, status: TaskStatus, data: string | null }>
#loopTaskStmt: BetterSqlite3.Statement<{ queue: string, job: string, key: string }>
#addTaskTx: BetterSqlite3.Transaction<(task: {
#getTaskStmt!: BetterSqlite3.Statement<{ queue: string, job: string, key: string }, Task | undefined>
#getNextTaskTx!: BetterSqlite3.Transaction<(queue: string) => [task: Task, steps: Step[], hasNext: boolean] | undefined>
#getNextFutureTaskStmt!: BetterSqlite3.Statement<{ queue: string }, { ms: number | null }>
#resolveTaskStmt!: BetterSqlite3.Statement<{ queue: string, job: string, key: string, status: TaskStatus, data: string | null }>
#loopTaskStmt!: BetterSqlite3.Statement<{ queue: string, job: string, key: string }>
#addTaskTx!: BetterSqlite3.Transaction<(task: {
queue: string,
job: string,
key: string,
Expand All @@ -769,7 +788,7 @@ export class SQLiteStorage implements Storage {
throttle: { s: number, id: string } | null
rateLimit: { s: number, id: string } | null
}) => [rateLimitMs: number | null, inserted: boolean, cancelled?: Task]>
#recordStepStmt: BetterSqlite3.Statement<{
#recordStepStmt!: BetterSqlite3.Statement<{
queue: string
job: string
key: string
Expand All @@ -785,7 +804,7 @@ export class SQLiteStorage implements Storage {
wait_retroactive: number | null
data: string | null
}>
#recordEventStmt: BetterSqlite3.Statement<{ queue: string, key: string, input: string, data: string }>
#recordEventStmt!: BetterSqlite3.Statement<{ queue: string, key: string, input: string, data: string }>

getTask<T>(queue: string, job: string, key: string, cb: (task: Task | undefined) => T): T {
const task = this.#getTaskStmt.get({ queue, job, key })
Expand Down

0 comments on commit 4ea30cc

Please sign in to comment.