Skip to content

Commit

Permalink
feat(queue): enhance getJobScheduler method to include template infor…
Browse files Browse the repository at this point in the history
…mation (#2929) ref #2875
  • Loading branch information
roggervalf authored Dec 2, 2024
1 parent c94e2bd commit cb99080
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 68 deletions.
56 changes: 37 additions & 19 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,11 @@ import { parseExpression } from 'cron-parser';
import { RedisClient, RepeatBaseOptions, RepeatOptions } from '../interfaces';
import { JobsOptions, RepeatStrategy } from '../types';
import { Job } from './job';
import { JobSchedulerJson, JobSchedulerTemplateJson } from '../interfaces';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';

export interface JobSchedulerJson {
key: string; // key is actually the job scheduler id
name: string;
id?: string | null;
endDate: number | null;
tz: string | null;
pattern: string | null;
every?: string | null;
next?: number;
}
import { optsAsJSON, optsFromJSON } from '../utils';

export class JobScheduler extends QueueBase {
private repeatStrategy: RepeatStrategy;
Expand Down Expand Up @@ -103,6 +94,8 @@ export class JobScheduler extends QueueBase {
(<unknown>multi) as RedisClient,
jobSchedulerId,
nextMillis,
JSON.stringify(typeof jobData === 'undefined' ? {} : jobData),
optsAsJSON(opts),
{
name: jobName,
endDate: endDate ? new Date(endDate).getTime() : undefined,
Expand Down Expand Up @@ -241,22 +234,47 @@ export class JobScheduler extends QueueBase {
};
}

async getJobScheduler(id: string): Promise<JobSchedulerJson> {
async getJobScheduler<D = any>(id: string): Promise<JobSchedulerJson<D>> {
const client = await this.client;
const jobData = await client.hgetall(this.toKey('repeat:' + id));
const schedulerAttributes = await client.hgetall(
this.toKey('repeat:' + id),
);

if (jobData) {
if (schedulerAttributes) {
return {
key: id,
name: jobData.name,
endDate: parseInt(jobData.endDate) || null,
tz: jobData.tz || null,
pattern: jobData.pattern || null,
every: jobData.every || null,
name: schedulerAttributes.name,
endDate: parseInt(schedulerAttributes.endDate) || null,
tz: schedulerAttributes.tz || null,
pattern: schedulerAttributes.pattern || null,
every: schedulerAttributes.every || null,
...(schedulerAttributes.data || schedulerAttributes.opts
? {
template: this.getTemplateFromJSON<D>(
schedulerAttributes.data,
schedulerAttributes.opts,
),
}
: {}),
};
}
}

private getTemplateFromJSON<D = any>(
rawData?: string,
rawOpts?: string,
): JobSchedulerTemplateJson<D> {
console.log(typeof rawOpts);
const template: JobSchedulerTemplateJson<D> = {};
if (rawData) {
template.data = JSON.parse(rawData);
}
if (rawOpts) {
template.opts = optsFromJSON(rawOpts);
}
return template;
}

async getJobSchedulers(
start = 0,
end = -1,
Expand Down
45 changes: 4 additions & 41 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import {
parseObjectValues,
tryCatch,
removeUndefinedFields,
optsAsJSON,
optsFromJSON,
} from '../utils';
import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
Expand Down Expand Up @@ -324,7 +326,7 @@ export class Job<
jobId?: string,
): Job<T, R, N> {
const data = JSON.parse(json.data || '{}');
const opts = Job.optsFromJSON(json.opts);
const opts = optsFromJSON(json.opts);

const job = new this<T, R, N>(
queue,
Expand Down Expand Up @@ -388,27 +390,6 @@ export class Job<
this.scripts = new Scripts(this.queue);
}

private static optsFromJSON(rawOpts?: string): JobsOptions {
const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;

const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsDecodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsDecodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as JobsOptions;
}

/**
* Fetches a Job from the queue given the passed job id.
*
Expand Down Expand Up @@ -469,7 +450,7 @@ export class Job<
id: this.id,
name: this.name,
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data),
opts: removeUndefinedFields<RedisJobOptions>(this.optsAsJSON(this.opts)),
opts: optsAsJSON(this.opts),
parent: this.parent ? { ...this.parent } : undefined,
parentKey: this.parentKey,
progress: this.progress,
Expand All @@ -487,24 +468,6 @@ export class Job<
});
}

private optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<
[keyof JobsOptions, any]
>;
const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsEncodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsEncodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as RedisJobOptions;
}

/**
* Prepares a job to be passed to Sandbox.
* @returns
Expand Down
3 changes: 2 additions & 1 deletion src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BaseJobOptions,
BulkJobOptions,
IoredisListener,
JobSchedulerJson,
QueueOptions,
RepeatableJob,
RepeatOptions,
Expand Down Expand Up @@ -571,7 +572,7 @@ export class Queue<
*
* @param id - identifier of scheduler.
*/
async getJobScheduler(id: string): Promise<RepeatableJob> {
async getJobScheduler(id: string): Promise<JobSchedulerJson<DataType>> {
return (await this.jobScheduler).getJobScheduler(id);
}

Expand Down
12 changes: 11 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ export class Scripts {
client: RedisClient,
jobSchedulerId: string,
nextMillis: number,
templateData: string,
templateOpts: RedisJobOptions,
opts: RepeatableOptions,
): Promise<string> {
const queueKeys = this.queue.keys;
Expand All @@ -319,7 +321,15 @@ export class Scripts {
queueKeys.repeat,
queueKeys.delayed,
];
const args = [nextMillis, pack(opts), jobSchedulerId, queueKeys['']];

const args = [
nextMillis,
pack(opts),
jobSchedulerId,
templateData,
pack(templateOpts),
queueKeys[''],
];
return this.execCommand(client, 'addJobScheduler', keys.concat(args));
}

Expand Down
26 changes: 20 additions & 6 deletions src/commands/addJobScheduler-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
[4] endDate?
[5] every?
ARGV[3] jobs scheduler id
ARGV[4] prefix key
ARGV[4] Json stringified template data
ARGV[5] mspacked template opts
ARGV[6] prefix key
Output:
repeatableKey - OK
Expand All @@ -24,13 +26,14 @@ local delayedKey = KEYS[2]

local nextMillis = ARGV[1]
local jobSchedulerId = ARGV[3]
local prefixKey = ARGV[4]
local templateOpts = cmsgpack.unpack(ARGV[5])
local prefixKey = ARGV[6]

-- Includes
--- @include "includes/removeJob"

local function storeRepeatableJob(repeatKey, nextMillis, rawOpts)
rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)
local function storeRepeatableJob(schedulerId, repeatKey, nextMillis, rawOpts, templateData, templateOpts)
rcall("ZADD", repeatKey, nextMillis, schedulerId)
local opts = cmsgpack.unpack(rawOpts)

local optionalValues = {}
Expand All @@ -54,7 +57,18 @@ local function storeRepeatableJob(repeatKey, nextMillis, rawOpts)
table.insert(optionalValues, opts['every'])
end

rcall("HMSET", repeatKey .. ":" .. jobSchedulerId, "name", opts['name'],
local jsonTemplateOpts = cjson.encode(templateOpts)
if jsonTemplateOpts and jsonTemplateOpts ~= '{}' then
table.insert(optionalValues, "opts")
table.insert(optionalValues, jsonTemplateOpts)
end

if templateData and templateData ~= '{}' then
table.insert(optionalValues, "data")
table.insert(optionalValues, templateData)
end

rcall("HMSET", repeatKey .. ":" .. schedulerId, "name", opts['name'],
unpack(optionalValues))
end

Expand All @@ -74,4 +88,4 @@ if prevMillis ~= false then
end
end

return storeRepeatableJob(repeatKey, nextMillis, ARGV[2])
return storeRepeatableJob(jobSchedulerId, repeatKey, nextMillis, ARGV[2], ARGV[4], templateOpts)
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export * from './debounce-options';
export * from './flow-job';
export * from './ioredis-events';
export * from './job-json';
export * from './job-scheduler-json';
export * from './keep-jobs';
export * from './metrics-options';
export * from './metrics';
Expand Down
18 changes: 18 additions & 0 deletions src/interfaces/job-scheduler-json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { JobsOptions } from '../types';

export interface JobSchedulerTemplateJson<D = any> {
data?: D;
opts?: Omit<JobsOptions, 'jobId' | 'repeat' | 'delay'>;
}

export interface JobSchedulerJson<D = any> {
key: string; // key is actually the job scheduler id
name: string;
id?: string | null;
endDate: number | null;
tz: string | null;
pattern: string | null;
every?: string | null;
next?: number;
template?: JobSchedulerTemplateJson<D>;
}
52 changes: 52 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { EventEmitter } from 'events';
import * as semver from 'semver';

import { SpanKind, TelemetryAttributes } from './enums';
import { JobsOptions, RedisJobOptions } from './types';

export const errorObject: { [index: string]: any } = { value: null };

Expand Down Expand Up @@ -270,6 +271,57 @@ export const toString = (value: any): string => {

export const QUEUE_EVENT_SUFFIX = ':qe';

const optsDecodeMap = {
de: 'deduplication',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
tm: 'telemetryMetadata',
};

const optsEncodeMap = invertObject(optsDecodeMap);
optsEncodeMap.debounce = 'de';

export function optsAsJSON(opts: JobsOptions = {}): RedisJobOptions {
const optionEntries = Object.entries(opts) as Array<[keyof JobsOptions, any]>;
const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if (value !== undefined) {
if ((optsEncodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsEncodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}
}

return options as RedisJobOptions;
}

export function optsFromJSON(rawOpts?: string): JobsOptions {
const opts = JSON.parse(rawOpts || '{}');

const optionEntries = Object.entries(opts) as Array<
[keyof RedisJobOptions, any]
>;

const options: Partial<Record<string, any>> = {};
for (const item of optionEntries) {
const [attributeName, value] = item;
if ((optsDecodeMap as Record<string, any>)[<string>attributeName]) {
options[(optsDecodeMap as Record<string, any>)[<string>attributeName]] =
value;
} else {
options[<string>attributeName] = value;
}
}

return options as JobsOptions;
}

export function removeUndefinedFields<T extends Record<string, any>>(
obj: Record<string, any>,
) {
Expand Down
16 changes: 16 additions & 0 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ describe('Job Scheduler', function () {
tz: null,
pattern: '*/2 * * * * *',
every: null,
template: {
data: {
foo: 'bar',
},
},
});

this.clock.tick(nextTick);
Expand Down Expand Up @@ -687,6 +692,17 @@ describe('Job Scheduler', function () {
name: 'rrule',
});

const scheduler = await queue.getJobScheduler('rrule');

expect(scheduler).to.deep.equal({
key: 'rrule',
name: 'rrule',
endDate: null,
tz: null,
pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO',
every: null,
});

this.clock.tick(nextTick);

let prev: any;
Expand Down

0 comments on commit cb99080

Please sign in to comment.