Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(workflows): fix backfill #1025

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions lib/chirp-workflow/core/src/ctx/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ impl BackfillCtx {

Ok(workflow_id)
}

pub fn existing_workflow<F>(&mut self, workflow_id: Uuid, builder: F) -> GlobalResult<()>
where
F: Fn(&mut WorkflowBackfillCtx) -> GlobalResult<()>,
{
let mut wf_ctx = WorkflowBackfillCtx::new("");
wf_ctx.workflow_id = workflow_id;

builder(&mut wf_ctx)?;

self.queries.extend(wf_ctx.queries);

Ok(())
}
}

pub struct WorkflowBackfillCtx {
Expand Down Expand Up @@ -123,6 +137,12 @@ impl WorkflowBackfillCtx {
.collect()
}

pub fn set_location(&mut self, location: &[usize]) {
assert!(!location.is_empty(), "empty location");
self.root_location = Box::from(&location[..location.len() - 1]);
self.location_idx = *location.last().unwrap();
}

pub fn finalize(&mut self) {
let wake_immediate = true;

Expand Down Expand Up @@ -308,8 +328,6 @@ impl WorkflowBackfillCtx {

self.queries.extend(swf_ctx.queries);

self.location_idx += 1;

Ok(())
}

Expand Down
63 changes: 42 additions & 21 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
metrics,
registry::RegistryHandle,
signal::Signal,
util::{GlobalErrorExt, Location},
util::{self, GlobalErrorExt, Location},
workflow::{Workflow, WorkflowInput},
};

Expand Down Expand Up @@ -167,6 +167,11 @@ impl WorkflowCtx {
.collect()
}

/// For debugging, pretty prints the current location
fn loc(&self) -> String {
util::format_location(&self.full_location())
}

pub(crate) fn loop_location(&self) -> Option<&[usize]> {
self.loop_location.as_deref()
}
Expand Down Expand Up @@ -429,15 +434,17 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::SubWorkflow(sub_workflow) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found sub workflow {}",
"expected {event} at {}, found sub workflow {}",
self.loc(),
I::Workflow::NAME
)))
.map_err(GlobalError::raw);
};

if sub_workflow.name != I::Workflow::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found sub_workflow {}",
"expected {event} at {}, found sub_workflow {}",
self.loc(),
I::Workflow::NAME
)))
.map_err(GlobalError::raw);
Expand Down Expand Up @@ -660,17 +667,19 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Activity(activity) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found activity {}",
"expected {event} at {}, found activity {}",
self.loc(),
activity_id.name
)))
.map_err(GlobalError::raw);
};

if activity.activity_id != activity_id {
return Err(WorkflowError::HistoryDiverged(format!(
"expected activity {}#{:x}, found activity {}#{:x}",
"expected activity {}#{:x} at {}, found activity {}#{:x}",
activity.activity_id.name,
activity.activity_id.input_hash,
self.loc(),
activity_id.name,
activity_id.input_hash,
)))
Expand Down Expand Up @@ -759,15 +768,17 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::SignalSend(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
};

if signal.name != T::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
Expand Down Expand Up @@ -823,15 +834,17 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::SignalSend(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
};

if signal.name != T::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal send {}",
"expected {event} at {}, found signal send {}",
self.loc(),
T::NAME
)))
.map_err(GlobalError::raw);
Expand Down Expand Up @@ -885,7 +898,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
"expected {event} at {}, found signal",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down Expand Up @@ -938,7 +952,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
"expected {event} at {}, found signal",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down Expand Up @@ -990,7 +1005,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Signal(signal) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found signal"
"expected {event} at {}, found signal",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down Expand Up @@ -1025,16 +1041,18 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::MessageSend(msg) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
};

if msg.name != M::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
}
Expand Down Expand Up @@ -1084,16 +1102,18 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::MessageSend(msg) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
};

if msg.name != M::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found message send {}",
M::NAME
"expected {event} at {}, found message send {}",
self.loc(),
M::NAME,
)))
.map_err(GlobalError::raw);
}
Expand Down Expand Up @@ -1150,7 +1170,8 @@ impl WorkflowCtx {
// Validate history is consistent
let Event::Loop(loop_event) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event}, found loop"
"expected {event} at {}, found loop",
self.loc(),
)))
.map_err(GlobalError::raw);
};
Expand Down
8 changes: 6 additions & 2 deletions lib/chirp-workflow/core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ impl Registry {
// Check for duplicates
for (workflow_name, _) in &registry.workflows {
if self.workflows.contains_key(workflow_name.as_str()) {
return Err(WorkflowError::DuplicateRegisteredWorkflow(workflow_name.clone()));
return Err(WorkflowError::DuplicateRegisteredWorkflow(
workflow_name.clone(),
));
}
}

Expand All @@ -43,7 +45,9 @@ impl Registry {
pub fn register_workflow<W: Workflow>(&mut self) -> WorkflowResult<()> {
// Check for duplicates
if self.workflows.contains_key(W::NAME) {
return Err(WorkflowError::DuplicateRegisteredWorkflow(W::NAME.to_string()));
return Err(WorkflowError::DuplicateRegisteredWorkflow(
W::NAME.to_string(),
));
}

self.workflows.insert(
Expand Down
19 changes: 19 additions & 0 deletions lib/chirp-workflow/core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,22 @@ pub(crate) fn new_conn(

rivet_connection::Connection::new(client, pools.clone(), cache.clone())
}

pub fn format_location(loc: &Location) -> String {
let mut s = "{".to_string();

let mut iter = loc.iter();

if let Some(x) = iter.next() {
s.push_str(&x.to_string());
}

for x in iter {
s.push_str(", ");
s.push_str(&x.to_string());
}

s.push_str("}");

s
}
23 changes: 19 additions & 4 deletions svc/pkg/cluster/standalone/workflow-backfill/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,25 @@ pub async fn run_from_env() -> GlobalResult<()> {
cluster::workflows::datacenter::CreateComplete {},
)?;

wf.signal(
"cluster-datacenter-scale",
cluster::workflows::datacenter::Scale {},
)?;
// Scale
wf.sub_workflow(|swf| {
#[derive(Serialize, Hash)]
struct CalculateDiffInput {
datacenter_id: Uuid,
}

swf.activity(
"calculate_diff",
CalculateDiffInput {
datacenter_id: dc.datacenter_id,
},
json!({
"actions": [],
}),
)?;

Ok(())
})?;

Ok(())
})?;
Expand Down
Loading