From d524e3c6d05b0c89c04c16a65cfa79cdede39f69 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 10 Oct 2024 02:22:22 +0000 Subject: [PATCH] fix(workflows): get workflows working again with new history management --- docs/libraries/workflow/GOTCHAS.md | 23 +- infra/tf/k8s_infra/cockroachdb.tf | 2 +- lib/bolt/core/src/tasks/wf/mod.rs | 26 +- .../core/src/builder/workflow/sub_workflow.rs | 2 +- lib/chirp-workflow/core/src/compat.rs | 4 +- lib/chirp-workflow/core/src/ctx/api.rs | 14 +- lib/chirp-workflow/core/src/ctx/backfill.rs | 4 - lib/chirp-workflow/core/src/ctx/listen.rs | 4 + lib/chirp-workflow/core/src/ctx/message.rs | 64 ++-- lib/chirp-workflow/core/src/ctx/standalone.rs | 14 +- lib/chirp-workflow/core/src/ctx/test.rs | 16 +- .../core/src/ctx/versioned_workflow.rs | 14 +- lib/chirp-workflow/core/src/ctx/workflow.rs | 143 +++++--- lib/chirp-workflow/core/src/db/mod.rs | 2 +- lib/chirp-workflow/core/src/db/pg_nats.rs | 323 ++++++++++-------- lib/chirp-workflow/core/src/error.rs | 22 +- lib/chirp-workflow/core/src/history/cursor.rs | 304 +++++++++++------ lib/chirp-workflow/core/src/history/event.rs | 18 +- .../core/src/history/location.rs | 12 + .../core/src/history/removed.rs | 2 +- lib/chirp-workflow/core/src/message.rs | 53 ++- lib/chirp-workflow/core/src/utils.rs | 6 + lib/util/core/src/serde.rs | 187 +++++----- .../admin/src/route/clusters/datacenters.rs | 7 +- svc/api/admin/src/route/clusters/mod.rs | 4 +- svc/api/servers/src/route/servers.rs | 12 +- .../cluster/src/workflows/datacenter/mod.rs | 5 +- .../standalone/workflow-worker/src/lib.rs | 2 +- .../20241007201625_versioning.up.sql | 26 +- .../20241007222710_versioning_pt2.up.sql | 20 ++ 30 files changed, 784 insertions(+), 551 deletions(-) diff --git a/docs/libraries/workflow/GOTCHAS.md b/docs/libraries/workflow/GOTCHAS.md index 768c5caf00..3891d1f2de 100644 --- a/docs/libraries/workflow/GOTCHAS.md +++ b/docs/libraries/workflow/GOTCHAS.md @@ -78,27 +78,10 @@ futures_util::stream::iter(iter) .await?; ``` -If you plan on running more than one workflow step in each future, use a closure instead: +If you plan on running more than one workflow step in each future, use a `closure` stub. -```rust -let iter = actions.into_iter().map(|action| { - ( - closure(|ctx| async move { - ctx.activity(MyActivityInput { - action, - }).await?; - }).await - )(ctx) -}); - -futures_util::stream::iter(iter) - .buffer_unordered(16) - .try_collect::>() - .await?; -``` - -Note that the first example would also work with a branch, but its a bit overkill as it creates a new layer in -the internal location. +Note that the first example would also work with a `closure`, but its a bit overkill as it creates a new layer +in the internal location. > **\*** Even if they did know about each other via atomics, there is no guarantee of consistency from > `buffer_unordered`. Preemptively incrementing the location ensures consistency regardless of the order or diff --git a/infra/tf/k8s_infra/cockroachdb.tf b/infra/tf/k8s_infra/cockroachdb.tf index 8445883525..af1175acdb 100644 --- a/infra/tf/k8s_infra/cockroachdb.tf +++ b/infra/tf/k8s_infra/cockroachdb.tf @@ -33,7 +33,7 @@ resource "helm_release" "cockroachdb" { namespace = kubernetes_namespace.cockroachdb[0].metadata.0.name repository = "https://charts.cockroachdb.com/" chart = "cockroachdb" - version = "11.1.5" # v23.1.9 + version = "14.0.4" # v24.2.3 values = [yamlencode({ statefulset = { replicas = local.service_cockroachdb.count diff --git a/lib/bolt/core/src/tasks/wf/mod.rs b/lib/bolt/core/src/tasks/wf/mod.rs index 3ac96bdb9a..1ebcfe8ee4 100644 --- a/lib/bolt/core/src/tasks/wf/mod.rs +++ b/lib/bolt/core/src/tasks/wf/mod.rs @@ -450,21 +450,17 @@ pub async fn print_history( println!( "{} {}", - style(workflow_name).yellow().bold(), - style(workflow_id).yellow() + style(workflow_name).blue().bold(), + style(workflow_id).blue() ); - print!( - " {} {}", - style("╰").yellow().dim(), - style("input").yellow() - ); + print!(" {} {}", style("╰").blue().dim(), style("input").blue()); let input = serde_json::to_string(&input)?; let input_trim = input.chars().take(50).collect::(); - print!(" {}", style(input_trim).yellow().dim()); + print!(" {}", style(input_trim).blue().dim()); if input.len() > 50 { - print!(" {}", style("...").yellow().dim()); + print!(" {}", style("...").blue().dim()); } println!("\n"); @@ -648,19 +644,15 @@ pub async fn print_history( // Print footer if let Some(output) = output { - println!("{}", style("Workflow complete").yellow().bold()); + println!("{}", style("Workflow complete").green().bold()); - print!( - " {} {}", - style("╰").yellow().dim(), - style("output").yellow() - ); + print!(" {} {}", style("╰").green().dim(), style("output").green()); let output = serde_json::to_string(&output)?; let output_trim = output.chars().take(50).collect::(); - print!(" {}", style(output_trim).yellow().dim()); + print!(" {}", style(output_trim).green().dim()); if output.len() > 50 { - print!(" {}", style("...").yellow().dim()); + print!(" {}", style("...").green().dim()); } println!(""); diff --git a/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs b/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs index 2d3cbe5cbf..6ef80e141e 100644 --- a/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs +++ b/lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs @@ -111,7 +111,7 @@ where .map_err(GlobalError::raw)?; let mut branch = self .ctx - .branch_inner(Arc::new(input_val), self.version, None) + .custom_branch(Arc::new(input_val), self.version) .await .map_err(GlobalError::raw)?; diff --git a/lib/chirp-workflow/core/src/compat.rs b/lib/chirp-workflow/core/src/compat.rs index f99faf4b18..7e88b14621 100644 --- a/lib/chirp-workflow/core/src/compat.rs +++ b/lib/chirp-workflow/core/src/compat.rs @@ -13,7 +13,7 @@ use crate::{ message::{MessageCtx, SubscriptionHandle}, }, db::{DatabaseHandle, DatabasePgNats}, - message::Message, + message::{AsTags, Message}, operation::{Operation, OperationInput}, signal::Signal, workflow::{Workflow, WorkflowInput}, @@ -91,7 +91,7 @@ where pub async fn subscribe( ctx: &rivet_operation::OperationContext, - tags: &serde_json::Value, + tags: impl AsTags, ) -> GlobalResult> where M: Message, diff --git a/lib/chirp-workflow/core/src/ctx/api.rs b/lib/chirp-workflow/core/src/ctx/api.rs index 362134abfd..842a2e1689 100644 --- a/lib/chirp-workflow/core/src/ctx/api.rs +++ b/lib/chirp-workflow/core/src/ctx/api.rs @@ -11,7 +11,7 @@ use crate::{ }, db::DatabaseHandle, error::WorkflowResult, - message::{Message, NatsMessage}, + message::{AsTags, Message, NatsMessage}, operation::{Operation, OperationInput}, signal::Signal, workflow::{Workflow, WorkflowInput}, @@ -117,10 +117,7 @@ impl ApiCtx { builder::message::MessageBuilder::new(&self.msg_ctx, body) } - pub async fn subscribe( - &self, - tags: &serde_json::Value, - ) -> GlobalResult> + pub async fn subscribe(&self, tags: impl AsTags) -> GlobalResult> where M: Message, { @@ -130,10 +127,7 @@ impl ApiCtx { .map_err(GlobalError::raw) } - pub async fn tail_read( - &self, - tags: serde_json::Value, - ) -> GlobalResult>> + pub async fn tail_read(&self, tags: impl AsTags) -> GlobalResult>> where M: Message, { @@ -145,7 +139,7 @@ impl ApiCtx { pub async fn tail_anchor( &self, - tags: serde_json::Value, + tags: impl AsTags, anchor: &TailAnchor, ) -> GlobalResult> where diff --git a/lib/chirp-workflow/core/src/ctx/backfill.rs b/lib/chirp-workflow/core/src/ctx/backfill.rs index a1953b837a..34d971118e 100644 --- a/lib/chirp-workflow/core/src/ctx/backfill.rs +++ b/lib/chirp-workflow/core/src/ctx/backfill.rs @@ -123,10 +123,6 @@ impl WorkflowBackfillCtx { branch } - // TODO: - // pub fn set_location(&mut self, location: &Location) { - // } - pub fn finalize(&mut self) { let wake_immediate = true; diff --git a/lib/chirp-workflow/core/src/ctx/listen.rs b/lib/chirp-workflow/core/src/ctx/listen.rs index dd825d6514..63a2c10ae9 100644 --- a/lib/chirp-workflow/core/src/ctx/listen.rs +++ b/lib/chirp-workflow/core/src/ctx/listen.rs @@ -22,6 +22,10 @@ impl<'a> ListenCtx<'a> { } } + pub(crate) fn reset(&mut self) { + self.used = false; + } + /// Checks for a signal to this workflow with any of the given signal names. /// - Will error if called more than once. pub async fn listen_any( diff --git a/lib/chirp-workflow/core/src/ctx/message.rs b/lib/chirp-workflow/core/src/ctx/message.rs index e234d440b5..73f673c78d 100644 --- a/lib/chirp-workflow/core/src/ctx/message.rs +++ b/lib/chirp-workflow/core/src/ctx/message.rs @@ -13,7 +13,7 @@ use uuid::Uuid; use crate::{ error::{WorkflowError, WorkflowResult}, - message::{redis_keys, Message, NatsMessage, NatsMessageWrapper}, + message::{redis_keys, AsTags, Message, NatsMessage, NatsMessageWrapper}, utils, }; @@ -56,7 +56,11 @@ impl MessageCtx { /// service should need to wait or fail if a message does not publish /// successfully. #[tracing::instrument(err, skip_all, fields(message = M::NAME))] - pub async fn message(&self, tags: serde_json::Value, message_body: M) -> WorkflowResult<()> + pub async fn message( + &self, + tags: impl AsTags + 'static, + message_body: M, + ) -> WorkflowResult<()> where M: Message, { @@ -87,15 +91,11 @@ impl MessageCtx { /// messages at once so we put the messages in a queue instead of submitting /// a large number of tasks to Tokio at once. #[tracing::instrument(err, skip_all, fields(message = M::NAME))] - pub async fn message_wait( - &self, - tags: serde_json::Value, - message_body: M, - ) -> WorkflowResult<()> + pub async fn message_wait(&self, tags: impl AsTags, message_body: M) -> WorkflowResult<()> where M: Message, { - let tags_str = cjson::to_string(&tags).map_err(WorkflowError::SerializeMessageTags)?; + let tags_str = tags.as_cjson_tags()?; let nats_subject = M::nats_subject(); let duration_since_epoch = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) @@ -114,30 +114,18 @@ impl MessageCtx { let message = NatsMessageWrapper { req_id: req_id, ray_id: self.ray_id, - tags, + tags: tags.as_tags()?, ts, - allow_recursive: false, // TODO: body: &body_buf, }; let message_buf = serde_json::to_vec(&message).map_err(WorkflowError::SerializeMessage)?; - // TODO: opts.dont_log_body - if true { - tracing::info!( - %nats_subject, - body_bytes = ?body_buf_len, - message_bytes = ?message_buf.len(), - "publish message" - ); - } else { - tracing::info!( - %nats_subject, - ?message_body, - body_bytes = ?body_buf_len, - message_bytes = ?message_buf.len(), - "publish message" - ); - } + tracing::info!( + %nats_subject, + body_bytes = ?body_buf_len, + message_bytes = ?message_buf.len(), + "publish message" + ); // Write to Redis and NATS. // @@ -241,15 +229,12 @@ impl MessageCtx { impl MessageCtx { /// Listens for Chirp workflow messages globally on NATS. #[tracing::instrument(level = "debug", err, skip_all)] - pub async fn subscribe( - &self, - tags: &serde_json::Value, - ) -> WorkflowResult> + pub async fn subscribe(&self, tags: impl AsTags) -> WorkflowResult> where M: Message, { self.subscribe_opt::(SubscribeOpts { - tags, + tags: tags.as_tags()?, flush_nats: true, }) .await @@ -259,7 +244,7 @@ impl MessageCtx { #[tracing::instrument(err, skip_all, fields(message = M::NAME))] pub async fn subscribe_opt( &self, - opts: SubscribeOpts<'_>, + opts: SubscribeOpts, ) -> WorkflowResult> where M: Message, @@ -287,17 +272,14 @@ impl MessageCtx { /// Reads the tail message of a stream without waiting for a message. #[tracing::instrument(err, skip_all, fields(message = M::NAME))] - pub async fn tail_read( - &self, - tags: serde_json::Value, - ) -> WorkflowResult>> + pub async fn tail_read(&self, tags: impl AsTags) -> WorkflowResult>> where M: Message, { let mut conn = self.redis_chirp_ephemeral.clone(); // Fetch message - let tags_str = cjson::to_string(&tags).map_err(WorkflowError::SerializeMessageTags)?; + let tags_str = tags.as_cjson_tags()?; let tail_key = redis_keys::message_tail::(&tags_str); let message_buf = conn .hget::<_, _, Option>>(&tail_key, redis_keys::message_tail::BODY) @@ -341,7 +323,7 @@ impl MessageCtx { #[tracing::instrument(err, skip_all, fields(message = M::NAME))] pub async fn tail_anchor( &self, - tags: serde_json::Value, + tags: impl AsTags, anchor: &TailAnchor, ) -> WorkflowResult> where @@ -379,8 +361,8 @@ impl MessageCtx { } #[derive(Debug)] -pub struct SubscribeOpts<'a> { - pub tags: &'a serde_json::Value, +pub struct SubscribeOpts { + pub tags: serde_json::Value, pub flush_nats: bool, } diff --git a/lib/chirp-workflow/core/src/ctx/standalone.rs b/lib/chirp-workflow/core/src/ctx/standalone.rs index 9debc9ceff..65ed00b63a 100644 --- a/lib/chirp-workflow/core/src/ctx/standalone.rs +++ b/lib/chirp-workflow/core/src/ctx/standalone.rs @@ -12,7 +12,7 @@ use crate::{ }, db::DatabaseHandle, error::WorkflowResult, - message::{Message, NatsMessage}, + message::{AsTags, Message, NatsMessage}, operation::{Operation, OperationInput}, signal::Signal, workflow::{Workflow, WorkflowInput}, @@ -120,10 +120,7 @@ impl StandaloneCtx { builder::message::MessageBuilder::new(&self.msg_ctx, body) } - pub async fn subscribe( - &self, - tags: &serde_json::Value, - ) -> GlobalResult> + pub async fn subscribe(&self, tags: impl AsTags) -> GlobalResult> where M: Message, { @@ -133,10 +130,7 @@ impl StandaloneCtx { .map_err(GlobalError::raw) } - pub async fn tail_read( - &self, - tags: serde_json::Value, - ) -> GlobalResult>> + pub async fn tail_read(&self, tags: impl AsTags) -> GlobalResult>> where M: Message, { @@ -148,7 +142,7 @@ impl StandaloneCtx { pub async fn tail_anchor( &self, - tags: serde_json::Value, + tags: impl AsTags, anchor: &TailAnchor, ) -> GlobalResult> where diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index 2fe831774f..223865c535 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -13,7 +13,7 @@ use crate::{ }, db::{DatabaseHandle, DatabasePgNats}, error::WorkflowError, - message::{Message, NatsMessage}, + message::{AsTags, Message, NatsMessage}, operation::{Operation, OperationInput}, signal::Signal, utils, @@ -143,10 +143,7 @@ impl TestCtx { builder::message::MessageBuilder::new(&self.msg_ctx, body) } - pub async fn subscribe( - &self, - tags: &serde_json::Value, - ) -> GlobalResult> + pub async fn subscribe(&self, tags: impl AsTags) -> GlobalResult> where M: Message, { @@ -156,10 +153,7 @@ impl TestCtx { .map_err(GlobalError::raw) } - pub async fn tail_read( - &self, - tags: serde_json::Value, - ) -> GlobalResult>> + pub async fn tail_read(&self, tags: impl AsTags) -> GlobalResult>> where M: Message, { @@ -169,9 +163,9 @@ impl TestCtx { .map_err(GlobalError::raw) } - pub async fn tail_anchor( + pub async fn tail_anchor( &self, - tags: serde_json::Value, + tags: impl AsTags, anchor: &TailAnchor, ) -> GlobalResult> where diff --git a/lib/chirp-workflow/core/src/ctx/versioned_workflow.rs b/lib/chirp-workflow/core/src/ctx/versioned_workflow.rs index 9ed8104e46..31c4f0f641 100644 --- a/lib/chirp-workflow/core/src/ctx/versioned_workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/versioned_workflow.rs @@ -24,7 +24,7 @@ macro_rules! wrap { let old_version = $self.inner.version(); - $self.inner.set_version($self.version); + $self.inner.set_version($self.version()); let res = $code; $self.inner.set_version(old_version); @@ -47,6 +47,10 @@ impl<'a> VersionedWorkflowCtx<'a> { self.inner } + pub fn version(&self) -> usize { + self.inner.version() + self.version - 1 + } + /// Creates a workflow ctx reference with a given version. pub fn v(&'a mut self, version: usize) -> VersionedWorkflowCtx<'a> { VersionedWorkflowCtx { @@ -71,7 +75,7 @@ impl<'a> VersionedWorkflowCtx<'a> { I: WorkflowInput, ::Workflow: Workflow, { - builder::sub_workflow::SubWorkflowBuilder::new(self.inner, self.version, input) + builder::sub_workflow::SubWorkflowBuilder::new(self.inner, self.version(), input) } /// Run activity. Will replay on failure. @@ -94,7 +98,7 @@ impl<'a> VersionedWorkflowCtx<'a> { /// Creates a signal builder. pub fn signal(&mut self, body: T) -> builder::signal::SignalBuilder { - builder::signal::SignalBuilder::new(self.inner, self.version, body) + builder::signal::SignalBuilder::new(self.inner, self.version(), body) } /// Listens for a signal for a short time before setting the workflow to sleep. Once the signal is @@ -118,7 +122,7 @@ impl<'a> VersionedWorkflowCtx<'a> { where M: Message, { - builder::message::MessageBuilder::new(self.inner, self.version, body) + builder::message::MessageBuilder::new(self.inner, self.version(), body) } /// Runs workflow steps in a loop. **Ensure that there are no side effects caused by the code in this @@ -149,7 +153,7 @@ impl<'a> VersionedWorkflowCtx<'a> { } } -// impl<'a> Deref for VersionedWorkflowCtx<'a> { +// impl<'a> std::ops::Deref for VersionedWorkflowCtx<'a> { // type Target = WorkflowCtx; // fn deref(&self) -> &Self::Target { diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 65d18dee81..6006bb63ea 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -18,7 +18,7 @@ use crate::{ history::{ cursor::{Cursor, HistoryResult}, event::{EventId, SleepState}, - location::Location, + location::{Coordinate, Location}, removed::Removed, History, }, @@ -120,8 +120,10 @@ impl WorkflowCtx { ) -> WorkflowResult<()> { if version < self.version { Err(WorkflowError::HistoryDiverged(format!( - "version of {step} is less than that of the current context (v{} < v{})", - version, self.version + "version of {step} at {} is less than that of the current context (v{} < v{})", + version, + self.cursor.current_location(), + self.version, ))) } else { Ok(()) @@ -356,39 +358,39 @@ impl WorkflowCtx { /// - **Not to be used directly by workflow users. For implementation uses only.** /// - **Remember to validate history after this branch is used.** pub(crate) async fn branch(&mut self) -> WorkflowResult { - self.branch_inner(self.input.clone(), self.version, None) - .await + self.custom_branch(self.input.clone(), self.version).await } - pub(crate) async fn branch_inner( + pub(crate) async fn custom_branch( &mut self, input: Arc>, version: usize, - // We allow providing a custom location in the case of loops, which take the spot of the branch event. - custom_location: Option, ) -> WorkflowResult { - let location = if let Some(location) = custom_location { - location - } else { - let history_res = self.cursor.compare_branch(version)?; - let location = self.cursor.current_location_for(&history_res); + let history_res = self.cursor.compare_branch(version)?; + let location = self.cursor.current_location_for(&history_res); - // Validate history is consistent - if !matches!(history_res, HistoryResult::Event(_)) { - self.db - .commit_workflow_branch_event( - self.workflow_id, - &location, - self.version, - self.loop_location.as_ref(), - ) - .await?; - } + // Validate history is consistent + if !matches!(history_res, HistoryResult::Event(_)) { + self.db + .commit_workflow_branch_event( + self.workflow_id, + &location, + self.version, + self.loop_location.as_ref(), + ) + .await?; + } - location - }; + Ok(self.branch_inner(input, version, location)) + } - Ok(WorkflowCtx { + pub(crate) fn branch_inner( + &mut self, + input: Arc>, + version: usize, + location: Location, + ) -> WorkflowCtx { + WorkflowCtx { workflow_id: self.workflow_id, name: self.name.clone(), create_ts: self.create_ts, @@ -408,7 +410,7 @@ impl WorkflowCtx { loop_location: self.loop_location.clone(), msg_ctx: self.msg_ctx.clone(), - }) + } } /// Like `branch` but it does not add another layer of depth. @@ -632,6 +634,7 @@ impl WorkflowCtx { loop { interval.tick().await; + ctx.reset(); match T::listen(&mut ctx).await { Ok(res) => break res, @@ -686,6 +689,7 @@ impl WorkflowCtx { loop { interval.tick().await; + ctx.reset(); match listener.listen(&mut ctx).await { Ok(res) => break res, @@ -730,6 +734,7 @@ impl WorkflowCtx { // // Listen for new message // else { // let mut ctx = ListenCtx::new(self); + // ctx.reset(); // match T::listen(&mut ctx).await { // Ok(res) => Some(res), @@ -766,24 +771,29 @@ impl WorkflowCtx { let loop_location = self.cursor.current_location_for(&history_res); // Loop existed before - let (iteration, output) = if let HistoryResult::Event(loop_event) = history_res { + let (mut iteration, output) = if let HistoryResult::Event(loop_event) = history_res { let output = loop_event.parse_output().map_err(GlobalError::raw)?; (loop_event.iteration, output) } else { + // Insert event before loop is run so the history is consistent + self.db + .upsert_workflow_loop_event( + self.workflow_id, + &loop_location, + self.version, + 0, + None, + self.loop_location(), + ) + .await?; + (0, None) }; - let mut loop_branch = self - .branch_inner( - self.input.clone(), - self.version, - Some(loop_location.clone()), - ) - .await - .map_err(GlobalError::raw)?; - // Shift by iteration count - loop_branch.cursor.set_idx(iteration); + // Create a branch but no branch event (loop event takes its place) + let mut loop_branch = + self.branch_inner(self.input.clone(), self.version, loop_location.clone()); // Loop complete let output = if let Some(output) = output { @@ -796,26 +806,46 @@ impl WorkflowCtx { tracing::info!(name=%self.name, id=%self.workflow_id, "running loop"); loop { - // HACK: We have to temporarily set the loop location to the current loop so that the branch - // event created in `WorkflowCtx::branch` has the correct loop location. - let old_loop_location = loop_branch.loop_location.replace(loop_location.clone()); - let mut iteration_branch = loop_branch.branch().await.map_err(GlobalError::raw)?; - - // Set back to previous loop location - loop_branch.loop_location = old_loop_location; + // Create a new branch for each iteration of the loop at location {...loop location, iteration idx} + let mut iteration_branch = loop_branch.branch_inner( + self.input.clone(), + self.version, + loop_branch + .cursor + .root() + .join(Coordinate::simple(iteration + 1)), + ); // Set branch loop location to the current loop iteration_branch.loop_location = Some(loop_location.clone()); + // Insert event if iteration is not a replay + if !loop_branch + .cursor + .compare_loop_branch(iteration) + .map_err(GlobalError::raw)? + { + self.db + .commit_workflow_branch_event( + self.workflow_id, + iteration_branch.cursor.root(), + self.version, + Some(&loop_location), + ) + .await?; + } + // Run loop match cb(&mut iteration_branch).await? { Loop::Continue => { + iteration += 1; + self.db - .upsert_loop( + .upsert_workflow_loop_event( self.workflow_id, &loop_location, self.version, - loop_branch.cursor.iter_idx(), + iteration, None, self.loop_location(), ) @@ -826,21 +856,20 @@ impl WorkflowCtx { .cursor .check_clear() .map_err(GlobalError::raw)?; - - // Move to next event - self.cursor.update(iteration_branch.cursor().root()); } Loop::Break(res) => { + iteration += 1; + let output_val = serde_json::value::to_raw_value(&res) .map_err(WorkflowError::SerializeLoopOutput) .map_err(GlobalError::raw)?; self.db - .upsert_loop( + .upsert_workflow_loop_event( self.workflow_id, &loop_location, self.version, - loop_branch.cursor.iter_idx(), + iteration, Some(&output_val), self.loop_location(), ) @@ -852,15 +881,15 @@ impl WorkflowCtx { .check_clear() .map_err(GlobalError::raw)?; - // Move to next event - self.cursor.update(iteration_branch.cursor().root()); - break res; } } } }; + // Move to next event + self.cursor.update(&loop_location); + Ok(output) } @@ -1045,6 +1074,7 @@ impl WorkflowCtx { loop { interval.tick().await; + ctx.reset(); match T::listen(&mut ctx).await { // Retry @@ -1077,6 +1107,7 @@ impl WorkflowCtx { loop { interval.tick().await; + ctx.reset(); match T::listen(&mut ctx).await { Ok(res) => break Some(res), diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 2e2fded5a2..ad4f58a945 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -179,7 +179,7 @@ pub trait Database: Send { ) -> WorkflowResult<()>; /// Updates a loop event in history and forgets all history items in the previous iteration. - async fn upsert_loop( + async fn upsert_workflow_loop_event( &self, workflow_id: Uuid, location: &Location, diff --git a/lib/chirp-workflow/core/src/db/pg_nats.rs b/lib/chirp-workflow/core/src/db/pg_nats.rs index 840c0afa4d..82ece16b5b 100644 --- a/lib/chirp-workflow/core/src/db/pg_nats.rs +++ b/lib/chirp-workflow/core/src/db/pg_nats.rs @@ -23,7 +23,7 @@ use crate::{ /// Max amount of workflows pulled from the database with each call to `pull_workflows`. const MAX_PULLED_WORKFLOWS: i64 = 50; // Base retry for query retry backoff -const QUERY_RETRY_MS: usize = 750; +const QUERY_RETRY_MS: usize = 500; // Time in between transaction retries const TXN_RETRY: Duration = Duration::from_millis(100); /// Maximum times a query ran by this database adapter is retried. @@ -83,7 +83,7 @@ impl DatabasePgNats { Fut: std::future::Future> + 'a, T: 'a, { - let mut backoff = rivet_util::Backoff::new(3, None, QUERY_RETRY_MS, 50); + let mut backoff = rivet_util::Backoff::new(4, None, QUERY_RETRY_MS, 50); let mut i = 0; loop { @@ -331,39 +331,41 @@ impl Database for DatabasePgNats { .map(|row| row.workflow_id) .collect::>(); - // TODO: Convert into union query // Fetch all events for all fetched workflows let events = sqlx::query_as::<_, AmalgamEventRow>(indoc!( " -- Activity events SELECT - ev.workflow_id, - ev.location, - ev.location2, + workflow_id, + location, + location2, + version, 0 AS event_type, -- EventType - ev.activity_name AS name, + activity_name AS name, NULL AS auxiliary_id, - ev.input_hash AS hash, - ev.output AS output, - ev.create_ts AS create_ts, - COUNT(err.*) AS error_count, + input_hash AS hash, + output AS output, + create_ts AS create_ts, + ( + SELECT COUNT(*) + FROM db_workflow.workflow_activity_errors AS err + WHERE + ev.workflow_id = err.workflow_id AND + ev.location2 = err.location2 + ) AS error_count, NULL AS iteration, NULL AS deadline_ts, NULL AS state, NULL AS inner_event_type FROM db_workflow.workflow_activity_events AS ev - LEFT JOIN db_workflow.workflow_activity_errors AS err - ON - ev.workflow_id = err.workflow_id AND - ev.location2_hash = err.location2_hash WHERE ev.workflow_id = ANY($1) AND forgotten = FALSE - GROUP BY ev.workflow_id, ev.location2 UNION ALL -- Signal listen events SELECT workflow_id, location, location2, + version, 1 AS event_type, -- EventType signal_name AS name, NULL AS auxiliary_id, @@ -383,6 +385,7 @@ impl Database for DatabasePgNats { workflow_id, location, location2, + version, 2 AS event_type, -- EventType signal_name AS name, signal_id AS auxiliary_id, @@ -402,6 +405,7 @@ impl Database for DatabasePgNats { workflow_id, location, location2, + version, 3 AS event_type, -- EventType message_name AS name, NULL AS auxiliary_id, @@ -421,6 +425,7 @@ impl Database for DatabasePgNats { sw.workflow_id, sw.location, sw.location2, + version, 4 AS event_type, -- pg_nats::types::EventType w.workflow_name AS name, sw.sub_workflow_id AS auxiliary_id, @@ -442,6 +447,7 @@ impl Database for DatabasePgNats { workflow_id, location, location2, + version, 5 AS event_type, -- pg_nats::types::EventType NULL AS name, NULL AS auxiliary_id, @@ -461,6 +467,7 @@ impl Database for DatabasePgNats { workflow_id, location, location2, + version, 6 AS event_type, -- pg_nats::types::EventType NULL AS name, NULL AS auxiliary_id, @@ -474,11 +481,13 @@ impl Database for DatabasePgNats { NULL AS inner_event_type FROM db_workflow.workflow_sleep_events WHERE workflow_id = ANY($1) AND forgotten = FALSE + UNION ALL -- Branch events SELECT workflow_id, ARRAY[] AS location, location AS location2, + version, 7 AS event_type, -- pg_nats::types::EventType NULL AS name, NULL AS auxiliary_id, @@ -490,13 +499,15 @@ impl Database for DatabasePgNats { NULL AS deadline_ts, NULL AS state, NULL AS inner_event_type - FROM db_workflow.workflow_sleep_events + FROM db_workflow.workflow_branch_events WHERE workflow_id = ANY($1) AND forgotten = FALSE + UNION ALL -- Removed events SELECT workflow_id, ARRAY[] AS location, location AS location2, + 1 AS version, -- Default 8 AS event_type, -- pg_nats::types::EventType event_name AS name, NULL AS auxiliary_id, @@ -508,13 +519,15 @@ impl Database for DatabasePgNats { NULL AS deadline_ts, NULL AS state, event_type AS inner_event_type - FROM db_workflow.workflow_sleep_events + FROM db_workflow.workflow_removed_events WHERE workflow_id = ANY($1) AND forgotten = FALSE + UNION ALL -- Version check events SELECT workflow_id, ARRAY[] AS location, location AS location2, + 1 AS version, -- Default 9 AS event_type, -- pg_nats::types::EventType NULL AS name, NULL AS auxiliary_id, @@ -526,10 +539,9 @@ impl Database for DatabasePgNats { NULL AS deadline_ts, NULL AS state, NULL AS inner_event_type - FROM db_workflow.workflow_sleep_events + FROM db_workflow.workflow_version_check_events WHERE workflow_id = ANY($1) AND forgotten = FALSE - -- We don't order by location2 because it is a JSONB type (probably inefficient) - ORDER BY workflow_id ASC; + ORDER BY workflow_id ASC ", )) .bind(&workflow_ids) @@ -669,7 +681,7 @@ impl Database for DatabasePgNats { loop_location2 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (workflow_id, location2) DO UPDATE + ON CONFLICT (workflow_id, location2_hash) DO UPDATE SET output = excluded.output ", )) @@ -704,7 +716,7 @@ impl Database for DatabasePgNats { loop_location2 ) VALUES ($1, $2, $3, $4, $5, $7, $8) - ON CONFLICT (workflow_id, location2) DO NOTHING + ON CONFLICT (workflow_id, location2_hash) DO NOTHING RETURNING 1 ), err AS ( @@ -1020,7 +1032,7 @@ impl Database for DatabasePgNats { INSERT INTO db_workflow.workflows ( workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate ) - VALUES ($8, $2, $3, $4, $5, $6, true) + VALUES ($9, $2, $3, $4, $5, $6, true) RETURNING 1 ), sub_workflow AS ( @@ -1115,7 +1127,7 @@ impl Database for DatabasePgNats { Ok(()) } - async fn upsert_loop( + async fn upsert_workflow_loop_event( &self, workflow_id: Uuid, location: &Location, @@ -1133,15 +1145,16 @@ impl Database for DatabasePgNats { INSERT INTO db_workflow.workflow_loop_events ( workflow_id, location2, + version, iteration, output, loop_location2 ) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (workflow_id, location2) DO UPDATE + ON CONFLICT (workflow_id, location2_hash) DO UPDATE SET - iteration = $3, - output = $4 + iteration = $4, + output = $5 RETURNING 1 ", )) @@ -1149,104 +1162,107 @@ impl Database for DatabasePgNats { .bind(&location) .bind(version as i64) .bind(iteration as i64) - .bind(sqlx::types::Json(output)) + .bind(output.map(sqlx::types::Json)) .bind(loop_location) .execute(&mut *tx) .await .map_err(WorkflowError::Sqlx)?; - sqlx::query(indoc!( - " - WITH - forget_activity_events AS ( - UPDATE db_workflow.workflow_activity_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location2_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_signal_events AS ( - UPDATE db_workflow.workflow_signal_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location2_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_sub_workflow_events AS ( - UPDATE db_workflow.workflow_sub_workflow_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location2_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_signal_send_events AS ( - UPDATE db_workflow.workflow_signal_send_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location2_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_message_send_events AS ( - UPDATE db_workflow.workflow_message_send_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location2_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_loop_events AS ( - UPDATE db_workflow.workflow_loop_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location2_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_branch_events AS ( - UPDATE db_workflow.workflow_branch_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_removed_events AS ( - UPDATE db_workflow.workflow_removed_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ), - forget_version_check_events AS ( - UPDATE db_workflow.workflow_version_check_events - SET forgotten = TRUE - WHERE - workflow_id = $1 AND - loop_location_hash = $2 AND - forgotten = FALSE - RETURNING 1 - ) - SELECT 1 - ", - )) - .bind(workflow_id) - .bind(hash_location(&location)?) - .execute(&mut *tx) - .await - .map_err(WorkflowError::Sqlx)?; + // 0-th iteration is the initial insertion + if iteration != 0 { + sqlx::query(indoc!( + " + WITH + forget_activity_events AS ( + UPDATE db_workflow.workflow_activity_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location2_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_signal_events AS ( + UPDATE db_workflow.workflow_signal_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location2_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_sub_workflow_events AS ( + UPDATE db_workflow.workflow_sub_workflow_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location2_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_signal_send_events AS ( + UPDATE db_workflow.workflow_signal_send_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location2_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_message_send_events AS ( + UPDATE db_workflow.workflow_message_send_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location2_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_loop_events AS ( + UPDATE db_workflow.workflow_loop_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location2_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_branch_events AS ( + UPDATE db_workflow.workflow_branch_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_removed_events AS ( + UPDATE db_workflow.workflow_removed_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), + forget_version_check_events AS ( + UPDATE db_workflow.workflow_version_check_events + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ) + SELECT 1 + ", + )) + .bind(workflow_id) + .bind(hash_location(&location)?) + .execute(&mut *tx) + .await + .map_err(WorkflowError::Sqlx)?; + } tx.commit().await.map_err(WorkflowError::Sqlx)?; @@ -1329,7 +1345,7 @@ impl Database for DatabasePgNats { INSERT INTO db_workflow.workflow_branch_events( workflow_id, location, version, loop_location ) - VALUES($1, $2, $3) + VALUES($1, $2, $3, $4) RETURNING 1 ", )) @@ -1474,7 +1490,7 @@ mod types { } /// Stores data for all event types in one. - #[derive(sqlx::FromRow)] + #[derive(Debug, sqlx::FromRow)] pub struct AmalgamEventRow { workflow_id: Uuid, location: Vec, @@ -1503,10 +1519,10 @@ mod types { .as_ref() .map(|x| x.tail().cloned().expect("empty location")) .unwrap_or_else(|| { - Coordinate::new(Box::new([ + Coordinate::simple( // NOTE: Add 1 because we switched from 0-based to 1-based *value.location.last().expect("empty location") as usize + 1, - ])) + ) }); let event_type = value @@ -1650,13 +1666,17 @@ mod types { } // Implements sqlx postgres types for `Location` - impl sqlx::Type for Location { - fn type_info() -> sqlx::postgres::PgTypeInfo { - ::array_type_info() + impl sqlx::Type for Location + where + DB: sqlx::Database, + serde_json::Value: sqlx::Type, + { + fn type_info() -> DB::TypeInfo { + >::type_info() } - fn compatible(ty: &sqlx::postgres::PgTypeInfo) -> bool { - ::array_compatible(ty) + fn compatible(ty: &DB::TypeInfo) -> bool { + >::compatible(ty) } } @@ -1675,16 +1695,17 @@ mod types { impl sqlx::Decode<'_, sqlx::Postgres> for Location { fn decode(value: sqlx::postgres::PgValueRef) -> Result { let value = - >> as sqlx::Decode>::decode( + ]>> as sqlx::Decode>::decode( value, )?; - Ok(value.0.into_iter().collect()) + Ok(IntoIterator::into_iter(value.0).collect()) } } + // TODO: Implement serde serialize and deserialize for `Location` /// Convert location to json as `number[][]`. - fn serialize_location(location: &Location) -> serde_json::Value { + pub fn serialize_location(location: &Location) -> serde_json::Value { serde_json::Value::Array( location .as_ref() @@ -1750,7 +1771,7 @@ mod types { .iter() .take(event_row.location.len().saturating_sub(1)) // NOTE: Add 1 because we switched from 0-based to 1-based - .map(|x| Coordinate::new(Box::new([(*x) as usize + 1]))) + .map(|x| Coordinate::simple((*x) as usize + 1)) .collect() }); @@ -1771,6 +1792,40 @@ mod types { for events in events_by_location.values_mut() { // Events are already mostly sorted themselves so this should be fairly cheap events.sort_by_key(|event| event.coordinate().clone()); + + // NOTE: The following is only for the purpose of backwards compatibility for workflows + // that were created before the branch event was formally added. + + // This if statement handles the side effect of inserting a large amount of useless + // `Empty` placeholders in loops. Because the direct children of a loop are only ever + // branches, we skip inserting `Empty` placeholders if there are only branches in the + // events list + if !events.iter().all(|e| matches!(e.data, EventData::Branch)) { + // Check for missing indexes and insert a `Empty` placeholder event for each missing + // spot. + let mut last_coord = Coordinate::simple(0); + *events = events + .drain(..) + .into_iter() + .flat_map(|event| { + let last = last_coord.head(); + let curr = event.coordinate.head(); + assert!(last <= curr, "invalid history"); + + let offset = (curr - last).saturating_sub(1); + + last_coord = event.coordinate.clone(); + + (1..=offset) + .map(move |i| Event { + coordinate: Coordinate::simple(last + i), + version: 0, + data: EventData::Empty, + }) + .chain(std::iter::once(event)) + }) + .collect(); + } } PulledWorkflow { diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index 9fc7141d95..5483a2a277 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -69,20 +69,26 @@ pub enum WorkflowError { #[error("serialize message body: {0}")] SerializeMessageBody(serde_json::Error), + #[error("deserialize message body: {0}")] + DeserializeMessageBody(serde_json::Error), + #[error("serialize message: {0}")] SerializeMessage(serde_json::Error), - #[error("decode message body: {0}")] - DeserializeMessageBody(serde_json::Error), - - #[error("decode message: {0}")] + #[error("deserialize message: {0}")] DeserializeMessage(serde_json::Error), - #[error("serialize message tags: {0:?}")] - SerializeMessageTags(cjson::Error), + #[error("cjson serialize tags: {0:?}")] + CjsonSerializeTags(cjson::Error), + + #[error("serialize tags: {0:?}")] + SerializeTags(serde_json::Error), + + #[error("deserialize tags: {0}")] + DeserializeTags(serde_json::Error), - #[error("decode message tags: {0}")] - DeserializeMessageTags(serde_json::Error), + #[error("tags must be a json object")] + InvalidTags, #[error("serialize loop output: {0}")] SerializeLoopOutput(serde_json::Error), diff --git a/lib/chirp-workflow/core/src/history/cursor.rs b/lib/chirp-workflow/core/src/history/cursor.rs index 0004010742..9686bfe322 100644 --- a/lib/chirp-workflow/core/src/history/cursor.rs +++ b/lib/chirp-workflow/core/src/history/cursor.rs @@ -1,3 +1,5 @@ +use std::cmp::Ordering; + use super::{ event::{ ActivityEvent, Event, EventData, EventId, EventType, LoopEvent, MessageSendEvent, @@ -30,18 +32,10 @@ impl Cursor { // This is the only place a coordinate of `0` can exist. It is used as a left-most bound for // coordinates; no coordinates can come before 0. - prev_coord: Coordinate::new(Box::new([0])), + prev_coord: Coordinate::simple(0), } } - pub(crate) fn iter_idx(&self) -> usize { - self.iter_idx - } - - pub(crate) fn set_idx(&mut self, iter_idx: usize) { - self.iter_idx = iter_idx; - } - pub fn current_coord(&self) -> Coordinate { self.coord_at(self.iter_idx) } @@ -68,7 +62,7 @@ impl Cursor { idx + 1 }; - Coordinate::new(Box::new([int])) + Coordinate::simple(int) } } @@ -77,11 +71,7 @@ impl Cursor { } pub fn current_location(&self) -> Location { - self.root_location - .iter() - .cloned() - .chain(std::iter::once(self.current_coord())) - .collect() + self.root_location.join(self.current_coord()) } /// Returns the current location based on the history result of a comparison. The returned location @@ -91,56 +81,62 @@ impl Cursor { let coord = match history_res { HistoryResult::Event(_) | HistoryResult::New => curr, - // Pick a location between the previous and current location based on coordinate and version + // Pick a location between the previous and current locations based on cardinality HistoryResult::Insertion => { - // The difference between these two is `historical_prev` will always come from history, - // whereas `prev` might be the last returned value from this function (not in history) - let historical_prev = if self.iter_idx == 0 { - Coordinate::new(Box::new([0])) - } else { - self.coord_at(self.iter_idx - 1) - }; let prev = &self.prev_coord; - if &historical_prev == prev { - // 1.1 vs 1.1.1 (cardinality) - if prev.cardinality() >= curr.cardinality() { - // prev + .1 + match prev.cardinality().cmp(&curr.cardinality()) { + // 1.1 vs 1.1.1 + Ordering::Less => { + // prev + .0.1 (2.3 -> 2.3.0.1) prev.into_iter() .cloned() + .chain(std::iter::once(0)) .chain(std::iter::once(1)) .collect::() - } else { - // prev + .0.1 + } + // 1.1 vs 1.2 + Ordering::Equal => { + // prev + .1 (8 -> 8.1) prev.into_iter() .cloned() - .chain(std::iter::once(0)) .chain(std::iter::once(1)) .collect::() } - } else { - // Increment tail (1.2 -> 1.3) - prev.with_tail(prev.tail() + 1) + // 1.3.1 vs 1.4 + Ordering::Greater => { + // Increment tail (1.2 -> 1.3) + prev.with_tail(prev.tail() + 1) + } } } }; - self.root_location - .iter() - .cloned() - .chain(std::iter::once(coord)) - .collect() + self.root_location.join(coord) } pub fn current_event(&self) -> Option<&Event> { if let Some(branch) = self.events.get(&self.root_location) { - branch.get(self.iter_idx) + let event = branch.get(self.iter_idx); + + // Empty events are considered `None` + if let Some(Event { + data: EventData::Empty, + .. + }) = &event + { + None + } else { + event + } } else { None } } pub(crate) fn inc(&mut self) { + self.prev_coord = self.current_coord(); + self.iter_idx += 1; } @@ -154,7 +150,7 @@ impl Cursor { // the next event. Otherwise, `Cursor::current_location_for` returned an inserted version which does // not constitute incrementing the cursor (as it only acts on history). if tail == &self.current_coord() { - self.inc(); + self.iter_idx += 1; } self.prev_coord = tail.clone(); @@ -170,8 +166,9 @@ impl Cursor { if self.iter_idx < branch.len() { let latent = branch.len() - self.iter_idx; return Err(WorkflowError::LatentHistoryFound(format!( - "expected {latent} more event{}", - if latent == 1 { "s" } else { "" } + "expected {latent} more event{} in root at {}", + if latent == 1 { "" } else { "s" }, + self.root_location, ))); }; @@ -185,22 +182,26 @@ impl Cursor { event_id: &EventId, ) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of activity {} before {event} at {} (invalid due to versions: v{} < v{})", - event_id.name, + "expected {} v{} at {}, found activity {:?} v{}", + event.data, + event.version, self.current_location(), + event_id.name, version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::Activity(activity) = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found activity {}", + "expected {} at {}, found activity {:?}", + event.data, self.current_location(), event_id.name ))); @@ -208,7 +209,7 @@ impl Cursor { if &activity.event_id != event_id { return Err(WorkflowError::HistoryDiverged(format!( - "expected activity {}#{:x} at {}, found activity {}#{:x}", + "expected activity {:?}#{:x} at {}, found activity {:?}#{:x}", activity.event_id.name, activity.event_id.input_hash, self.current_location(), @@ -230,22 +231,26 @@ impl Cursor { msg_name: &str, ) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of message send {} before {event} at {} (invalid due to versions: v{} < v{})", - msg_name, + "expected {} v{} at {}, found message send {:?} v{}", + event.data, + event.version, self.current_location(), + msg_name, version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::MessageSend(msg) = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found message send {}", + "expected {} at {}, found message send {:?}", + event.data, self.current_location(), msg_name, ))); @@ -253,7 +258,8 @@ impl Cursor { if msg.name != msg_name { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found message send {}", + "expected {} at {}, found message send {:?}", + event.data, self.current_location(), msg_name, ))); @@ -272,22 +278,26 @@ impl Cursor { signal_name: &str, ) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of signal send {} before {event} at {} (invalid due to versions: v{} < v{})", - signal_name, + "expected {} v{} at {}, found signal send {:?} v{}", + event.data, + event.version, self.current_location(), + signal_name, version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::SignalSend(signal) = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found signal send {}", + "expected {} at {}, found signal send {:?}", + event.data, self.current_location(), signal_name, ))); @@ -295,7 +305,8 @@ impl Cursor { if signal.name != signal_name { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found signal send {}", + "expected {} at {}, found signal send {:?}", + event.data, self.current_location(), signal_name, ))); @@ -314,22 +325,26 @@ impl Cursor { sub_workflow_name: &str, ) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of sub workflow {} before {event} at {} (invalid due to versions: v{} < v{})", - sub_workflow_name, + "expected {} v{} at {}, found sub workflow {:?} v{}", + event.data, + event.version, self.current_location(), + sub_workflow_name, version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::SubWorkflow(sub_workflow) = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found sub workflow {}", + "expected {} at {}, found sub workflow {:?}", + event.data, self.current_location(), sub_workflow_name, ))); @@ -337,7 +352,8 @@ impl Cursor { if sub_workflow.name != sub_workflow_name { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found sub_workflow {}", + "expected {} at {}, found sub_workflow {:?}", + event.data, self.current_location(), sub_workflow_name, ))); @@ -352,21 +368,25 @@ impl Cursor { /// Returns `Some` if the current event is a replay. pub fn compare_signal(&self, version: usize) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of signal before {event} at {} (invalid due to versions: v{} < v{})", + "expected {} v{} at {}, found signal v{}", + event.data, + event.version, self.current_location(), version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::Signal(signal) = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found signal", + "expected {} at {}, found signal", + event.data, self.current_location(), ))); }; @@ -380,21 +400,25 @@ impl Cursor { /// Returns `Some` if the current event is a replay. pub fn compare_loop(&self, version: usize) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of loop before {event} at {} (invalid due to versions: v{} < v{})", + "expected {} v{} at {}, found loop v{}", + event.data, + event.version, self.current_location(), version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::Loop(loop_event) = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found loop", + "expected {} at {}, found loop", + event.data, self.current_location(), ))); }; @@ -408,21 +432,25 @@ impl Cursor { /// Returns `Some` if the current event is a replay. pub fn compare_sleep(&self, version: usize) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of sleep before {event} at {} (invalid due to versions: v{} < v{})", + "expected {} v{} at {}, found sleep v{}", + event.data, + event.version, self.current_location(), version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::Sleep(sleep) = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found sleep", + "expected {} at {}, found sleep", + event.data, self.current_location(), ))); }; @@ -436,21 +464,25 @@ impl Cursor { /// Returns `true` if the current event is a replay. pub fn compare_branch(&self, version: usize) -> WorkflowResult> { if let Some(event) = self.current_event() { - if event.version < version { + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { return Err(WorkflowError::HistoryDiverged(format!( - "attempted insertion of branch before {event} at {} (invalid due to versions: v{} < v{})", + "expected {} v{} at {}, found branch v{}", + event.data, + event.version, self.current_location(), version, - event.version, ))); - } else if event.version > version { - return Ok(HistoryResult::Insertion); } // Validate history is consistent let EventData::Branch = &event.data else { return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found branch", + "expected {} at {}, found branch", + event.data, self.current_location(), ))); }; @@ -461,12 +493,37 @@ impl Cursor { } } + /// Returns `true` if the current event is a replay. + /// Because loops have a sparse history with potentially 0 events (after forgetting), they create branches + /// at specific locations instead of using `current_location_for`. This means the cursor cannot use + /// `current_event` to compare the history and instead we just find the correct event via coordinate. + pub fn compare_loop_branch(&self, iteration: usize) -> WorkflowResult { + let empty_vec = Vec::new(); + let branch = self.events.get(&self.root_location).unwrap_or(&empty_vec); + let coordinate = Coordinate::simple(iteration + 1); + + if let Some(event) = branch.iter().find(|x| x.coordinate == coordinate) { + // Validate history is consistent + let EventData::Branch = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found branch", + event.data, + self.current_location(), + ))); + }; + + Ok(true) + } else { + Ok(false) + } + } + /// Returns `true` if the current event is a replay. pub fn compare_removed(&self) -> WorkflowResult { if let Some(event) = self.current_event() { // Validate history is consistent let valid = if let EventData::Removed(removed) = &event.data { - removed.name.as_deref() == T::name() && removed.event_type == T::event_type() + removed.event_type == T::event_type() && removed.name.as_deref() == T::name() } else { match T::event_type() { EventType::Activity => { @@ -506,12 +563,24 @@ impl Cursor { }; if !valid { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {event} at {}, found removed {}", - self.current_location(), - T::event_type(), - ))); - }; + let msg = if let Some(name) = T::name() { + format!( + "expected {} at {}, found removed {} {name:?}", + event.data, + self.current_location(), + T::event_type(), + ) + } else { + format!( + "expected {} at {}, found removed {}", + event.data, + self.current_location(), + T::event_type(), + ) + }; + + return Err(WorkflowError::HistoryDiverged(msg)); + } Ok(true) } else { @@ -560,6 +629,31 @@ mod tests { }; } + #[test] + fn coord_with_sparse_events() { + let events = [( + loc![], + vec![ + Event { + coordinate: coord![2, 1], + version: 1, + data: EventData::VersionCheck, + }, + Event { + coordinate: coord![4], + version: 1, + data: EventData::VersionCheck, + }, + ], + )] + .into_iter() + .collect(); + let mut cursor = Cursor::new(Arc::new(events), Location::empty()); + + assert_eq!(coord![2, 1], cursor.coord_at(0)); + assert_eq!(coord![5], cursor.coord_at(2)); + } + /// Before 1 is 0.1 #[test] fn insert_before_first() { diff --git a/lib/chirp-workflow/core/src/history/event.rs b/lib/chirp-workflow/core/src/history/event.rs index 992e057595..385910e344 100644 --- a/lib/chirp-workflow/core/src/history/event.rs +++ b/lib/chirp-workflow/core/src/history/event.rs @@ -27,6 +27,12 @@ impl Event { } } +impl std::fmt::Display for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} v{} @ {}", self.data, self.version, self.coordinate) + } +} + impl Deref for Event { type Target = EventData; @@ -46,13 +52,16 @@ pub enum EventData { Sleep(SleepEvent), Removed(RemovedEvent), VersionCheck, - // Used as a placeholder for branching locations Branch, + + /// NOTE: Strictly used as a placeholder for backfilling. When using this, the coordinate of the `Event` + /// must still be valid. + Empty, } -impl std::fmt::Display for Event { +impl std::fmt::Display for EventData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match &self.data { + match &self { EventData::Activity(activity) => write!(f, "activity {:?}", activity.event_id.name), EventData::Signal(signal) => write!(f, "signal {:?}", signal.name), EventData::SignalSend(signal_send) => write!(f, "signal send {:?}", signal_send.name), @@ -66,13 +75,14 @@ impl std::fmt::Display for Event { EventData::Sleep(_) => write!(f, "sleep"), EventData::Removed(removed) => { if let Some(name) = &removed.name { - write!(f, "removed {} {name}", removed.event_type) + write!(f, "removed {} {name:?}", removed.event_type) } else { write!(f, "removed {}", removed.event_type) } } EventData::VersionCheck => write!(f, "version check"), EventData::Branch => write!(f, "branch"), + EventData::Empty => write!(f, "empty"), } } } diff --git a/lib/chirp-workflow/core/src/history/location.rs b/lib/chirp-workflow/core/src/history/location.rs index 5bde81e80a..435e6a8d8c 100644 --- a/lib/chirp-workflow/core/src/history/location.rs +++ b/lib/chirp-workflow/core/src/history/location.rs @@ -24,6 +24,14 @@ impl Location { pub fn tail(&self) -> Option<&Coordinate> { self.0.last() } + + pub fn join(&self, coordinate: Coordinate) -> Location { + self.0 + .iter() + .cloned() + .chain(std::iter::once(coordinate)) + .collect() + } } impl std::fmt::Display for Location { @@ -81,6 +89,10 @@ impl Coordinate { Coordinate(raw) } + pub fn simple(int: usize) -> Self { + Coordinate(Box::new([int])) + } + pub fn with_tail(&self, tail: usize) -> Self { self.0 .iter() diff --git a/lib/chirp-workflow/core/src/history/removed.rs b/lib/chirp-workflow/core/src/history/removed.rs index 7827611528..d6fe5372fe 100644 --- a/lib/chirp-workflow/core/src/history/removed.rs +++ b/lib/chirp-workflow/core/src/history/removed.rs @@ -69,7 +69,7 @@ pub struct Sleep; impl Removed for Sleep { fn event_type() -> EventType { - EventType::Activity + EventType::Sleep } } diff --git a/lib/chirp-workflow/core/src/message.rs b/lib/chirp-workflow/core/src/message.rs index abcd5ba03e..458c6eb687 100644 --- a/lib/chirp-workflow/core/src/message.rs +++ b/lib/chirp-workflow/core/src/message.rs @@ -1,4 +1,4 @@ -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; use uuid::Uuid; @@ -16,6 +16,56 @@ pub trait Message: Debug + Send + Sync + Serialize + DeserializeOwned + 'static } } +pub trait AsTags: Send + Sync { + fn as_tags(&self) -> WorkflowResult; + fn as_cjson_tags(&self) -> WorkflowResult; +} + +impl AsTags for (T, U) { + fn as_tags(&self) -> WorkflowResult { + let (k, v) = self; + Ok(serde_json::Value::Object( + IntoIterator::into_iter([( + k.to_string(), + serde_json::to_value(&v).map_err(WorkflowError::SerializeTags)?, + )]) + .collect(), + )) + } + + fn as_cjson_tags(&self) -> WorkflowResult { + cjson::to_string(&self.as_tags()?).map_err(WorkflowError::CjsonSerializeTags) + } +} + +impl AsTags for serde_json::Value { + fn as_tags(&self) -> WorkflowResult { + match self { + serde_json::Value::Object(_) => Ok(self.clone()), + _ => Err(WorkflowError::InvalidTags), + } + } + + fn as_cjson_tags(&self) -> WorkflowResult { + match self { + serde_json::Value::Object(_) => { + cjson::to_string(&self).map_err(WorkflowError::CjsonSerializeTags) + } + _ => Err(WorkflowError::InvalidTags), + } + } +} + +impl AsTags for &T { + fn as_tags(&self) -> WorkflowResult { + (*self).as_tags() + } + + fn as_cjson_tags(&self) -> WorkflowResult { + (*self).as_cjson_tags() + } +} + /// A message received from a NATS subscription. #[derive(Debug)] pub struct NatsMessage @@ -103,7 +153,6 @@ pub(crate) struct NatsMessageWrapper<'a> { pub(crate) ts: i64, #[serde(borrow)] pub(crate) body: &'a serde_json::value::RawValue, - pub(crate) allow_recursive: bool, } pub mod redis_keys { diff --git a/lib/chirp-workflow/core/src/utils.rs b/lib/chirp-workflow/core/src/utils.rs index dfdd9f771f..52a30c1bcd 100644 --- a/lib/chirp-workflow/core/src/utils.rs +++ b/lib/chirp-workflow/core/src/utils.rs @@ -88,6 +88,12 @@ pub mod time { } } + impl DurationToMillis for i32 { + fn to_millis(self) -> GlobalResult { + self.try_into().map_err(Into::into) + } + } + impl DurationToMillis for u64 { fn to_millis(self) -> GlobalResult { Ok(self) diff --git a/lib/util/core/src/serde.rs b/lib/util/core/src/serde.rs index a121437076..ceb265440e 100644 --- a/lib/util/core/src/serde.rs +++ b/lib/util/core/src/serde.rs @@ -7,110 +7,113 @@ use std::{ }; use indexmap::IndexMap; -use serde::{de::{self, DeserializeOwned}, Deserialize, Deserializer, Serialize, Serializer}; +use serde::{ + de::{self, DeserializeOwned}, + Deserialize, Deserializer, Serialize, Serializer, +}; use serde_json::{value::RawValue, Number}; use thiserror::Error; /// Like serde_json::Value but with no nesting types (arrays, objects). #[derive(Debug, Clone, PartialEq, Eq)] pub enum SimpleValue { - Null, - Bool(bool), - Number(Number), - String(String), + Null, + Bool(bool), + Number(Number), + String(String), } #[derive(Debug, Error)] pub enum DeserializeError { - #[error("arrays and objects are not supported")] - OnlySimple, + #[error("arrays and objects are not supported")] + OnlySimple, } impl<'de> Deserialize<'de> for SimpleValue { - fn deserialize(deserializer: D) -> Result - where - D: Deserializer<'de>, - { - struct SimpleValueVisitor; - - impl<'de> de::Visitor<'de> for SimpleValueVisitor { - type Value = SimpleValue; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a simple value (null, bool, number, or string)") - } - - fn visit_bool(self, value: bool) -> Result { - Ok(SimpleValue::Bool(value)) - } - - fn visit_i64(self, value: i64) -> Result { - Ok(SimpleValue::Number(value.into())) - } - - fn visit_u64(self, value: u64) -> Result { - Ok(SimpleValue::Number(value.into())) - } - - fn visit_f64(self, value: f64) -> Result { - Ok(Number::from_f64(value).map_or(SimpleValue::Null, SimpleValue::Number)) - } - - fn visit_str(self, value: &str) -> Result { - Ok(SimpleValue::String(value.to_owned())) - } - - fn visit_string(self, value: String) -> Result { - Ok(SimpleValue::String(value)) - } - - fn visit_none(self) -> Result { - Ok(SimpleValue::Null) - } - - fn visit_unit(self) -> Result { - Ok(SimpleValue::Null) - } - - fn visit_some(self, deserializer: D) -> Result - where - D: Deserializer<'de>, - { - Deserialize::deserialize(deserializer) - } - - fn visit_map(self, _visitor: V) -> Result - where - V: de::MapAccess<'de>, - { - Err(de::Error::custom(DeserializeError::OnlySimple)) - } - - fn visit_seq(self, _visitor: V) -> Result - where - V: de::SeqAccess<'de>, - { - Err(de::Error::custom(DeserializeError::OnlySimple)) - } - } - - deserializer.deserialize_any(SimpleValueVisitor) - } + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct SimpleValueVisitor; + + impl<'de> de::Visitor<'de> for SimpleValueVisitor { + type Value = SimpleValue; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a simple value (null, bool, number, or string)") + } + + fn visit_bool(self, value: bool) -> Result { + Ok(SimpleValue::Bool(value)) + } + + fn visit_i64(self, value: i64) -> Result { + Ok(SimpleValue::Number(value.into())) + } + + fn visit_u64(self, value: u64) -> Result { + Ok(SimpleValue::Number(value.into())) + } + + fn visit_f64(self, value: f64) -> Result { + Ok(Number::from_f64(value).map_or(SimpleValue::Null, SimpleValue::Number)) + } + + fn visit_str(self, value: &str) -> Result { + Ok(SimpleValue::String(value.to_owned())) + } + + fn visit_string(self, value: String) -> Result { + Ok(SimpleValue::String(value)) + } + + fn visit_none(self) -> Result { + Ok(SimpleValue::Null) + } + + fn visit_unit(self) -> Result { + Ok(SimpleValue::Null) + } + + fn visit_some(self, deserializer: D) -> Result + where + D: Deserializer<'de>, + { + Deserialize::deserialize(deserializer) + } + + fn visit_map(self, _visitor: V) -> Result + where + V: de::MapAccess<'de>, + { + Err(de::Error::custom(DeserializeError::OnlySimple)) + } + + fn visit_seq(self, _visitor: V) -> Result + where + V: de::SeqAccess<'de>, + { + Err(de::Error::custom(DeserializeError::OnlySimple)) + } + } + + deserializer.deserialize_any(SimpleValueVisitor) + } } impl Serialize for SimpleValue { - #[inline] - fn serialize(&self, serializer: S) -> Result - where - S: ::serde::Serializer, - { - match self { - SimpleValue::Null => serializer.serialize_unit(), - SimpleValue::Bool(b) => serializer.serialize_bool(*b), - SimpleValue::Number(n) => n.serialize(serializer), - SimpleValue::String(s) => serializer.serialize_str(s), - } - } + #[inline] + fn serialize(&self, serializer: S) -> Result + where + S: ::serde::Serializer, + { + match self { + SimpleValue::Null => serializer.serialize_unit(), + SimpleValue::Bool(b) => serializer.serialize_bool(*b), + SimpleValue::Number(n) => n.serialize(serializer), + SimpleValue::String(s) => serializer.serialize_str(s), + } + } } /// Used in workflow activity inputs/outputs. Using this over BTreeMap is preferred because this does not @@ -345,15 +348,15 @@ where } impl sqlx::postgres::PgHasArrayType for Raw { - fn array_type_info() -> sqlx::postgres::PgTypeInfo { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { // JSONB array - sqlx::postgres::PgTypeInfo::with_name("_jsonb") + sqlx::postgres::PgTypeInfo::with_name("_jsonb") } } impl sqlx::postgres::PgHasArrayType for &Raw { - fn array_type_info() -> sqlx::postgres::PgTypeInfo { + fn array_type_info() -> sqlx::postgres::PgTypeInfo { // JSONB array - sqlx::postgres::PgTypeInfo::with_name("_jsonb") + sqlx::postgres::PgTypeInfo::with_name("_jsonb") } } diff --git a/svc/api/admin/src/route/clusters/datacenters.rs b/svc/api/admin/src/route/clusters/datacenters.rs index 830ef68816..97fbe5c3ee 100644 --- a/svc/api/admin/src/route/clusters/datacenters.rs +++ b/svc/api/admin/src/route/clusters/datacenters.rs @@ -90,9 +90,10 @@ pub async fn create( ]; let mut sub = ctx - .subscribe::(&json!({ - "datacenter_id": datacenter_id, - })) + .subscribe::(( + "datacenter_id", + datacenter_id, + )) .await?; ctx.signal(cluster::workflows::cluster::DatacenterCreate { diff --git a/svc/api/admin/src/route/clusters/mod.rs b/svc/api/admin/src/route/clusters/mod.rs index 0932c586cc..7217db6f76 100644 --- a/svc/api/admin/src/route/clusters/mod.rs +++ b/svc/api/admin/src/route/clusters/mod.rs @@ -35,9 +35,7 @@ pub async fn create( let cluster_id = Uuid::new_v4(); let mut sub = ctx - .subscribe::(&json!({ - "cluster_id": cluster_id, - })) + .subscribe::(("cluster_id", cluster_id)) .await?; ctx.workflow(cluster::workflows::cluster::Input { diff --git a/svc/api/servers/src/route/servers.rs b/svc/api/servers/src/route/servers.rs index ab11d56ec4..8f46f0a9bd 100644 --- a/svc/api/servers/src/route/servers.rs +++ b/svc/api/servers/src/route/servers.rs @@ -77,14 +77,10 @@ pub async fn create( let server_id = Uuid::new_v4(); let mut create_sub = ctx - .subscribe::(&json!({ - "server_id": server_id, - })) + .subscribe::(("server_id", server_id)) .await?; let mut fail_sub = ctx - .subscribe::(&json!({ - "server_id": server_id, - })) + .subscribe::(("server_id", server_id)) .await?; ctx.workflow(ds::workflows::server::Input { @@ -197,9 +193,7 @@ pub async fn destroy( ); let mut sub = ctx - .subscribe::(&json!({ - "server_id": server_id, - })) + .subscribe::(("server_id", server_id)) .await?; ctx.signal(ds::workflows::server::Destroy { diff --git a/svc/pkg/cluster/src/workflows/datacenter/mod.rs b/svc/pkg/cluster/src/workflows/datacenter/mod.rs index df20f7abe7..e7255b1a4c 100644 --- a/svc/pkg/cluster/src/workflows/datacenter/mod.rs +++ b/svc/pkg/cluster/src/workflows/datacenter/mod.rs @@ -157,13 +157,14 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()> provider, provider_datacenter_id, provider_api_token, + pools, pools2, build_delivery_method, prebakes_enabled, create_ts ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 + $1, $2, $3, $4, $5, $6, $7, '', $8, $9, $10, $11 ) ", input.datacenter_id, @@ -189,7 +190,7 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> GlobalResult<()> state, expire_ts ) - VALUES ($1, $2) + VALUES ($1, $2, 0) ", input.datacenter_id, TlsState::Creating as i64, diff --git a/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs b/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs index 51dccc6f10..1fc7d34ff3 100644 --- a/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs +++ b/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs @@ -17,7 +17,7 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { .merge(pegboard::registry()?)?; let db = db::DatabasePgNats::from_pools(pools.crdb()?, pools.nats()?); - let worker = Worker::new(reg.handle(), db.clone()); + let worker = Worker::new(reg.handle(), db); // Start worker worker.wake_start(pools).await?; diff --git a/svc/pkg/workflow/db/workflow/migrations/20241007201625_versioning.up.sql b/svc/pkg/workflow/db/workflow/migrations/20241007201625_versioning.up.sql index fb72185518..8376081466 100644 --- a/svc/pkg/workflow/db/workflow/migrations/20241007201625_versioning.up.sql +++ b/svc/pkg/workflow/db/workflow/migrations/20241007201625_versioning.up.sql @@ -15,7 +15,7 @@ ALTER TABLE workflow_activity_events ADD COLUMN _uuid UUID NOT NULL DEFAULT gen_random_uuid(), -- Required to create indexes (JSONB indexes are not supported) ADD COLUMN location2_hash BYTES GENERATED ALWAYS AS (digest(location2::TEXT, 'md5')) STORED, - ADD CONSTRAINT workflow_activity_events_location_unique UNIQUE (workflow_id, location2), + ADD CONSTRAINT workflow_activity_events_location_unique UNIQUE (workflow_id, location2_hash), ADD COLUMN loop_location2_hash BYTES GENERATED ALWAYS AS (digest(loop_location2::TEXT, 'md5')) STORED; -- Update idx @@ -23,6 +23,7 @@ DROP INDEX workflow_activity_events@idx_workflow_activity_events_loop_location2; CREATE INDEX ON workflow_activity_events (workflow_id, loop_location2_hash) WHERE forgotten = FALSE; + -- Signals ALTER TABLE workflow_signal_events ADD COLUMN version INT NOT NULL DEFAULT 1, @@ -36,7 +37,7 @@ ALTER TABLE workflow_signal_events ADD COLUMN _uuid UUID NOT NULL DEFAULT gen_random_uuid(), -- Required to create indexes (JSONB indexes are not supported) ADD COLUMN location2_hash BYTES GENERATED ALWAYS AS (digest(location2::TEXT, 'md5')) STORED, - ADD CONSTRAINT workflow_signal_events_location_unique UNIQUE (workflow_id, location2), + ADD CONSTRAINT workflow_signal_events_location_unique UNIQUE (workflow_id, location2_hash), ADD COLUMN loop_location2_hash BYTES GENERATED ALWAYS AS (digest(loop_location2::TEXT, 'md5')) STORED; -- Update idx @@ -44,6 +45,7 @@ DROP INDEX workflow_signal_events@idx_workflow_signal_events_loop_location2; CREATE INDEX ON workflow_signal_events (workflow_id, loop_location2_hash) WHERE forgotten = FALSE; + -- Signal send ALTER TABLE workflow_signal_send_events ADD COLUMN version INT NOT NULL DEFAULT 1, @@ -57,7 +59,7 @@ ALTER TABLE workflow_signal_send_events ADD COLUMN _uuid UUID NOT NULL DEFAULT gen_random_uuid(), -- Required to create indexes (JSONB indexes are not supported) ADD COLUMN location2_hash BYTES GENERATED ALWAYS AS (digest(location2::TEXT, 'md5')) STORED, - ADD CONSTRAINT workflow_signal_send_events_location_unique UNIQUE (workflow_id, location2), + ADD CONSTRAINT workflow_signal_send_events_location_unique UNIQUE (workflow_id, location2_hash), ADD COLUMN loop_location2_hash BYTES GENERATED ALWAYS AS (digest(loop_location2::TEXT, 'md5')) STORED; -- Update idx @@ -65,6 +67,7 @@ DROP INDEX workflow_signal_send_events@idx_workflow_signal_send_events_loop_loca CREATE INDEX ON workflow_signal_send_events (workflow_id, loop_location2_hash) WHERE forgotten = FALSE; + -- Message send ALTER TABLE workflow_message_send_events ADD COLUMN version INT NOT NULL DEFAULT 1, @@ -78,7 +81,7 @@ ALTER TABLE workflow_message_send_events ADD COLUMN _uuid UUID NOT NULL DEFAULT gen_random_uuid(), -- Required to create indexes (JSONB indexes are not supported) ADD COLUMN location2_hash BYTES GENERATED ALWAYS AS (digest(location2::TEXT, 'md5')) STORED, - ADD CONSTRAINT workflow_message_send_events_location_unique UNIQUE (workflow_id, location2), + ADD CONSTRAINT workflow_message_send_events_location_unique UNIQUE (workflow_id, location2_hash), ADD COLUMN loop_location2_hash BYTES GENERATED ALWAYS AS (digest(loop_location2::TEXT, 'md5')) STORED; -- Update idx @@ -86,6 +89,7 @@ DROP INDEX workflow_message_send_events@idx_workflow_message_send_events_loop_lo CREATE INDEX ON workflow_message_send_events (workflow_id, loop_location2_hash) WHERE forgotten = FALSE; + -- Sub workflows ALTER TABLE workflow_sub_workflow_events ADD COLUMN version INT NOT NULL DEFAULT 1, @@ -99,7 +103,7 @@ ALTER TABLE workflow_sub_workflow_events ADD COLUMN _uuid UUID NOT NULL DEFAULT gen_random_uuid(), -- Required to create indexes (JSONB indexes are not supported) ADD COLUMN location2_hash BYTES GENERATED ALWAYS AS (digest(location2::TEXT, 'md5')) STORED, - ADD CONSTRAINT workflow_sub_workflow_events_location_unique UNIQUE (workflow_id, location2), + ADD CONSTRAINT workflow_sub_workflow_events_location_unique UNIQUE (workflow_id, location2_hash), ADD COLUMN loop_location2_hash BYTES GENERATED ALWAYS AS (digest(loop_location2::TEXT, 'md5')) STORED; -- Update idx @@ -107,6 +111,7 @@ DROP INDEX workflow_sub_workflow_events@idx_workflow_sub_workflow_events_loop_lo CREATE INDEX ON workflow_sub_workflow_events (workflow_id, loop_location2_hash) WHERE forgotten = FALSE; + -- Loops ALTER TABLE workflow_loop_events ADD COLUMN version INT NOT NULL DEFAULT 1, @@ -120,7 +125,7 @@ ALTER TABLE workflow_loop_events ADD COLUMN _uuid UUID NOT NULL DEFAULT gen_random_uuid(), -- Required to create indexes (JSONB indexes are not supported) ADD COLUMN location2_hash BYTES GENERATED ALWAYS AS (digest(location2::TEXT, 'md5')) STORED, - ADD CONSTRAINT workflow_loop_events_location_unique UNIQUE (workflow_id, location2), + ADD CONSTRAINT workflow_loop_events_location_unique UNIQUE (workflow_id, location2_hash), ADD COLUMN loop_location2_hash BYTES GENERATED ALWAYS AS (digest(loop_location2::TEXT, 'md5')) STORED; -- Update idx @@ -128,6 +133,7 @@ DROP INDEX workflow_loop_events@idx_workflow_loop_events_loop_location2; CREATE INDEX ON workflow_loop_events (workflow_id, loop_location2_hash) WHERE forgotten = FALSE; + -- Sleep ALTER TABLE workflow_sleep_events ADD COLUMN version INT NOT NULL DEFAULT 1, @@ -141,7 +147,7 @@ ALTER TABLE workflow_sleep_events ADD COLUMN _uuid UUID NOT NULL DEFAULT gen_random_uuid(), -- Required to create indexes (JSONB indexes are not supported) ADD COLUMN location2_hash BYTES GENERATED ALWAYS AS (digest(location2::TEXT, 'md5')) STORED, - ADD CONSTRAINT workflow_sleep_events_location_unique UNIQUE (workflow_id, location2), + ADD CONSTRAINT workflow_sleep_events_location_unique UNIQUE (workflow_id, location2_hash), ADD COLUMN loop_location2_hash BYTES GENERATED ALWAYS AS (digest(loop_location2::TEXT, 'md5')) STORED; -- Update idx @@ -149,6 +155,7 @@ DROP INDEX workflow_sleep_events@idx_workflow_sleep_events_loop_location; CREATE INDEX ON workflow_sleep_events (workflow_id, loop_location2_hash) WHERE forgotten = FALSE; + ALTER TABLE workflow_activity_errors -- Deprecated ALTER COLUMN location SET DEFAULT '{}', @@ -160,10 +167,12 @@ ALTER TABLE workflow_activity_errors -- Update idx CREATE INDEX ON workflow_activity_errors (workflow_id, location2_hash); + -- Branches CREATE TABLE workflow_branch_events ( workflow_id UUID NOT NULL REFERENCES workflows, location JSONB NOT NULL, + version INT NOT NULL DEFAULT 1, loop_location JSONB, forgotten BOOLEAN NOT NULL DEFAULT FALSE, @@ -178,6 +187,7 @@ CREATE TABLE workflow_branch_events ( CREATE INDEX ON workflow_branch_events (workflow_id, loop_location_hash) WHERE forgotten = FALSE; + -- Removed CREATE TABLE workflow_removed_events ( workflow_id UUID NOT NULL REFERENCES workflows, @@ -199,6 +209,7 @@ CREATE TABLE workflow_removed_events ( CREATE INDEX ON workflow_removed_events (workflow_id, loop_location_hash) WHERE forgotten = FALSE; + -- Version checks CREATE TABLE workflow_version_check_events ( workflow_id UUID NOT NULL REFERENCES workflows, @@ -216,4 +227,3 @@ CREATE TABLE workflow_version_check_events ( CREATE INDEX ON workflow_version_check_events (workflow_id, loop_location_hash) WHERE forgotten = FALSE; - diff --git a/svc/pkg/workflow/db/workflow/migrations/20241007222710_versioning_pt2.up.sql b/svc/pkg/workflow/db/workflow/migrations/20241007222710_versioning_pt2.up.sql index a3684c11b9..497aa71648 100644 --- a/svc/pkg/workflow/db/workflow/migrations/20241007222710_versioning_pt2.up.sql +++ b/svc/pkg/workflow/db/workflow/migrations/20241007222710_versioning_pt2.up.sql @@ -28,3 +28,23 @@ ALTER TABLE workflow_loop_events ALTER TABLE workflow_sleep_events DROP CONSTRAINT workflow_sleep_events_pkey, ADD PRIMARY KEY (_uuid); + + +-- Backfill location2 for activities and loops (they cause problems with ON CONFLICT) +UPDATE db_workflow.workflow_activity_events AS a +SET location2 = ( + SELECT jsonb_agg(jsonb_build_array(x + 1)) + FROM unnest(a.location) AS x +) +FROM db_workflow.workflows AS w +WHERE + a.workflow_id = w.workflow_id AND + w.output IS NULL AND + a.forgotten = false; + +UPDATE workflow_loop_events +SET location2 = ( + SELECT jsonb_agg(jsonb_build_array(x + 1)) + FROM unnest(location) AS x +) +WHERE forgotten = FALSE;