Skip to content

Commit

Permalink
perf(marker): add base markers while consuming jobs to get workers bu…
Browse files Browse the repository at this point in the history
…sy (#2904) fixes #2842
  • Loading branch information
roggervalf authored Nov 19, 2024
1 parent db84ad5 commit 1759c8b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 9 deletions.
7 changes: 6 additions & 1 deletion src/commands/includes/prepareJobForProcessing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
opts - limiter
]]

-- Includes
--- @include "addBaseMarkerIfNeeded"

local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey,
jobId, processedOn, maxJobs, opts)
jobId, processedOn, maxJobs, markerKey, opts)
local jobKey = keyPrefix .. jobId

-- Check if we need to perform rate limiting.
Expand Down Expand Up @@ -41,5 +44,7 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey
rcall("HMSET", jobKey, "processedOn", processedOn, unpack(optionalValues))
rcall("HINCRBY", jobKey, "ats", 1)

addBaseMarkerIfNeeded(markerKey, false)

return {rcall("HGETALL", jobKey), jobId, 0, 0} -- get job data
end
4 changes: 2 additions & 2 deletions src/commands/moveToActive-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ end

if jobId then
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
maxJobs, markerKey, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10])
if jobId then
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
maxJobs, markerKey, opts)
end
end

Expand Down
9 changes: 5 additions & 4 deletions src/commands/moveToFinished-14.lua
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists

local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8])

local markerKey = KEYS[14]
-- Check if there are delayed jobs that can be promoted
promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, prefix,
promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[3], eventStreamKey, prefix,
timestamp, KEYS[10], isPausedOrMaxed)

local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
Expand All @@ -233,19 +234,19 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2],
KEYS[10])
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
timestamp, maxJobs, markerKey,
opts)
end
else
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
timestamp, maxJobs, markerKey,
opts)
end
else
jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10])
if jobId then
return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId,
timestamp, maxJobs,
timestamp, maxJobs, markerKey,
opts)
end
end
Expand Down
47 changes: 45 additions & 2 deletions tests/test_bulk.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { expect } from 'chai';
import { default as IORedis } from 'ioredis';
import { after as afterNumExecutions } from 'lodash';
import { after, beforeEach, describe, it, before } from 'mocha';
import { v4 } from 'uuid';
import { Queue, Worker, Job } from '../src/classes';
import { removeAllQueueData } from '../src/utils';
import { Queue, QueueEvents, Worker, Job } from '../src/classes';
import { removeAllQueueData, delay } from '../src/utils';

describe('bulk jobs', () => {
const redisHost = process.env.REDIS_HOST || 'localhost';
Expand Down Expand Up @@ -119,6 +120,48 @@ describe('bulk jobs', () => {
await removeAllQueueData(new IORedis(redisHost), parentQueueName);
});

it('should keep workers busy', async () => {
const numJobs = 6;
const queue2 = new Queue(queueName, { connection, markerCount: 2, prefix });

const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();

const worker = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
const worker2 = new Worker(
queueName,
async () => {
await delay(1000);
},
{ connection, prefix },
);
await worker.waitUntilReady();
await worker2.waitUntilReady();

const completed = new Promise(resolve => {
queueEvents.on('completed', afterNumExecutions(numJobs, resolve));
});

const jobs = Array.from(Array(numJobs).keys()).map(index => ({
name: 'test',
data: { index },
}));

await queue2.addBulk(jobs);

await completed;
await queue2.close();
await worker.close();
await worker2.close();
await queueEvents.close();
});

it('should process jobs with custom ids', async () => {
const name = 'test';
let processor;
Expand Down

0 comments on commit 1759c8b

Please sign in to comment.