Skip to content

Commit

Permalink
fix(workflows): implement backoff for timeouts (#1111)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Sep 3, 2024
1 parent 0079be8 commit 6659b34
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 59 deletions.
4 changes: 2 additions & 2 deletions infra/tf/grafana/grafana_dashboards/chirp-workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@
"uid": "prometheus"
},
"editorMode": "code",
"expr": "sum by (error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})",
"expr": "sum by (workflow_name, error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})",
"instant": false,
"legendFormat": "{{error_code}}",
"legendFormat": "(workflow_name) {{error_code}}",
"range": true,
"refId": "A"
}
Expand Down
3 changes: 2 additions & 1 deletion lib/chirp-workflow/core/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ where
I::Operation::NAME,
);

let res = I::Operation::run(&ctx, &input)
let res = tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
.await
.map_err(|_| WorkflowError::OperationTimeout(0))?
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw);

Expand Down
20 changes: 14 additions & 6 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl WorkflowCtx {

let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input))
.await
.map_err(|_| WorkflowError::ActivityTimeout);
.map_err(|_| WorkflowError::ActivityTimeout(0));

let dt = start_instant.elapsed().as_secs_f64();

Expand Down Expand Up @@ -499,15 +499,24 @@ impl WorkflowCtx {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err)
} else {
// Add error count to the error
// Add error count to the error for backoff calculation
WorkflowError::ActivityFailure(err, error_count)
}
}
WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => {
WorkflowError::ActivityTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err))
} else {
err
// Add error count to the error for backoff calculation
WorkflowError::ActivityTimeout(error_count)
}
}
WorkflowError::OperationTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err))
} else {
// Add error count to the error for backoff calculation
WorkflowError::OperationTimeout(error_count)
}
}
_ => err,
Expand Down Expand Up @@ -822,8 +831,7 @@ impl WorkflowCtx {
pub async fn sleep<T: DurationToMillis>(&mut self, duration: T) -> GlobalResult<()> {
let ts = rivet_util::timestamp::now() as u64 + duration.to_millis()?;

self.sleep_until(ts as i64)
.await
self.sleep_until(ts as i64).await
}

pub async fn sleep_until<T: TsToMillis>(&mut self, time: T) -> GlobalResult<()> {
Expand Down
20 changes: 9 additions & 11 deletions lib/chirp-workflow/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ pub enum WorkflowError {
Pools(#[from] rivet_pools::Error),

#[error("activity timed out")]
ActivityTimeout,
ActivityTimeout(usize),

#[error("operation timed out")]
OperationTimeout,
OperationTimeout(usize),

#[error("duplicate registered workflow: {0}")]
DuplicateRegisteredWorkflow(String),
Expand All @@ -140,7 +140,9 @@ impl WorkflowError {
/// Returns the next deadline for a workflow to be woken up again based on the error.
pub(crate) fn deadline_ts(&self) -> Option<i64> {
match self {
WorkflowError::ActivityFailure(_, error_count) => {
WorkflowError::ActivityFailure(_, error_count)
| WorkflowError::ActivityTimeout(error_count)
| WorkflowError::OperationTimeout(error_count) => {
// NOTE: Max retry is handled in `WorkflowCtx::activity`
let mut backoff =
rivet_util::Backoff::new_at(8, None, RETRY_TIMEOUT_MS, 500, *error_count);
Expand All @@ -157,10 +159,6 @@ impl WorkflowError {

Some(deadline_ts)
}
// TODO: Add backoff
WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => {
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64)
}
WorkflowError::Sleep(ts) => Some(*ts),
_ => None,
}
Expand All @@ -169,8 +167,8 @@ impl WorkflowError {
pub fn is_recoverable(&self) -> bool {
match self {
WorkflowError::ActivityFailure(_, _)
| WorkflowError::ActivityTimeout
| WorkflowError::OperationTimeout
| WorkflowError::ActivityTimeout(_)
| WorkflowError::OperationTimeout(_)
| WorkflowError::NoSignalFound(_)
| WorkflowError::SubWorkflowIncomplete(_)
| WorkflowError::Sleep(_) => true,
Expand All @@ -181,8 +179,8 @@ impl WorkflowError {
pub(crate) fn is_retryable(&self) -> bool {
match self {
WorkflowError::ActivityFailure(_, _)
| WorkflowError::ActivityTimeout
| WorkflowError::OperationTimeout => true,
| WorkflowError::ActivityTimeout(_)
| WorkflowError::OperationTimeout(_) => true,
_ => false,
}
}
Expand Down
18 changes: 18 additions & 0 deletions lib/util/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ impl<K: Eq + Clone + Hash, V: Clone + Hash> AsHashableExt<K, V> for HashMap<K, V
}
}

impl<K: Eq + Clone + Hash, V: Clone + Hash> Into<HashableMap<K, V>> for HashMap<K, V> {
fn into(self) -> HashableMap<K, V> {
HashableMap(self.into_iter().collect())
}
}

impl<K: Eq + Clone + Hash, V: Clone + Hash> Into<HashableMap<K, V>> for &HashMap<K, V> {
fn into(self) -> HashableMap<K, V> {
HashableMap(self.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
}
}

impl<K: Eq + Hash, V: Hash> FromIterator<(K, V)> for HashableMap<K, V> {
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
HashableMap(iter.into_iter().collect())
}
}

#[cfg(test)]
mod tests {
use std::time::Instant;
Expand Down
5 changes: 1 addition & 4 deletions svc/pkg/cluster/src/ops/server/lost_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ pub async fn cluster_server_lost_list(ctx: &OperationCtx, input: &Input) -> Glob
.await?
.into_iter()
.map(|(provider, provider_api_token)| (provider.0, provider_api_token))
.chain(std::iter::once((
Provider::Linode,
util::env::read_secret(&["linode", "token"]).await?,
)))
.chain(std::iter::once((Provider::Linode, linode_token)))
.collect::<HashSet<_>>();

// Filter by namespace
Expand Down
5 changes: 1 addition & 4 deletions svc/pkg/cluster/src/ops/server/prune_with_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ pub async fn cluster_server_prune_with_filter(
.await?
.into_iter()
.map(|(provider, provider_api_token)| (provider.0, provider_api_token))
.chain(std::iter::once((
Provider::Linode,
util::env::read_secret(&["linode", "token"]).await?,
)))
.chain(std::iter::once((Provider::Linode, linode_token)))
.collect::<HashSet<_>>();

// Filter by namespace
Expand Down
3 changes: 1 addition & 2 deletions svc/pkg/cluster/src/workflows/server/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
};
use rivet_operation::prelude::proto::backend::pkg::mm;
use serde_json::json;
use rivet_operation::prelude::proto::backend::pkg::*;

use crate::types::PoolType;

Expand Down
9 changes: 4 additions & 5 deletions svc/pkg/ds/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::convert::TryInto;

use chirp_worker::prelude::*;
use proto::backend::pkg::*;
use serde_json::json;

#[worker(name = "ds-drain-all")]
async fn worker(ctx: &OperationContext<ds::msg::drain_all::Message>) -> GlobalResult<()> {
Expand All @@ -24,16 +23,16 @@ async fn worker(ctx: &OperationContext<ds::msg::drain_all::Message>) -> GlobalRe
.await?;

for (server_id, kill_timeout_ms) in server_rows {
chirp_workflow::compat::tagged_signal(
chirp_workflow::compat::signal(
ctx,
&json!({
"server_id": server_id,
}),
crate::workflows::server::Destroy {
override_kill_timeout_ms: (drain_timeout < kill_timeout_ms)
.then_some(drain_timeout),
},
)
.await?
.tag("server_id", server_id)
.send()
.await?;
}

Expand Down
5 changes: 5 additions & 0 deletions svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id: input.server_id,
eval: sig.eval,
})
.output()
.await?;

if let EvalStatus::Failed = eval_status {
Expand All @@ -121,6 +122,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id: input.server_id,
override_kill_timeout_ms: None,
})
.output()
.await?;
}
}
Expand All @@ -131,6 +133,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id: input.server_id,
override_kill_timeout_ms: sig.override_kill_timeout_ms,
})
.output()
.await?;

return Ok(());
Expand All @@ -148,6 +151,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id,
alloc: sig.alloc,
})
.output()
.await?;
}
Main::NomadAllocUpdate(sig) => {
Expand All @@ -156,6 +160,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id,
alloc: sig.alloc,
})
.output()
.await?;

if finished {
Expand Down
4 changes: 3 additions & 1 deletion svc/pkg/job-run/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use proto::backend::pkg::*;

#[worker(name = "job-run-drain-all")]
async fn worker(ctx: &OperationContext<job_run::msg::drain_all::Message>) -> GlobalResult<()> {
chirp_workflow::compat::dispatch_workflow(
chirp_workflow::compat::workflow(
ctx,
crate::workflows::drain_all::Input {
nomad_node_id: ctx.nomad_node_id.clone(),
drain_timeout: ctx.drain_timeout,
},
)
.await?
.dispatch()
.await?;

Ok(())
Expand Down
13 changes: 5 additions & 8 deletions svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ pub async fn handle(
})
.await?;
} else if ds::util::is_nomad_ds(job_id) {
ctx.tagged_signal(
&json!({
"nomad_dispatched_job_id": job_id,
}),
ds::workflows::server::NomadAllocPlan {
alloc: alloc.clone(),
},
)
ctx.signal(ds::workflows::server::NomadAllocPlan {
alloc: alloc.clone(),
})
.tag("nomad_dispatched_job_id", job_id)
.send()
.await?;
} else {
tracing::info!(%job_id, "disregarding event");
Expand Down
13 changes: 5 additions & 8 deletions svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ pub async fn handle(
})
.await?;
} else if ds::util::is_nomad_ds(job_id) {
ctx.tagged_signal(
&json!({
"nomad_dispatched_job_id": job_id,
}),
ds::workflows::server::NomadAllocUpdate {
alloc: alloc.clone(),
},
)
ctx.signal(ds::workflows::server::NomadAllocUpdate {
alloc: alloc.clone(),
})
.tag("nomad_dispatched_job_id", job_id)
.send()
.await?;
} else {
tracing::info!(%job_id, "disregarding event");
Expand Down
11 changes: 4 additions & 7 deletions svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,10 @@ pub async fn handle(
})
.await?;
} else if ds::util::is_nomad_ds(job_id) {
ctx.tagged_signal(
&json!({
"nomad_dispatched_job_id": job_id,
}),
ds::workflows::server::NomadEvalUpdate { eval: eval.clone() },
)
.await?;
ctx.signal(ds::workflows::server::NomadEvalUpdate { eval: eval.clone() })
.tag("nomad_dispatched_job_id", job_id)
.send()
.await?;
} else {
tracing::info!(%job_id, "disregarding event");
}
Expand Down

0 comments on commit 6659b34

Please sign in to comment.