Skip to content

Commit

Permalink
feat(jobs): support for removing jobs and getting job info on queueing
Browse files Browse the repository at this point in the history
  • Loading branch information
adlerfaulkner committed Oct 3, 2023
1 parent 958e69a commit 13adaf2
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 21 deletions.
4 changes: 4 additions & 0 deletions src/jobs/JobInfo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface JobInfo {
id: string;
queue: string;
}
21 changes: 19 additions & 2 deletions src/jobs/adapter/BullQueueAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Queue, Job as BullJob } from 'bull';
import Bull from 'bull';
import { getLoggerFor } from '../../logging/LogUtil';
import type { Job } from '../Job';
import type { JobInfo } from '../JobInfo';
import type { JobOptions } from '../JobOptions';
import type { BullQueueProcessor } from '../processor/BullQueueProcessor';
import type { QueueAdapter } from './QueueAdapter';
Expand Down Expand Up @@ -102,7 +103,7 @@ export class BullQueueAdapter implements QueueAdapter {
jobName: string,
data: Record<string, any> = {},
overrideOptions: Partial<JobOptions> = {},
): Promise<void> {
): Promise<JobInfo> {
const job = this.jobs.find((jobIter): boolean => jobIter.name === jobName);
if (!job) {
throw new Error(`Job '${jobName}' is not defined`);
Expand All @@ -116,7 +117,15 @@ export class BullQueueAdapter implements QueueAdapter {
}

const bullOptions = this.jobOptionsToBullOptions(options);
await queue.add(jobName, data, bullOptions);
const queuedJob = await queue.add(jobName, data, bullOptions);
return this.bullJobToJobInfo(queuedJob);
}

private bullJobToJobInfo(queuedJob: BullJob): JobInfo {
return {
id: queuedJob.id as string,
queue: queuedJob.queue.name,
};
}

private jobOptionsToBullOptions(options: JobOptions): Bull.JobOptions {
Expand Down Expand Up @@ -167,6 +176,14 @@ export class BullQueueAdapter implements QueueAdapter {
}
}

public async removeJob(jobId: string, queueName: string): Promise<void> {
const queue = this.queues[queueName];
const job = await queue.getJob(jobId);
if (job) {
await job.remove();
}
}

public async deleteQueue(queueName: string): Promise<void> {
const queue = this.queues[queueName];
if (!queue) {
Expand Down
5 changes: 4 additions & 1 deletion src/jobs/adapter/QueueAdapter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Finalizable } from '../../init/finalize/Finalizable';
import type { JobInfo } from '../JobInfo';
import type { JobOptions } from '../JobOptions';

/**
Expand All @@ -9,7 +10,9 @@ export interface QueueAdapter extends Finalizable {
jobName: string,
data?: Record<string, any>,
overrideOptions?: Partial<JobOptions>,
) => Promise<void>;
) => Promise<JobInfo>;

removeJob: (jobId: string, queueName: string) => Promise<void>;

deleteQueue: (queueName: string) => Promise<void>;

Expand Down
14 changes: 10 additions & 4 deletions test/integration/ConfiguredJobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ describe('An http server with preconfigured jobs', (): void => {
let add: any;
let on: any;
let bullConstructor: any;
let registeredJobs: Record<string, (data: any) => Promise<void>>;
let registeredJobs: Record<string, any>;

beforeEach(async(): Promise<void> => {
registeredJobs = {};
process = jest.fn().mockImplementation(
(jobName: string, concurrency: number, processFn: (bullJob: any) => Promise<void>): void => {
registeredJobs[jobName] = processFn;
registeredJobs[jobName] = {
perform: processFn,
id: 'jobId',
queue: { name: 'default' },
};
},
);

add = jest.fn().mockImplementation(
async(jobName: string, data: any): Promise<void> => {
await registeredJobs[jobName]({ data });
async(jobName: string, data: any): Promise<any> => {
const job = registeredJobs[jobName];
await registeredJobs[jobName].perform({ data });
return job;
},
);

Expand Down
49 changes: 35 additions & 14 deletions test/unit/jobs/adapter/BullQueueAdapter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const today = Date.now();
const tomorrow = new Date(today + dayInMs);

describe('A BullQueueAdapter', (): void => {
const jobId = 'somejobid';
const queue = 'default';
let queues: Record<string, BullQueueSettings>;
let perform: Job['perform'];
Expand All @@ -21,6 +22,8 @@ describe('A BullQueueAdapter', (): void => {
let add: any;
let obliterate: any;
let getCompleted: any;
let getJob: any;
let bullJob: any;
const redisConfig = { port: 6379, host: '127.0.0.1' };
let adapter: BullQueueAdapter;
let queueProcessor: BullQueueProcessor;
Expand All @@ -34,15 +37,24 @@ describe('A BullQueueAdapter', (): void => {
});
job = { name: 'example', perform, options: { queue }};
jobs = [ job ];

close = jest.fn();
obliterate = jest.fn();
getCompleted = jest.fn();
add = jest.fn();
bullJob = { remove: jest.fn() };
getJob = jest.fn().mockResolvedValue(bullJob);
add = jest.fn().mockResolvedValue({ id: jobId, queue: { name: queue }});
queueProcessor = { processJobsOnQueues: jest.fn() } as any;

(Bull as jest.Mock).mockImplementation(
(name: string): Bull.Queue => ({ process, add, name, close, obliterate, getCompleted } as any),
(name: string): Bull.Queue => ({
process,
add,
name,
close,
obliterate,
getCompleted,
getJob,
} as any),
);
});

Expand Down Expand Up @@ -89,7 +101,7 @@ describe('A BullQueueAdapter', (): void => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
const data = { alpha: 1 };
await expect(adapter.performLater('example', data))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -99,7 +111,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job on a cron schedule.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { every: '5 4 * * *' }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -112,7 +124,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job on a cron schedule with a start time at a specific date.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { at: tomorrow, every: '5 4 * * *' }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -125,7 +137,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job on a cron schedule with a start time after a delay.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { in: 1000, every: '5 4 * * *' }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -138,7 +150,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job on a certain millisecond schedule.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { every: 1000 }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -151,7 +163,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job to be run at a specific date.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { at: tomorrow }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -164,7 +176,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job to be run after a delay.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { in: 1000 }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -177,7 +189,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job with an auto-retry backoff setting.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { retryAttempts: 3 }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -191,7 +203,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job with remove on complete setting disabled.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { disableRemoveOnComplete: true }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -203,7 +215,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job with a removeOnFail age.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { removeOnFailAge: 1000 }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -219,7 +231,7 @@ describe('A BullQueueAdapter', (): void => {
it('adds the job with a removeOnFail count.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.performLater('example', {}, { removeOnFailCount: 1000 }))
.resolves.toBeUndefined();
.resolves.toEqual({ id: jobId, queue });
expect(Bull).toHaveBeenCalledTimes(1);
expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}});
expect(add).toHaveBeenCalledTimes(1);
Expand All @@ -232,6 +244,15 @@ describe('A BullQueueAdapter', (): void => {
});
});

it('removes a job from a queue.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.removeJob('somejobid', 'default'))
.resolves.toBeUndefined();
expect(getJob).toHaveBeenCalledTimes(1);
expect(getJob).toHaveBeenCalledWith(jobId);
expect(bullJob.remove).toHaveBeenCalledTimes(1);
});

it('deletes a queue.', async(): Promise<void> => {
adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor);
await expect(adapter.deleteQueue('default')).resolves.toBeUndefined();
Expand Down

0 comments on commit 13adaf2

Please sign in to comment.