diff --git a/docs/gitbook/guide/jobs/repeatable.md b/docs/gitbook/guide/jobs/repeatable.md index a3ce39bcb4..8e6ccd802d 100644 --- a/docs/gitbook/guide/jobs/repeatable.md +++ b/docs/gitbook/guide/jobs/repeatable.md @@ -189,6 +189,47 @@ As you may notice, the repeat strategy setting should be provided in `Queue` and The repeat strategy function receives an optional `jobName` third parameter. {% endhint %} +### Custom Repeatable Key + +By default, we are generating repeatable keys base on repeat options and job name. + +In some cases, it is desired to pass a custom key to be able to differentiate your repeatable jobs even when they have same repeat options: + +```typescript +import { Queue } from 'bullmq'; + +const myQueue = new Queue('Paint', { connection }); + +// Repeat job every 10 seconds +await myQueue.add( + 'bird', + { color: 'bird' }, + { + repeat: { + every: 1000, + }, + key: 'colibri', + }, +); + +// Repeat job every 10 seconds +await myQueue.add( + 'bird', + { color: 'bird' }, + { + repeat: { + every: 1000, + }, + key: 'eagle', + }, +); + +``` + +{% hint style="warning" %} +While adding a new repeatable job with same key but different repeat options, you will override your previous record. +{% endhint %} + ### Read more: * 💡 [Repeat Strategy API Reference](https://api.docs.bullmq.io/types/v5.RepeatStrategy.html) diff --git a/src/classes/repeat.ts b/src/classes/repeat.ts index 1138789455..868844ebaf 100644 --- a/src/classes/repeat.ts +++ b/src/classes/repeat.ts @@ -1,6 +1,11 @@ import { parseExpression } from 'cron-parser'; import { createHash } from 'crypto'; -import { RepeatBaseOptions, RepeatableJob, RepeatOptions } from '../interfaces'; +import { + RedisClient, + RepeatBaseOptions, + RepeatableJob, + RepeatOptions, +} from '../interfaces'; import { JobsOptions, RepeatStrategy } from '../types'; import { Job } from './job'; import { QueueBase } from './queue-base'; @@ -31,7 +36,7 @@ export class Repeat extends QueueBase { skipCheckExists?: boolean, ): Promise | undefined> { // HACK: This is a temporary fix to enable easy migration from bullmq <3.0.0 - // to >= 3.0.0. It should be removed when moving to 4.x. + // to >= 3.0.0. TODO: It should be removed when moving to 4.x. const repeatOpts: RepeatOptions & { cron?: string } = { ...opts.repeat }; repeatOpts.pattern ??= repeatOpts.cron; delete repeatOpts.cron; @@ -70,23 +75,27 @@ export class Repeat extends QueueBase { repeatOpts.jobId = opts.jobId; } - const repeatJobKey = getRepeatKey(name, repeatOpts); + const qualifiedName = getRepeatCocatOptions(name, repeatOpts); - let repeatableExists = true; + const repeatJobKey = await this.scripts.addRepeatableJob( + opts.repeat.key ?? this.hash(qualifiedName), + nextMillis, + { + name, + endDate: repeatOpts.endDate + ? new Date(repeatOpts.endDate).getTime() + : undefined, + tz: repeatOpts.tz, + pattern: repeatOpts.pattern, + every: repeatOpts.every, + }, + qualifiedName, + skipCheckExists, + ); - if (!skipCheckExists) { - // Check that the repeatable job hasn't been removed - // TODO: a lua script would be better here - const client = await this.client; - repeatableExists = !!(await client.zscore( - this.keys.repeat, - repeatJobKey, - )); - } const { immediately, ...filteredRepeatOpts } = repeatOpts; - // The job could have been deleted since this check - if (repeatableExists) { + if (repeatJobKey) { return this.createNextJob( name, nextMillis, @@ -109,17 +118,12 @@ export class Repeat extends QueueBase { currentCount: number, hasImmediately: boolean, ) { - const client = await this.client; - // // Generate unique job id for this iteration. // - const jobId = this.getRepeatJobId({ - name, + const jobId = this.getRepeatDelayedJobId({ + customKey: repeatJobKey, nextMillis, - namespace: this.hash(repeatJobKey), - jobId: opts.repeat.jobId, - key: opts.repeat.key, }); const now = Date.now(); const delay = @@ -136,8 +140,6 @@ export class Repeat extends QueueBase { mergedOpts.repeat = { ...opts.repeat, count: currentCount }; - await client.zadd(this.keys.repeat, nextMillis.toString(), repeatJobKey); - return this.Job.create(this, name, data, mergedOpts); } @@ -146,29 +148,56 @@ export class Repeat extends QueueBase { repeat: RepeatOptions, jobId?: string, ): Promise { - const repeatJobKey = getRepeatKey(name, { ...repeat, jobId }); - const repeatJobId = this.getRepeatJobId({ + const qualifiedName = getRepeatCocatOptions(name, { ...repeat, jobId }); + const repeatJobKey = repeat.key ?? this.hash(qualifiedName); + const legacyRepeatJobId = this.getRepeatJobId({ name, nextMillis: '', - namespace: this.hash(repeatJobKey), + namespace: this.hash(qualifiedName), jobId: jobId ?? repeat.jobId, key: repeat.key, }); - return this.scripts.removeRepeatable(repeatJobId, repeatJobKey); + return this.scripts.removeRepeatable( + legacyRepeatJobId, + qualifiedName, + repeatJobKey, + ); } async removeRepeatableByKey(repeatJobKey: string): Promise { const data = this.keyToData(repeatJobKey); - const repeatJobId = this.getRepeatJobId({ + const legacyRepeatJobId = this.getRepeatJobId({ name: data.name, nextMillis: '', namespace: this.hash(repeatJobKey), jobId: data.id, }); - return this.scripts.removeRepeatable(repeatJobId, repeatJobKey); + return this.scripts.removeRepeatable(legacyRepeatJobId, '', repeatJobKey); + } + + private async getRepeatableData( + client: RedisClient, + key: string, + next?: number, + ): Promise { + const jobData = await client.hgetall(this.toKey('repeat:' + key)); + + if (jobData) { + return { + key, + name: jobData.name, + endDate: parseInt(jobData.endDate) || null, + tz: jobData.tz || null, + pattern: jobData.pattern || null, + every: jobData.every || null, + next, + }; + } + + return this.keyToData(key, next); } private keyToData(key: string, next?: number): RepeatableJob { @@ -200,9 +229,11 @@ export class Repeat extends QueueBase { const jobs = []; for (let i = 0; i < result.length; i += 2) { - jobs.push(this.keyToData(result[i], parseInt(result[i + 1]))); + jobs.push( + this.getRepeatableData(client, result[i], parseInt(result[i + 1])), + ); } - return jobs; + return Promise.all(jobs); } async getRepeatableCount(): Promise { @@ -214,6 +245,16 @@ export class Repeat extends QueueBase { return createHash(this.repeatKeyHashAlgorithm).update(str).digest('hex'); } + private getRepeatDelayedJobId({ + nextMillis, + customKey, + }: { + customKey: string; + nextMillis: number | string; + }) { + return `repeat:${customKey}:${nextMillis}`; + } + private getRepeatJobId({ name, nextMillis, @@ -229,12 +270,10 @@ export class Repeat extends QueueBase { }) { const checksum = key ?? this.hash(`${name}${jobId || ''}${namespace}`); return `repeat:${checksum}:${nextMillis}`; - // return `repeat:${jobId || ''}:${name}:${namespace}:${nextMillis}`; - //return `repeat:${name}:${namespace}:${nextMillis}`; } } -function getRepeatKey(name: string, repeat: RepeatOptions) { +function getRepeatCocatOptions(name: string, repeat: RepeatOptions) { const endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : ''; const tz = repeat.tz || ''; const pattern = repeat.pattern; diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index cdb215839f..25e8fadac7 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -23,6 +23,7 @@ import { WorkerOptions, KeepJobs, MoveToDelayedOpts, + RepeatableOptions, } from '../interfaces'; import { JobState, @@ -259,25 +260,78 @@ export class Scripts { return (client).pause(args); } + protected addRepeatableJobArgs( + customKey: string, + nextMillis: number, + opts: RepeatableOptions, + legacyCustomKey: string, + skipCheckExists: boolean, + ): (string | number | Buffer)[] { + const keys: (string | number | Buffer)[] = [ + this.queue.keys.repeat, + customKey, + ]; + + const args = [ + nextMillis, + pack(opts), + legacyCustomKey, + skipCheckExists ? '1' : '0', + ]; + + return keys.concat(args); + } + + async addRepeatableJob( + customKey: string, + nextMillis: number, + opts: RepeatableOptions, + legacyCustomKey: string, + skipCheckExists: boolean, + ): Promise { + const client = await this.queue.client; + + const args = this.addRepeatableJobArgs( + customKey, + nextMillis, + opts, + legacyCustomKey, + skipCheckExists, + ); + + return (client).addRepeatableJob(args); + } + private removeRepeatableArgs( - repeatJobId: string, + legacyRepeatJobId: string, + qualifiedName: string, repeatJobKey: string, ): string[] { const queueKeys = this.queue.keys; const keys = [queueKeys.repeat, queueKeys.delayed]; - const args = [repeatJobId, repeatJobKey, queueKeys['']]; + const args = [ + legacyRepeatJobId, + qualifiedName, + repeatJobKey, + queueKeys[''], + ]; return keys.concat(args); } async removeRepeatable( - repeatJobId: string, + legacyRepeatJobId: string, + qualifiedName: string, repeatJobKey: string, ): Promise { const client = await this.queue.client; - const args = this.removeRepeatableArgs(repeatJobId, repeatJobKey); + const args = this.removeRepeatableArgs( + legacyRepeatJobId, + qualifiedName, + repeatJobKey, + ); return (client).removeRepeatable(args); } diff --git a/src/commands/addRepeatableJob-2.lua b/src/commands/addRepeatableJob-2.lua new file mode 100644 index 0000000000..d97126e997 --- /dev/null +++ b/src/commands/addRepeatableJob-2.lua @@ -0,0 +1,69 @@ +--[[ + Adds a repeatable job + + Input: + KEYS[1] 'repeat' key + KEYS[2] custom key + + ARGV[1] next milliseconds + ARGV[2] msgpacked options + [1] name + [2] tz? + [3] patten? + [4] endDate? + [5] every? + ARGV[3] legacy custom key TODO: remove this logic in next breaking change + ARGV[4] skipCheckExists + + Output: + repeatableKey - OK +]] +local rcall = redis.call +local repeatKey = KEYS[1] +local customKey = KEYS[2] +local legacyCustomKey = ARGV[3] +local nextMilli = ARGV[1] + +local function storeRepeatableJob(repeatKey, customKey, nextMilli, rawOpts) + rcall("ZADD", repeatKey, nextMilli, customKey) + local opts = cmsgpack.unpack(rawOpts) + + local optionalValues = {} + if opts['tz'] then + table.insert(optionalValues, "tz") + table.insert(optionalValues, opts['tz']) + end + + if opts['pattern'] then + table.insert(optionalValues, "pattern") + table.insert(optionalValues, opts['pattern']) + end + + if opts['endDate'] then + table.insert(optionalValues, "endDate") + table.insert(optionalValues, opts['endDate']) + end + + if opts['every'] then + table.insert(optionalValues, "every") + table.insert(optionalValues, opts['every']) + end + + rcall("HMSET", repeatKey .. ":" .. customKey, "name", opts['name'], + unpack(optionalValues)) + + return customKey +end + +if ARGV[4] == '0' then + if rcall("ZSCORE", repeatKey, legacyCustomKey) ~= false then + rcall("ZADD", repeatKey, nextMilli, legacyCustomKey) + return legacyCustomKey + elseif rcall("ZSCORE", repeatKey, customKey) ~= false then + return storeRepeatableJob(repeatKey, customKey, nextMilli, ARGV[2]) + end +else + return storeRepeatableJob(repeatKey, customKey, nextMilli, ARGV[2]) +end + +return '' \ No newline at end of file diff --git a/src/commands/obliterate-2.lua b/src/commands/obliterate-2.lua index 085d968399..1a7be36393 100644 --- a/src/commands/obliterate-2.lua +++ b/src/commands/obliterate-2.lua @@ -1,5 +1,12 @@ --[[ Completely obliterates a queue and all of its contents + This command completely destroys a queue including all of its jobs, current or past + leaving no trace of its existence. Since this script needs to iterate to find all the job + keys, consider that this call may be slow for very large queues. + + The queue needs to be "paused" or it will return an error + If the queue has currently active jobs then the script by default will return error, + however this behaviour can be overrided using the 'force' option. Input: KEYS[1] meta @@ -9,13 +16,6 @@ ARGV[2] force ]] --- This command completely destroys a queue including all of its jobs, current or past --- leaving no trace of its existence. Since this script needs to iterate to find all the job --- keys, consider that this call may be slow for very large queues. - --- The queue needs to be "paused" or it will return an error --- If the queue has currently active jobs then the script by default will return error, --- however this behaviour can be overrided using the 'force' option. local maxCount = tonumber(ARGV[1]) local baseKey = KEYS[2] @@ -59,6 +59,22 @@ if(maxCount <= 0) then return 1 end +local repeatKey = baseKey .. 'repeat' +local repeatJobsIds = getZSetItems(repeatKey, maxCount) +for i, key in ipairs(repeatJobsIds) do + local jobKey = repeatKey .. ":" .. key + rcall("DEL", jobKey) +end +if(#repeatJobsIds > 0) then + for from, to in batches(#repeatJobsIds, 7000) do + rcall("ZREM", repeatKey, unpack(repeatJobsIds, from, to)) + end +end +maxCount = maxCount - #repeatJobsIds +if(maxCount <= 0) then + return 1 +end + local completedKey = baseKey .. 'completed' maxCount = removeZSetJobs(completedKey, true, baseKey, maxCount) if(maxCount <= 0) then @@ -92,7 +108,6 @@ if(maxCount > 0) then baseKey .. 'id', baseKey .. 'pc', baseKey .. 'meta', - baseKey .. 'repeat', baseKey .. 'metrics:completed', baseKey .. 'metrics:completed:data', baseKey .. 'metrics:failed', diff --git a/src/commands/removeRepeatable-2.lua b/src/commands/removeRepeatable-2.lua index e69ca8fb25..05a0bd21d8 100644 --- a/src/commands/removeRepeatable-2.lua +++ b/src/commands/removeRepeatable-2.lua @@ -5,9 +5,10 @@ KEYS[1] repeat jobs key KEYS[2] delayed jobs key - ARGV[1] repeat job id - ARGV[2] repeat job key - ARGV[3] queue key + ARGV[1] old repeat job id + ARGV[2] options concat + ARGV[3] repeat job key + ARGV[4] prefix key Output: 0 - OK @@ -19,12 +20,16 @@ local rcall = redis.call local millis = rcall("ZSCORE", KEYS[1], ARGV[2]) -if(millis) then +-- Includes +--- @include "includes/removeJobKeys" + +-- legacy removal TODO: remove in next breaking change +if millis then -- Delete next programmed job. local repeatJobId = ARGV[1] .. millis if(rcall("ZREM", KEYS[2], repeatJobId) == 1) then - rcall("DEL", ARGV[3] .. repeatJobId) - rcall("XADD", ARGV[3] .. "events", "*", "event", "removed", "jobId", repeatJobId, "prev", "delayed"); + removeJobKeys(ARGV[4] .. repeatJobId) + rcall("XADD", ARGV[4] .. "events", "*", "event", "removed", "jobId", repeatJobId, "prev", "delayed"); end end @@ -32,4 +37,20 @@ if(rcall("ZREM", KEYS[1], ARGV[2]) == 1) then return 0 end +-- new removal +millis = rcall("ZSCORE", KEYS[1], ARGV[3]) + +if millis then + -- Delete next programmed job. + local repeatJobId = "repeat:" .. ARGV[3] .. ":" .. millis + if(rcall("ZREM", KEYS[2], repeatJobId) == 1) then + removeJobKeys(ARGV[4] .. repeatJobId) + rcall("XADD", ARGV[4] .. "events", "*", "event", "removed", "jobId", repeatJobId, "prev", "delayed"); + end +end + +if(rcall("ZREM", KEYS[1], ARGV[3]) == 1) then + return 0 +end + return 1 diff --git a/src/interfaces/index.ts b/src/interfaces/index.ts index ace81ce3ff..533fa81b31 100644 --- a/src/interfaces/index.ts +++ b/src/interfaces/index.ts @@ -17,6 +17,7 @@ export * from './rate-limiter-options'; export * from './redis-options'; export * from './redis-streams'; export * from './repeatable-job'; +export * from './repeatable-options'; export * from './repeat-options'; export * from './sandboxed-job-processor'; export * from './sandboxed-job'; diff --git a/src/interfaces/repeatable-job.ts b/src/interfaces/repeatable-job.ts index 1e1e45bd03..e5a2963bf7 100644 --- a/src/interfaces/repeatable-job.ts +++ b/src/interfaces/repeatable-job.ts @@ -1,9 +1,10 @@ export type RepeatableJob = { key: string; name: string; - id: string | null; + id?: string | null; endDate: number | null; tz: string | null; - pattern: string; + pattern: string | null; + every?: string | null; next: number; }; diff --git a/src/interfaces/repeatable-options.ts b/src/interfaces/repeatable-options.ts new file mode 100644 index 0000000000..6b2da11583 --- /dev/null +++ b/src/interfaces/repeatable-options.ts @@ -0,0 +1,7 @@ +export type RepeatableOptions = { + name: string; + endDate?: number; + tz?: string; + pattern?: string; + every?: number; +}; diff --git a/tests/test_repeat.ts b/tests/test_repeat.ts index 54369a1690..6de28e2f32 100644 --- a/tests/test_repeat.ts +++ b/tests/test_repeat.ts @@ -184,17 +184,32 @@ describe('repeat', function () { delayStub.restore(); }); - it('should create multiple jobs if they have the same cron pattern and different name', async function () { - const cron = '*/10 * * * * *'; + describe('when jobs have the same cron pattern and different name', function () { + it('should create multiple jobs', async function () { + const cron = '*/10 * * * * *'; - await Promise.all([ - queue.add('test1', {}, { repeat: { pattern: cron } }), - queue.add('test2', {}, { repeat: { pattern: cron } }), - queue.add('test3', {}, { repeat: { pattern: cron } }), - ]); + await Promise.all([ + queue.add('test1', {}, { repeat: { pattern: cron } }), + queue.add('test2', {}, { repeat: { pattern: cron } }), + queue.add('test3', {}, { repeat: { pattern: cron } }), + ]); - const count = await queue.count(); - expect(count).to.be.eql(3); + const count = await queue.count(); + expect(count).to.be.eql(3); + }); + }); + + describe('when jobs have same key and different every pattern', function () { + it('should create only one repeatable job', async function () { + await Promise.all([ + queue.add('test1', {}, { repeat: { every: 1000, key: 'test' } }), + queue.add('test2', {}, { repeat: { every: 2000, key: 'test' } }), + queue.add('test3', {}, { repeat: { every: 3000, key: 'test' } }), + ]); + + const repeatableJobs = await queue.getRepeatableJobs(); + expect(repeatableJobs.length).to.be.eql(1); + }); }); it('should get repeatable jobs with different cron pattern', async function () { @@ -239,40 +254,49 @@ describe('repeat', function () { .to.be.and.an('array') .and.have.length(5) .and.to.deep.include({ - key: 'first::12345::10 * * * * *', + key: '81e7865a899dddf47c3ad19649304bac', name: 'first', - id: null, endDate: 12345, tz: null, pattern: '10 * * * * *', + every: null, next: 10000, }) .and.to.deep.include({ - key: 'second::610000::2 10 * * * *', + key: '47f7425312b6adf8db58ebd37c7ad8be', name: 'second', - id: null, endDate: 610000, tz: null, pattern: '2 10 * * * *', + every: null, next: 602000, }) .and.to.deep.include({ - key: 'fourth:::Africa/Accra:2 * * 4 * *', + key: 'f1e05411209310794fb4b34ec2a8df6b', name: 'fourth', - id: null, endDate: null, tz: 'Africa/Accra', pattern: '2 * * 4 * *', + every: null, next: 259202000, }) .and.to.deep.include({ - key: 'third:::Africa/Abidjan:1 * * 5 * *', + key: 'd58b8d085ba529d423d59e220a813f82', name: 'third', - id: null, endDate: null, tz: 'Africa/Abidjan', pattern: '1 * * 5 * *', + every: null, next: 345601000, + }) + .and.to.deep.include({ + key: 'e891826d68ad4ffbd7243b7f98d88614', + name: 'fifth', + endDate: null, + tz: 'Europa/Copenhaguen', + pattern: null, + every: '5000', + next: 5000, }); }); @@ -434,76 +458,78 @@ describe('repeat', function () { delayStub.restore(); }); - it('should remove repeated job when using removeOnComplete', async function () { - this.timeout(10000); - const queueName2 = `test-${v4()}`; - const queue2 = new Queue(queueName2, { - connection, - prefix, - defaultJobOptions: { - removeOnComplete: true, - }, - }); + describe('when using removeOnComplete', function () { + it('should remove repeated job', async function () { + this.timeout(10000); + const queueName2 = `test-${v4()}`; + const queue2 = new Queue(queueName2, { + connection, + prefix, + defaultJobOptions: { + removeOnComplete: true, + }, + }); - const date = new Date('2017-02-07 9:24:00'); - this.clock.setSystemTime(date); - const nextTick = 2 * ONE_SECOND + 500; - const delay = 5 * ONE_SECOND + 500; + const date = new Date('2017-02-07 9:24:00'); + this.clock.setSystemTime(date); + const nextTick = 2 * ONE_SECOND + 500; + const delay = 5 * ONE_SECOND + 500; - const worker = new Worker( - queueName, - async () => { - this.clock.tick(nextTick); - }, - { autorun: false, connection, prefix }, - ); - const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); + const worker = new Worker( + queueName, + async () => { + this.clock.tick(nextTick); + }, + { autorun: false, connection, prefix }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {}); - await queue.add( - 'test', - { foo: 'bar' }, - { - repeat: { - pattern: '*/2 * * * * *', - startDate: new Date('2017-02-07 9:24:05'), + await queue.add( + 'test', + { foo: 'bar' }, + { + repeat: { + pattern: '*/2 * * * * *', + startDate: new Date('2017-02-07 9:24:05'), + }, }, - }, - ); + ); - this.clock.tick(nextTick + delay); + this.clock.tick(nextTick + delay); - let prev: Job; - let counter = 0; + let prev: Job; + let counter = 0; - const completing = new Promise((resolve, reject) => { - worker.on('completed', async job => { - if (prev) { - expect(prev.timestamp).to.be.lt(job.timestamp); - expect(job.timestamp - prev.timestamp).to.be.gte(2000); - } - prev = job; - counter++; - if (counter == 5) { - const counts = await queue2.getJobCounts('completed'); - expect(counts.completed).to.be.equal(0); - resolve(); - } + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + if (prev) { + expect(prev.timestamp).to.be.lt(job.timestamp); + expect(job.timestamp - prev.timestamp).to.be.gte(2000); + } + prev = job; + counter++; + if (counter == 5) { + const counts = await queue2.getJobCounts('completed'); + expect(counts.completed).to.be.equal(0); + resolve(); + } + }); }); - }); - worker.run(); + worker.run(); - await completing; + await completing; - await queue2.close(); - await worker.close(); - await removeAllQueueData(new IORedis(redisHost), queueName2); - delayStub.restore(); + await queue2.close(); + await worker.close(); + await removeAllQueueData(new IORedis(redisHost), queueName2); + delayStub.restore(); + }); }); describe('when custom cron strategy is provided', function () { it('should repeat every 2 seconds', async function () { - this.timeout(20000); + this.timeout(15000); const settings = { repeatStrategy: (millis, opts) => { const currentDate = @@ -703,7 +729,7 @@ describe('repeat', function () { }); it('should have repeatable job key with sha256 hashing when sha256 hash algorithm is provided', async function () { - this.timeout(20000); + this.timeout(15000); const settings = { repeatKeyHashAlgorithm: 'sha256', }; @@ -733,11 +759,17 @@ describe('repeat', function () { ); const keyPrefix = getRepeatableJobKeyPrefix(prefix, queueName); - const jobsRedisKeys = await new IORedis(redisHost).keys(`${keyPrefix}*`); - expect(jobsRedisKeys.length).to.be.equal(1); + const client = await worker.client; + + const jobsRedisKeys = await client.keys(`${keyPrefix}*`); + expect(jobsRedisKeys.length).to.be.equal(2); const actualHashedRepeatableJobKey = - extractRepeatableJobChecksumFromRedisKey(jobsRedisKeys[0]); + extractRepeatableJobChecksumFromRedisKey( + jobsRedisKeys[0].length > jobsRedisKeys[1].length + ? jobsRedisKeys[1] + : jobsRedisKeys[0], + ); const expectedRawKey = createRepeatableJobKey( jobName, jobId, @@ -746,10 +778,8 @@ describe('repeat', function () { suffix, ); const expectedRepeatJobIdCheckum = getRepeatJobIdCheckum( - jobName, expectedRawKey, settings.repeatKeyHashAlgorithm, - jobId, ); expect(actualHashedRepeatableJobKey).to.be.equal( @@ -1011,7 +1041,7 @@ describe('repeat', function () { }); it('should repeat 7:th day every month at 9:25', async function () { - this.timeout(18000); + this.timeout(15000); const date = new Date('2017-02-02 7:21:42'); this.clock.setSystemTime(date); @@ -1561,7 +1591,7 @@ describe('repeat', function () { queueEvents.on('waiting', function ({ jobId }) { try { expect(jobId).to.be.equal( - `repeat:c602b9b36e4beddd9e7db39a3ef2ea4c:${ + `repeat:16db7a9b166154f5c636abf3c8fe3364:${ date.getTime() + 1 * ONE_SECOND }`, ); diff --git a/tests/utils/repeat_utils.ts b/tests/utils/repeat_utils.ts index cef9e72534..eabb8e5d47 100644 --- a/tests/utils/repeat_utils.ts +++ b/tests/utils/repeat_utils.ts @@ -25,11 +25,8 @@ export function hash(repeatKeyHashAlgorithm: string, payload: string) { } export function getRepeatJobIdCheckum( - name: string, repeatJobKey: string, repeatKeyHashAlgorithm: string, - jobId?: string, ) { - const namespace = hash(repeatKeyHashAlgorithm, repeatJobKey); - return hash(repeatKeyHashAlgorithm, `${name}${jobId || ''}${namespace}`); + return hash(repeatKeyHashAlgorithm, repeatJobKey); }