diff --git a/src/commands/includes/prepareJobForProcessing.lua b/src/commands/includes/prepareJobForProcessing.lua index b1e96ffb49..7bceac146e 100644 --- a/src/commands/includes/prepareJobForProcessing.lua +++ b/src/commands/includes/prepareJobForProcessing.lua @@ -7,8 +7,11 @@ opts - limiter ]] +-- Includes +--- @include "addBaseMarkerIfNeeded" + local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey, - jobId, processedOn, maxJobs, opts) + jobId, processedOn, maxJobs, markerKey, opts) local jobKey = keyPrefix .. jobId -- Check if we need to perform rate limiting. @@ -41,5 +44,7 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues)) rcall("HINCRBY", jobKey, "ats", 1) + addBaseMarkerIfNeeded(markerKey, false) + return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data end diff --git a/src/commands/moveToActive-11.lua b/src/commands/moveToActive-11.lua index 946091e84a..064f3fecef 100644 --- a/src/commands/moveToActive-11.lua +++ b/src/commands/moveToActive-11.lua @@ -77,12 +77,12 @@ end if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], - maxJobs, opts) + maxJobs, markerKey, opts) else jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10]) if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], - maxJobs, opts) + maxJobs, markerKey, opts) end end diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-14.lua index 06631b52d9..616ee5dada 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-14.lua @@ -207,8 +207,9 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8]) + local markerKey = KEYS[14] -- Check if there are delayed jobs that can be promoted - promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, prefix, + promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[3], eventStreamKey, prefix, timestamp, KEYS[10], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) @@ -233,19 +234,19 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, - timestamp, maxJobs, + timestamp, maxJobs, markerKey, opts) end else return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, - timestamp, maxJobs, + timestamp, maxJobs, markerKey, opts) end else jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) if jobId then return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, - timestamp, maxJobs, + timestamp, maxJobs, markerKey, opts) end end diff --git a/tests/test_bulk.ts b/tests/test_bulk.ts index 5cff0f2122..1d6b8eb4ec 100644 --- a/tests/test_bulk.ts +++ b/tests/test_bulk.ts @@ -1,9 +1,10 @@ import { expect } from 'chai'; import { default as IORedis } from 'ioredis'; +import { after as afterNumExecutions } from 'lodash'; import { after, beforeEach, describe, it, before } from 'mocha'; import { v4 } from 'uuid'; -import { Queue, Worker, Job } from '../src/classes'; -import { removeAllQueueData } from '../src/utils'; +import { Queue, QueueEvents, Worker, Job } from '../src/classes'; +import { removeAllQueueData, delay } from '../src/utils'; describe('bulk jobs', () => { const redisHost = process.env.REDIS_HOST || 'localhost'; @@ -119,6 +120,48 @@ describe('bulk jobs', () => { await removeAllQueueData(new IORedis(redisHost), parentQueueName); }); + it('should keep workers busy', async () => { + const numJobs = 6; + const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix }); + + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + + const worker = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + const worker2 = new Worker( + queueName, + async () => { + await delay(1000); + }, + { connection, prefix }, + ); + await worker.waitUntilReady(); + await worker2.waitUntilReady(); + + const completed = new Promise(resolve => { + queueEvents.on('completed', afterNumExecutions(numJobs, resolve)); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(index => ({ + name: 'test', + data: { index }, + })); + + await queue2.addBulk(jobs); + + await completed; + await queue2.close(); + await worker.close(); + await worker2.close(); + await queueEvents.close(); + }); + it('should process jobs with custom ids', async () => { const name = 'test'; let processor;