Skip to content

Commit

Permalink
feat(workflows): add operations service type (#898)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 18, 2024
1 parent d6d05f6 commit 0a0d377
Show file tree
Hide file tree
Showing 20 changed files with 227 additions and 81 deletions.
61 changes: 59 additions & 2 deletions docs/libraries/workflow/ERRORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,62 @@ because they will never succeed (the state is consistent up the point of error).

Sub workflow errors cannot be caught because it's up to the workflow to handle its own errors gracefully.

We return OK responses from workflows for failure cases we explicitly handle (e.g. linode server provision
cleaning itself up)
We return OK responses from workflows for failure cases that we will explicitly handle (e.g. linode server
provision cleaning itself up). See
[Errors that are meant to be propagated up](#errors-that-are-meant-to-be-propagated-up).

## Propagation

There are 3 classes of errors in workflows:

1. Errors that can't be retried
2. Errors that can be retried
3. Errors that are meant to be propagated up

### Errors that can't be retried

Certain errors cannot be retried by the workflow system. These are usually problems with the internal
mechanisms of the workflow system itself.

### Errors that can be retried

All user errors thrown in an activity will cause a workflow retry. While this is good for errors meant to be
retried, it causes unnecessary retries for errors that you know can't be recovered from (like assertions). We
don't currently have a way to mitigate this besides propagating the errors manually (see below) or letting the
useless retries happen.

### Errors that are meant to be propagated up

To propagate an error, you must manually serialize it in the activity/workflow output. The workflow itself
will succeed, but the output data will have the error you want to propagate up.

You can use nested `Result`'s for this:

```rust
#[derive(...)]
struct MyActivityInput { }

type MyActivityOutput = Result<MyActivityOutputOk, MyActivityOutputErr>;

#[derive(...)]
struct MyActivityOutputOk {
foo: String,
}

#[derive(...)]
struct MyActivityOutputErr {
bar: u32,
}

fn activity(input: MyActivityInput) -> GlobalResult<MyActivityOutput> {
if ... {
return Ok(Err(MyActivityOutputErr {
bar: 404,
}));
}

Ok(Ok(MyActivityOutputOk {
foo: "all good".to_string(),
}))
}
```
46 changes: 27 additions & 19 deletions docs/libraries/workflow/GLOSSARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@

## Worker

A process that queries for pending workflows with a specific filter. Filter is based on which workflows are registered in the given worker's registry.
The queried workflows are run on the same machine as the worker but given their own thread.
A process that queries for pending workflows with a specific filter. Filter is based on which workflows are
registered in the given worker's registry. The queried workflows are run on the same machine as the worker but
given their own thread.

## Registry

A collection of registered workflows. This is solely used for the worker to fetch workflows from the database.

## Workflow

A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or sub workflow triggers.
A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or
sub workflow triggers.

Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow.
Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should
be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow.

Upon an activity failure, workflow code can be reran without duplicate side effects because activities are cached and re-read after they succeed.
Upon an activity failure, workflow code can be reran without duplicate side effects because activities are
cached and re-read after they succeed.

## Activity

Expand All @@ -25,41 +29,45 @@ workflow fails.

## Operation

Effectively a native rust function. Can fail or not fail, used simply for tidiness (as you would with any other function).
Operations can only be called from activities, not from workflows.
Effectively a native rust function. Can fail or not fail. Used for widely used operations like fetching a
user. Operations cannot be called from workflows.

Examples include:

- most `get` operations (`user-get`)
- any complex logic you'd want in it's own function (fetching some http data and parsing it)
- most `get` operations (`user-get`)
- any complex logic you'd want in it's own function (fetching some http data and parsing it)

Operations are not required; all of their functionality can be put into an activity instead.

## Workflow Event

An action that gets executed in a workflow. An event can be a:

- Activity
- Received signal
- Dispatched sub-workflow
- Activity
- Received signal
- Dispatched sub-workflow

Events store the output from activities and are used to ensure activities are ran only once.

## Workflow Event History

List of events that have executed in this workflow. These are used in replays to verify that the workflow has not changed to an invalid state.
List of events that have executed in this workflow. These are used in replays to verify that the workflow has
not changed to an invalid state.

## Workflow Replay

After the first run of a workflow, subsequent runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will not actually run any code and instead use the output from the previous run.
After the first run of a workflow, subsequent runs will replay the activities and compare against the event
history. If an activity has already been ran successfully, the activity will not actually run any code and
instead use the output from the previous run.

## Workflow Wake Condition

If a workflow is not currently running an activity, wake conditions define when the workflow should be ran again.
If a workflow is not currently running an activity, wake conditions define when the workflow should be ran
again.

The available conditions are:

- **Immediately** Run immediately by the first available node
- **Deadline** Run at a given timestamp.
- **Signal** Run once any one of the listed signals is received.
- **Sub workflow** Run once the given sub workflow is completed.
- **Immediately** Run immediately by the first available node
- **Deadline** Run at a given timestamp.
- **Signal** Run once any one of the listed signals is received.
- **Sub workflow** Run once the given sub workflow is completed.
5 changes: 5 additions & 0 deletions lib/bolt/config/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ pub enum ServiceKind {
#[serde(rename = "operation")]
Operation {},

#[serde(rename = "operations")]
Operations {},

// TODO: Rename to worker
#[serde(rename = "consumer")]
Consumer {
Expand Down Expand Up @@ -359,6 +362,7 @@ impl ServiceKind {
ServiceKind::Static { .. } => "static",
ServiceKind::Database { .. } => "database",
ServiceKind::Cache { .. } => "cache",
ServiceKind::Operations { .. } => "operations",
}
}

Expand All @@ -371,6 +375,7 @@ impl ServiceKind {
| ServiceKind::Api { .. } => ComponentClass::Executable,

ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::Consumer { .. }
| ServiceKind::ApiRoutes { .. } => ComponentClass::NonExecutable,
ServiceKind::Database { .. } => ComponentClass::Database,
Expand Down
22 changes: 21 additions & 1 deletion lib/bolt/core/src/context/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,27 @@ impl ProjectContextData {
.await;

// Read ops
Self::load_services_dir(svc_ctxs_map, &workspace_path, pkg.path().join("ops")).await;
// Check if service config exists
if fs::metadata(pkg.path().join("ops").join("Service.toml"))
.await
.is_ok()
{
// Load the ops directory as a single service
let svc_ctx = context::service::ServiceContextData::from_path(
Weak::new(),
svc_ctxs_map,
&workspace_path,
&pkg.path().join("ops"),
)
.await
.unwrap();

svc_ctxs_map.insert(svc_ctx.name(), svc_ctx.clone());
} else {
// Load all individual ops
Self::load_services_dir(svc_ctxs_map, &workspace_path, pkg.path().join("ops"))
.await;
}

// Read dbs
Self::load_services_dir(svc_ctxs_map, &workspace_path, pkg.path().join("db")).await;
Expand Down
3 changes: 3 additions & 0 deletions lib/bolt/core/src/context/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,20 +512,23 @@ impl ServiceContextData {
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::Consumer { .. }
)
} else if matches!(self.config().kind, ServiceKind::Api { .. }) {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::ApiRoutes { .. }
)
} else {
matches!(
dep.config().kind,
ServiceKind::Database { .. }
| ServiceKind::Cache { .. } | ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
)
};

Expand Down
1 change: 1 addition & 0 deletions lib/bolt/core/src/dep/k8s/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ pub async fn gen_svc(exec_ctx: &ExecServiceContext) -> Vec<serde_json::Value> {
ServiceKind::Oneshot { .. } => SpecType::Job,
ServiceKind::Periodic { .. } => SpecType::CronJob,
ServiceKind::Operation { .. }
| ServiceKind::Operations { .. }
| ServiceKind::Database { .. }
| ServiceKind::Cache { .. }
| ServiceKind::ApiRoutes { .. } => {
Expand Down
47 changes: 32 additions & 15 deletions lib/bolt/core/src/tasks/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,23 +129,40 @@ async fn generate_root(path: &Path) {
}
}
}
// Iterate through `ops` folder
let ops_path = pkg.path().join("ops");
if fs::metadata(&ops_path).await.is_ok() {
let mut dir = fs::read_dir(ops_path).await.unwrap();
while let Some(entry) = dir.next_entry().await.unwrap() {
if entry.metadata().await.unwrap().is_dir() {
workspace_members.push(format!(
r#""pkg/{pkg}/ops/{entry}""#,
pkg = pkg.file_name().into_string().unwrap(),
entry = entry.file_name().into_string().unwrap()
));

// Remove services' Cargo.lock files in favor of the shared svc
// Cargo.toml
let _ = fs::remove_file(entry.path().join("Cargo.lock")).await;
// Check if service config exists
if fs::metadata(pkg.path().join("ops").join("Service.toml"))
.await
.is_ok()
{
workspace_members.push(format!(
r#""pkg/{pkg}/ops""#,
pkg = pkg.file_name().into_string().unwrap(),
));

set_license(&entry.path().join("Cargo.toml")).await;
let _ = fs::remove_file(pkg.path().join("ops").join("Cargo.lock")).await;

set_license(&pkg.path().join("ops").join("Cargo.toml")).await;
}
// Iterate through `ops` folder
else {
let ops_path = pkg.path().join("ops");
if fs::metadata(&ops_path).await.is_ok() {
let mut dir = fs::read_dir(ops_path).await.unwrap();
while let Some(entry) = dir.next_entry().await.unwrap() {
if entry.metadata().await.unwrap().is_dir() {
workspace_members.push(format!(
r#""pkg/{pkg}/ops/{entry}""#,
pkg = pkg.file_name().into_string().unwrap(),
entry = entry.file_name().into_string().unwrap()
));

// Remove services' Cargo.lock files in favor of the shared svc
// Cargo.toml
let _ = fs::remove_file(entry.path().join("Cargo.lock")).await;

set_license(&entry.path().join("Cargo.toml")).await;
}
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions lib/bolt/core/src/tasks/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ pub async fn generate(ctx: &mut ProjectContext, opts: TemplateOpts) -> Result<()
);
}

// Check for new operations service type
if matches!(template_type, TemplateType::Operation)
&& fs::metadata(
base_path
.join("svc")
.join("pkg")
.join(&pkg_name)
.join("Service.toml"),
)
.await
.is_ok()
{
bail!(
"Creating operations in new `operations` service type ({pkg_name}/ops) not yet supported.",
);
}

// Touch types lib to force it to rebuild generated proto when making a new package
if create_pkg {
let lib_file = base_path
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ where
<I as OperationInput>::Operation: Operation<Input = I>,
B: Debug + Clone,
{
let mut ctx = OperationCtx::new(
let ctx = OperationCtx::new(
db_from_ctx(ctx).await?,
ctx.conn(),
ctx.ray_id(),
Expand All @@ -137,7 +137,7 @@ where
I::Operation::NAME,
);

I::Operation::run(&mut ctx, &input)
I::Operation::run(&ctx, &input)
.await
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
Expand Down
5 changes: 3 additions & 2 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use uuid::Uuid;

use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError};

#[derive(Clone)]
pub struct ActivityCtx {
ray_id: Uuid,
name: &'static str,
Expand Down Expand Up @@ -60,7 +61,7 @@ impl ActivityCtx {
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
let mut ctx = OperationCtx::new(
let ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
self.ray_id,
Expand All @@ -69,7 +70,7 @@ impl ActivityCtx {
I::Operation::NAME,
);

tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&mut ctx, &input))
tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
.await
.map_err(|_| WorkflowError::OperationTimeout)?
.map_err(WorkflowError::OperationFailure)
Expand Down
6 changes: 3 additions & 3 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,14 @@ impl ApiCtx {
}

pub async fn op<I>(
&mut self,
&self,
input: I,
) -> GlobalResult<<<I as OperationInput>::Operation as Operation>::Output>
where
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
let mut ctx = OperationCtx::new(
let ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
self.ray_id,
Expand All @@ -168,7 +168,7 @@ impl ApiCtx {
I::Operation::NAME,
);

I::Operation::run(&mut ctx, &input)
I::Operation::run(&ctx, &input)
.await
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
Expand Down
Loading

0 comments on commit 0a0d377

Please sign in to comment.