diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index fbd8bc3336..8778b871b0 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -258,6 +258,8 @@ impl WorkflowCtx { activity_id: &ActivityId, create_ts: i64, ) -> WorkflowResult { + tracing::debug!(id=%self.workflow_id, activity_name=%A::NAME, "running activity"); + let ctx = ActivityCtx::new( self.workflow_id, self.db.clone(), @@ -372,11 +374,19 @@ impl WorkflowCtx { let id = if let Some(event) = event { // Validate history is consistent let Event::SubWorkflow(sub_workflow) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found sub workflow {}", + I::Workflow::NAME + ))) + .map_err(GlobalError::raw); }; - if sub_workflow.sub_workflow_name != I::Workflow::NAME { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + if sub_workflow.name != I::Workflow::NAME { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found sub_workflow {}", + I::Workflow::NAME + ))) + .map_err(GlobalError::raw); } tracing::debug!( @@ -592,15 +602,25 @@ impl WorkflowCtx { let output = if let Some(event) = event { // Validate history is consistent let Event::Activity(activity) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found activity {}", + activity_id.name + ))) + .map_err(GlobalError::raw); }; if activity.activity_id != activity_id { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found activity {}", + activity_id.name + ))) + .map_err(GlobalError::raw); } // Activity succeeded if let Some(output) = activity.parse_output().map_err(GlobalError::raw)? { + tracing::debug!(id=%self.workflow_id, activity_name=%I::Activity::NAME, "replaying activity"); + output } // Activity failed, retry @@ -677,9 +697,21 @@ impl WorkflowCtx { let signal_id = if let Some(event) = event { // Validate history is consistent let Event::SignalSend(signal) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found signal send {}", + T::NAME + ))) + .map_err(GlobalError::raw); }; + if signal.name != T::NAME { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found signal send {}", + T::NAME + ))) + .map_err(GlobalError::raw); + } + tracing::debug!(id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying signal dispatch"); signal.signal_id @@ -728,9 +760,21 @@ impl WorkflowCtx { let signal_id = if let Some(event) = event { // Validate history is consistent let Event::SignalSend(signal) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found signal send {}", + T::NAME + ))) + .map_err(GlobalError::raw); }; + if signal.name != T::NAME { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found signal send {}", + T::NAME + ))) + .map_err(GlobalError::raw); + } + tracing::debug!(id=%self.workflow_id, signal_name=%signal.name, signal_id=%signal.signal_id, "replaying tagged signal dispatch"); signal.signal_id @@ -777,7 +821,10 @@ impl WorkflowCtx { let signal = if let Some(event) = event { // Validate history is consistent let Event::Signal(signal) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found signal" + ))) + .map_err(GlobalError::raw); }; tracing::debug!(name=%self.name, id=%self.workflow_id, signal_name=%signal.name, "replaying signal"); @@ -827,7 +874,10 @@ impl WorkflowCtx { let signal = if let Some(event) = event { // Validate history is consistent let Event::Signal(signal) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found signal" + ))) + .map_err(GlobalError::raw); }; tracing::debug!(name=%self.name, id=%self.workflow_id, signal_name=%signal.name, "replaying signal"); @@ -876,7 +926,10 @@ impl WorkflowCtx { // Validate history is consistent let Event::Signal(signal) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found signal" + ))) + .map_err(GlobalError::raw); }; Some(T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)?) @@ -908,9 +961,21 @@ impl WorkflowCtx { if let Some(event) = event { // Validate history is consistent let Event::MessageSend(msg) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found message send {}", + M::NAME + ))) + .map_err(GlobalError::raw); }; + if msg.name != M::NAME { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found message send {}", + M::NAME + ))) + .map_err(GlobalError::raw); + } + tracing::debug!(id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch"); } // Send message @@ -954,9 +1019,21 @@ impl WorkflowCtx { if let Some(event) = event { // Validate history is consistent let Event::MessageSend(msg) = event else { - return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found message send {}", + M::NAME + ))) + .map_err(GlobalError::raw); }; + if msg.name != M::NAME { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {event}, found message send {}", + M::NAME + ))) + .map_err(GlobalError::raw); + } + tracing::debug!(id=%self.workflow_id, msg_name=%msg.name, "replaying message dispatch"); } // Send message diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 767ca78154..7b11c724e8 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -10,6 +10,8 @@ use super::{ }; use crate::{activity::ActivityId, util, WorkflowError, WorkflowResult}; +const MAX_QUERY_RETRIES: usize = 16; + pub struct DatabasePostgres { pool: PgPool, } @@ -52,6 +54,35 @@ impl DatabasePostgres { self.pool.acquire().await.map_err(WorkflowError::Sqlx) } } + + /// Executes queries and explicitly handles retry errors. + async fn query<'a, F, Fut, T>(&self, mut cb: F) -> WorkflowResult + where + F: FnMut() -> Fut, + Fut: std::future::Future> + 'a, + T: 'a, + { + for _ in 0..MAX_QUERY_RETRIES { + match cb().await { + Err(WorkflowError::Sqlx(err)) => { + if let Some(db_err) = err.as_database_error() { + if db_err + .message() + .contains("TransactionRetryWithProtoRefreshError") + { + tracing::info!(message=%db_err.message(), "transaction retry"); + continue; + } + } + + return Err(WorkflowError::Sqlx(err)); + } + x => return x, + } + } + + Err(WorkflowError::MaxSqlRetries) + } } #[async_trait::async_trait] @@ -64,23 +95,26 @@ impl Database for DatabasePostgres { tags: Option<&serde_json::Value>, input: serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " INSERT INTO db_workflow.workflows ( workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate ) VALUES ($1, $2, $3, $4, $5, $6, true) ", - )) - .bind(workflow_id) - .bind(workflow_name) - .bind(rivet_util::timestamp::now()) - .bind(ray_id) - .bind(&tags) - .bind(input) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(workflow_id) + .bind(workflow_name) + .bind(rivet_util::timestamp::now()) + .bind(ray_id) + .bind(&tags) + .bind(&input) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -302,18 +336,21 @@ impl Database for DatabasePostgres { workflow_id: Uuid, output: &serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " UPDATE db_workflow.workflows SET output = $2 WHERE workflow_id = $1 ", - )) - .bind(workflow_id) - .bind(output) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(workflow_id) + .bind(output) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -328,8 +365,9 @@ impl Database for DatabasePostgres { error: &str, ) -> WorkflowResult<()> { // TODO(RVT-3762): Should this compare `wake_deadline_ts` before setting it? - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " UPDATE db_workflow.workflows SET worker_instance_id = NULL, @@ -340,16 +378,18 @@ impl Database for DatabasePostgres { error = $6 WHERE workflow_id = $1 ", - )) - .bind(workflow_id) - .bind(immediate) - .bind(deadline_ts) - .bind(wake_signals) - .bind(wake_sub_workflow_id) - .bind(error) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(workflow_id) + .bind(immediate) + .bind(deadline_ts) + .bind(wake_signals) + .bind(wake_sub_workflow_id) + .bind(error) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -361,18 +401,21 @@ impl Database for DatabasePostgres { workflow_id: Uuid, tags: &serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " UPDATE db_workflow.workflows SET tags = $2 WHERE workflow_id = $1 ", - )) - .bind(workflow_id) - .bind(tags) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(workflow_id) + .bind(tags) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -388,8 +431,9 @@ impl Database for DatabasePostgres { ) -> WorkflowResult<()> { match res { Ok(output) => { - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " INSERT INTO db_workflow.workflow_activity_events ( workflow_id, location, activity_name, input_hash, input, output, create_ts ) @@ -397,22 +441,25 @@ impl Database for DatabasePostgres { ON CONFLICT (workflow_id, location) DO UPDATE SET output = excluded.output ", - )) - .bind(workflow_id) - .bind(location.iter().map(|x| *x as i64).collect::>()) - .bind(&activity_id.name) - .bind(activity_id.input_hash.to_le_bytes()) - .bind(input) - .bind(output) - .bind(rivet_util::timestamp::now()) - .bind(create_ts) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(workflow_id) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(&activity_id.name) + .bind(activity_id.input_hash.to_le_bytes()) + .bind(&input) + .bind(&output) + .bind(rivet_util::timestamp::now()) + .bind(create_ts) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; } Err(err) => { - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " WITH event AS ( INSERT INTO db_workflow.workflow_activity_events ( @@ -431,18 +478,20 @@ impl Database for DatabasePostgres { ) SELECT 1 ", - )) - .bind(workflow_id) - .bind(location.iter().map(|x| *x as i64).collect::>()) - .bind(&activity_id.name) - .bind(activity_id.input_hash.to_le_bytes()) - .bind(input) - .bind(err) - .bind(create_ts) - .bind(rivet_util::timestamp::now()) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(workflow_id) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(&activity_id.name) + .bind(activity_id.input_hash.to_le_bytes()) + .bind(&input) + .bind(err) + .bind(create_ts) + .bind(rivet_util::timestamp::now()) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; } } @@ -455,8 +504,10 @@ impl Database for DatabasePostgres { filter: &[&str], location: &[usize], ) -> WorkflowResult> { - let signal = sqlx::query_as::<_, SignalRow>(indoc!( - " + let signal = self + .query(|| async { + sqlx::query_as::<_, SignalRow>(indoc!( + " WITH -- Finds the oldest signal matching the signal name filter in either the normal signals table -- or tagged signals table @@ -506,14 +557,16 @@ impl Database for DatabasePostgres { ) SELECT * FROM next_signal ", - )) - .bind(workflow_id) - .bind(filter) - .bind(location.iter().map(|x| *x as i64).collect::>()) - .bind(rivet_util::timestamp::now()) - .fetch_optional(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(workflow_id) + .bind(filter) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(rivet_util::timestamp::now()) + .fetch_optional(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(signal) } @@ -526,21 +579,24 @@ impl Database for DatabasePostgres { signal_name: &str, body: serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, ray_id, create_ts) VALUES ($1, $2, $3, $4, $5, $6) ", - )) - .bind(signal_id) - .bind(workflow_id) - .bind(signal_name) - .bind(body) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(signal_id) + .bind(workflow_id) + .bind(signal_name) + .bind(&body) + .bind(ray_id) + .bind(rivet_util::timestamp::now()) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -553,21 +609,24 @@ impl Database for DatabasePostgres { signal_name: &str, body: serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " + self.query(|| async { + sqlx::query(indoc!( + " INSERT INTO db_workflow.tagged_signals (signal_id, tags, signal_name, body, ray_id, create_ts) VALUES ($1, $2, $3, $4, $5, $6) ", - )) - .bind(signal_id) - .bind(tags) - .bind(signal_name) - .bind(body) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + )) + .bind(signal_id) + .bind(tags) + .bind(signal_name) + .bind(&body) + .bind(ray_id) + .bind(rivet_util::timestamp::now()) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -582,35 +641,38 @@ impl Database for DatabasePostgres { signal_name: &str, body: serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " - WITH - signal AS ( - INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, ray_id, create_ts) - VALUES ($1, $2, $3, $4, $5, $6) - RETURNING 1 - ), - send_event AS ( - INSERT INTO db_workflow.workflow_signal_send_events( - workflow_id, location, signal_id, signal_name, body + self.query(|| async { + sqlx::query(indoc!( + " + WITH + signal AS ( + INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, ray_id, create_ts) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING 1 + ), + send_event AS ( + INSERT INTO db_workflow.workflow_signal_send_events( + workflow_id, location, signal_id, signal_name, body + ) + VALUES($7, $8, $1, $3, $4) + RETURNING 1 ) - VALUES($7, $8, $1, $3, $4) - RETURNING 1 - ) - SELECT 1 - ", - )) - .bind(signal_id) - .bind(to_workflow_id) - .bind(signal_name) - .bind(body) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .bind(from_workflow_id) - .bind(location.iter().map(|x| *x as i64).collect::>()) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + SELECT 1 + ", + )) + .bind(signal_id) + .bind(to_workflow_id) + .bind(signal_name) + .bind(&body) + .bind(ray_id) + .bind(rivet_util::timestamp::now()) + .bind(from_workflow_id) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -625,35 +687,38 @@ impl Database for DatabasePostgres { signal_name: &str, body: serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " - WITH - signal AS ( - INSERT INTO db_workflow.tagged_signals (signal_id, tags, signal_name, body, ray_id, create_ts) - VALUES ($1, $2, $3, $4, $5, $6) - RETURNING 1 - ), - send_event AS ( - INSERT INTO db_workflow.workflow_signal_send_events( - workflow_id, location, signal_id, signal_name, body + self.query(|| async { + sqlx::query(indoc!( + " + WITH + signal AS ( + INSERT INTO db_workflow.tagged_signals (signal_id, tags, signal_name, body, ray_id, create_ts) + VALUES ($1, $2, $3, $4, $5, $6) + RETURNING 1 + ), + send_event AS ( + INSERT INTO db_workflow.workflow_signal_send_events( + workflow_id, location, signal_id, signal_name, body + ) + VALUES($7, $8, $1, $3, $4) + RETURNING 1 ) - VALUES($7, $8, $1, $3, $4) - RETURNING 1 - ) - SELECT 1 - ", - )) - .bind(signal_id) - .bind(tags) - .bind(signal_name) - .bind(body) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .bind(from_workflow_id) - .bind(location.iter().map(|x| *x as i64).collect::>()) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + SELECT 1 + ", + )) + .bind(signal_id) + .bind(tags) + .bind(signal_name) + .bind(&body) + .bind(ray_id) + .bind(rivet_util::timestamp::now()) + .bind(from_workflow_id) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -668,37 +733,40 @@ impl Database for DatabasePostgres { tags: Option<&serde_json::Value>, input: serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " - WITH - workflow AS ( - INSERT INTO db_workflow.workflows ( - workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate - ) - VALUES ($8, $2, $3, $4, $5, $6, true) - RETURNING 1 - ), - sub_workflow AS ( - INSERT INTO db_workflow.workflow_sub_workflow_events( - workflow_id, location, sub_workflow_id, create_ts + self.query(|| async { + sqlx::query(indoc!( + " + WITH + workflow AS ( + INSERT INTO db_workflow.workflows ( + workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate + ) + VALUES ($8, $2, $3, $4, $5, $6, true) + RETURNING 1 + ), + sub_workflow AS ( + INSERT INTO db_workflow.workflow_sub_workflow_events( + workflow_id, location, sub_workflow_id, create_ts + ) + VALUES($1, $7, $8, $3) + RETURNING 1 ) - VALUES($1, $7, $8, $3) - RETURNING 1 - ) - SELECT 1 - ", - )) - .bind(workflow_id) - .bind(sub_workflow_name) - .bind(rivet_util::timestamp::now()) - .bind(ray_id) - .bind(tags) - .bind(input) - .bind(location.iter().map(|x| *x as i64).collect::>()) - .bind(sub_workflow_id) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + SELECT 1 + ", + )) + .bind(workflow_id) + .bind(sub_workflow_name) + .bind(rivet_util::timestamp::now()) + .bind(ray_id) + .bind(tags) + .bind(&input) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(sub_workflow_id) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } @@ -736,23 +804,26 @@ impl Database for DatabasePostgres { message_name: &str, body: serde_json::Value, ) -> WorkflowResult<()> { - sqlx::query(indoc!( - " - INSERT INTO db_workflow.workflow_message_send_events( - workflow_id, location, tags, message_name, body - ) - VALUES($1, $2, $3, $4, $5) - RETURNING 1 - ", - )) - .bind(from_workflow_id) - .bind(location.iter().map(|x| *x as i64).collect::>()) - .bind(tags) - .bind(message_name) - .bind(body) - .execute(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + self.query(|| async { + sqlx::query(indoc!( + " + INSERT INTO db_workflow.workflow_message_send_events( + workflow_id, location, tags, message_name, body + ) + VALUES($1, $2, $3, $4, $5) + RETURNING 1 + ", + )) + .bind(from_workflow_id) + .bind(location.iter().map(|x| *x as i64).collect::>()) + .bind(tags) + .bind(message_name) + .bind(&body) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx) + }) + .await?; Ok(()) } diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index 92420a1094..4983d96169 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -27,8 +27,8 @@ pub enum WorkflowError { #[error("workflow not found")] WorkflowNotFound, - #[error("history diverged")] - HistoryDiverged, + #[error("history diverged: {0}")] + HistoryDiverged(String), #[error("serialize workflow input: {0}")] SerializeWorkflowInput(serde_json::Error), @@ -105,6 +105,9 @@ pub enum WorkflowError { #[error("sql: {0}")] Sqlx(sqlx::Error), + #[error("max sql retries")] + MaxSqlRetries, + #[error("pools: {0}")] Pools(#[from] rivet_pools::Error), diff --git a/lib/chirp-workflow/core/src/event.rs b/lib/chirp-workflow/core/src/event.rs index 8d5c9e8a8d..ec894efd53 100644 --- a/lib/chirp-workflow/core/src/event.rs +++ b/lib/chirp-workflow/core/src/event.rs @@ -20,6 +20,19 @@ pub enum Event { Branch, } +impl std::fmt::Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Event::Activity(activity) => write!(f, "activity {:?}", activity.activity_id.name), + Event::Signal(signal) => write!(f, "signal {:?}", signal.name), + Event::SignalSend(signal_send) => write!(f, "signal send {:?}", signal_send.name), + Event::MessageSend(message_send) => write!(f, "message send {:?}", message_send.name), + Event::SubWorkflow(sub_workflow) => write!(f, "sub workflow {:?}", sub_workflow.name), + Event::Branch => write!(f, "branch"), + } + } +} + #[derive(Debug)] pub struct ActivityEvent { pub activity_id: ActivityId, @@ -108,7 +121,7 @@ impl TryFrom for MessageSendEvent { #[derive(Debug)] pub struct SubWorkflowEvent { pub sub_workflow_id: Uuid, - pub sub_workflow_name: String, + pub name: String, } impl TryFrom for SubWorkflowEvent { @@ -117,7 +130,7 @@ impl TryFrom for SubWorkflowEvent { fn try_from(value: SubWorkflowEventRow) -> WorkflowResult { Ok(SubWorkflowEvent { sub_workflow_id: value.sub_workflow_id, - sub_workflow_name: value.sub_workflow_name, + name: value.sub_workflow_name, }) } }