Skip to content

Commit

Permalink
avoid some unnecessary work
Browse files Browse the repository at this point in the history
  • Loading branch information
Sheraff committed Jun 29, 2024
1 parent b73e481 commit c863cb6
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions src/v3/lib/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ export class Queue<
#willRun = false
#sleepTimeout: NodeJS.Timeout | null = null

#drain(): void | Promise<void> {
/** @returns `true` if it managed to drain the queue */
#drain(): boolean | Promise<boolean> {
return this.storage.startNextTask(this.id, (result) => {
if (!result) return
if (!result) return true

const [task, steps, hasNext] = result
const job = this.jobs[task.job]
Expand All @@ -153,8 +154,12 @@ export class Queue<
this.#loop()
})

if (hasNext && this.#running.size < this.parallel) return this.#drain()
}) as void | Promise<void>
if (!hasNext) return true
if (this.#running.size >= this.parallel) return false
if (this.#closed) return false

return this.#drain()
}) as boolean | Promise<boolean>
}

// TODO: should this be public? Might be useful for cases where multiple queue workers are writing to the same storage, and we don't want to poll and would rather have a manual trigger.
Expand All @@ -168,8 +173,9 @@ export class Queue<
}
setImmediate(async () => {
if (this.#closed) return
await this.#drain()
const drained = await this.#drain()
this.#willRun = false
if (!drained || this.#closed) return // can't do anything else, we're at the limit of `parallel` jobs
setImmediate(() => {
if (this.#willRun || this.#closed) return
this.storage.nextFutureTask(this.id, (result) => {
Expand Down

0 comments on commit c863cb6

Please sign in to comment.