From 6659b34f41e9e0e1f31b21ffd45485cfba4d6e09 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Tue, 3 Sep 2024 23:48:55 +0000 Subject: [PATCH] fix(workflows): implement backoff for timeouts (#1111) ## Changes --- .../grafana_dashboards/chirp-workflow.json | 4 ++-- lib/chirp-workflow/core/src/ctx/common.rs | 3 ++- lib/chirp-workflow/core/src/ctx/workflow.rs | 20 +++++++++++++------ lib/chirp-workflow/core/src/error.rs | 20 +++++++++---------- lib/util/core/src/lib.rs | 18 +++++++++++++++++ svc/pkg/cluster/src/ops/server/lost_list.rs | 5 +---- .../src/ops/server/prune_with_filter.rs | 5 +---- svc/pkg/cluster/src/workflows/server/drain.rs | 3 +-- svc/pkg/ds/src/workers/drain_all.rs | 9 ++++----- svc/pkg/ds/src/workflows/server/mod.rs | 5 +++++ svc/pkg/job-run/src/workers/drain_all.rs | 4 +++- .../monitor/src/monitors/alloc_plan.rs | 13 +++++------- .../monitor/src/monitors/alloc_update.rs | 13 +++++------- .../monitor/src/monitors/eval_update.rs | 11 ++++------ 14 files changed, 74 insertions(+), 59 deletions(-) diff --git a/infra/tf/grafana/grafana_dashboards/chirp-workflow.json b/infra/tf/grafana/grafana_dashboards/chirp-workflow.json index 0450eaf2e7..a664c5adfb 100644 --- a/infra/tf/grafana/grafana_dashboards/chirp-workflow.json +++ b/infra/tf/grafana/grafana_dashboards/chirp-workflow.json @@ -408,9 +408,9 @@ "uid": "prometheus" }, "editorMode": "code", - "expr": "sum by (error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})", + "expr": "sum by (workflow_name, error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})", "instant": false, - "legendFormat": "{{error_code}}", + "legendFormat": "(workflow_name) {{error_code}}", "range": true, "refId": "A" } diff --git a/lib/chirp-workflow/core/src/ctx/common.rs b/lib/chirp-workflow/core/src/ctx/common.rs index 6cd2726b37..c342c7fbca 100644 --- a/lib/chirp-workflow/core/src/ctx/common.rs +++ b/lib/chirp-workflow/core/src/ctx/common.rs @@ -63,8 +63,9 @@ where I::Operation::NAME, ); - let res = I::Operation::run(&ctx, &input) + let res = tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input)) .await + .map_err(|_| WorkflowError::OperationTimeout(0))? .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw); diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index e990e244d4..7037205580 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -300,7 +300,7 @@ impl WorkflowCtx { let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input)) .await - .map_err(|_| WorkflowError::ActivityTimeout); + .map_err(|_| WorkflowError::ActivityTimeout(0)); let dt = start_instant.elapsed().as_secs_f64(); @@ -499,15 +499,24 @@ impl WorkflowCtx { if error_count + 1 >= I::Activity::MAX_RETRIES { WorkflowError::ActivityMaxFailuresReached(err) } else { - // Add error count to the error + // Add error count to the error for backoff calculation WorkflowError::ActivityFailure(err, error_count) } } - WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => { + WorkflowError::ActivityTimeout(_) => { if error_count + 1 >= I::Activity::MAX_RETRIES { WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err)) } else { - err + // Add error count to the error for backoff calculation + WorkflowError::ActivityTimeout(error_count) + } + } + WorkflowError::OperationTimeout(_) => { + if error_count + 1 >= I::Activity::MAX_RETRIES { + WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err)) + } else { + // Add error count to the error for backoff calculation + WorkflowError::OperationTimeout(error_count) } } _ => err, @@ -822,8 +831,7 @@ impl WorkflowCtx { pub async fn sleep(&mut self, duration: T) -> GlobalResult<()> { let ts = rivet_util::timestamp::now() as u64 + duration.to_millis()?; - self.sleep_until(ts as i64) - .await + self.sleep_until(ts as i64).await } pub async fn sleep_until(&mut self, time: T) -> GlobalResult<()> { diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index 65205d31a5..f55f902565 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -124,10 +124,10 @@ pub enum WorkflowError { Pools(#[from] rivet_pools::Error), #[error("activity timed out")] - ActivityTimeout, + ActivityTimeout(usize), #[error("operation timed out")] - OperationTimeout, + OperationTimeout(usize), #[error("duplicate registered workflow: {0}")] DuplicateRegisteredWorkflow(String), @@ -140,7 +140,9 @@ impl WorkflowError { /// Returns the next deadline for a workflow to be woken up again based on the error. pub(crate) fn deadline_ts(&self) -> Option { match self { - WorkflowError::ActivityFailure(_, error_count) => { + WorkflowError::ActivityFailure(_, error_count) + | WorkflowError::ActivityTimeout(error_count) + | WorkflowError::OperationTimeout(error_count) => { // NOTE: Max retry is handled in `WorkflowCtx::activity` let mut backoff = rivet_util::Backoff::new_at(8, None, RETRY_TIMEOUT_MS, 500, *error_count); @@ -157,10 +159,6 @@ impl WorkflowError { Some(deadline_ts) } - // TODO: Add backoff - WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => { - Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64) - } WorkflowError::Sleep(ts) => Some(*ts), _ => None, } @@ -169,8 +167,8 @@ impl WorkflowError { pub fn is_recoverable(&self) -> bool { match self { WorkflowError::ActivityFailure(_, _) - | WorkflowError::ActivityTimeout - | WorkflowError::OperationTimeout + | WorkflowError::ActivityTimeout(_) + | WorkflowError::OperationTimeout(_) | WorkflowError::NoSignalFound(_) | WorkflowError::SubWorkflowIncomplete(_) | WorkflowError::Sleep(_) => true, @@ -181,8 +179,8 @@ impl WorkflowError { pub(crate) fn is_retryable(&self) -> bool { match self { WorkflowError::ActivityFailure(_, _) - | WorkflowError::ActivityTimeout - | WorkflowError::OperationTimeout => true, + | WorkflowError::ActivityTimeout(_) + | WorkflowError::OperationTimeout(_) => true, _ => false, } } diff --git a/lib/util/core/src/lib.rs b/lib/util/core/src/lib.rs index 4a3c8af1c7..0b8523a682 100644 --- a/lib/util/core/src/lib.rs +++ b/lib/util/core/src/lib.rs @@ -218,6 +218,24 @@ impl AsHashableExt for HashMap Into> for HashMap { + fn into(self) -> HashableMap { + HashableMap(self.into_iter().collect()) + } +} + +impl Into> for &HashMap { + fn into(self) -> HashableMap { + HashableMap(self.iter().map(|(k, v)| (k.clone(), v.clone())).collect()) + } +} + +impl FromIterator<(K, V)> for HashableMap { + fn from_iter>(iter: I) -> Self { + HashableMap(iter.into_iter().collect()) + } +} + #[cfg(test)] mod tests { use std::time::Instant; diff --git a/svc/pkg/cluster/src/ops/server/lost_list.rs b/svc/pkg/cluster/src/ops/server/lost_list.rs index d8d71b4659..aca15c0f9b 100644 --- a/svc/pkg/cluster/src/ops/server/lost_list.rs +++ b/svc/pkg/cluster/src/ops/server/lost_list.rs @@ -49,10 +49,7 @@ pub async fn cluster_server_lost_list(ctx: &OperationCtx, input: &Input) -> Glob .await? .into_iter() .map(|(provider, provider_api_token)| (provider.0, provider_api_token)) - .chain(std::iter::once(( - Provider::Linode, - util::env::read_secret(&["linode", "token"]).await?, - ))) + .chain(std::iter::once((Provider::Linode, linode_token))) .collect::>(); // Filter by namespace diff --git a/svc/pkg/cluster/src/ops/server/prune_with_filter.rs b/svc/pkg/cluster/src/ops/server/prune_with_filter.rs index 3996b686f4..b9d7ef8ec0 100644 --- a/svc/pkg/cluster/src/ops/server/prune_with_filter.rs +++ b/svc/pkg/cluster/src/ops/server/prune_with_filter.rs @@ -47,10 +47,7 @@ pub async fn cluster_server_prune_with_filter( .await? .into_iter() .map(|(provider, provider_api_token)| (provider.0, provider_api_token)) - .chain(std::iter::once(( - Provider::Linode, - util::env::read_secret(&["linode", "token"]).await?, - ))) + .chain(std::iter::once((Provider::Linode, linode_token))) .collect::>(); // Filter by namespace diff --git a/svc/pkg/cluster/src/workflows/server/drain.rs b/svc/pkg/cluster/src/workflows/server/drain.rs index a6bb5ff18e..e1242acd11 100644 --- a/svc/pkg/cluster/src/workflows/server/drain.rs +++ b/svc/pkg/cluster/src/workflows/server/drain.rs @@ -3,8 +3,7 @@ use nomad_client::{ apis::{configuration::Configuration, nodes_api}, models, }; -use rivet_operation::prelude::proto::backend::pkg::mm; -use serde_json::json; +use rivet_operation::prelude::proto::backend::pkg::*; use crate::types::PoolType; diff --git a/svc/pkg/ds/src/workers/drain_all.rs b/svc/pkg/ds/src/workers/drain_all.rs index 8dc561652a..69c556a207 100644 --- a/svc/pkg/ds/src/workers/drain_all.rs +++ b/svc/pkg/ds/src/workers/drain_all.rs @@ -2,7 +2,6 @@ use std::convert::TryInto; use chirp_worker::prelude::*; use proto::backend::pkg::*; -use serde_json::json; #[worker(name = "ds-drain-all")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { @@ -24,16 +23,16 @@ async fn worker(ctx: &OperationContext) -> GlobalRe .await?; for (server_id, kill_timeout_ms) in server_rows { - chirp_workflow::compat::tagged_signal( + chirp_workflow::compat::signal( ctx, - &json!({ - "server_id": server_id, - }), crate::workflows::server::Destroy { override_kill_timeout_ms: (drain_timeout < kill_timeout_ms) .then_some(drain_timeout), }, ) + .await? + .tag("server_id", server_id) + .send() .await?; } diff --git a/svc/pkg/ds/src/workflows/server/mod.rs b/svc/pkg/ds/src/workflows/server/mod.rs index 5708224c54..8e1ace8fd0 100644 --- a/svc/pkg/ds/src/workflows/server/mod.rs +++ b/svc/pkg/ds/src/workflows/server/mod.rs @@ -112,6 +112,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> server_id: input.server_id, eval: sig.eval, }) + .output() .await?; if let EvalStatus::Failed = eval_status { @@ -121,6 +122,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> server_id: input.server_id, override_kill_timeout_ms: None, }) + .output() .await?; } } @@ -131,6 +133,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> server_id: input.server_id, override_kill_timeout_ms: sig.override_kill_timeout_ms, }) + .output() .await?; return Ok(()); @@ -148,6 +151,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> server_id, alloc: sig.alloc, }) + .output() .await?; } Main::NomadAllocUpdate(sig) => { @@ -156,6 +160,7 @@ pub async fn ds_server(ctx: &mut WorkflowCtx, input: &Input) -> GlobalResult<()> server_id, alloc: sig.alloc, }) + .output() .await?; if finished { diff --git a/svc/pkg/job-run/src/workers/drain_all.rs b/svc/pkg/job-run/src/workers/drain_all.rs index 4c9b384b34..a480c41b2c 100644 --- a/svc/pkg/job-run/src/workers/drain_all.rs +++ b/svc/pkg/job-run/src/workers/drain_all.rs @@ -3,13 +3,15 @@ use proto::backend::pkg::*; #[worker(name = "job-run-drain-all")] async fn worker(ctx: &OperationContext) -> GlobalResult<()> { - chirp_workflow::compat::dispatch_workflow( + chirp_workflow::compat::workflow( ctx, crate::workflows::drain_all::Input { nomad_node_id: ctx.nomad_node_id.clone(), drain_timeout: ctx.drain_timeout, }, ) + .await? + .dispatch() .await?; Ok(()) diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs index 69fcfdd548..897ff8c07d 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_plan.rs @@ -23,14 +23,11 @@ pub async fn handle( }) .await?; } else if ds::util::is_nomad_ds(job_id) { - ctx.tagged_signal( - &json!({ - "nomad_dispatched_job_id": job_id, - }), - ds::workflows::server::NomadAllocPlan { - alloc: alloc.clone(), - }, - ) + ctx.signal(ds::workflows::server::NomadAllocPlan { + alloc: alloc.clone(), + }) + .tag("nomad_dispatched_job_id", job_id) + .send() .await?; } else { tracing::info!(%job_id, "disregarding event"); diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs index 001b6f90ff..fe274d2d26 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/alloc_update.rs @@ -23,14 +23,11 @@ pub async fn handle( }) .await?; } else if ds::util::is_nomad_ds(job_id) { - ctx.tagged_signal( - &json!({ - "nomad_dispatched_job_id": job_id, - }), - ds::workflows::server::NomadAllocUpdate { - alloc: alloc.clone(), - }, - ) + ctx.signal(ds::workflows::server::NomadAllocUpdate { + alloc: alloc.clone(), + }) + .tag("nomad_dispatched_job_id", job_id) + .send() .await?; } else { tracing::info!(%job_id, "disregarding event"); diff --git a/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs b/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs index 6573681055..cafc1fae9e 100644 --- a/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs +++ b/svc/pkg/nomad/standalone/monitor/src/monitors/eval_update.rs @@ -41,13 +41,10 @@ pub async fn handle( }) .await?; } else if ds::util::is_nomad_ds(job_id) { - ctx.tagged_signal( - &json!({ - "nomad_dispatched_job_id": job_id, - }), - ds::workflows::server::NomadEvalUpdate { eval: eval.clone() }, - ) - .await?; + ctx.signal(ds::workflows::server::NomadEvalUpdate { eval: eval.clone() }) + .tag("nomad_dispatched_job_id", job_id) + .send() + .await?; } else { tracing::info!(%job_id, "disregarding event"); }