diff --git a/design/task-management-api.md b/design/task-management-api.md new file mode 100644 index 00000000000..53b1ce83e30 --- /dev/null +++ b/design/task-management-api.md @@ -0,0 +1,346 @@ +# Task management API: requirements + +Multiple subsystems require reliable long-lived distributed operations. We shall support them +with a task queue subsystem. Concepts of the API are intended to parallel concepts in the +[Celery][celery-api] and/or [Machinery][machinery-api] APIs. We do not use either of them to +reduce the number of required dependencies. A future version may support external queues to +achieve better performance or robustness, at the price of increased ops or cost. These will +likely be queues such as Kafka or SQS rather than full Celery or Machinery. + +In practice this generally means providing APIs similar to those of Machinery (which is more +Go-like than Celery) for constructing task flows and for registering workers. + +In particular: +1. We provide similar concepts for building task flows as do existing + task queues. +1. We use similar terminology. +1. We do *not* require the entire API of an existing task queue. +1. We do *not* use the verbs or API calls of an existing task queue. + +This API definition comes with implementation sketches for how we to use these APIs to implement +the branch export and concurrent dedupe "user" stories. + +## API + +### Concepts + +#### Tasks + +A task is the basic atom of task management. It represents a single unit of work to +perform, and can succeed or fail. Tasks may be retried on failure, so _executing a task +must be idempotent_. + +Tasks include these attributes: +- `Id`: a unique identifier for the task. Use a known-unique substring in the identifier + (e.g. a UUID or [nanoid][nanoid]) to avoid collisions, or a well-known identifier to ensure + only one task of a type can exist. +- `Action`: the type of action to perform for this task. Workers pick tasks to perform and the + actions to perform on them according to this field. +- `Body`: a description of parameters for this text. E.g. in a "copy file" task the body might + specify source key, ETag and destination key. +- `StatusCode`: the internally-used state of the task in its lifecycle, see [life of a + task](#life-of-a-task) below. +- `Status`: a textual description of the current status, generated by application code. +- `ToSignal`: an array of task IDs that cannot start before this task ends, and will therefore + be signalled when it does. +- `NumSignals`: number of tasks that must signal this task before it can be performed. + Initially equal to the number of tasks on which it appears in the `ToSignal` array. +- `MaxTries`: the maximal number of times to try to execute the task if it keeps being returned + to state `pending`. +- `ActorId`: the unique string identifier chosen by a worker which is currently performing the + task. Useful for monitoring. +- `ActionDeadline`: a time by which the worker currently performing the task has committed to + finish it. + +Tasks provide these additional facilities (and include fields not listed here to support them): +- **Retries**. A task repeatedly placed back into state `pending` will not be retried again. +- **Dependencies**. Every task can only occur after some other tasks + are done. + +A task is performed by a single worker; if that worker does not finish processing it and an +action deadline was set, it will be given to another worker. + +#### Life of a task + +``` + | + | InsertTasks + | + | + +-----v-----+ + +-->| pending | + | +-----+-----+ + ReturnTask| | + (to | | OwnTasks + pending) | | + | +-----v-----+ + +---+in-progress| + +-----------+ + | + +------------+------------+ ReturnTask + | | + +----v---+ +----v----+ + |aborted | |completed| + +--------+ +---------+ +``` + +A task arrives complete with dependencies, a count of the number of preceding tasks that +must "signal" it before it may be executed. When the task completes it signals all of its +dependent tasks. + +Tasks are inserted in state `pending`. Multiple workers call `OwnTasks` to get tasks. A +task may only be claimed by a call to `OwnTasks` if: +* Its action is specified as acceptable to that call. +* All dependencies of the task have been settled: all tasks specifying its task ID in their + `ToSignal` list have completed. +* The task is not claimed by another worker. Either: + - the task is in state `pending`, or + - the task is in state `in-progress`, but its `ActionDeadline` has elapsed (see "ownership + expiry", below). + +`OwnTasks` returns task IDs and a "performance token" for this performance of the task. +Both ID and token must be provided to _return_ the task from ownership. (The +performance token is used to resolve conflicts during "ownership expiry", below.) + +Once a worker owns a task, it performs it. It can decide to return the task to the task +queue and _complete_, _abort_ or _retry_ it by calling `ReturnTask`. Once completed, all +dependents of the task are signalled, causing any dependent that has received all its +required signals to be eligible for return by `OwnTasks`. + +#### Ownership expiry + +Processes can fail. To allow restarting a failed process calls to `OwnTasks` may specify a +deadline. The lease granted to an owning worker will expire after this deadline, allowing +another worker to own the task. Only the _last_ worker granted ownership may call +`ReturnTask` on the task. A delayed worker should still return the task, in case the task +has not yet been granted to another worker. + +#### Basic API + +This is a sample API. All details are fully subject to change, of course! Note that most +`func`s are probably going to be methods on some object, which we assume will carry DB +connection information etc. + +##### TaskData + +```go +type TaskId string + +type ActorId string + +type PerformanceToken pgtype.UUID // With added stringifiers + +// TaskData describes a task to perform. +type TaskData struct { + Id TaskId // Unique ID of task + Action string // Action to perform, used to fetch in OwnTasks + Body *string // Body containing details of action, used by clients only + Status *string // Human- and client-readable status + StatusCode TaskStatusCodeValue // Status code, used by task queue + NumTries int // Number of times this task has moved from started to in-progress + MaxTries *int // Maximal number of times to try this task + // Dependencies might be stored or handled differently, depending on what gives reasonable + // performance. + TotalDependencies *int // Number of tasks which must signal before this task can be owned + ToSignal []TaskId // Tasks to signal after this task is done + ActorId ActorId // ID of current actor performing this task (if in-progress) + ActionDeadline *time.Time // Deadline for current actor to finish performing this task (if in-progress) + PerformanceToken *PerformanceToken // Token to allow ReturnTask + PostResult bool // If set allow waiting for this task +} +``` + +##### InsertTasks + +```go +// InsertTasks atomically adds all tasks to the queue: if any task cannot be added (typically because +// it re-uses an existing key) then no tasks will be added. If PostResult was set on any tasks then +// they can be waited upon after InsertTasks returns. +func InsertTasks(ctx context.Context, source *taskDataIterator) error +``` + +A variant allows inserting a task _by force_ + +```go +// ReplaceTasks atomically adds all tasks to the queue. If a task not yet in-process with the same +// ID already exists then _replace it_ as though it were atomically aborted before this insert. If +// PostResult was set on any tasks then they can be waited upon after InsertTasks returns. +func ReplaceTasks(ctx context.Context, source *taskDataIterator) error +``` + +##### OwnTasks + +```go +// OwnedTaskData is a task returned from OwnedTask +type OwnedTaskData struct { + Id TaskId `db:"task_id"` + Token PerformanceToken `db:"token"` + Action string + Body *string +} + +// OwnTasks owns for actor and returns up to maxTasks tasks for performing any of actions, setting +// the lifetime of each returned owned task to maxDuration. +func OwnTasks(ctx context.Context, actor ActorId, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error) +``` + +##### ReturnTask + +```go +// ReturnTask returns taskId which was acquired using the specified performanceToken, giving it +// resultStatus and resultStatusCode. It returns InvalidTokenError if the performanceToken is +// invalid; this happens when ReturnTask is called after its deadline expires, or due to a logic +// error. If resultStatusCode is ABORT, abort all succeeding tasks. +func ReturnTask(ctx context.Context, taskId TaskId, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error +``` + +##### WaitForTask + +```go +// WaitForTask waits for taskId (which must have been started wth PostResult) to finish and +// returns it. It returns immediately the task has already finished. +func WaitForTask(ctx context.Context, taskId TaskId) (TaskData, error) +``` + +##### AddDependencies + +```go +// AddDependencies atomically adds dependencies: for every dependency, task Run must run after +// task After. +type TaskDependency interface { + After, Run TaskID +} + +func AddDependencies(ctx context.Context, dependencies []TaskDependency) error +``` + +##### Monitoring + +Also some routine as a basis for monitoring: it gives the number and status of each of a number +of actions and task IDs, possibly with some filtering. The exact nature depends on the +implementation chosen, however we _do_ require its availability. + +#### Differences from the Celery model + +This task management model is at a somewhat lower level than the Celery model: +* **Workers explicitly loop to own and handle tasks.** Emulate the Celery model by writing an + explicit function that takes "handlers" for the different actions. We may well do this. + + _Why change?_ Writing the loop is rarely an important factor. Flexibility in specifying the + action parameter of OwnTasks allows variable actions, for instance handling particular action + types only when a particular environmental condition is met (say, system load), or + incorporating side data in action names (and not only in task IDs). Flexibility in timing + allows a per-process rate limiters for particular actions: filter out expensive actions when + their token bucket runs out. Flexibility in specifying _when_ OwnTasks is called allows + controlling load on the queuing component. Flexibility in specifying action dispatch allows + controlling _how many goroutines_ run particular actions concurrently. All this without + having to add configurable structures to the task manager. +* **No explicit graph structures.** Emulate these using the section [Structures][#structures] + below. +* **No implicit argument serialization.** Rather than flatten an "args" array we pass a stringy + "body". In practice "args" anyway require serialization; incorporating them into the queue + requires either configuring the queue with relevant serialization or allowing only primitives. + Celery selects the first, Machinery the second. In both cases a Go client library must place + most of the serialization burden on application code -- simplest is to do so explicitly. + +#### Structures + +We can implement what Celery calls _Chains_, _Chords_ and _Groups_ using the basic API: these +are just ways to describe structured dependencies which form [parallel/serial +networks][parallel-series] networks. Drawings appear below. + +##### Chains + +``` + +----------+ + | task 1 | + +----------+ + | + | + +----v-----+ + | task 2 | + +----+-----+ + | + | + +----v-----+ + | task 3 | + +----------+ +``` + +##### Chords (and Groups) + +``` + +--------+ + +--->| task1 +-----+ + | +--------+ | + | | + | | + | +--------+ | + +--->| task2 +-----+ + | +--------+ | + +-------+ | | +-------------+ + | prev |-----+ +---->|(spontaneous)| + +-------+ | +--------+ | +-------------+ + +--->| task3 +-----+ + | +--------+ | + | | + | | + | +--------+ | + +--->| task4 +-----+ + +--------+ +``` + +## Implementing "user" stories with the API + +### Branch export + +1. Under the merge/commit lock for the branch: + 1. Insert a task with ID `start-export-{branch}` to start generating export tasks and a task + `done-export-{branch}` that depends on it. + 1. If insertion failed, _replace_ the task with ID `next-export-{branch}` with a task to + export _this_ comit ID. And add a dependency on `done-export-{branch}` (which may fail if + that task has completed). +1. To handle `start-export-{branch}`: + 1. Generate a task to copy or delete each file object (this is an opportunity to batch + multiple file objects if performance doesn't match. `done-export-{branch}` depends on + each of these tasks (and cannot have been deleted since `start-export-{branch}` has not + yet been returned). For every prefix for which such an object is configured, add a task + to generate its `.../_SUCCESS` object on S3, dependent on all the objects under that + prefix (or, to handle objects in sub-prefixes, on just the `_SUCCESS` of that sub-prefix). + 2. Add a task to generate manifests, dependent on `done-export-{branch}`. + 2. Return `start-export-{branch}` as completed. +1. To handle a copy or delete operation, perform it. +1. To handle `done-export-{branch}`: just return it, it can be spontaneous (if the task queue + supports that). +1. To handle `next-export-{branch}`: create `start-export-{branch}` (the previous one must have + ended), and return the task. + +`next-export-{branch}` is used to serialize branch exports, achieving the requirement for single +concurrent export per branch. Per-prefix `_SUCCESS` objects are generated on time due to their +dependencies. (As an option, we could set priorities and return tasks in priority order from +`OwnTasks`, to allow `_SUCCESS` objects to be created before copying other objects.) Retries +are handled by setting multiple per-copy attempts. + +### Concurrent dedupe batching + +1. For every created object, create a dedupe task. +2. To perform a dedupe task, acquire its checksum (from computation or ETag) and set a task to + write a dedupe record. +3. Acquire tasks _many_ dedupe records at a time, and process those in batches. + +## References + +### Well-known task queues +1. [Celery][celery-api] +2. [Machinery][machinery-api] + +### Modules +1. [nanoid][nanoid] + +### Graphs +1. [Parallel series][parallel-series] + +[celery-api]: https://docs.celeryproject.org/en/stable/userguide/index.html +[machinery-api]: https://github.com/RichardKnop/machinery#readme +[nanoid]: https://www.npmjs.com/package/nanoid +[parallel-series]: https://www.cpp.edu/~elab/projects/project_05/index.html