From 194a8c189a614f8646e395f6e41ce069a72c2a8a Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 18 Dec 2024 13:16:13 -0700 Subject: [PATCH] feat: add shouldRestore config to job queue tasks (#10059) By default, if a task has passed previously and a workflow is re-run, the task will not be re-run. Instead, the output from the previous task run will be returned. This is to prevent unnecessary re-runs of tasks that have already passed. This PR allows you to configure this behavior through the `retries.shouldRestore` property. This property accepts a boolean or a function for more complex restore behaviors. --- docs/jobs-queue/tasks.mdx | 62 +++++++++++++++++++ .../src/queues/config/types/taskTypes.ts | 32 +++++++++- .../runJobs/runJob/getRunTaskFunction.ts | 60 ++++++++++++------ 3 files changed, 133 insertions(+), 21 deletions(-) diff --git a/docs/jobs-queue/tasks.mdx b/docs/jobs-queue/tasks.mdx index 14119c5b0ed..490cd9dfc0a 100644 --- a/docs/jobs-queue/tasks.mdx +++ b/docs/jobs-queue/tasks.mdx @@ -141,3 +141,65 @@ export const createPostHandler: TaskHandler<'createPost'> = async ({ input, job, } } ``` + +### Configuring task restoration + +By default, if a task has passed previously and a workflow is re-run, the task will not be re-run. Instead, the output from the previous task run will be returned. This is to prevent unnecessary re-runs of tasks that have already passed. + +You can configure this behavior through the `retries.shouldRestore` property. This property accepts a boolean or a function. + +If `shouldRestore` is set to true, the task will only be re-run if it previously failed. This is the default behavior. + +If `shouldRestore` this is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries. + +If `shouldRestore` is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic, e.g you may want to re-run a task up to X amount of times and then restore it for consecutive runs, or only re-run a task if the input has changed. + +Example: + +```ts +export default buildConfig({ + // ... + jobs: { + tasks: [ + { + slug: 'myTask', + retries: { + shouldRestore: false, + } + // ... + } as TaskConfig<'myTask'>, + ] + } +}) +``` + +Example - determine whether a task should be restored based on the input data: + +```ts +export default buildConfig({ + // ... + jobs: { + tasks: [ + { + slug: 'myTask', + inputSchema: [ + { + name: 'someDate', + type: 'date', + required: true, + }, + ], + retries: { + shouldRestore: ({ input }) => { + if(new Date(input.someDate) > new Date()) { + return false + } + return true + }, + } + // ... + } as TaskConfig<'myTask'>, + ] + } +}) +``` diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index accca3ac760..ea9a1e6127d 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -1,5 +1,5 @@ import type { Field, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js' -import type { RunningJob, RunningJobSimple } from './workflowTypes.js' +import type { BaseJob, RunningJob, RunningJobSimple, SingleTaskStatus } from './workflowTypes.js' export type TaskInputOutput = { input: object @@ -101,8 +101,23 @@ export type RunInlineTaskFunction = Promise +export type ShouldRestoreFn = (args: { + /** + * Input data passed to the task + */ + input: object + job: BaseJob + req: PayloadRequest + taskStatus: SingleTaskStatus +}) => boolean | Promise + export type RetryConfig = { - attempts: number + /** + * This controls how many times the task should be retried if it fails. + * + * @default undefined - attempts are either inherited from the workflow retry config or set to 0. + */ + attempts?: number /** * The backoff strategy to use when retrying the task. This determines how long to wait before retrying the task. * @@ -137,6 +152,19 @@ export type RetryConfig = { */ type: 'exponential' | 'fixed' } + /** + * This controls whether the task output should be restored if the task previously succeeded and the workflow is being retried. + * + * If this is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries. + * + * If this is set to true, the task will only be re-run if it previously failed. + * + * If this is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic, + * e.g you may want to re-run a task up until a certain point and then restore it, or only re-run a task if the input has changed. + * + * @default true - the task output will be restored if the task previously succeeded. + */ + shouldRestore?: boolean | ShouldRestoreFn } export type TaskConfig< diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index 6ba74134c24..8ef3bda7c44 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -170,39 +170,47 @@ export const getRunTaskFunction = ( inlineRunner = task } - let retriesConfig: number | RetryConfig = retries let taskConfig: TaskConfig if (!isInline) { taskConfig = req.payload.config.jobs.tasks.find((t) => t.slug === taskSlug) - if (!retriesConfig) { - retriesConfig = taskConfig.retries - } if (!taskConfig) { throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`) } } - let maxRetries: number = - typeof retriesConfig === 'object' ? retriesConfig?.attempts : retriesConfig - if (maxRetries === undefined || maxRetries === null) { - // Inherit retries from workflow config, if they are undefined and the workflow config has retries configured - if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) { - maxRetries = - typeof workflowConfig.retries === 'object' - ? workflowConfig.retries.attempts - : workflowConfig.retries - } else { - maxRetries = 0 - } + const retriesConfigFromPropsNormalized = + retries == undefined || retries == null + ? {} + : typeof retries === 'number' + ? { attempts: retries } + : retries + const retriesConfigFromTaskConfigNormalized = taskConfig + ? typeof taskConfig.retries === 'number' + ? { attempts: taskConfig.retries } + : taskConfig.retries + : {} + + const finalRetriesConfig: RetryConfig = { + ...retriesConfigFromTaskConfigNormalized, + ...retriesConfigFromPropsNormalized, // Retry config from props takes precedence } const taskStatus: null | SingleTaskStatus = job?.taskStatus?.[taskSlug] ? job.taskStatus[taskSlug][taskID] : null + // Handle restoration of task if it succeeded in a previous run if (taskStatus && taskStatus.complete === true) { - return taskStatus.output + let shouldRestore = true + if (finalRetriesConfig?.shouldRestore === false) { + shouldRestore = false + } else if (typeof finalRetriesConfig?.shouldRestore === 'function') { + shouldRestore = await finalRetriesConfig.shouldRestore({ input, job, req, taskStatus }) + } + if (shouldRestore) { + return taskStatus.output + } } let runner: TaskHandler @@ -245,6 +253,20 @@ export const getRunTaskFunction = ( let output: object = {} + let maxRetries: number | undefined = finalRetriesConfig?.attempts + + if (maxRetries === undefined || maxRetries === null) { + // Inherit retries from workflow config, if they are undefined and the workflow config has retries configured + if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) { + maxRetries = + typeof workflowConfig.retries === 'object' + ? workflowConfig.retries.attempts + : workflowConfig.retries + } else { + maxRetries = 0 + } + } + try { const runnerOutput = await runner({ input, @@ -260,7 +282,7 @@ export const getRunTaskFunction = ( maxRetries, output, req, - retriesConfig, + retriesConfig: finalRetriesConfig, runnerOutput, state, taskConfig, @@ -282,7 +304,7 @@ export const getRunTaskFunction = ( maxRetries, output, req, - retriesConfig, + retriesConfig: finalRetriesConfig, state, taskConfig, taskID,