Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(workflows): implement backoff for timeouts #1111

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions infra/tf/grafana/grafana_dashboards/chirp-workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@
"uid": "prometheus"
},
"editorMode": "code",
"expr": "sum by (error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})",
"expr": "sum by (workflow_name, error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})",
"instant": false,
"legendFormat": "{{error_code}}",
"legendFormat": "(workflow_name) {{error_code}}",
"range": true,
"refId": "A"
}
Expand Down
3 changes: 2 additions & 1 deletion lib/chirp-workflow/core/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ where
I::Operation::NAME,
);

let res = I::Operation::run(&ctx, &input)
let res = tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
.await
.map_err(|_| WorkflowError::OperationTimeout(0))?
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw);

Expand Down
20 changes: 14 additions & 6 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl WorkflowCtx {

let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input))
.await
.map_err(|_| WorkflowError::ActivityTimeout);
.map_err(|_| WorkflowError::ActivityTimeout(0));

let dt = start_instant.elapsed().as_secs_f64();

Expand Down Expand Up @@ -499,15 +499,24 @@ impl WorkflowCtx {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err)
} else {
// Add error count to the error
// Add error count to the error for backoff calculation
WorkflowError::ActivityFailure(err, error_count)
}
}
WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => {
WorkflowError::ActivityTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err))
} else {
err
// Add error count to the error for backoff calculation
WorkflowError::ActivityTimeout(error_count)
}
}
WorkflowError::OperationTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err))
} else {
// Add error count to the error for backoff calculation
WorkflowError::OperationTimeout(error_count)
}
}
_ => err,
Expand Down Expand Up @@ -822,8 +831,7 @@ impl WorkflowCtx {
pub async fn sleep<T: DurationToMillis>(&mut self, duration: T) -> GlobalResult<()> {
let ts = rivet_util::timestamp::now() as u64 + duration.to_millis()?;

self.sleep_until(ts as i64)
.await
self.sleep_until(ts as i64).await
}

pub async fn sleep_until<T: TsToMillis>(&mut self, time: T) -> GlobalResult<()> {
Expand Down
20 changes: 9 additions & 11 deletions lib/chirp-workflow/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ pub enum WorkflowError {
Pools(#[from] rivet_pools::Error),

#[error("activity timed out")]
ActivityTimeout,
ActivityTimeout(usize),

#[error("operation timed out")]
OperationTimeout,
OperationTimeout(usize),

#[error("duplicate registered workflow: {0}")]
DuplicateRegisteredWorkflow(String),
Expand All @@ -140,7 +140,9 @@ impl WorkflowError {
/// Returns the next deadline for a workflow to be woken up again based on the error.
pub(crate) fn deadline_ts(&self) -> Option<i64> {
match self {
WorkflowError::ActivityFailure(_, error_count) => {
WorkflowError::ActivityFailure(_, error_count)
| WorkflowError::ActivityTimeout(error_count)
| WorkflowError::OperationTimeout(error_count) => {
// NOTE: Max retry is handled in `WorkflowCtx::activity`
let mut backoff =
rivet_util::Backoff::new_at(8, None, RETRY_TIMEOUT_MS, 500, *error_count);
Expand All @@ -157,10 +159,6 @@ impl WorkflowError {

Some(deadline_ts)
}
// TODO: Add backoff
WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => {
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64)
}
WorkflowError::Sleep(ts) => Some(*ts),
_ => None,
}
Expand All @@ -169,8 +167,8 @@ impl WorkflowError {
pub fn is_recoverable(&self) -> bool {
match self {
WorkflowError::ActivityFailure(_, _)
| WorkflowError::ActivityTimeout
| WorkflowError::OperationTimeout
| WorkflowError::ActivityTimeout(_)
| WorkflowError::OperationTimeout(_)
| WorkflowError::NoSignalFound(_)
| WorkflowError::SubWorkflowIncomplete(_)
| WorkflowError::Sleep(_) => true,
Expand All @@ -181,8 +179,8 @@ impl WorkflowError {
pub(crate) fn is_retryable(&self) -> bool {
match self {
WorkflowError::ActivityFailure(_, _)
| WorkflowError::ActivityTimeout
| WorkflowError::OperationTimeout => true,
| WorkflowError::ActivityTimeout(_)
| WorkflowError::OperationTimeout(_) => true,
_ => false,
}
}
Expand Down
18 changes: 18 additions & 0 deletions lib/util/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,24 @@ impl<K: Eq + Clone + Hash, V: Clone + Hash> AsHashableExt<K, V> for HashMap<K, V
}
}

impl<K: Eq + Clone + Hash, V: Clone + Hash> Into<HashableMap<K, V>> for HashMap<K, V> {
fn into(self) -> HashableMap<K, V> {
HashableMap(self.into_iter().collect())
}
}

impl<K: Eq + Clone + Hash, V: Clone + Hash> Into<HashableMap<K, V>> for &HashMap<K, V> {
fn into(self) -> HashableMap<K, V> {
HashableMap(self.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
}
}

impl<K: Eq + Hash, V: Hash> FromIterator<(K, V)> for HashableMap<K, V> {
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
HashableMap(iter.into_iter().collect())
}
}

#[cfg(test)]
mod tests {
use std::time::Instant;
Expand Down
5 changes: 1 addition & 4 deletions svc/pkg/cluster/src/ops/server/lost_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,7 @@ pub async fn cluster_server_lost_list(ctx: &OperationCtx, input: &Input) -> Glob
.await?
.into_iter()
.map(|(provider, provider_api_token)| (provider.0, provider_api_token))
.chain(std::iter::once((
Provider::Linode,
util::env::read_secret(&["linode", "token"]).await?,
)))
.chain(std::iter::once((Provider::Linode, linode_token)))
.collect::<HashSet<_>>();

// Filter by namespace
Expand Down
5 changes: 1 addition & 4 deletions svc/pkg/cluster/src/ops/server/prune_with_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ pub async fn cluster_server_prune_with_filter(
.await?
.into_iter()
.map(|(provider, provider_api_token)| (provider.0, provider_api_token))
.chain(std::iter::once((
Provider::Linode,
util::env::read_secret(&["linode", "token"]).await?,
)))
.chain(std::iter::once((Provider::Linode, linode_token)))
.collect::<HashSet<_>>();

// Filter by namespace
Expand Down
3 changes: 1 addition & 2 deletions svc/pkg/cluster/src/workflows/server/drain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use nomad_client::{
apis::{configuration::Configuration, nodes_api},
models,
};
use rivet_operation::prelude::proto::backend::pkg::mm;
use serde_json::json;
use rivet_operation::prelude::proto::backend::pkg::*;

use crate::types::PoolType;

Expand Down
9 changes: 4 additions & 5 deletions svc/pkg/ds/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::convert::TryInto;

use chirp_worker::prelude::*;
use proto::backend::pkg::*;
use serde_json::json;

#[worker(name = "ds-drain-all")]
async fn worker(ctx: &OperationContext<ds::msg::drain_all::Message>) -> GlobalResult<()> {
Expand All @@ -24,16 +23,16 @@ async fn worker(ctx: &OperationContext<ds::msg::drain_all::Message>) -> GlobalRe
.await?;

for (server_id, kill_timeout_ms) in server_rows {
chirp_workflow::compat::tagged_signal(
chirp_workflow::compat::signal(
ctx,
&json!({
"server_id": server_id,
}),
crate::workflows::server::Destroy {
override_kill_timeout_ms: (drain_timeout < kill_timeout_ms)
.then_some(drain_timeout),
},
)
.await?
.tag("server_id", server_id)
.send()
.await?;
}

Expand Down
5 changes: 5 additions & 0 deletions svc/pkg/ds/src/workflows/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id: input.server_id,
eval: sig.eval,
})
.output()
.await?;

if let EvalStatus::Failed = eval_status {
Expand All @@ -121,6 +122,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id: input.server_id,
override_kill_timeout_ms: None,
})
.output()
.await?;
}
}
Expand All @@ -131,6 +133,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id: input.server_id,
override_kill_timeout_ms: sig.override_kill_timeout_ms,
})
.output()
.await?;

return Ok(());
Expand All @@ -148,6 +151,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id,
alloc: sig.alloc,
})
.output()
.await?;
}
Main::NomadAllocUpdate(sig) => {
Expand All @@ -156,6 +160,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()>
server_id,
alloc: sig.alloc,
})
.output()
.await?;

if finished {
Expand Down
4 changes: 3 additions & 1 deletion svc/pkg/job-run/src/workers/drain_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ use proto::backend::pkg::*;

#[worker(name = "job-run-drain-all")]
async fn worker(ctx: &OperationContext<job_run::msg::drain_all::Message>) -> GlobalResult<()> {
chirp_workflow::compat::dispatch_workflow(
chirp_workflow::compat::workflow(
ctx,
crate::workflows::drain_all::Input {
nomad_node_id: ctx.nomad_node_id.clone(),
drain_timeout: ctx.drain_timeout,
},
)
.await?
.dispatch()
.await?;

Ok(())
Expand Down
13 changes: 5 additions & 8 deletions svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ pub async fn handle(
})
.await?;
} else if ds::util::is_nomad_ds(job_id) {
ctx.tagged_signal(
&json!({
"nomad_dispatched_job_id": job_id,
}),
ds::workflows::server::NomadAllocPlan {
alloc: alloc.clone(),
},
)
ctx.signal(ds::workflows::server::NomadAllocPlan {
alloc: alloc.clone(),
})
.tag("nomad_dispatched_job_id", job_id)
.send()
.await?;
} else {
tracing::info!(%job_id, "disregarding event");
Expand Down
13 changes: 5 additions & 8 deletions svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ pub async fn handle(
})
.await?;
} else if ds::util::is_nomad_ds(job_id) {
ctx.tagged_signal(
&json!({
"nomad_dispatched_job_id": job_id,
}),
ds::workflows::server::NomadAllocUpdate {
alloc: alloc.clone(),
},
)
ctx.signal(ds::workflows::server::NomadAllocUpdate {
alloc: alloc.clone(),
})
.tag("nomad_dispatched_job_id", job_id)
.send()
.await?;
} else {
tracing::info!(%job_id, "disregarding event");
Expand Down
11 changes: 4 additions & 7 deletions svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,10 @@ pub async fn handle(
})
.await?;
} else if ds::util::is_nomad_ds(job_id) {
ctx.tagged_signal(
&json!({
"nomad_dispatched_job_id": job_id,
}),
ds::workflows::server::NomadEvalUpdate { eval: eval.clone() },
)
.await?;
ctx.signal(ds::workflows::server::NomadEvalUpdate { eval: eval.clone() })
.tag("nomad_dispatched_job_id", job_id)
.send()
.await?;
} else {
tracing::info!(%job_id, "disregarding event");
}
Expand Down
Loading