Skip to content

Commit

Permalink
feat(queue): add removeRateLimitKey method (#2806)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Nov 19, 2024
1 parent 8204ea3 commit ff70613
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 2 deletions.
17 changes: 17 additions & 0 deletions docs/gitbook/guide/rate-limiting.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,24 @@ if (ttl > 0) {
}
```

### Remove Rate Limit Key

Sometimes is useful to stop a rate limit delay.

For this purpose, you can use the **`removeRateLimitKey`** method like this:

```typescript
import { Queue } from 'bullmq';

const queue = new Queue('myQueue', { connection });

await queue.removeRateLimitKey();
```

By removing rate limit key, workers will be able to pick jobs again and your rate limit counter is reset to zero.

## Read more:

- 💡 [Rate Limit API Reference](https://api.docs.bullmq.io/classes/v5.Worker.html#rateLimit)
- 💡 [Get Rate Limit Ttl API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#getRateLimitTtl)
- 💡 [Remove Rate Limit Key API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#removeRateLimitKey)
9 changes: 9 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,15 @@ export class Queue<
);
}

/**
* Removes rate limit key.
*/
async removeRateLimitKey(): Promise<number> {
const client = await this.client;

return client.del(this.keys.limiter);
}

/**
* Removes a repeatable job by its key. Note that the key is the one used
* to store the repeatable job metadata and not one of the job iterations
Expand Down
2 changes: 0 additions & 2 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,13 @@ export interface WorkerOptions extends QueueBaseOptions, SandboxedOptions {
skipLockRenewal?: boolean;

/**
*
* Number of seconds to long poll for jobs when the queue is empty.
*
* @default 5
*/
drainDelay?: number;

/**
*
* Duration of the lock for the job in milliseconds. The lock represents that
* a worker is processing the job. If the lock is lost, the job will be eventually
* be picked up by the stalled checker and move back to wait so that another worker
Expand Down
58 changes: 58 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,64 @@ describe('Rate Limiter', function () {
await worker.close();
});
});

describe('when removing rate limit', () => {
it('should process jobs normally', async function () {
this.timeout(5000);

const numJobs = 2;
const dynamicLimit = 10000;
const duration = 1000;

const ttl = await queue.getRateLimitTtl();
expect(ttl).to.be.equal(-2);

const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
prefix,
limiter: {
max: 1,
duration,
},
});

await worker.rateLimit(dynamicLimit);

await queue.removeRateLimitKey();
const result = new Promise<void>((resolve, reject) => {
queueEvents.on(
'completed',
// after every job has been completed
after(numJobs, async () => {
try {
const timeDiff = new Date().getTime() - startTime;
expect(timeDiff).to.be.gte((numJobs - 1) * duration);
expect(timeDiff).to.be.lte(numJobs * duration);
resolve();
} catch (err) {
reject(err);
}
}),
);

queueEvents.on('failed', async err => {
reject(err);
});
});

const startTime = new Date().getTime();
const jobs = Array.from(Array(numJobs).keys()).map(() => ({
name: 'rate test',
data: {},
}));
await queue.addBulk(jobs);

worker.run();
await result;
await worker.close();
});
});
});

describe('when there are more added jobs than max limiter', () => {
Expand Down

0 comments on commit ff70613

Please sign in to comment.