Skip to content

Commit

Permalink
fix(worker): use 0.002 as minimum timeout for redis version lower tha…
Browse files Browse the repository at this point in the history
…n 7.0.8 (#2515) fixes #2466
  • Loading branch information
roggervalf authored Apr 10, 2024
1 parent 9c25995 commit 44f7d21
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 2 deletions.
3 changes: 3 additions & 0 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const deprecationMessage =

interface RedisCapabilities {
canDoubleTimeout: boolean;
canBlockFor1Ms: boolean;
}

export interface RawCommand {
Expand All @@ -39,6 +40,7 @@ export class RedisConnection extends EventEmitter {
closing: boolean;
capabilities: RedisCapabilities = {
canDoubleTimeout: false,
canBlockFor1Ms: true,
};

status: 'initializing' | 'ready' | 'closing' | 'closed' = 'initializing';
Expand Down Expand Up @@ -251,6 +253,7 @@ export class RedisConnection extends EventEmitter {

this.capabilities = {
canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'),
canBlockFor1Ms: !isRedisVersionLowerThan(this.version, '7.0.8'),
};

this.status = 'ready';
Expand Down
10 changes: 8 additions & 2 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,12 @@ export class Worker<
);
}

get minimumBlockTimeout(): number {
return this.blockingConnection.capabilities.canBlockFor1Ms
? minimumBlockTimeout
: 0.002;
}

protected async moveToActive(
client: RedisClient,
token: string,
Expand Down Expand Up @@ -653,7 +659,7 @@ export class Worker<
if (blockUntil) {
const blockDelay = blockUntil - Date.now();
// when we reach the time to get new jobs
if (blockDelay < 1) {
if (blockDelay < this.minimumBlockTimeout * 1000) {
blockTimeout = minimumBlockTimeout;
} else {
blockTimeout = blockDelay / 1000;
Expand All @@ -664,7 +670,7 @@ export class Worker<
// reference: https://github.com/taskforcesh/bullmq/issues/1658
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);
} else {
blockTimeout = Math.max(opts.drainDelay, minimumBlockTimeout);
blockTimeout = Math.max(opts.drainDelay, this.minimumBlockTimeout);
}

return blockTimeout;
Expand Down
19 changes: 19 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,25 @@ describe('workers', function () {
await worker.close();
});

describe('when 0.002 is used as blocktimeout', () => {
it('should not block forever', async () => {
const worker = new Worker(queueName, async () => {}, {
connection,
prefix,
});
await worker.waitUntilReady();
const client = await worker.client;
if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) {
await client.bzpopmin(`key`, 0.002);
} else {
await client.bzpopmin(`key`, 0.001);
}

expect(true).to.be.true;
await worker.close();
});
});

describe('when closing a worker', () => {
it('process a job that throws an exception after worker close', async () => {
const jobError = new Error('Job Failed');
Expand Down

0 comments on commit 44f7d21

Please sign in to comment.