Skip to content

Commit

Permalink
fix(worker): set blockTimeout as 0.001 when reach the time to get del…
Browse files Browse the repository at this point in the history
…ayed jobs (#2455) fixes #2450
  • Loading branch information
roggervalf authored Mar 1, 2024
1 parent 82dfc48 commit 2de15ca
Showing 1 changed file with 33 additions and 23 deletions.
56 changes: 33 additions & 23 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -595,37 +595,30 @@ export class Worker<
}

try {
const opts: WorkerOptions = <WorkerOptions>this.opts;

if (!this.closing) {
let blockTimeout = Math.max(
blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay,
0,
);
let blockTimeout = this.getBlockTimeout(blockUntil);

// Blocking for less than 50ms is useless.
if (blockTimeout > 0.05) {
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);
// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);

// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);

if (result) {
const [_key, member, score] = result;
if (result) {
const [_key, member, score] = result;

if (member) {
return parseInt(score);
}
if (member) {
return parseInt(score);
}
}

return 0;
}
} catch (error) {
Expand All @@ -641,6 +634,23 @@ export class Worker<
return Infinity;
}

protected getBlockTimeout(blockUntil: number): number {
const opts: WorkerOptions = <WorkerOptions>this.opts;

// when there are delayed jobs
if (blockUntil) {
const blockDelay = blockUntil - Date.now();
// when we reach the time to get new jobs
if (blockDelay < 1) {
return 0.001;
} else {
return blockDelay / 1000;
}
} else {
return Math.max(opts.drainDelay, 0);
}
}

/**
*
* This function is exposed only for testing purposes.
Expand Down

0 comments on commit 2de15ca

Please sign in to comment.