From 6f7c94c6bf63f48b2674787a5f1e869197043a13 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Wed, 7 Aug 2024 00:24:45 +0000 Subject: [PATCH] fix(workflows): fix backfill (#1025) ## Changes --- lib/chirp-workflow/core/src/ctx/backfill.rs | 22 ++++++- lib/chirp-workflow/core/src/ctx/workflow.rs | 63 ++++++++++++------- lib/chirp-workflow/core/src/registry.rs | 8 ++- lib/chirp-workflow/core/src/util.rs | 19 ++++++ .../standalone/workflow-backfill/src/lib.rs | 23 +++++-- 5 files changed, 106 insertions(+), 29 deletions(-) diff --git a/lib/chirp-workflow/core/src/ctx/backfill.rs b/lib/chirp-workflow/core/src/ctx/backfill.rs index 8310667eb5..bf6fea2108 100644 --- a/lib/chirp-workflow/core/src/ctx/backfill.rs +++ b/lib/chirp-workflow/core/src/ctx/backfill.rs @@ -56,6 +56,20 @@ impl BackfillCtx { Ok(workflow_id) } + + pub fn existing_workflow(&mut self, workflow_id: Uuid, builder: F) -> GlobalResult<()> + where + F: Fn(&mut WorkflowBackfillCtx) -> GlobalResult<()>, + { + let mut wf_ctx = WorkflowBackfillCtx::new(""); + wf_ctx.workflow_id = workflow_id; + + builder(&mut wf_ctx)?; + + self.queries.extend(wf_ctx.queries); + + Ok(()) + } } pub struct WorkflowBackfillCtx { @@ -123,6 +137,12 @@ impl WorkflowBackfillCtx { .collect() } + pub fn set_location(&mut self, location: &[usize]) { + assert!(!location.is_empty(), "empty location"); + self.root_location = Box::from(&location[..location.len() - 1]); + self.location_idx = *location.last().unwrap(); + } + pub fn finalize(&mut self) { let wake_immediate = true; @@ -308,8 +328,6 @@ impl WorkflowBackfillCtx { self.queries.extend(swf_ctx.queries); - self.location_idx += 1; - Ok(()) } diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index c92a96f737..61907036b3 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -18,7 +18,7 @@ use crate::{ metrics, registry::RegistryHandle, signal::Signal, - util::{GlobalErrorExt, Location}, + util::{self, GlobalErrorExt, Location}, workflow::{Workflow, WorkflowInput}, }; @@ -167,6 +167,11 @@ impl WorkflowCtx { .collect() } + /// For debugging, pretty prints the current location + fn loc(&self) -> String { + util::format_location(&self.full_location()) + } + pub(crate) fn loop_location(&self) -> Option<&[usize]> { self.loop_location.as_deref() } @@ -429,7 +434,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::SubWorkflow(sub_workflow) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found sub workflow {}", + "expected {event} at {}, found sub workflow {}", + self.loc(), I::Workflow::NAME ))) .map_err(GlobalError::raw); @@ -437,7 +443,8 @@ impl WorkflowCtx { if sub_workflow.name != I::Workflow::NAME { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found sub_workflow {}", + "expected {event} at {}, found sub_workflow {}", + self.loc(), I::Workflow::NAME ))) .map_err(GlobalError::raw); @@ -660,7 +667,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::Activity(activity) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found activity {}", + "expected {event} at {}, found activity {}", + self.loc(), activity_id.name ))) .map_err(GlobalError::raw); @@ -668,9 +676,10 @@ impl WorkflowCtx { if activity.activity_id != activity_id { return Err(WorkflowError::HistoryDiverged(format!( - "expected activity {}#{:x}, found activity {}#{:x}", + "expected activity {}#{:x} at {}, found activity {}#{:x}", activity.activity_id.name, activity.activity_id.input_hash, + self.loc(), activity_id.name, activity_id.input_hash, ))) @@ -759,7 +768,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::SignalSend(signal) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found signal send {}", + "expected {event} at {}, found signal send {}", + self.loc(), T::NAME ))) .map_err(GlobalError::raw); @@ -767,7 +777,8 @@ impl WorkflowCtx { if signal.name != T::NAME { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found signal send {}", + "expected {event} at {}, found signal send {}", + self.loc(), T::NAME ))) .map_err(GlobalError::raw); @@ -823,7 +834,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::SignalSend(signal) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found signal send {}", + "expected {event} at {}, found signal send {}", + self.loc(), T::NAME ))) .map_err(GlobalError::raw); @@ -831,7 +843,8 @@ impl WorkflowCtx { if signal.name != T::NAME { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found signal send {}", + "expected {event} at {}, found signal send {}", + self.loc(), T::NAME ))) .map_err(GlobalError::raw); @@ -885,7 +898,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::Signal(signal) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found signal" + "expected {event} at {}, found signal", + self.loc(), ))) .map_err(GlobalError::raw); }; @@ -938,7 +952,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::Signal(signal) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found signal" + "expected {event} at {}, found signal", + self.loc(), ))) .map_err(GlobalError::raw); }; @@ -990,7 +1005,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::Signal(signal) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found signal" + "expected {event} at {}, found signal", + self.loc(), ))) .map_err(GlobalError::raw); }; @@ -1025,16 +1041,18 @@ impl WorkflowCtx { // Validate history is consistent let Event::MessageSend(msg) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found message send {}", - M::NAME + "expected {event} at {}, found message send {}", + self.loc(), + M::NAME, ))) .map_err(GlobalError::raw); }; if msg.name != M::NAME { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found message send {}", - M::NAME + "expected {event} at {}, found message send {}", + self.loc(), + M::NAME, ))) .map_err(GlobalError::raw); } @@ -1084,16 +1102,18 @@ impl WorkflowCtx { // Validate history is consistent let Event::MessageSend(msg) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found message send {}", - M::NAME + "expected {event} at {}, found message send {}", + self.loc(), + M::NAME, ))) .map_err(GlobalError::raw); }; if msg.name != M::NAME { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found message send {}", - M::NAME + "expected {event} at {}, found message send {}", + self.loc(), + M::NAME, ))) .map_err(GlobalError::raw); } @@ -1150,7 +1170,8 @@ impl WorkflowCtx { // Validate history is consistent let Event::Loop(loop_event) = event else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event}, found loop" + "expected {event} at {}, found loop", + self.loc(), ))) .map_err(GlobalError::raw); }; diff --git a/lib/chirp-workflow/core/src/registry.rs b/lib/chirp-workflow/core/src/registry.rs index 395c8418d6..6a1343ddbc 100644 --- a/lib/chirp-workflow/core/src/registry.rs +++ b/lib/chirp-workflow/core/src/registry.rs @@ -31,7 +31,9 @@ impl Registry { // Check for duplicates for (workflow_name, _) in ®istry.workflows { if self.workflows.contains_key(workflow_name.as_str()) { - return Err(WorkflowError::DuplicateRegisteredWorkflow(workflow_name.clone())); + return Err(WorkflowError::DuplicateRegisteredWorkflow( + workflow_name.clone(), + )); } } @@ -43,7 +45,9 @@ impl Registry { pub fn register_workflow(&mut self) -> WorkflowResult<()> { // Check for duplicates if self.workflows.contains_key(W::NAME) { - return Err(WorkflowError::DuplicateRegisteredWorkflow(W::NAME.to_string())); + return Err(WorkflowError::DuplicateRegisteredWorkflow( + W::NAME.to_string(), + )); } self.workflows.insert( diff --git a/lib/chirp-workflow/core/src/util.rs b/lib/chirp-workflow/core/src/util.rs index 9a106be887..0a9561135c 100644 --- a/lib/chirp-workflow/core/src/util.rs +++ b/lib/chirp-workflow/core/src/util.rs @@ -80,3 +80,22 @@ pub(crate) fn new_conn( rivet_connection::Connection::new(client, pools.clone(), cache.clone()) } + +pub fn format_location(loc: &Location) -> String { + let mut s = "{".to_string(); + + let mut iter = loc.iter(); + + if let Some(x) = iter.next() { + s.push_str(&x.to_string()); + } + + for x in iter { + s.push_str(", "); + s.push_str(&x.to_string()); + } + + s.push_str("}"); + + s +} diff --git a/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs b/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs index a48678ad24..7efd250ee1 100644 --- a/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs +++ b/svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs @@ -330,10 +330,25 @@ pub async fn run_from_env() -> GlobalResult<()> { cluster::workflows::datacenter::CreateComplete {}, )?; - wf.signal( - "cluster-datacenter-scale", - cluster::workflows::datacenter::Scale {}, - )?; + // Scale + wf.sub_workflow(|swf| { + #[derive(Serialize, Hash)] + struct CalculateDiffInput { + datacenter_id: Uuid, + } + + swf.activity( + "calculate_diff", + CalculateDiffInput { + datacenter_id: dc.datacenter_id, + }, + json!({ + "actions": [], + }), + )?; + + Ok(()) + })?; Ok(()) })?;