Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): enhance getJobSchedulers method to include template information #2956

Merged
merged 7 commits into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 26 additions & 29 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { Job } from './job';
import { QueueBase } from './queue-base';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';
import { optsAsJSON, optsFromJSON } from '../utils';
import { array2obj, optsAsJSON, optsFromJSON } from '../utils';

export class JobScheduler extends QueueBase {
private repeatStrategy: RepeatStrategy;
Expand Down Expand Up @@ -202,13 +202,21 @@ export class JobScheduler extends QueueBase {
return this.scripts.removeJobScheduler(jobSchedulerId);
}

private async getSchedulerData(
private async getSchedulerData<D>(
client: RedisClient,
key: string,
next?: number,
): Promise<JobSchedulerJson> {
): Promise<JobSchedulerJson<D>> {
const jobData = await client.hgetall(this.toKey('repeat:' + key));

return this.transformSchedulerData<D>(key, jobData, next);
}

private async transformSchedulerData<D>(
key: string,
jobData: any,
next?: number,
): Promise<JobSchedulerJson<D>> {
if (jobData) {
return {
key,
Expand All @@ -217,6 +225,11 @@ export class JobScheduler extends QueueBase {
tz: jobData.tz || null,
pattern: jobData.pattern || null,
every: jobData.every || null,
...(jobData.data || jobData.opts
? {
template: this.getTemplateFromJSON<D>(jobData.data, jobData.opts),
}
: {}),
next,
};
}
Expand All @@ -239,30 +252,14 @@ export class JobScheduler extends QueueBase {
};
}

async getJobScheduler<D = any>(id: string): Promise<JobSchedulerJson<D>> {
const client = await this.client;
const schedulerAttributes = await client.hgetall(
this.toKey('repeat:' + id),
);
async getScheduler<D = any>(id: string): Promise<JobSchedulerJson<D>> {
const [rawJobData, next] = await this.scripts.getJobScheduler(id);

if (schedulerAttributes) {
return {
key: id,
name: schedulerAttributes.name,
endDate: parseInt(schedulerAttributes.endDate) || null,
tz: schedulerAttributes.tz || null,
pattern: schedulerAttributes.pattern || null,
every: schedulerAttributes.every || null,
...(schedulerAttributes.data || schedulerAttributes.opts
? {
template: this.getTemplateFromJSON<D>(
schedulerAttributes.data,
schedulerAttributes.opts,
),
}
: {}),
};
}
return this.transformSchedulerData<D>(
id,
rawJobData ? array2obj(rawJobData) : null,
next ? parseInt(next) : null,
);
}

private getTemplateFromJSON<D = any>(
Expand All @@ -279,11 +276,11 @@ export class JobScheduler extends QueueBase {
return template;
}

async getJobSchedulers(
async getJobSchedulers<D = any>(
start = 0,
end = -1,
asc = false,
): Promise<JobSchedulerJson[]> {
): Promise<JobSchedulerJson<D>[]> {
const client = await this.client;
const jobSchedulersKey = this.keys.repeat;

Expand All @@ -294,7 +291,7 @@ export class JobScheduler extends QueueBase {
const jobs = [];
for (let i = 0; i < result.length; i += 2) {
jobs.push(
this.getSchedulerData(client, result[i], parseInt(result[i + 1])),
this.getSchedulerData<D>(client, result[i], parseInt(result[i + 1])),
);
}
return Promise.all(jobs);
Expand Down
10 changes: 7 additions & 3 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ export class Queue<
* @param id - identifier of scheduler.
*/
async getJobScheduler(id: string): Promise<JobSchedulerJson<DataType>> {
return (await this.jobScheduler).getJobScheduler(id);
return (await this.jobScheduler).getScheduler<DataType>(id);
}

/**
Expand All @@ -588,8 +588,12 @@ export class Queue<
start?: number,
end?: number,
asc?: boolean,
): Promise<RepeatableJob[]> {
return (await this.jobScheduler).getJobSchedulers(start, end, asc);
): Promise<JobSchedulerJson<DataType>[]> {
return (await this.jobScheduler).getJobSchedulers<DataType>(
start,
end,
asc,
);
}

/**
Expand Down
14 changes: 14 additions & 0 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,20 @@ export class Scripts {
]);
}

getJobSchedulerArgs(id: string): string[] {
const keys: string[] = [this.queue.keys.repeat];

return keys.concat([id]);
}

async getJobScheduler(id: string): Promise<[any, string | null]> {
const client = await this.queue.client;

const args = this.getJobSchedulerArgs(id);

return this.execCommand(client, 'getJobScheduler', args);
}

retryJobArgs(
jobId: string,
lifo: boolean,
Expand Down
19 changes: 19 additions & 0 deletions src/commands/getJobScheduler-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
--[[
Get job scheduler record.

Input:
KEYS[1] 'repeat' key

ARGV[1] id
]]

local rcall = redis.call
local jobSchedulerKey = KEYS[1] .. ":" .. ARGV[1]

local score = rcall("ZSCORE", KEYS[1], ARGV[1])

if score then
return {rcall("HGETALL", jobSchedulerKey), score} -- get job data
end

return {nil, nil}
26 changes: 0 additions & 26 deletions src/commands/updateJobOption-1.lua

This file was deleted.

16 changes: 8 additions & 8 deletions src/interfaces/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ export interface ContextManager<Context = any> {
/**
* Creates a new context and sets it as active for the fn passed as last argument
*
* @param context
* @param fn
* @param context -
* @param fn -
*/
with<A extends (...args: any[]) => any>(
context: Context,
Expand All @@ -54,16 +54,16 @@ export interface ContextManager<Context = any> {
* is the mechanism used to propagate the context across a distributed
* application.
*
* @param context
* @param context -
*/
getMetadata(context: Context): string;

/**
* Creates a new context from a serialized version effectively
* linking the new context to the parent context.
*
* @param activeContext
* @param metadata
* @param activeContext -
* @param metadata -
*/
fromMetadata(activeContext: Context, metadata: string): Context;
}
Expand All @@ -78,9 +78,9 @@ export interface Tracer<Context = any> {
* context. If the context is not provided, the current active context should be
* used.
*
* @param name
* @param options
* @param context
* @param name -
* @param options -
* @param context -
*/
startSpan(name: string, options?: SpanOptions, context?: Context): Span;
}
Expand Down
34 changes: 31 additions & 3 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ describe('Job Scheduler', function () {
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

const date = new Date('2017-02-07 9:24:00');
const date = new Date('2017-02-07T15:24:00.000Z');
this.clock.setSystemTime(date);

await queue.upsertJobScheduler(
Expand All @@ -360,6 +360,7 @@ describe('Job Scheduler', function () {
tz: null,
pattern: '*/2 * * * * *',
every: null,
next: 1486481042000,
template: {
data: {
foo: 'bar',
Expand Down Expand Up @@ -682,7 +683,7 @@ describe('Job Scheduler', function () {
);
const delayStub = sinon.stub(worker, 'delay').callsFake(async () => {});

const date = new Date('2017-02-07 9:24:00');
const date = new Date('2017-02-07T15:24:00.000Z');
this.clock.setSystemTime(date);

const repeat = {
Expand All @@ -698,6 +699,7 @@ describe('Job Scheduler', function () {
key: 'rrule',
name: 'rrule',
endDate: null,
next: 1486481042000,
tz: null,
pattern: 'RRULE:FREQ=SECONDLY;INTERVAL=2;WKST=MO',
every: null,
Expand Down Expand Up @@ -1424,8 +1426,11 @@ describe('Job Scheduler', function () {

describe('when repeatable job fails', function () {
it('should continue repeating', async function () {
const date = new Date('2017-02-07T15:24:00.000Z');
this.clock.setSystemTime(date);
const repeatOpts = {
pattern: '0 * 1 * *',
tz: 'Asia/Calcutta',
};

const worker = new Worker(
Expand All @@ -1445,7 +1450,11 @@ describe('Job Scheduler', function () {
});
});

const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts);
const repeatableJob = await queue.upsertJobScheduler('test', repeatOpts, {
name: 'a',
data: { foo: 'bar' },
opts: { priority: 1 },
});
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

Expand All @@ -1463,6 +1472,25 @@ describe('Job Scheduler', function () {
const count = await queue.count();
expect(count).to.be.equal(1);
expect(jobSchedulers).to.have.length(1);

expect(jobSchedulers[0]).to.deep.equal({
key: 'test',
name: 'a',
endDate: null,
tz: 'Asia/Calcutta',
pattern: '0 * 1 * *',
every: null,
next: 1488310200000,
template: {
data: {
foo: 'bar',
},
opts: {
priority: 1,
},
},
});

await worker.close();
});

Expand Down
2 changes: 1 addition & 1 deletion tests/test_metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('metrics', function () {
});

beforeEach(function () {
this.clock = sinon.useFakeTimers();
this.clock = sinon.useFakeTimers({ shouldClearNativeTimers: true });
});

beforeEach(async function () {
Expand Down
Loading