From 9c66e79ade025dcf0482e9568743d64cb97b9657 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Tue, 10 Dec 2024 15:56:32 -0700 Subject: [PATCH 1/4] feat: add payload.jobs.runByID --- packages/payload/src/queues/localAPI.ts | 14 ++++++ .../src/queues/operations/runJobs/index.ts | 47 ++++++++++++++----- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index 45c7f53063f..7755b02b7e0 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -80,4 +80,18 @@ export const getJobsLocalAPI = (payload: Payload) => ({ }) return result }, + + runByID: async (args: { + id: string + overrideAccess?: boolean + req?: PayloadRequest + }): Promise> => { + const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload)) + const result = await runJobs({ + id: args.id, + overrideAccess: args?.overrideAccess !== false, + req: newReq, + }) + return result + }, }) diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index 650c2707e95..bcdabb91cb3 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -17,6 +17,10 @@ import { runJob } from './runJob/index.js' import { runJSONJob } from './runJSONJob/index.js' export type RunJobsArgs = { + /** + * ID of the job to run + */ + id?: string limit?: number overrideAccess?: boolean queue?: string @@ -36,6 +40,7 @@ export type RunJobsResult = { } export const runJobs = async ({ + id, limit = 10, overrideAccess, queue, @@ -91,18 +96,36 @@ export const runJobs = async ({ // Find all jobs and ensure we set job to processing: true as early as possible to reduce the chance of // the same job being picked up by another worker - const jobsQuery = (await req.payload.update({ - collection: 'payload-jobs', - data: { - processing: true, - seenByWorker: true, - }, - depth: req.payload.config.jobs.depth, - disableTransaction: true, - limit, - showHiddenFields: true, - where, - })) as unknown as PaginatedDocs + const jobsQuery: { + docs: BaseJob[] + } = id + ? { + docs: [ + (await req.payload.update({ + id, + collection: 'payload-jobs', + data: { + processing: true, + seenByWorker: true, + }, + depth: req.payload.config.jobs.depth, + disableTransaction: true, + showHiddenFields: true, + })) as BaseJob, + ], + } + : ((await req.payload.update({ + collection: 'payload-jobs', + data: { + processing: true, + seenByWorker: true, + }, + depth: req.payload.config.jobs.depth, + disableTransaction: true, + limit, + showHiddenFields: true, + where, + })) as unknown as PaginatedDocs) /** * Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried). From 47ddd731f68c7cfffba544402dea71d126636627 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Tue, 10 Dec 2024 16:02:37 -0700 Subject: [PATCH 2/4] add int test --- test/queues/int.spec.ts | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index 05324edb22b..46847d2c9d9 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -925,4 +925,44 @@ describe('Queues', () => { expect(allSimples.totalDocs).toBe(1) expect(allSimples.docs[0].title).toBe('externalWorkflow') }) + + it('ensure payload.jobs.runByID works and only runs the specified job', async () => { + payload.config.jobs.deleteJobOnComplete = false + + let lastJobID: string = null + for (let i = 0; i < 3; i++) { + const job = await payload.jobs.queue({ + task: 'CreateSimple', + input: { + message: 'from single task', + }, + }) + lastJobID = job.id + } + + await payload.jobs.runByID({ + id: lastJobID, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + expect(allSimples.docs[0].title).toBe('from single task') + + const allCompletedJobs = await payload.find({ + collection: 'payload-jobs', + limit: 100, + where: { + completedAt: { + exists: true, + }, + }, + }) + + expect(allCompletedJobs.totalDocs).toBe(1) + expect(allCompletedJobs.docs[0].id).toBe(lastJobID) + }) }) From 4a95028307b6fbc01e082fbf1daa66441e419435 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Tue, 10 Dec 2024 16:04:31 -0700 Subject: [PATCH 3/4] add docs --- docs/jobs-queue/queues.mdx | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/docs/jobs-queue/queues.mdx b/docs/jobs-queue/queues.mdx index d8c63b347a6..68f3f852aa3 100644 --- a/docs/jobs-queue/queues.mdx +++ b/docs/jobs-queue/queues.mdx @@ -98,6 +98,8 @@ After the project is deployed to Vercel, the Vercel Cron job will automatically If you want to process jobs programmatically from your server-side code, you can use the Local API: +**Run all jobs:** + ```ts const results = await payload.jobs.run() @@ -105,6 +107,14 @@ const results = await payload.jobs.run() await payload.jobs.run({ queue: 'nightly', limit: 100 }) ``` +**Run a single job:** + +```ts +const results = await payload.jobs.runByID({ + id: myJobID +}) +``` + #### Bin script Finally, you can process jobs via the bin script that comes with Payload out of the box. From 2452e66aec6987d8a59967d926b4b55479408b0e Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Tue, 10 Dec 2024 16:09:49 -0700 Subject: [PATCH 4/4] fix id type --- packages/payload/src/queues/localAPI.ts | 2 +- packages/payload/src/queues/operations/runJobs/index.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index 7755b02b7e0..db4d0d40713 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -82,7 +82,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ }, runByID: async (args: { - id: string + id: number | string overrideAccess?: boolean req?: PayloadRequest }): Promise> => { diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index bcdabb91cb3..8e858b23f53 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -20,7 +20,7 @@ export type RunJobsArgs = { /** * ID of the job to run */ - id?: string + id?: number | string limit?: number overrideAccess?: boolean queue?: string