From 13adaf246e66ca1dfc136837df309a83b8bd848e Mon Sep 17 00:00:00 2001 From: Adler Faulkner Date: Mon, 2 Oct 2023 17:41:58 -0700 Subject: [PATCH] feat(jobs): support for removing jobs and getting job info on queueing --- src/jobs/JobInfo.ts | 4 ++ src/jobs/adapter/BullQueueAdapter.ts | 21 +++++++- src/jobs/adapter/QueueAdapter.ts | 5 +- test/integration/ConfiguredJobs.test.ts | 14 ++++-- .../jobs/adapter/BullQueueAdapter.test.ts | 49 +++++++++++++------ 5 files changed, 72 insertions(+), 21 deletions(-) create mode 100644 src/jobs/JobInfo.ts diff --git a/src/jobs/JobInfo.ts b/src/jobs/JobInfo.ts new file mode 100644 index 0000000..293148c --- /dev/null +++ b/src/jobs/JobInfo.ts @@ -0,0 +1,4 @@ +export interface JobInfo { + id: string; + queue: string; +} diff --git a/src/jobs/adapter/BullQueueAdapter.ts b/src/jobs/adapter/BullQueueAdapter.ts index 0fb093c..5f1a50c 100644 --- a/src/jobs/adapter/BullQueueAdapter.ts +++ b/src/jobs/adapter/BullQueueAdapter.ts @@ -3,6 +3,7 @@ import type { Queue, Job as BullJob } from 'bull'; import Bull from 'bull'; import { getLoggerFor } from '../../logging/LogUtil'; import type { Job } from '../Job'; +import type { JobInfo } from '../JobInfo'; import type { JobOptions } from '../JobOptions'; import type { BullQueueProcessor } from '../processor/BullQueueProcessor'; import type { QueueAdapter } from './QueueAdapter'; @@ -102,7 +103,7 @@ export class BullQueueAdapter implements QueueAdapter { jobName: string, data: Record = {}, overrideOptions: Partial = {}, - ): Promise { + ): Promise { const job = this.jobs.find((jobIter): boolean => jobIter.name === jobName); if (!job) { throw new Error(`Job '${jobName}' is not defined`); @@ -116,7 +117,15 @@ export class BullQueueAdapter implements QueueAdapter { } const bullOptions = this.jobOptionsToBullOptions(options); - await queue.add(jobName, data, bullOptions); + const queuedJob = await queue.add(jobName, data, bullOptions); + return this.bullJobToJobInfo(queuedJob); + } + + private bullJobToJobInfo(queuedJob: BullJob): JobInfo { + return { + id: queuedJob.id as string, + queue: queuedJob.queue.name, + }; } private jobOptionsToBullOptions(options: JobOptions): Bull.JobOptions { @@ -167,6 +176,14 @@ export class BullQueueAdapter implements QueueAdapter { } } + public async removeJob(jobId: string, queueName: string): Promise { + const queue = this.queues[queueName]; + const job = await queue.getJob(jobId); + if (job) { + await job.remove(); + } + } + public async deleteQueue(queueName: string): Promise { const queue = this.queues[queueName]; if (!queue) { diff --git a/src/jobs/adapter/QueueAdapter.ts b/src/jobs/adapter/QueueAdapter.ts index c4bb64d..e31aa6c 100644 --- a/src/jobs/adapter/QueueAdapter.ts +++ b/src/jobs/adapter/QueueAdapter.ts @@ -1,4 +1,5 @@ import type { Finalizable } from '../../init/finalize/Finalizable'; +import type { JobInfo } from '../JobInfo'; import type { JobOptions } from '../JobOptions'; /** @@ -9,7 +10,9 @@ export interface QueueAdapter extends Finalizable { jobName: string, data?: Record, overrideOptions?: Partial, - ) => Promise; + ) => Promise; + + removeJob: (jobId: string, queueName: string) => Promise; deleteQueue: (queueName: string) => Promise; diff --git a/test/integration/ConfiguredJobs.test.ts b/test/integration/ConfiguredJobs.test.ts index 4d14eb1..5a0c30d 100644 --- a/test/integration/ConfiguredJobs.test.ts +++ b/test/integration/ConfiguredJobs.test.ts @@ -19,19 +19,25 @@ describe('An http server with preconfigured jobs', (): void => { let add: any; let on: any; let bullConstructor: any; - let registeredJobs: Record Promise>; + let registeredJobs: Record; beforeEach(async(): Promise => { registeredJobs = {}; process = jest.fn().mockImplementation( (jobName: string, concurrency: number, processFn: (bullJob: any) => Promise): void => { - registeredJobs[jobName] = processFn; + registeredJobs[jobName] = { + perform: processFn, + id: 'jobId', + queue: { name: 'default' }, + }; }, ); add = jest.fn().mockImplementation( - async(jobName: string, data: any): Promise => { - await registeredJobs[jobName]({ data }); + async(jobName: string, data: any): Promise => { + const job = registeredJobs[jobName]; + await registeredJobs[jobName].perform({ data }); + return job; }, ); diff --git a/test/unit/jobs/adapter/BullQueueAdapter.test.ts b/test/unit/jobs/adapter/BullQueueAdapter.test.ts index 00595f8..c23144b 100644 --- a/test/unit/jobs/adapter/BullQueueAdapter.test.ts +++ b/test/unit/jobs/adapter/BullQueueAdapter.test.ts @@ -12,6 +12,7 @@ const today = Date.now(); const tomorrow = new Date(today + dayInMs); describe('A BullQueueAdapter', (): void => { + const jobId = 'somejobid'; const queue = 'default'; let queues: Record; let perform: Job['perform']; @@ -21,6 +22,8 @@ describe('A BullQueueAdapter', (): void => { let add: any; let obliterate: any; let getCompleted: any; + let getJob: any; + let bullJob: any; const redisConfig = { port: 6379, host: '127.0.0.1' }; let adapter: BullQueueAdapter; let queueProcessor: BullQueueProcessor; @@ -34,15 +37,24 @@ describe('A BullQueueAdapter', (): void => { }); job = { name: 'example', perform, options: { queue }}; jobs = [ job ]; - close = jest.fn(); obliterate = jest.fn(); getCompleted = jest.fn(); - add = jest.fn(); + bullJob = { remove: jest.fn() }; + getJob = jest.fn().mockResolvedValue(bullJob); + add = jest.fn().mockResolvedValue({ id: jobId, queue: { name: queue }}); queueProcessor = { processJobsOnQueues: jest.fn() } as any; (Bull as jest.Mock).mockImplementation( - (name: string): Bull.Queue => ({ process, add, name, close, obliterate, getCompleted } as any), + (name: string): Bull.Queue => ({ + process, + add, + name, + close, + obliterate, + getCompleted, + getJob, + } as any), ); }); @@ -89,7 +101,7 @@ describe('A BullQueueAdapter', (): void => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); const data = { alpha: 1 }; await expect(adapter.performLater('example', data)) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -99,7 +111,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job on a cron schedule.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { every: '5 4 * * *' })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -112,7 +124,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job on a cron schedule with a start time at a specific date.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { at: tomorrow, every: '5 4 * * *' })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -125,7 +137,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job on a cron schedule with a start time after a delay.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { in: 1000, every: '5 4 * * *' })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -138,7 +150,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job on a certain millisecond schedule.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { every: 1000 })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -151,7 +163,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job to be run at a specific date.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { at: tomorrow })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -164,7 +176,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job to be run after a delay.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { in: 1000 })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -177,7 +189,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job with an auto-retry backoff setting.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { retryAttempts: 3 })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -191,7 +203,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job with remove on complete setting disabled.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { disableRemoveOnComplete: true })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -203,7 +215,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job with a removeOnFail age.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { removeOnFailAge: 1000 })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -219,7 +231,7 @@ describe('A BullQueueAdapter', (): void => { it('adds the job with a removeOnFail count.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.performLater('example', {}, { removeOnFailCount: 1000 })) - .resolves.toBeUndefined(); + .resolves.toEqual({ id: jobId, queue }); expect(Bull).toHaveBeenCalledTimes(1); expect(Bull).toHaveBeenCalledWith('default', { redis: redisConfig, settings: {}}); expect(add).toHaveBeenCalledTimes(1); @@ -232,6 +244,15 @@ describe('A BullQueueAdapter', (): void => { }); }); + it('removes a job from a queue.', async(): Promise => { + adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); + await expect(adapter.removeJob('somejobid', 'default')) + .resolves.toBeUndefined(); + expect(getJob).toHaveBeenCalledTimes(1); + expect(getJob).toHaveBeenCalledWith(jobId); + expect(bullJob.remove).toHaveBeenCalledTimes(1); + }); + it('deletes a queue.', async(): Promise => { adapter = new BullQueueAdapter(jobs, queues, redisConfig, queueProcessor); await expect(adapter.deleteQueue('default')).resolves.toBeUndefined();