Skip to content

Commit

Permalink
Requirements doc for task (queue) management API
Browse files Browse the repository at this point in the history
  • Loading branch information
arielshaqed committed Sep 29, 2020
1 parent 35278e4 commit a6d2d6e
Showing 1 changed file with 346 additions and 0 deletions.
346 changes: 346 additions & 0 deletions design/task-management-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
# Task management API: requirements

Multiple subsystems require reliable long-lived distributed operations. We shall support them
with a task queue subsystem. Concepts of the API are intended to parallel concepts in the
[Celery][celery-api] and/or [Machinery][machinery-api] APIs. We do not use either of them to
reduce the number of required dependencies. A future version may support external queues to
achieve better performance or robustness, at the price of increased ops or cost. These will
likely be queues such as Kafka or SQS rather than full Celery or Machinery.

In practice this generally means providing APIs similar to those of Machinery (which is more
Go-like than Celery) for constructing task flows and for registering workers.

In particular:
1. We provide similar concepts for building task flows as do existing
task queues.
1. We use similar terminology.
1. We do *not* require the entire API of an existing task queue.
1. We do *not* use the verbs or API calls of an existing task queue.

This API definition comes with implementation sketches for how we to use these APIs to implement
the branch export and concurrent dedupe "user" stories.

## API

### Concepts

#### Tasks

A task is the basic atom of task management. It represents a single unit of work to
perform, and can succeed or fail. Tasks may be retried on failure, so _executing a task
must be idempotent_.

Tasks include these attributes:
- `Id`: a unique identifier for the task. Use a known-unique substring in the identifier
(e.g. a UUID or [nanoid][nanoid]) to avoid collisions, or a well-known identifier to ensure
only one task of a type can exist.
- `Action`: the type of action to perform for this task. Workers pick tasks to perform and the
actions to perform on them according to this field.
- `Body`: a description of parameters for this text. E.g. in a "copy file" task the body might
specify source key, ETag and destination key.
- `StatusCode`: the internally-used state of the task in its lifecycle, see [life of a
task](#life-of-a-task) below.
- `Status`: a textual description of the current status, generated by application code.
- `ToSignal`: an array of task IDs that cannot start before this task ends, and will therefore
be signalled when it does.
- `NumSignals`: number of tasks that must signal this task before it can be performed.
Initially equal to the number of tasks on which it appears in the `ToSignal` array.
- `MaxTries`: the maximal number of times to try to execute the task if it keeps being returned
to state `pending`.
- `ActorId`: the unique string identifier chosen by a worker which is currently performing the
task. Useful for monitoring.
- `ActionDeadline`: a time by which the worker currently performing the task has committed to
finish it.

Tasks provide these additional facilities (and include fields not listed here to support them):
- **Retries**. A task repeatedly placed back into state `pending` will not be retried again.
- **Dependencies**. Every task can only occur after some other tasks
are done.

A task is performed by a single worker; if that worker does not finish processing it and an
action deadline was set, it will be given to another worker.

#### Life of a task

```
|
| InsertTasks
|
|
+-----v-----+
+-->| pending |
| +-----+-----+
ReturnTask| |
(to | | OwnTasks
pending) | |
| +-----v-----+
+---+in-progress|
+-----------+
|
+------------+------------+ ReturnTask
| |
+----v---+ +----v----+
|aborted | |completed|
+--------+ +---------+
```

A task arrives complete with dependencies, a count of the number of preceding tasks that
must "signal" it before it may be executed. When the task completes it signals all of its
dependent tasks.

Tasks are inserted in state `pending`. Multiple workers call `OwnTasks` to get tasks. A
task may only be claimed by a call to `OwnTasks` if:
* Its action is specified as acceptable to that call.
* All dependencies of the task have been settled: all tasks specifying its task ID in their
`ToSignal` list have completed.
* The task is not claimed by another worker. Either:
- the task is in state `pending`, or
- the task is in state `in-progress`, but its `ActionDeadline` has elapsed (see "ownership
expiry", below).

`OwnTasks` returns task IDs and a "performance token" for this performance of the task.
Both ID and token must be provided to _return_ the task from ownership. (The
performance token is used to resolve conflicts during "ownership expiry", below.)

Once a worker owns a task, it performs it. It can decide to return the task to the task
queue and _complete_, _abort_ or _retry_ it by calling `ReturnTask`. Once completed, all
dependents of the task are signalled, causing any dependent that has received all its
required signals to be eligible for return by `OwnTasks`.

#### Ownership expiry

Processes can fail. To allow restarting a failed process calls to `OwnTasks` may specify a
deadline. The lease granted to an owning worker will expire after this deadline, allowing
another worker to own the task. Only the _last_ worker granted ownership may call
`ReturnTask` on the task. A delayed worker should still return the task, in case the task
has not yet been granted to another worker.

#### Basic API

This is a sample API. All details are fully subject to change, of course! Note that most
`func`s are probably going to be methods on some object, which we assume will carry DB
connection information etc.

##### TaskData

```go
type TaskId string

type ActorId string

type PerformanceToken pgtype.UUID // With added stringifiers

// TaskData describes a task to perform.
type TaskData struct {
Id TaskId // Unique ID of task
Action string // Action to perform, used to fetch in OwnTasks
Body *string // Body containing details of action, used by clients only
Status *string // Human- and client-readable status
StatusCode TaskStatusCodeValue // Status code, used by task queue
NumTries int // Number of times this task has moved from started to in-progress
MaxTries *int // Maximal number of times to try this task
// Dependencies might be stored or handled differently, depending on what gives reasonable
// performance.
TotalDependencies *int // Number of tasks which must signal before this task can be owned
ToSignal []TaskId // Tasks to signal after this task is done
ActorId ActorId // ID of current actor performing this task (if in-progress)
ActionDeadline *time.Time // Deadline for current actor to finish performing this task (if in-progress)
PerformanceToken *PerformanceToken // Token to allow ReturnTask
PostResult bool // If set allow waiting for this task
}
```

##### InsertTasks

```go
// InsertTasks atomically adds all tasks to the queue: if any task cannot be added (typically because
// it re-uses an existing key) then no tasks will be added. If PostResult was set on any tasks then
// they can be waited upon after InsertTasks returns.
func InsertTasks(ctx context.Context, source *taskDataIterator) error
```

A variant allows inserting a task _by force_

```go
// ReplaceTasks atomically adds all tasks to the queue. If a task not yet in-process with the same
// ID already exists then _replace it_ as though it were atomically aborted before this insert. If
// PostResult was set on any tasks then they can be waited upon after InsertTasks returns.
func ReplaceTasks(ctx context.Context, source *taskDataIterator) error
```

##### OwnTasks

```go
// OwnedTaskData is a task returned from OwnedTask
type OwnedTaskData struct {
Id TaskId `db:"task_id"`
Token PerformanceToken `db:"token"`
Action string
Body *string
}

// OwnTasks owns for actor and returns up to maxTasks tasks for performing any of actions, setting
// the lifetime of each returned owned task to maxDuration.
func OwnTasks(ctx context.Context, actor ActorId, maxTasks int, actions []string, maxDuration *time.Duration) ([]OwnedTaskData, error)
```

##### ReturnTask

```go
// ReturnTask returns taskId which was acquired using the specified performanceToken, giving it
// resultStatus and resultStatusCode. It returns InvalidTokenError if the performanceToken is
// invalid; this happens when ReturnTask is called after its deadline expires, or due to a logic
// error. If resultStatusCode is ABORT, abort all succeeding tasks.
func ReturnTask(ctx context.Context, taskId TaskId, token PerformanceToken, resultStatus string, resultStatusCode TaskStatusCodeValue) error
```

##### WaitForTask

```go
// WaitForTask waits for taskId (which must have been started wth PostResult) to finish and
// returns it. It returns immediately the task has already finished.
func WaitForTask(ctx context.Context, taskId TaskId) (TaskData, error)
```

##### AddDependencies

```go
// AddDependencies atomically adds dependencies: for every dependency, task Run must run after
// task After.
type TaskDependency interface {
After, Run TaskID
}

func AddDependencies(ctx context.Context, dependencies []TaskDependency) error
```

##### Monitoring

Also some routine as a basis for monitoring: it gives the number and status of each of a number
of actions and task IDs, possibly with some filtering. The exact nature depends on the
implementation chosen, however we _do_ require its availability.

#### Differences from the Celery model

This task management model is at a somewhat lower level than the Celery model:
* **Workers explicitly loop to own and handle tasks.** Emulate the Celery model by writing an
explicit function that takes "handlers" for the different actions. We may well do this.

_Why change?_ Writing the loop is rarely an important factor. Flexibility in specifying the
action parameter of OwnTasks allows variable actions, for instance handling particular action
types only when a particular environmental condition is met (say, system load), or
incorporating side data in action names (and not only in task IDs). Flexibility in timing
allows a per-process rate limiters for particular actions: filter out expensive actions when
their token bucket runs out. Flexibility in specifying _when_ OwnTasks is called allows
controlling load on the queuing component. Flexibility in specifying action dispatch allows
controlling _how many goroutines_ run particular actions concurrently. All this without
having to add configurable structures to the task manager.
* **No explicit graph structures.** Emulate these using the section [Structures][#structures]
below.
* **No implicit argument serialization.** Rather than flatten an "args" array we pass a stringy
"body". In practice "args" anyway require serialization; incorporating them into the queue
requires either configuring the queue with relevant serialization or allowing only primitives.
Celery selects the first, Machinery the second. In both cases a Go client library must place
most of the serialization burden on application code -- simplest is to do so explicitly.

#### Structures

We can implement what Celery calls _Chains_, _Chords_ and _Groups_ using the basic API: these
are just ways to describe structured dependencies which form [parallel/serial
networks][parallel-series] networks. Drawings appear below.

##### Chains

```
+----------+
| task 1 |
+----------+
|
|
+----v-----+
| task 2 |
+----+-----+
|
|
+----v-----+
| task 3 |
+----------+
```

##### Chords (and Groups)

```
+--------+
+--->| task1 +-----+
| +--------+ |
| |
| |
| +--------+ |
+--->| task2 +-----+
| +--------+ |
+-------+ | | +-------------+
| prev |-----+ +---->|(spontaneous)|
+-------+ | +--------+ | +-------------+
+--->| task3 +-----+
| +--------+ |
| |
| |
| +--------+ |
+--->| task4 +-----+
+--------+
```

## Implementing "user" stories with the API

### Branch export

1. Under the merge/commit lock for the branch:
1. Insert a task with ID `start-export-{branch}` to start generating export tasks and a task
`done-export-{branch}` that depends on it.
1. If insertion failed, _replace_ the task with ID `next-export-{branch}` with a task to
export _this_ comit ID. And add a dependency on `done-export-{branch}` (which may fail if
that task has completed).
1. To handle `start-export-{branch}`:
1. Generate a task to copy or delete each file object (this is an opportunity to batch
multiple file objects if performance doesn't match. `done-export-{branch}` depends on
each of these tasks (and cannot have been deleted since `start-export-{branch}` has not
yet been returned). For every prefix for which such an object is configured, add a task
to generate its `.../_SUCCESS` object on S3, dependent on all the objects under that
prefix (or, to handle objects in sub-prefixes, on just the `_SUCCESS` of that sub-prefix).
2. Add a task to generate manifests, dependent on `done-export-{branch}`.
2. Return `start-export-{branch}` as completed.
1. To handle a copy or delete operation, perform it.
1. To handle `done-export-{branch}`: just return it, it can be spontaneous (if the task queue
supports that).
1. To handle `next-export-{branch}`: create `start-export-{branch}` (the previous one must have
ended), and return the task.
`next-export-{branch}` is used to serialize branch exports, achieving the requirement for single
concurrent export per branch. Per-prefix `_SUCCESS` objects are generated on time due to their
dependencies. (As an option, we could set priorities and return tasks in priority order from
`OwnTasks`, to allow `_SUCCESS` objects to be created before copying other objects.) Retries
are handled by setting multiple per-copy attempts.
### Concurrent dedupe batching
1. For every created object, create a dedupe task.
2. To perform a dedupe task, acquire its checksum (from computation or ETag) and set a task to
write a dedupe record.
3. Acquire tasks _many_ dedupe records at a time, and process those in batches.
## References
### Well-known task queues
1. [Celery][celery-api]
2. [Machinery][machinery-api]
### Modules
1. [nanoid][nanoid]
### Graphs
1. [Parallel series][parallel-series]
[celery-api]: https://docs.celeryproject.org/en/stable/userguide/index.html
[machinery-api]: https://github.com/RichardKnop/machinery#readme
[nanoid]: https://www.npmjs.com/package/nanoid
[parallel-series]: https://www.cpp.edu/~elab/projects/project_05/index.html

0 comments on commit a6d2d6e

Please sign in to comment.