Skip to content

Commit

Permalink
feat(workflows): clean up dispatching syntax (#1079)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Sep 3, 2024
1 parent 600d4fb commit 233efcc
Show file tree
Hide file tree
Showing 14 changed files with 717 additions and 641 deletions.
907 changes: 523 additions & 384 deletions lib/chirp-workflow/core/src/ctx/workflow.rs

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion lib/chirp-workflow/core/src/db/pg_nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,14 @@ impl Database for DatabasePgNats {
})
.await?;

self.wake_worker();
// Wake worker again if the deadline is before the next tick
if let Some(deadline_ts) = deadline_ts {
if deadline_ts
< rivet_util::timestamp::now() + worker::TICK_INTERVAL.as_millis() as i64 + 1
{
self.wake_worker();
}
}

Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion lib/chirp-workflow/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ impl Worker {
util::time::sleep_until_ts(wake_deadline_ts as u64).await;
}

ctx.run().await;
if let Err(err) = ctx.run().await {
tracing::error!(?err, "unhandled error");
}
}
.in_current_span(),
);
Expand Down
60 changes: 25 additions & 35 deletions svc/pkg/cluster/src/workflows/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use chirp_workflow::prelude::*;
use futures_util::FutureExt;
use serde_json::json;

use crate::types::{BuildDeliveryMethod, Pool, Provider};

Expand All @@ -20,13 +19,10 @@ pub async fn cluster(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
})
.await?;

ctx.msg(
json!({
"cluster_id": input.cluster_id,
}),
CreateComplete {},
)
.await?;
ctx.msg(CreateComplete {})
.tag("cluster_id", input.cluster_id)
.send()
.await?;

ctx.repeat(|ctx| {
let cluster_id = input.cluster_id;
Expand All @@ -40,35 +36,29 @@ pub async fn cluster(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> {
})
.await?;

ctx.msg(
json!({
"cluster_id": cluster_id,
}),
GameLinkComplete {},
)
.await?;
ctx.msg(GameLinkComplete {})
.tag("cluster_id", cluster_id)
.send()
.await?;
}
Main::DatacenterCreate(sig) => {
ctx.dispatch_tagged_workflow(
&json!({
"datacenter_id": sig.datacenter_id,
}),
crate::workflows::datacenter::Input {
cluster_id,
datacenter_id: sig.datacenter_id,
name_id: sig.name_id,
display_name: sig.display_name,

provider: sig.provider,
provider_datacenter_id: sig.provider_datacenter_id,
provider_api_token: sig.provider_api_token,

pools: sig.pools,

build_delivery_method: sig.build_delivery_method,
prebakes_enabled: sig.prebakes_enabled,
},
)
ctx.workflow(crate::workflows::datacenter::Input {
cluster_id,
datacenter_id: sig.datacenter_id,
name_id: sig.name_id,
display_name: sig.display_name,

provider: sig.provider,
provider_datacenter_id: sig.provider_datacenter_id,
provider_api_token: sig.provider_api_token,

pools: sig.pools,

build_delivery_method: sig.build_delivery_method,
prebakes_enabled: sig.prebakes_enabled,
})
.tag("datacenter_id", sig.datacenter_id)
.dispatch()
.await?;
}
}
Expand Down
40 changes: 18 additions & 22 deletions svc/pkg/cluster/src/workflows/datacenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::convert::TryInto;
use chirp_workflow::prelude::*;
use futures_util::FutureExt;
use rivet_operation::prelude::{proto::backend, Message};
use serde_json::json;

pub mod scale;
pub mod tls_issue;
Expand Down Expand Up @@ -51,20 +50,19 @@ pub(crate) async fn cluster_datacenter(ctx: &mut WorkflowCtx, input: &Input) ->
datacenter_id: input.datacenter_id,
renew: false,
})
.run()
.await?;

ctx.msg(
json!({
"datacenter_id": input.datacenter_id,
}),
CreateComplete {},
)
.await?;
ctx.msg(CreateComplete {})
.tag("datacenter_id", input.datacenter_id)
.send()
.await?;

// Scale initially
ctx.workflow(scale::Input {
datacenter_id: input.datacenter_id,
})
.run()
.await?;

ctx.repeat(|ctx| {
Expand All @@ -81,30 +79,28 @@ pub(crate) async fn cluster_datacenter(ctx: &mut WorkflowCtx, input: &Input) ->
.await?;

// Scale
ctx.workflow(scale::Input { datacenter_id }).await?;
ctx.workflow(scale::Input { datacenter_id }).run().await?;
}
Main::Scale(_) => {
ctx.workflow(scale::Input { datacenter_id }).await?;
ctx.workflow(scale::Input { datacenter_id }).run().await?;
}
Main::ServerCreate(sig) => {
ctx.dispatch_tagged_workflow(
&json!({
"server_id": sig.server_id,
}),
crate::workflows::server::Input {
datacenter_id,
server_id: sig.server_id,
pool_type: sig.pool_type,
tags: sig.tags,
},
)
ctx.workflow(crate::workflows::server::Input {
datacenter_id,
server_id: sig.server_id,
pool_type: sig.pool_type,
tags: sig.tags,
})
.tag("server_id", sig.server_id)
.dispatch()
.await?;
}
Main::TlsRenew(_) => {
ctx.dispatch_workflow(tls_issue::Input {
ctx.workflow(tls_issue::Input {
datacenter_id,
renew: true,
})
.dispatch()
.await?;
}
}
Expand Down
53 changes: 20 additions & 33 deletions svc/pkg/cluster/src/workflows/datacenter/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{

use chirp_workflow::prelude::*;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use serde_json::json;

use crate::types::{Datacenter, PoolType, Provider};

Expand Down Expand Up @@ -143,45 +142,33 @@ impl Action {
server_id,
pool_type,
} => {
ctx.dispatch_tagged_workflow(
&json!({
"server_id": server_id,
}),
crate::workflows::server::Input {
datacenter_id,
server_id,
pool_type,
tags: Vec::new(),
},
)
ctx.workflow(crate::workflows::server::Input {
datacenter_id,
server_id,
pool_type,
tags: Vec::new(),
})
.tag("server_id", server_id)
.dispatch()
.await?;
}
Action::Drain { server_id } => {
ctx.tagged_signal(
&json!({
"server_id": server_id,
}),
crate::workflows::server::Drain {},
)
.await?;
ctx.signal(crate::workflows::server::Drain {})
.tag("server_id", server_id)
.send()
.await?;
}
Action::Undrain { server_id } => {
ctx.tagged_signal(
&json!({
"server_id": server_id,
}),
crate::workflows::server::Undrain {},
)
.await?;
ctx.signal(crate::workflows::server::Undrain {})
.tag("server_id", server_id)
.send()
.await?;
}
Action::Destroy { server_id } => {
ctx.tagged_signal(
&json!({
"server_id": server_id,
}),
crate::workflows::server::Destroy {},
)
.await?;
ctx.signal(crate::workflows::server::Destroy {})
.tag("server_id", server_id)
.send()
.await?;
}
}

Expand Down
64 changes: 28 additions & 36 deletions svc/pkg/cluster/src/workflows/prebake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,22 @@ pub async fn cluster_prebake(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
match input.provider {
Provider::Linode => {
let workflow_id = ctx
.dispatch_tagged_workflow(
&json!({
"server_id": prebake_server_id,
}),
linode::workflows::server::Input {
server_id: prebake_server_id,
provider_datacenter_id: dc.provider_datacenter_id.clone(),
custom_image: None,
api_token: dc.provider_api_token.clone(),
hardware: linode::util::consts::PREBAKE_HARDWARE.to_string(),
firewall_preset: match input.pool_type {
PoolType::Job => linode::types::FirewallPreset::Job,
PoolType::Gg => linode::types::FirewallPreset::Gg,
PoolType::Ats => linode::types::FirewallPreset::Ats,
},
vlan_ip: None,
tags,
.workflow(linode::workflows::server::Input {
server_id: prebake_server_id,
provider_datacenter_id: dc.provider_datacenter_id.clone(),
custom_image: None,
api_token: dc.provider_api_token.clone(),
hardware: linode::util::consts::PREBAKE_HARDWARE.to_string(),
firewall_preset: match input.pool_type {
PoolType::Job => linode::types::FirewallPreset::Job,
PoolType::Gg => linode::types::FirewallPreset::Gg,
PoolType::Ats => linode::types::FirewallPreset::Ats,
},
)
vlan_ip: None,
tags,
})
.tag("server_id", prebake_server_id)
.dispatch()
.await?;

match ctx.listen::<Linode>().await? {
Expand All @@ -62,21 +59,19 @@ pub async fn cluster_prebake(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
pool_type: input.pool_type.clone(),
initialize_immediately: false,
})
.run()
.await?;

// Create image
let workflow_id = ctx
.dispatch_tagged_workflow(
&json!({
"linode_id": sig.linode_id,
}),
linode::workflows::image::Input {
prebake_server_id,
api_token: dc.provider_api_token.clone(),
linode_id: sig.linode_id,
boot_disk_id: sig.boot_disk_id,
},
)
.workflow(linode::workflows::image::Input {
prebake_server_id,
api_token: dc.provider_api_token.clone(),
linode_id: sig.linode_id,
boot_disk_id: sig.boot_disk_id,
})
.tag("linode_id", sig.linode_id)
.dispatch()
.await?;

// Wait for image creation
Expand All @@ -95,13 +90,10 @@ pub async fn cluster_prebake(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResu
.await?;

// Destroy linode server after the image is complete
ctx.tagged_signal(
&json!({
"server_id": prebake_server_id,
}),
linode::workflows::server::Destroy {},
)
.await?;
ctx.signal(linode::workflows::server::Destroy {})
.tag("server_id", prebake_server_id)
.send()
.await?;

// Wait for image workflow to get cleaned up by linode-gc after the image expires
ctx.wait_for_workflow::<linode::workflows::server::Workflow>(workflow_id)
Expand Down
13 changes: 5 additions & 8 deletions svc/pkg/cluster/src/workflows/server/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
};
use rivet_operation::prelude::proto::backend::pkg::*;
use rivet_operation::prelude::proto::backend::pkg::mm;
use serde_json::json;

use crate::types::PoolType;
Expand Down Expand Up @@ -38,13 +38,10 @@ pub(crate) async fn cluster_server_drain(ctx: &mut WorkflowCtx, input: &Input) -
.await?;
}
PoolType::Gg => {
ctx.tagged_signal(
&json!({
"server_id": input.server_id,
}),
crate::workflows::server::DnsDelete {},
)
.await?;
ctx.signal(crate::workflows::server::DnsDelete {})
.tag("server_id", input.server_id)
.send()
.await?;
}
PoolType::Ats => {}
}
Expand Down
Loading

0 comments on commit 233efcc

Please sign in to comment.