Skip to content

Commit

Permalink
feat: add shouldRestore config to job queue tasks (#10059)
Browse files Browse the repository at this point in the history
By default, if a task has passed previously and a workflow is re-run,
the task will not be re-run. Instead, the output from the previous task
run will be returned. This is to prevent unnecessary re-runs of tasks
that have already passed.

This PR allows you to configure this behavior through the
`retries.shouldRestore` property. This property accepts a boolean or a
function for more complex restore behaviors.
  • Loading branch information
AlessioGr authored Dec 18, 2024
1 parent 1446fe4 commit 194a8c1
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 21 deletions.
62 changes: 62 additions & 0 deletions docs/jobs-queue/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,65 @@ export const createPostHandler: TaskHandler<'createPost'> = async ({ input, job,
}
}
```

### Configuring task restoration

By default, if a task has passed previously and a workflow is re-run, the task will not be re-run. Instead, the output from the previous task run will be returned. This is to prevent unnecessary re-runs of tasks that have already passed.

You can configure this behavior through the `retries.shouldRestore` property. This property accepts a boolean or a function.

If `shouldRestore` is set to true, the task will only be re-run if it previously failed. This is the default behavior.

If `shouldRestore` this is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries.

If `shouldRestore` is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic, e.g you may want to re-run a task up to X amount of times and then restore it for consecutive runs, or only re-run a task if the input has changed.

Example:

```ts
export default buildConfig({
// ...
jobs: {
tasks: [
{
slug: 'myTask',
retries: {
shouldRestore: false,
}
// ...
} as TaskConfig<'myTask'>,
]
}
})
```

Example - determine whether a task should be restored based on the input data:

```ts
export default buildConfig({
// ...
jobs: {
tasks: [
{
slug: 'myTask',
inputSchema: [
{
name: 'someDate',
type: 'date',
required: true,
},
],
retries: {
shouldRestore: ({ input }) => {
if(new Date(input.someDate) > new Date()) {
return false
}
return true
},
}
// ...
} as TaskConfig<'myTask'>,
]
}
})
```
32 changes: 30 additions & 2 deletions packages/payload/src/queues/config/types/taskTypes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { Field, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js'
import type { RunningJob, RunningJobSimple } from './workflowTypes.js'
import type { BaseJob, RunningJob, RunningJobSimple, SingleTaskStatus } from './workflowTypes.js'

export type TaskInputOutput = {
input: object
Expand Down Expand Up @@ -101,8 +101,23 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
},
) => Promise<TTaskOutput>

export type ShouldRestoreFn = (args: {
/**
* Input data passed to the task
*/
input: object
job: BaseJob
req: PayloadRequest
taskStatus: SingleTaskStatus<string>
}) => boolean | Promise<boolean>

export type RetryConfig = {
attempts: number
/**
* This controls how many times the task should be retried if it fails.
*
* @default undefined - attempts are either inherited from the workflow retry config or set to 0.
*/
attempts?: number
/**
* The backoff strategy to use when retrying the task. This determines how long to wait before retrying the task.
*
Expand Down Expand Up @@ -137,6 +152,19 @@ export type RetryConfig = {
*/
type: 'exponential' | 'fixed'
}
/**
* This controls whether the task output should be restored if the task previously succeeded and the workflow is being retried.
*
* If this is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries.
*
* If this is set to true, the task will only be re-run if it previously failed.
*
* If this is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic,
* e.g you may want to re-run a task up until a certain point and then restore it, or only re-run a task if the input has changed.
*
* @default true - the task output will be restored if the task previously succeeded.
*/
shouldRestore?: boolean | ShouldRestoreFn
}

export type TaskConfig<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,39 +170,47 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
inlineRunner = task
}

let retriesConfig: number | RetryConfig = retries
let taskConfig: TaskConfig<string>
if (!isInline) {
taskConfig = req.payload.config.jobs.tasks.find((t) => t.slug === taskSlug)
if (!retriesConfig) {
retriesConfig = taskConfig.retries
}

if (!taskConfig) {
throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`)
}
}
let maxRetries: number =
typeof retriesConfig === 'object' ? retriesConfig?.attempts : retriesConfig

if (maxRetries === undefined || maxRetries === null) {
// Inherit retries from workflow config, if they are undefined and the workflow config has retries configured
if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) {
maxRetries =
typeof workflowConfig.retries === 'object'
? workflowConfig.retries.attempts
: workflowConfig.retries
} else {
maxRetries = 0
}
const retriesConfigFromPropsNormalized =
retries == undefined || retries == null
? {}
: typeof retries === 'number'
? { attempts: retries }
: retries
const retriesConfigFromTaskConfigNormalized = taskConfig
? typeof taskConfig.retries === 'number'
? { attempts: taskConfig.retries }
: taskConfig.retries
: {}

const finalRetriesConfig: RetryConfig = {
...retriesConfigFromTaskConfigNormalized,
...retriesConfigFromPropsNormalized, // Retry config from props takes precedence
}

const taskStatus: null | SingleTaskStatus<string> = job?.taskStatus?.[taskSlug]
? job.taskStatus[taskSlug][taskID]
: null

// Handle restoration of task if it succeeded in a previous run
if (taskStatus && taskStatus.complete === true) {
return taskStatus.output
let shouldRestore = true
if (finalRetriesConfig?.shouldRestore === false) {
shouldRestore = false
} else if (typeof finalRetriesConfig?.shouldRestore === 'function') {
shouldRestore = await finalRetriesConfig.shouldRestore({ input, job, req, taskStatus })
}
if (shouldRestore) {
return taskStatus.output
}
}

let runner: TaskHandler<TaskType>
Expand Down Expand Up @@ -245,6 +253,20 @@ export const getRunTaskFunction = <TIsInline extends boolean>(

let output: object = {}

let maxRetries: number | undefined = finalRetriesConfig?.attempts

if (maxRetries === undefined || maxRetries === null) {
// Inherit retries from workflow config, if they are undefined and the workflow config has retries configured
if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) {
maxRetries =
typeof workflowConfig.retries === 'object'
? workflowConfig.retries.attempts
: workflowConfig.retries
} else {
maxRetries = 0
}
}

try {
const runnerOutput = await runner({
input,
Expand All @@ -260,7 +282,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
maxRetries,
output,
req,
retriesConfig,
retriesConfig: finalRetriesConfig,
runnerOutput,
state,
taskConfig,
Expand All @@ -282,7 +304,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
maxRetries,
output,
req,
retriesConfig,
retriesConfig: finalRetriesConfig,
state,
taskConfig,
taskID,
Expand Down

0 comments on commit 194a8c1

Please sign in to comment.