Skip to content

Commit

Permalink
fix(workflows): fix backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Aug 6, 2024
1 parent 2f51bb8 commit 3d70b26
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 29 deletions.
22 changes: 20 additions & 2 deletions lib/chirp-workflow/core/src/ctx/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ impl BackfillCtx {

Ok(workflow_id)
}

pub fn existing_workflow<F>(&mut self, workflow_id: Uuid, builder: F) -> GlobalResult<()>
where
F: Fn(&mut WorkflowBackfillCtx) -> GlobalResult<()>,
{
let mut wf_ctx = WorkflowBackfillCtx::new("");
wf_ctx.workflow_id = workflow_id;

builder(&mut wf_ctx)?;

self.queries.extend(wf_ctx.queries);

Ok(())
}
}

pub struct WorkflowBackfillCtx {
Expand Down Expand Up @@ -123,6 +137,12 @@ impl WorkflowBackfillCtx {
.collect()
}

pub fn set_location(&mut self, location: &[usize]) {
assert!(!location.is_empty(), "empty location");
self.root_location = Box::from(&location[..location.len() - 1]);
self.location_idx = *location.last().unwrap();
}

pub fn finalize(&mut self) {
let wake_immediate = true;

Expand Down Expand Up @@ -308,8 +328,6 @@ impl WorkflowBackfillCtx {

self.queries.extend(swf_ctx.queries);

self.location_idx += 1;

Ok(())
}

Expand Down
63 changes: 42 additions & 21 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
metrics,
registry::RegistryHandle,
signal::Signal,
util::{GlobalErrorExt, Location},
util::{self, GlobalErrorExt, Location},
workflow::{Workflow, WorkflowInput},
};

Expand Down Expand Up @@ -167,6 +167,11 @@ impl WorkflowCtx {
.collect()
}

/// For debugging, pretty prints the current location
fn loc(&self) -> String {
util::format_location(&self.full_location())
}

pub(crate) fn loop_location(&self) -> Option<&[usize]> {
self.loop_location.as_deref()
}
Expand Down Expand Up @@ -429,15 +434,17 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::SubWorkflow(sub_workflow) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found sub workflow {}",
"expected {event} at {}, found sub workflow {}",
self.loc(),
I::Workflow::NAME
)))
.map_err(GlobalError::raw);
};

if sub_workflow.name != I::Workflow::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found sub_workflow {}",
"expected {event} at {}, found sub_workflow {}",
self.loc(),
I::Workflow::NAME
)))
.map_err(GlobalError::raw);
Expand Down Expand Up @@ -660,17 +667,19 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Activity(activity) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found activity {}",
"expected {event} at {}, found activity {}",
self.loc(),
activity_id.name
)))
.map_err(GlobalError::raw);
};

if activity.activity_id != activity_id {
return Err(WorkflowError::HistoryDiverged(format!(
"expected activity {}#{:x}, found activity {}#{:x}",
"expected activity {}#{:x} at {}, found activity {}#{:x}",
activity.activity_id.name,
activity.activity_id.input_hash,
self.loc(),
activity_id.name,
activity_id.input_hash,
)))
Expand Down Expand Up @@ -759,15 +768,17 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::SignalSend(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
};

if signal.name != T::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
Expand Down Expand Up @@ -823,15 +834,17 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::SignalSend(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
};

if signal.name != T::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
Expand Down Expand Up @@ -885,7 +898,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
"expected {event} at {}, found signal",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down Expand Up @@ -938,7 +952,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
"expected {event} at {}, found signal",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down Expand Up @@ -990,7 +1005,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
"expected {event} at {}, found signal",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down Expand Up @@ -1025,16 +1041,18 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::MessageSend(msg) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
};

if msg.name != M::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
}
Expand Down Expand Up @@ -1084,16 +1102,18 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::MessageSend(msg) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
};

if msg.name != M::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
}
Expand Down Expand Up @@ -1150,7 +1170,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Loop(loop_event) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found loop"
"expected {event} at {}, found loop",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down
8 changes: 6 additions & 2 deletions lib/chirp-workflow/core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ impl Registry {
// Check for duplicates
for (workflow_name, _) in &registry.workflows {
if self.workflows.contains_key(workflow_name.as_str()) {
return Err(WorkflowError::DuplicateRegisteredWorkflow(workflow_name.clone()));
return Err(WorkflowError::DuplicateRegisteredWorkflow(
workflow_name.clone(),
));
}
}

Expand All @@ -43,7 +45,9 @@ impl Registry {
pub fn register_workflow<W: Workflow>(&mut self) -> WorkflowResult<()> {
// Check for duplicates
if self.workflows.contains_key(W::NAME) {
return Err(WorkflowError::DuplicateRegisteredWorkflow(W::NAME.to_string()));
return Err(WorkflowError::DuplicateRegisteredWorkflow(
W::NAME.to_string(),
));
}

self.workflows.insert(
Expand Down
19 changes: 19 additions & 0 deletions lib/chirp-workflow/core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,22 @@ pub(crate) fn new_conn(

rivet_connection::Connection::new(client, pools.clone(), cache.clone())
}

pub fn format_location(loc: &Location) -> String {
let mut s = "{".to_string();

let mut iter = loc.iter();

if let Some(x) = iter.next() {
s.push_str(&x.to_string());
}

for x in iter {
s.push_str(", ");
s.push_str(&x.to_string());
}

s.push_str("}");

s
}
23 changes: 19 additions & 4 deletions svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,25 @@ pub async fn run_from_env() -> GlobalResult<()> {
cluster::workflows::datacenter::CreateComplete {},
)?;

wf.signal(
"cluster-datacenter-scale",
cluster::workflows::datacenter::Scale {},
)?;
// Scale
wf.sub_workflow(|swf| {
#[derive(Serialize, Hash)]
struct CalculateDiffInput {
datacenter_id: Uuid,
}

swf.activity(
"calculate_diff",
CalculateDiffInput {
datacenter_id: dc.datacenter_id,
},
json!({
"actions": [],
}),
)?;

Ok(())
})?;

Ok(())
})?;
Expand Down

0 comments on commit 3d70b26

Please sign in to comment.