Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(queue): allow to add more than one base marker #2841

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/gitbook/guide/queues/adding-bulks.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ jobs = await queue.addBulk([

This call can only succeed or fail, and all or none of the jobs will be added.

{% hint style="warning" %}
A new marker will be added per each job in the array, unless you provide **markerCount** option as the maximum quantity.
{% endhint %}

## Read more:

- 💡 [Add Bulk API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#addBulk)
21 changes: 20 additions & 1 deletion src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
BackoffOptions,
BulkJobOptions,
DependenciesOpts,
JobBulkOptions,
JobJson,
JobJsonRaw,
MinimalJob,
Expand Down Expand Up @@ -278,6 +279,7 @@ export class Job<
data: T;
opts?: BulkJobOptions;
}[],
opts: JobBulkOptions,
): Promise<Job<T, R, N>[]> {
const client = await queue.client;

Expand All @@ -297,8 +299,25 @@ export class Job<
});
}

// minus 1 base marker that is added when calling addJob scripts
const markerCount =
(opts.markerCount
? Math.min(jobInstances.length, opts.markerCount)
: jobInstances.length) - 1;
if (markerCount > 0) {
const markers: (number | string)[] = [];
Array(markerCount)
.fill(0)
.forEach(index => {
markers.push(0, index + 1);
});
pipeline.zadd(queue.toKey('marker'), ...markers);
}

const results = (await pipeline.exec()) as [null | Error, string][];
for (let index = 0; index < results.length; ++index) {
const jobResultLength =
opts.markerCount > 1 ? results.length - 1 : results.length;
for (let index = 0; index < jobResultLength; ++index) {
const [err, id] = results[index];
if (err) {
throw err;
Expand Down
4 changes: 4 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BaseJobOptions,
BulkJobOptions,
IoredisListener,
JobBulkOptions,
QueueOptions,
RepeatableJob,
RepeatOptions,
Expand Down Expand Up @@ -308,9 +309,11 @@ export class Queue<
*
* @param jobs - The array of jobs to add to the queue. Each job is defined by 3
* properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'.
* @param opts -
*/
async addBulk(
jobs: { name: NameType; data: DataType; opts?: BulkJobOptions }[],
opts: JobBulkOptions = { markerCount: 1 },
): Promise<Job<DataType, ResultType, NameType>[]> {
return this.trace<Job<DataType, ResultType, NameType>[]>(
SpanKind.PRODUCER,
Expand All @@ -336,6 +339,7 @@ export class Queue<
tm: span && srcPropagationMedatada,
},
})),
opts,
);
},
);
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/addDelayMarkerIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ local function addDelayMarkerIfNeeded(markerKey, delayedKey)
if nextTimestamp ~= nil then
-- Replace the score of the marker with the newest known
-- next timestamp.
rcall("ZADD", markerKey, nextTimestamp, "1")
rcall("ZADD", markerKey, nextTimestamp, "-1")
end
end
7 changes: 7 additions & 0 deletions src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ export interface QueueOptions extends QueueBaseOptions {
telemetry?: Telemetry;
}

export interface JobBulkOptions {
/**
* Max quantity of base markers to be added.
*/
markerCount?: number;
}

/**
* Options for the Repeat class.
*/
Expand Down
49 changes: 47 additions & 2 deletions tests/test_bulk.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { expect } from 'chai';
import { default as IORedis } from 'ioredis';
import { after as afterNumExecutions } from 'lodash';
import { after, beforeEach, describe, it, before } from 'mocha';
import { v4 } from 'uuid';
import { Queue, Worker, Job } from '../src/classes';
import { removeAllQueueData } from '../src/utils';
import { Queue, QueueEvents, Worker, Job } from '../src/classes';
import { removeAllQueueData, delay } from '../src/utils';

describe('bulk jobs', () => {
const redisHost = process.env.REDIS_HOST || 'localhost';
Expand Down Expand Up @@ -119,6 +120,50 @@ describe('bulk jobs', () => {
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});

it('should keep workers busy', async () => {
const numJobs = 6;
const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix });

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();

const worker = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
const worker2 = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
await worker.waitUntilReady();
await worker2.waitUntilReady();

const completed = new Promise(resolve => {
queueEvents.on('completed', afterNumExecutions(numJobs, resolve));
});

const jobs = Array.from(Array(numJobs).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue2.addBulk(jobs, {
markerCount: 2,
});

await completed;
await queue2.close();
await worker.close();
await worker2.close();
await queueEvents.close();
});

it('should process jobs with custom ids', async () => {
const name = 'test';
let processor;
Expand Down
Loading