diff --git a/src/v3/lib/queue.ts b/src/v3/lib/queue.ts index 015cb3e..1bc8a7d 100644 --- a/src/v3/lib/queue.ts +++ b/src/v3/lib/queue.ts @@ -2,7 +2,7 @@ import { Pipe } from "./pipe" import { exec, Job } from "./job" import type { Storage } from "./storage" import { registration, type RegistrationContext } from "./context" -import { hash, serializeError } from "./utils" +import { isPromise, serializeError } from "./utils" type SafeKeys = { [k in K]: { id: k } } @@ -24,6 +24,8 @@ export class Queue< public readonly storage: Storage /** @public */ public readonly parallel: number + /** @public */ + public readonly ready: Promise constructor(opts: { id: string, @@ -69,11 +71,24 @@ export class Queue< ])) as Pipes } - for (const job of Object.values(this.jobs)) { - job.start() + const init = this.storage.init() + if (isPromise(init)) { + this.ready = init + init.then(() => { + for (const job of Object.values(this.jobs)) { + job.start() + } + this.#loop() + this.#cron(opts.cronScheduler) + }) + } else { + this.ready = Promise.resolve() + for (const job of Object.values(this.jobs)) { + job.start() + } + this.#loop() + this.#cron(opts.cronScheduler) } - this.#loop() - this.#cron(opts.cronScheduler) } #registrationContext: RegistrationContext = { diff --git a/src/v3/lib/storage.ts b/src/v3/lib/storage.ts index 339e8e2..e66a0c7 100644 --- a/src/v3/lib/storage.ts +++ b/src/v3/lib/storage.ts @@ -1,6 +1,6 @@ import BetterSqlite3 from "better-sqlite3" -type TaskStatus = +export type TaskStatus = /** task is ready to be picked up */ | 'pending' /** task is being processed, do not pick up */ @@ -32,7 +32,7 @@ export type Task = { data: string | null } -type StepStatus = +export type StepStatus = /** step is a promise, currently resolving */ | 'running' /** step ran (at least) once, but needs to re-run */ @@ -84,6 +84,8 @@ export type Step = { } export interface Storage { + /** sets up the db, useful for async version of storage as a continuation of the constructor */ + init(): void | Promise /** Close the database connection if any. This should only close it if the database is not external to the Storage instance */ close(): void | Promise /** Simply return the full Task based on unique index queue+job+key */ @@ -146,6 +148,11 @@ export interface Storage { export class SQLiteStorage implements Storage { #db: BetterSqlite3.Database #externalDb: boolean + #initialized = false + + #tasksTable: string + #stepsTable: string + #eventsTable: string constructor({ db, @@ -162,20 +169,32 @@ export class SQLiteStorage implements Storage { } = {}) { if (!db || typeof db === 'string') { this.#db = new BetterSqlite3(db) + this.#externalDb = false + } else { + this.#db = db + this.#externalDb = true + } + + this.#tasksTable = typeof tables === 'object' ? tables.tasks ?? 'tasks' : tables ? `${tables}_tasks` : 'tasks' + this.#stepsTable = typeof tables === 'object' ? tables.steps ?? 'steps' : tables ? `${tables}_steps` : 'steps' + this.#eventsTable = typeof tables === 'object' ? tables.events ?? 'events' : tables ? `${tables}_events` : 'events' + } + + init(): void { + if (this.#initialized) return + this.#initialized = true + + if (!this.#externalDb) { this.#db.pragma('journal_mode = WAL') this.#db.pragma('busy_timeout = 100') this.#db.pragma('synchronous = NORMAL') this.#db.pragma('cache_size = 2000') this.#db.pragma('temp_store = MEMORY') - this.#externalDb = false - } else { - this.#db = db - this.#externalDb = true } - const tasksTable = typeof tables === 'object' ? tables.tasks : tables ? `${tables}_tasks` : 'tasks' - const stepsTable = typeof tables === 'object' ? tables.steps : tables ? `${tables}_steps` : 'steps' - const eventsTable = typeof tables === 'object' ? tables.events : tables ? `${tables}_events` : 'events' + const tasksTable = this.#tasksTable + const stepsTable = this.#stepsTable + const eventsTable = this.#eventsTable this.#db.exec(/* sql */ ` CREATE TABLE IF NOT EXISTS ${tasksTable} ( @@ -752,12 +771,12 @@ export class SQLiteStorage implements Storage { `) } - #getTaskStmt: BetterSqlite3.Statement<{ queue: string, job: string, key: string }, Task | undefined> - #getNextTaskTx: BetterSqlite3.Transaction<(queue: string) => [task: Task, steps: Step[], hasNext: boolean] | undefined> - #getNextFutureTaskStmt: BetterSqlite3.Statement<{ queue: string }, { ms: number | null }> - #resolveTaskStmt: BetterSqlite3.Statement<{ queue: string, job: string, key: string, status: TaskStatus, data: string | null }> - #loopTaskStmt: BetterSqlite3.Statement<{ queue: string, job: string, key: string }> - #addTaskTx: BetterSqlite3.Transaction<(task: { + #getTaskStmt!: BetterSqlite3.Statement<{ queue: string, job: string, key: string }, Task | undefined> + #getNextTaskTx!: BetterSqlite3.Transaction<(queue: string) => [task: Task, steps: Step[], hasNext: boolean] | undefined> + #getNextFutureTaskStmt!: BetterSqlite3.Statement<{ queue: string }, { ms: number | null }> + #resolveTaskStmt!: BetterSqlite3.Statement<{ queue: string, job: string, key: string, status: TaskStatus, data: string | null }> + #loopTaskStmt!: BetterSqlite3.Statement<{ queue: string, job: string, key: string }> + #addTaskTx!: BetterSqlite3.Transaction<(task: { queue: string, job: string, key: string, @@ -769,7 +788,7 @@ export class SQLiteStorage implements Storage { throttle: { s: number, id: string } | null rateLimit: { s: number, id: string } | null }) => [rateLimitMs: number | null, inserted: boolean, cancelled?: Task]> - #recordStepStmt: BetterSqlite3.Statement<{ + #recordStepStmt!: BetterSqlite3.Statement<{ queue: string job: string key: string @@ -785,7 +804,7 @@ export class SQLiteStorage implements Storage { wait_retroactive: number | null data: string | null }> - #recordEventStmt: BetterSqlite3.Statement<{ queue: string, key: string, input: string, data: string }> + #recordEventStmt!: BetterSqlite3.Statement<{ queue: string, key: string, input: string, data: string }> getTask(queue: string, job: string, key: string, cb: (task: Task | undefined) => T): T { const task = this.#getTaskStmt.get({ queue, job, key })