From 5d09180f0896193d192da70607a0ed140c95af31 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 18 Oct 2024 23:40:21 -0500 Subject: [PATCH 1/7] perf(queue): allow to add more than one base marker --- src/classes/queue.ts | 2 + src/classes/scripts.ts | 2 + src/commands/addStandardJob-8.lua | 4 +- .../includes/addBaseMarkerIfNeeded.lua | 4 +- src/commands/includes/addJobInTargetList.lua | 4 +- src/interfaces/queue-options.ts | 7 +++ tests/test_bulk.ts | 47 ++++++++++++++++++- 7 files changed, 63 insertions(+), 7 deletions(-) diff --git a/src/classes/queue.ts b/src/classes/queue.ts index bb1dae7ae5..908dc06d81 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -104,6 +104,7 @@ export class Queue< private _repeat?: Repeat; // To be deprecated in v6 in favor of JobScheduler private _jobScheduler?: JobScheduler; + private markerCount: number; constructor( name: string, @@ -120,6 +121,7 @@ export class Queue< this.jobsOpts = opts?.defaultJobOptions ?? {}; + this.markerCount = opts?.markerCount || 1; this.waitUntilReady() .then(client => { if (!this.closing && !opts?.skipMetasUpdate) { diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index c9156b291d..fab4299cfc 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -175,6 +175,7 @@ export class Scripts { parentOpts: ParentOpts = {}, ): Promise { const queueKeys = this.queue.keys; + const markerCount = (this.queue as any).markerCount as number; const parent: Record = job.parent ? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof, idof: opts.idof } @@ -191,6 +192,7 @@ export class Scripts { parent, job.repeatJobKey, job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null, + markerCount || 1, ]; let encodedOpts; diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index 7005e91af7..5f4e39ea05 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -35,6 +35,7 @@ [8] parent? {id, queueKey} [9] repeat job key [10] deduplication key + [11] marker count ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -58,6 +59,7 @@ local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] local deduplicationKey = args[10] +local markerCount = args[11] local parentData -- Includes @@ -108,7 +110,7 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId) +addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/includes/addBaseMarkerIfNeeded.lua b/src/commands/includes/addBaseMarkerIfNeeded.lua index af10026589..5b36c62358 100644 --- a/src/commands/includes/addBaseMarkerIfNeeded.lua +++ b/src/commands/includes/addBaseMarkerIfNeeded.lua @@ -2,8 +2,8 @@ Add marker if needed when a job is available. ]] -local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) +local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount) if not isPausedOrMaxed then - rcall("ZADD", markerKey, 0, "0") + rcall("ZADD", markerKey, (jobCounter or 1) % (markerCount or 1), "0") end end diff --git a/src/commands/includes/addJobInTargetList.lua b/src/commands/includes/addJobInTargetList.lua index 80f7bc0173..08e8e35946 100644 --- a/src/commands/includes/addJobInTargetList.lua +++ b/src/commands/includes/addJobInTargetList.lua @@ -5,7 +5,7 @@ -- Includes --- @include "addBaseMarkerIfNeeded" -local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) +local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount) rcall(pushCmd, targetKey, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount) end diff --git a/src/interfaces/queue-options.ts b/src/interfaces/queue-options.ts index a1459059e5..5a6e38aedd 100644 --- a/src/interfaces/queue-options.ts +++ b/src/interfaces/queue-options.ts @@ -46,6 +46,13 @@ export interface QueueBaseOptions { export interface QueueOptions extends QueueBaseOptions { defaultJobOptions?: DefaultJobOptions; + /** + * Max quantity of base markers to be added. It's recommend to be the same + * as the quantity of worker instances for this specific queue + * @default 1 + */ + markerCount?: number; + /** * Options for the streams used internally in BullMQ. */ diff --git a/tests/test_bulk.ts b/tests/test_bulk.ts index 5cff0f2122..1d6b8eb4ec 100644 --- a/tests/test_bulk.ts +++ b/tests/test_bulk.ts @@ -1,9 +1,10 @@ import { expect } from 'chai'; import { default as IORedis } from 'ioredis'; +import { after as afterNumExecutions } from 'lodash'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { Queue, Worker, Job } from '../src/classes'; -import { removeAllQueueData } from '../src/utils'; +import { Queue, QueueEvents, Worker, Job } from '../src/classes'; +import { removeAllQueueData, delay } from '../src/utils'; describe('bulk jobs', () => { const redisHost = process.env.REDIS_HOST || 'localhost'; @@ -119,6 +120,48 @@ describe('bulk jobs', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); }); + it('should keep workers busy', async () => { + const numJobs = 6; + const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix }); + + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + + const worker = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + const worker2 = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + await worker2.waitUntilReady(); + + const completed = new Promise(resolve => { + queueEvents.on('completed', afterNumExecutions(numJobs, resolve)); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(index => ({ + name: 'test', + data: { index }, + })); + + await queue2.addBulk(jobs); + + await completed; + await queue2.close(); + await worker.close(); + await worker2.close(); + await queueEvents.close(); + }); + it('should process jobs with custom ids', async () => { const name = 'test'; let processor; From 2009d77308cc5cd45eaf6bae66eb125aacc03335 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Fri, 25 Oct 2024 00:51:05 -0500 Subject: [PATCH 2/7] chore: pass marker score --- src/commands/addPrioritizedJob-8.lua | 5 ++++- src/commands/addStandardJob-8.lua | 3 ++- src/commands/changePriority-7.lua | 4 ++-- src/commands/includes/addBaseMarkerIfNeeded.lua | 4 ++-- src/commands/includes/addJobInTargetList.lua | 4 ++-- src/commands/includes/addJobWithPriority.lua | 4 ++-- src/commands/includes/moveParentToWaitIfNeeded.lua | 4 ++-- src/commands/includes/promoteDelayedJobs.lua | 4 ++-- src/commands/includes/removeParentDependencyKey.lua | 2 +- src/commands/moveJobFromActiveToWait-10.lua | 2 +- src/commands/moveJobsToWait-8.lua | 2 +- src/commands/moveStalledJobsToWait-9.lua | 2 +- src/commands/promote-9.lua | 4 ++-- src/commands/reprocessJob-8.lua | 2 +- src/commands/retryJob-11.lua | 4 ++-- 15 files changed, 27 insertions(+), 23 deletions(-) diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 4fa6dbc616..dfa57b5343 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -25,6 +25,7 @@ [8] parent? {id, queueKey} [9] repeat job key [10] deduplication key + [11] marker count ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -55,6 +56,7 @@ local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] local deduplicationKey = args[10] +local markerCount = args[11] local parentData -- Includes @@ -103,7 +105,8 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], -- Add the job to the prioritized set local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) -addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed) +local markerScore = (jobCounter or 1) % (markerCount or 1) +addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed, markerScore) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index 5f4e39ea05..e445005626 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -110,7 +110,8 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount) +local markerScore = (jobCounter or 1) % (markerCount or 1) +addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, markerScore) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/changePriority-7.lua b/src/commands/changePriority-7.lua index 3350f54fbd..89bff5137a 100644 --- a/src/commands/changePriority-7.lua +++ b/src/commands/changePriority-7.lua @@ -33,13 +33,13 @@ local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, priorityCounter, lifo, priority, jobId, isPausedOrMaxed) if priority == 0 then local pushCmd = lifo and 'RPUSH' or 'LPUSH' - addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) + addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, '0') else if lifo then pushBackJobWithPriority(prioritizedKey, priority, jobId) else addJobWithPriority(markerKey, prioritizedKey, priority, jobId, - priorityCounter, isPausedOrMaxed) + priorityCounter, isPausedOrMaxed, '0') end end end diff --git a/src/commands/includes/addBaseMarkerIfNeeded.lua b/src/commands/includes/addBaseMarkerIfNeeded.lua index 5b36c62358..53c528fd96 100644 --- a/src/commands/includes/addBaseMarkerIfNeeded.lua +++ b/src/commands/includes/addBaseMarkerIfNeeded.lua @@ -2,8 +2,8 @@ Add marker if needed when a job is available. ]] -local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount) +local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore) if not isPausedOrMaxed then - rcall("ZADD", markerKey, (jobCounter or 1) % (markerCount or 1), "0") + rcall("ZADD", markerKey, markerScore, "0") end end diff --git a/src/commands/includes/addJobInTargetList.lua b/src/commands/includes/addJobInTargetList.lua index 08e8e35946..7bd799239a 100644 --- a/src/commands/includes/addJobInTargetList.lua +++ b/src/commands/includes/addJobInTargetList.lua @@ -5,7 +5,7 @@ -- Includes --- @include "addBaseMarkerIfNeeded" -local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, jobCounter, markerCount) +local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, markerScore) rcall(pushCmd, targetKey, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, jobCounter, markerCount) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore) end diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index f5c334f62e..b7de10289a 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -6,9 +6,9 @@ --- @include "addBaseMarkerIfNeeded" local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, - isPausedOrMaxed) + isPausedOrMaxed, markerScore) local prioCounter = rcall("INCR", priorityCounterKey) local score = priority * 0x100000000 + prioCounter % 0x100000000 rcall("ZADD", prioritizedKey, score, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore) end diff --git a/src/commands/includes/moveParentToWaitIfNeeded.lua b/src/commands/includes/moveParentToWaitIfNeeded.lua index 33d9bcb031..7e7f7aeef3 100644 --- a/src/commands/includes/moveParentToWaitIfNeeded.lua +++ b/src/commands/includes/moveParentToWaitIfNeeded.lua @@ -40,12 +40,12 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey, parentPausedKey) addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, - parentId) + parentId, '0') else local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey) addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, - parentId, parentQueueKey .. ":pc", isPausedOrMaxed) + parentId, parentQueueKey .. ":pc", isPausedOrMaxed, '0') end rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", diff --git a/src/commands/includes/promoteDelayedJobs.lua b/src/commands/includes/promoteDelayedJobs.lua index 56cb209c61..6daccc2d7c 100644 --- a/src/commands/includes/promoteDelayedJobs.lua +++ b/src/commands/includes/promoteDelayedJobs.lua @@ -25,10 +25,10 @@ local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedK if priority == 0 then -- LIFO or FIFO - addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId) + addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId, '0') else addJobWithPriority(markerKey, prioritizedKey, priority, - jobId, priorityCounterKey, isPaused) + jobId, priorityCounterKey, isPaused, '0') end -- Emit waiting event diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index 87262850c9..7f9fb98ced 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -13,7 +13,7 @@ local function moveParentToWait(parentPrefix, parentId, emitEvent) local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", parentPrefix .. "wait", parentPrefix .. "paused") - addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) + addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId, '0') if emitEvent then local parentEventStream = parentPrefix .. "events" diff --git a/src/commands/moveJobFromActiveToWait-10.lua b/src/commands/moveJobFromActiveToWait-10.lua index e90d6d2d10..987c20206a 100644 --- a/src/commands/moveJobFromActiveToWait-10.lua +++ b/src/commands/moveJobFromActiveToWait-10.lua @@ -44,7 +44,7 @@ if lockToken == token then if priority > 0 then pushBackJobWithPriority(KEYS[8], priority, jobId) else - addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId) + addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId, '0') end rcall("DEL", lockKey) diff --git a/src/commands/moveJobsToWait-8.lua b/src/commands/moveJobsToWait-8.lua index 15e99c6295..cdb27824d4 100644 --- a/src/commands/moveJobsToWait-8.lua +++ b/src/commands/moveJobsToWait-8.lua @@ -63,7 +63,7 @@ if (#jobs > 0) then rcall("LPUSH", target, unpack(jobs, from, to)) end - addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed) + addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed, '0') end maxCount = maxCount - #jobs diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index 2e6161ebee..c667681a37 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -153,7 +153,7 @@ if (#stalling > 0) then getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. - addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId) + addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId, '0') rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, 'prev', 'active') diff --git a/src/commands/promote-9.lua b/src/commands/promote-9.lua index 2143e52aa0..682ec9bfc8 100644 --- a/src/commands/promote-9.lua +++ b/src/commands/promote-9.lua @@ -45,9 +45,9 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then if priority == 0 then -- LIFO or FIFO - addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId) + addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId, '0') else - addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed) + addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed, '0') end -- Emit waiting event (wait..ing@token) diff --git a/src/commands/reprocessJob-8.lua b/src/commands/reprocessJob-8.lua index 300ab6a1e8..af82b4f45c 100644 --- a/src/commands/reprocessJob-8.lua +++ b/src/commands/reprocessJob-8.lua @@ -34,7 +34,7 @@ if rcall("EXISTS", KEYS[1]) == 1 then rcall("HDEL", KEYS[1], "finishedOn", "processedOn", ARGV[3]) local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6]) - addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId) + addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId, '0') local maxEvents = getOrSetMaxEvents(KEYS[5]) -- Emit waiting event diff --git a/src/commands/retryJob-11.lua b/src/commands/retryJob-11.lua index 33d1f7a85a..f35bfd2b4a 100644 --- a/src/commands/retryJob-11.lua +++ b/src/commands/retryJob-11.lua @@ -63,9 +63,9 @@ if rcall("EXISTS", KEYS[4]) == 1 then -- Standard or priority add if priority == 0 then - addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4]) + addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4], '0') else - addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed) + addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed, '0') end rcall("HINCRBY", KEYS[4], "atm", 1) From 5cae7cff161c8e06a582e0ff7a135609081e21ac Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 30 Oct 2024 21:31:27 -0500 Subject: [PATCH 3/7] chore: remove or logic --- src/commands/addPrioritizedJob-8.lua | 2 +- src/commands/addStandardJob-8.lua | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index dfa57b5343..8d3105bd72 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -105,7 +105,7 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], -- Add the job to the prioritized set local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) -local markerScore = (jobCounter or 1) % (markerCount or 1) +local markerScore = jobCounter % (markerCount or 1) addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed, markerScore) -- Emit waiting event diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index e445005626..eed0df802a 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -110,7 +110,7 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -local markerScore = (jobCounter or 1) % (markerCount or 1) +local markerScore = jobCounter % (markerCount or 1) addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, markerScore) -- Emit waiting event From f491b82b442342260d5f700d2a9aa27e441c3e49 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 30 Oct 2024 22:05:26 -0500 Subject: [PATCH 4/7] refactor: change delay marker to -1 --- src/commands/addPrioritizedJob-8.lua | 4 ++-- src/commands/addStandardJob-8.lua | 4 ++-- src/commands/includes/addBaseMarkerIfNeeded.lua | 4 ++-- src/commands/includes/addDelayMarkerIfNeeded.lua | 2 +- src/commands/includes/addJobInTargetList.lua | 4 ++-- src/commands/includes/addJobWithPriority.lua | 4 ++-- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 8d3105bd72..3610d0f94f 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -105,8 +105,8 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], -- Add the job to the prioritized set local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) -local markerScore = jobCounter % (markerCount or 1) -addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed, markerScore) +local markerMember = jobCounter % (markerCount or 1) +addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed, markerMember) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index eed0df802a..033af98abb 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -110,8 +110,8 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -local markerScore = jobCounter % (markerCount or 1) -addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, markerScore) +local markerMember = jobCounter % (markerCount or 1) +addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, markerMember) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/includes/addBaseMarkerIfNeeded.lua b/src/commands/includes/addBaseMarkerIfNeeded.lua index 53c528fd96..036ac45d08 100644 --- a/src/commands/includes/addBaseMarkerIfNeeded.lua +++ b/src/commands/includes/addBaseMarkerIfNeeded.lua @@ -2,8 +2,8 @@ Add marker if needed when a job is available. ]] -local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore) +local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerMember) if not isPausedOrMaxed then - rcall("ZADD", markerKey, markerScore, "0") + rcall("ZADD", markerKey, 0, markerMember) end end diff --git a/src/commands/includes/addDelayMarkerIfNeeded.lua b/src/commands/includes/addDelayMarkerIfNeeded.lua index 2f985004c9..7912232d64 100644 --- a/src/commands/includes/addDelayMarkerIfNeeded.lua +++ b/src/commands/includes/addDelayMarkerIfNeeded.lua @@ -10,6 +10,6 @@ local function addDelayMarkerIfNeeded(markerKey, delayedKey) if nextTimestamp ~= nil then -- Replace the score of the marker with the newest known -- next timestamp. - rcall("ZADD", markerKey, nextTimestamp, "1") + rcall("ZADD", markerKey, nextTimestamp, "-1") end end diff --git a/src/commands/includes/addJobInTargetList.lua b/src/commands/includes/addJobInTargetList.lua index 7bd799239a..c915bc1b45 100644 --- a/src/commands/includes/addJobInTargetList.lua +++ b/src/commands/includes/addJobInTargetList.lua @@ -5,7 +5,7 @@ -- Includes --- @include "addBaseMarkerIfNeeded" -local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, markerScore) +local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, markerMember) rcall(pushCmd, targetKey, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerMember) end diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index b7de10289a..fb723293a9 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -6,9 +6,9 @@ --- @include "addBaseMarkerIfNeeded" local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, - isPausedOrMaxed, markerScore) + isPausedOrMaxed, markerMember) local prioCounter = rcall("INCR", priorityCounterKey) local score = priority * 0x100000000 + prioCounter % 0x100000000 rcall("ZADD", prioritizedKey, score, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerScore) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerMember) end From 1dfa2da6c30aae634d473bfbb297d95013c1bf3b Mon Sep 17 00:00:00 2001 From: roggervalf Date: Mon, 11 Nov 2024 19:57:21 -0500 Subject: [PATCH 5/7] refactor: add markerCount option in addBulk --- src/classes/job.ts | 14 +++++++++++++- src/classes/queue.ts | 6 ++++-- src/classes/scripts.ts | 2 -- src/commands/addPrioritizedJob-8.lua | 5 +---- src/commands/addStandardJob-8.lua | 5 +---- src/commands/changePriority-7.lua | 4 ++-- src/commands/includes/addBaseMarkerIfNeeded.lua | 4 ++-- src/commands/includes/addJobInTargetList.lua | 4 ++-- src/commands/includes/addJobWithPriority.lua | 4 ++-- .../includes/moveParentToWaitIfNeeded.lua | 4 ++-- src/commands/includes/promoteDelayedJobs.lua | 4 ++-- .../includes/removeParentDependencyKey.lua | 2 +- src/commands/moveJobFromActiveToWait-10.lua | 2 +- src/commands/moveJobsToWait-8.lua | 2 +- src/commands/moveStalledJobsToWait-9.lua | 2 +- src/commands/promote-9.lua | 4 ++-- src/commands/reprocessJob-8.lua | 2 +- src/commands/retryJob-11.lua | 4 ++-- src/interfaces/queue-options.ts | 16 +++++++++------- tests/test_bulk.ts | 4 +++- 20 files changed, 52 insertions(+), 42 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index b63e5cdf49..5b5d215ed2 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -4,6 +4,7 @@ import { BackoffOptions, BulkJobOptions, DependenciesOpts, + JobBulkOptions, JobJson, JobJsonRaw, MinimalJob, @@ -278,6 +279,7 @@ export class Job< data: T; opts?: BulkJobOptions; }[], + opts: JobBulkOptions, ): Promise[]> { const client = await queue.client; @@ -297,8 +299,18 @@ export class Job< }); } + if (opts.markerCount > 1) { + const markers: (number | string)[] = []; + Array.from(Array(opts.markerCount - 1).keys()).forEach(index => { + markers.push(0, index + 1); + }); + pipeline.zadd(queue.toKey('marker'), ...markers); + } + const results = (await pipeline.exec()) as [null | Error, string][]; - for (let index = 0; index < results.length; ++index) { + const jobResultLength = + opts.markerCount > 1 ? results.length - 1 : results.length; + for (let index = 0; index < jobResultLength; ++index) { const [err, id] = results[index]; if (err) { throw err; diff --git a/src/classes/queue.ts b/src/classes/queue.ts index 908dc06d81..07d877df5b 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -3,6 +3,7 @@ import { BaseJobOptions, BulkJobOptions, IoredisListener, + JobBulkOptions, QueueOptions, RepeatableJob, RepeatOptions, @@ -104,7 +105,6 @@ export class Queue< private _repeat?: Repeat; // To be deprecated in v6 in favor of JobScheduler private _jobScheduler?: JobScheduler; - private markerCount: number; constructor( name: string, @@ -121,7 +121,6 @@ export class Queue< this.jobsOpts = opts?.defaultJobOptions ?? {}; - this.markerCount = opts?.markerCount || 1; this.waitUntilReady() .then(client => { if (!this.closing && !opts?.skipMetasUpdate) { @@ -310,9 +309,11 @@ export class Queue< * * @param jobs - The array of jobs to add to the queue. Each job is defined by 3 * properties, 'name', 'data' and 'opts'. They follow the same signature as 'Queue.add'. + * @param opts - */ async addBulk( jobs: { name: NameType; data: DataType; opts?: BulkJobOptions }[], + opts: JobBulkOptions = { markerCount: 1 }, ): Promise[]> { return this.trace[]>( SpanKind.PRODUCER, @@ -338,6 +339,7 @@ export class Queue< tm: span && srcPropagationMedatada, }, })), + opts, ); }, ); diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index fab4299cfc..c9156b291d 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -175,7 +175,6 @@ export class Scripts { parentOpts: ParentOpts = {}, ): Promise { const queueKeys = this.queue.keys; - const markerCount = (this.queue as any).markerCount as number; const parent: Record = job.parent ? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof, idof: opts.idof } @@ -192,7 +191,6 @@ export class Scripts { parent, job.repeatJobKey, job.deduplicationId ? `${queueKeys.de}:${job.deduplicationId}` : null, - markerCount || 1, ]; let encodedOpts; diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-8.lua index 3610d0f94f..4fa6dbc616 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-8.lua @@ -25,7 +25,6 @@ [8] parent? {id, queueKey} [9] repeat job key [10] deduplication key - [11] marker count ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -56,7 +55,6 @@ local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] local deduplicationKey = args[10] -local markerCount = args[11] local parentData -- Includes @@ -105,8 +103,7 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], -- Add the job to the prioritized set local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) -local markerMember = jobCounter % (markerCount or 1) -addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed, markerMember) +addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-8.lua index 033af98abb..7005e91af7 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-8.lua @@ -35,7 +35,6 @@ [8] parent? {id, queueKey} [9] repeat job key [10] deduplication key - [11] marker count ARGV[2] Json stringified job data ARGV[3] msgpacked options @@ -59,7 +58,6 @@ local parentKey = args[5] local parent = args[8] local repeatJobKey = args[9] local deduplicationKey = args[10] -local markerCount = args[11] local parentData -- Includes @@ -110,8 +108,7 @@ local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KE -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -local markerMember = jobCounter % (markerCount or 1) -addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId, markerMember) +addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/changePriority-7.lua b/src/commands/changePriority-7.lua index 89bff5137a..3350f54fbd 100644 --- a/src/commands/changePriority-7.lua +++ b/src/commands/changePriority-7.lua @@ -33,13 +33,13 @@ local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, priorityCounter, lifo, priority, jobId, isPausedOrMaxed) if priority == 0 then local pushCmd = lifo and 'RPUSH' or 'LPUSH' - addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, '0') + addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) else if lifo then pushBackJobWithPriority(prioritizedKey, priority, jobId) else addJobWithPriority(markerKey, prioritizedKey, priority, jobId, - priorityCounter, isPausedOrMaxed, '0') + priorityCounter, isPausedOrMaxed) end end end diff --git a/src/commands/includes/addBaseMarkerIfNeeded.lua b/src/commands/includes/addBaseMarkerIfNeeded.lua index 036ac45d08..af10026589 100644 --- a/src/commands/includes/addBaseMarkerIfNeeded.lua +++ b/src/commands/includes/addBaseMarkerIfNeeded.lua @@ -2,8 +2,8 @@ Add marker if needed when a job is available. ]] -local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerMember) +local function addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) if not isPausedOrMaxed then - rcall("ZADD", markerKey, 0, markerMember) + rcall("ZADD", markerKey, 0, "0") end end diff --git a/src/commands/includes/addJobInTargetList.lua b/src/commands/includes/addJobInTargetList.lua index c915bc1b45..80f7bc0173 100644 --- a/src/commands/includes/addJobInTargetList.lua +++ b/src/commands/includes/addJobInTargetList.lua @@ -5,7 +5,7 @@ -- Includes --- @include "addBaseMarkerIfNeeded" -local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId, markerMember) +local function addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) rcall(pushCmd, targetKey, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerMember) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end diff --git a/src/commands/includes/addJobWithPriority.lua b/src/commands/includes/addJobWithPriority.lua index fb723293a9..f5c334f62e 100644 --- a/src/commands/includes/addJobWithPriority.lua +++ b/src/commands/includes/addJobWithPriority.lua @@ -6,9 +6,9 @@ --- @include "addBaseMarkerIfNeeded" local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, - isPausedOrMaxed, markerMember) + isPausedOrMaxed) local prioCounter = rcall("INCR", priorityCounterKey) local score = priority * 0x100000000 + prioCounter % 0x100000000 rcall("ZADD", prioritizedKey, score, jobId) - addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed, markerMember) + addBaseMarkerIfNeeded(markerKey, isPausedOrMaxed) end diff --git a/src/commands/includes/moveParentToWaitIfNeeded.lua b/src/commands/includes/moveParentToWaitIfNeeded.lua index 7e7f7aeef3..33d9bcb031 100644 --- a/src/commands/includes/moveParentToWaitIfNeeded.lua +++ b/src/commands/includes/moveParentToWaitIfNeeded.lua @@ -40,12 +40,12 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey, parentPausedKey) addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, - parentId, '0') + parentId) else local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey) addJobWithPriority(parentMarkerKey, parentQueueKey .. ":prioritized", priority, - parentId, parentQueueKey .. ":pc", isPausedOrMaxed, '0') + parentId, parentQueueKey .. ":pc", isPausedOrMaxed) end rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", diff --git a/src/commands/includes/promoteDelayedJobs.lua b/src/commands/includes/promoteDelayedJobs.lua index 6daccc2d7c..56cb209c61 100644 --- a/src/commands/includes/promoteDelayedJobs.lua +++ b/src/commands/includes/promoteDelayedJobs.lua @@ -25,10 +25,10 @@ local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedK if priority == 0 then -- LIFO or FIFO - addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId, '0') + addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId) else addJobWithPriority(markerKey, prioritizedKey, priority, - jobId, priorityCounterKey, isPaused, '0') + jobId, priorityCounterKey, isPaused) end -- Emit waiting event diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index 7f9fb98ced..87262850c9 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -13,7 +13,7 @@ local function moveParentToWait(parentPrefix, parentId, emitEvent) local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", parentPrefix .. "wait", parentPrefix .. "paused") - addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId, '0') + addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) if emitEvent then local parentEventStream = parentPrefix .. "events" diff --git a/src/commands/moveJobFromActiveToWait-10.lua b/src/commands/moveJobFromActiveToWait-10.lua index 987c20206a..e90d6d2d10 100644 --- a/src/commands/moveJobFromActiveToWait-10.lua +++ b/src/commands/moveJobFromActiveToWait-10.lua @@ -44,7 +44,7 @@ if lockToken == token then if priority > 0 then pushBackJobWithPriority(KEYS[8], priority, jobId) else - addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId, '0') + addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId) end rcall("DEL", lockKey) diff --git a/src/commands/moveJobsToWait-8.lua b/src/commands/moveJobsToWait-8.lua index cdb27824d4..15e99c6295 100644 --- a/src/commands/moveJobsToWait-8.lua +++ b/src/commands/moveJobsToWait-8.lua @@ -63,7 +63,7 @@ if (#jobs > 0) then rcall("LPUSH", target, unpack(jobs, from, to)) end - addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed, '0') + addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed) end maxCount = maxCount - #jobs diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-9.lua index c667681a37..2e6161ebee 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-9.lua @@ -153,7 +153,7 @@ if (#stalling > 0) then getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. - addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId, '0') + addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId) rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, 'prev', 'active') diff --git a/src/commands/promote-9.lua b/src/commands/promote-9.lua index 682ec9bfc8..2143e52aa0 100644 --- a/src/commands/promote-9.lua +++ b/src/commands/promote-9.lua @@ -45,9 +45,9 @@ if rcall("ZREM", KEYS[1], jobId) == 1 then if priority == 0 then -- LIFO or FIFO - addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId, '0') + addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId) else - addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed, '0') + addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed) end -- Emit waiting event (wait..ing@token) diff --git a/src/commands/reprocessJob-8.lua b/src/commands/reprocessJob-8.lua index af82b4f45c..300ab6a1e8 100644 --- a/src/commands/reprocessJob-8.lua +++ b/src/commands/reprocessJob-8.lua @@ -34,7 +34,7 @@ if rcall("EXISTS", KEYS[1]) == 1 then rcall("HDEL", KEYS[1], "finishedOn", "processedOn", ARGV[3]) local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6]) - addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId, '0') + addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId) local maxEvents = getOrSetMaxEvents(KEYS[5]) -- Emit waiting event diff --git a/src/commands/retryJob-11.lua b/src/commands/retryJob-11.lua index f35bfd2b4a..33d1f7a85a 100644 --- a/src/commands/retryJob-11.lua +++ b/src/commands/retryJob-11.lua @@ -63,9 +63,9 @@ if rcall("EXISTS", KEYS[4]) == 1 then -- Standard or priority add if priority == 0 then - addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4], '0') + addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4]) else - addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed, '0') + addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed) end rcall("HINCRBY", KEYS[4], "atm", 1) diff --git a/src/interfaces/queue-options.ts b/src/interfaces/queue-options.ts index 5a6e38aedd..684a7eaa8c 100644 --- a/src/interfaces/queue-options.ts +++ b/src/interfaces/queue-options.ts @@ -46,13 +46,6 @@ export interface QueueBaseOptions { export interface QueueOptions extends QueueBaseOptions { defaultJobOptions?: DefaultJobOptions; - /** - * Max quantity of base markers to be added. It's recommend to be the same - * as the quantity of worker instances for this specific queue - * @default 1 - */ - markerCount?: number; - /** * Options for the streams used internally in BullMQ. */ @@ -89,6 +82,15 @@ export interface QueueOptions extends QueueBaseOptions { telemetry?: Telemetry; } +export interface JobBulkOptions { + /** + * Max quantity of base markers to be added. It's recommend to be the same + * as the quantity of worker instances for this specific queue + * @default 1 + */ + markerCount: number; +} + /** * Options for the Repeat class. */ diff --git a/tests/test_bulk.ts b/tests/test_bulk.ts index 1d6b8eb4ec..43e209aed7 100644 --- a/tests/test_bulk.ts +++ b/tests/test_bulk.ts @@ -153,7 +153,9 @@ describe('bulk jobs', () => { data: { index }, })); - await queue2.addBulk(jobs); + await queue2.addBulk(jobs, { + markerCount: 2, + }); await completed; await queue2.close(); From 63355ca460c86c7e0cf869964a3190184ac66085 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Tue, 12 Nov 2024 23:18:53 -0500 Subject: [PATCH 6/7] refactor: change markerCount as optional parameter --- src/classes/job.ts | 15 +++++++++++---- src/interfaces/queue-options.ts | 6 ++---- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/classes/job.ts b/src/classes/job.ts index 5b5d215ed2..3d29c40ebb 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -299,11 +299,18 @@ export class Job< }); } - if (opts.markerCount > 1) { + // minus 1 base marker that is added when calling addJob scripts + const markerCount = + (opts.markerCount + ? Math.min(jobInstances.length, opts.markerCount) + : jobInstances.length) - 1; + if (markerCount > 0) { const markers: (number | string)[] = []; - Array.from(Array(opts.markerCount - 1).keys()).forEach(index => { - markers.push(0, index + 1); - }); + Array(markerCount) + .fill(0) + .forEach(index => { + markers.push(0, index + 1); + }); pipeline.zadd(queue.toKey('marker'), ...markers); } diff --git a/src/interfaces/queue-options.ts b/src/interfaces/queue-options.ts index 684a7eaa8c..ee864666e0 100644 --- a/src/interfaces/queue-options.ts +++ b/src/interfaces/queue-options.ts @@ -84,11 +84,9 @@ export interface QueueOptions extends QueueBaseOptions { export interface JobBulkOptions { /** - * Max quantity of base markers to be added. It's recommend to be the same - * as the quantity of worker instances for this specific queue - * @default 1 + * Max quantity of base markers to be added. */ - markerCount: number; + markerCount?: number; } /** From 5ec8b8327d7809682969c4521e95dc41cf608d17 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 13 Nov 2024 22:11:33 -0500 Subject: [PATCH 7/7] docs(add-bulk): add warning statement --- docs/gitbook/guide/queues/adding-bulks.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/gitbook/guide/queues/adding-bulks.md b/docs/gitbook/guide/queues/adding-bulks.md index a3f218ce62..5bff53759f 100644 --- a/docs/gitbook/guide/queues/adding-bulks.md +++ b/docs/gitbook/guide/queues/adding-bulks.md @@ -39,6 +39,10 @@ jobs = await queue.addBulk([ This call can only succeed or fail, and all or none of the jobs will be added. +{% hint style="warning" %} +A new marker will be added per each job in the array, unless you provide **markerCount** option as the maximum quantity. +{% endhint %} + ## Read more: - 💡 [Add Bulk API Reference](https://api.docs.bullmq.io/classes/v5.Queue.html#addBulk)