diff --git a/lib/chirp-workflow/core/src/compat.rs b/lib/chirp-workflow/core/src/compat.rs index f413cddc8d..993a7daac9 100644 --- a/lib/chirp-workflow/core/src/compat.rs +++ b/lib/chirp-workflow/core/src/compat.rs @@ -37,7 +37,43 @@ where db_from_ctx(ctx) .await? - .dispatch_workflow(ctx.ray_id(), id, &name, input_val) + .dispatch_workflow(ctx.ray_id(), id, &name, None, input_val) + .await + .map_err(GlobalError::raw)?; + + tracing::info!(%name, ?id, "workflow dispatched"); + + Ok(id) +} + +pub async fn dispatch_tagged_workflow( + ctx: &rivet_operation::OperationContext, + tags: &serde_json::Value, + input: I, +) -> GlobalResult +where + I: WorkflowInput, + ::Workflow: Workflow, + B: Debug + Clone, +{ + if ctx.from_workflow { + bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body."); + } + + let name = I::Workflow::NAME; + + tracing::debug!(%name, ?input, "dispatching workflow"); + + let id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeWorkflowOutput) + .map_err(GlobalError::raw)?; + + db_from_ctx(ctx) + .await? + .dispatch_workflow(ctx.ray_id(), id, &name, Some(tags), input_val) .await .map_err(GlobalError::raw)?; @@ -92,6 +128,26 @@ where .await? } +/// Dispatch a new workflow and wait for it to complete. Has a 60s timeout. +pub async fn tagged_workflow( + ctx: &rivet_operation::OperationContext, + tags: &serde_json::Value, + input: I, +) -> GlobalResult<<::Workflow as Workflow>::Output> +where + I: WorkflowInput, + ::Workflow: Workflow, + B: Debug + Clone, +{ + let workflow_id = dispatch_tagged_workflow(ctx, tags, input).await?; + + tokio::time::timeout( + WORKFLOW_TIMEOUT, + wait_for_workflow::(ctx, workflow_id), + ) + .await? +} + pub async fn signal( ctx: &rivet_operation::OperationContext, workflow_id: Uuid, @@ -119,6 +175,33 @@ pub async fn signal( Ok(signal_id) } +pub async fn tagged_signal( + ctx: &rivet_operation::OperationContext, + tags: &serde_json::Value, + input: I, +) -> GlobalResult { + if ctx.from_workflow { + bail!("cannot dispatch a signal from an operation within a workflow execution. trigger it from the workflow's body."); + } + + tracing::debug!(name=%I::NAME, ?tags, "dispatching tagged signal"); + + let signal_id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; + + db_from_ctx(ctx) + .await? + .publish_tagged_signal(ctx.ray_id(), tags, signal_id, I::NAME, input_val) + .await + .map_err(GlobalError::raw)?; + + Ok(signal_id) +} + pub async fn op( ctx: &rivet_operation::OperationContext, input: I, diff --git a/lib/chirp-workflow/core/src/ctx/api.rs b/lib/chirp-workflow/core/src/ctx/api.rs index 3c084eb4ab..b8f2ec7377 100644 --- a/lib/chirp-workflow/core/src/ctx/api.rs +++ b/lib/chirp-workflow/core/src/ctx/api.rs @@ -74,7 +74,33 @@ impl ApiCtx { .map_err(GlobalError::raw)?; self.db - .dispatch_workflow(self.ray_id, id, &name, input_val) + .dispatch_workflow(self.ray_id, id, &name, None, input_val) + .await + .map_err(GlobalError::raw)?; + + tracing::info!(%name, ?id, "workflow dispatched"); + + Ok(id) + } + + pub async fn dispatch_tagged_workflow(&self, tags: &serde_json::Value, input: I) -> GlobalResult + where + I: WorkflowInput, + ::Workflow: Workflow, + { + let name = I::Workflow::NAME; + + tracing::debug!(%name, ?tags, ?input, "dispatching tagged workflow"); + + let id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeWorkflowOutput) + .map_err(GlobalError::raw)?; + + self.db + .dispatch_workflow(self.ray_id, id, &name, Some(tags), input_val) .await .map_err(GlobalError::raw)?; @@ -128,6 +154,25 @@ impl ApiCtx { .await? } + /// Dispatch a new workflow with tags and wait for it to complete. Has a 60s timeout. + pub async fn tagged_workflow( + &self, + tags: &serde_json::Value, + input: I, + ) -> GlobalResult<<::Workflow as Workflow>::Output> + where + I: WorkflowInput, + ::Workflow: Workflow, + { + let workflow_id = self.dispatch_tagged_workflow(tags, input).await?; + + tokio::time::timeout( + WORKFLOW_TIMEOUT, + self.wait_for_workflow::(workflow_id), + ) + .await? + } + pub async fn signal( &self, workflow_id: Uuid, @@ -150,6 +195,28 @@ impl ApiCtx { Ok(signal_id) } + pub async fn tagged_signal( + &self, + tags: &serde_json::Value, + input: T, + ) -> GlobalResult { + tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal"); + + let signal_id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; + + self.db + .publish_tagged_signal(self.ray_id, tags, signal_id, T::NAME, input_val) + .await + .map_err(GlobalError::raw)?; + + Ok(signal_id) + } + pub async fn op( &self, input: I, diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index a61dedab1e..2bbb30e9ff 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -90,7 +90,33 @@ impl TestCtx { .map_err(GlobalError::raw)?; self.db - .dispatch_workflow(self.ray_id, id, &name, input_val) + .dispatch_workflow(self.ray_id, id, &name, None, input_val) + .await + .map_err(GlobalError::raw)?; + + tracing::info!(%name, ?id, "workflow dispatched"); + + Ok(id) + } + + pub async fn dispatch_tagged_workflow(&self, tags: &serde_json::Value, input: I) -> GlobalResult + where + I: WorkflowInput, + ::Workflow: Workflow, + { + let name = I::Workflow::NAME; + + tracing::debug!(%name, ?tags, ?input, "dispatching tagged workflow"); + + let id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeWorkflowOutput) + .map_err(GlobalError::raw)?; + + self.db + .dispatch_workflow(self.ray_id, id, &name, Some(tags), input_val) .await .map_err(GlobalError::raw)?; @@ -137,6 +163,20 @@ impl TestCtx { Ok(output) } + pub async fn tagged_workflow( + &self, + tags: &serde_json::Value, + input: I, + ) -> GlobalResult<<::Workflow as Workflow>::Output> + where + I: WorkflowInput, + ::Workflow: Workflow, + { + let workflow_id = self.dispatch_tagged_workflow(tags, input).await?; + let output = self.wait_for_workflow::(workflow_id).await?; + Ok(output) + } + pub async fn signal( &self, workflow_id: Uuid, @@ -159,6 +199,28 @@ impl TestCtx { Ok(signal_id) } + pub async fn tagged_signal( + &self, + tags: &serde_json::Value, + input: T, + ) -> GlobalResult { + tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal"); + + let signal_id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; + + self.db + .publish_tagged_signal(self.ray_id, tags, signal_id, T::NAME, input_val) + .await + .map_err(GlobalError::raw)?; + + Ok(signal_id) + } + /// Waits for a workflow to be triggered with a superset of given input. Strictly for tests. pub fn observe(&self, input: serde_json::Value) -> GlobalResult { // Serialize input diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 3b6fea44a7..78d9006279 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -151,13 +151,13 @@ impl WorkflowCtx { } // Purposefully infallible - pub(crate) async fn run_workflow(mut self) { - if let Err(err) = Self::run_workflow_inner(&mut self).await { + pub(crate) async fn run(mut self) { + if let Err(err) = Self::run_inner(&mut self).await { tracing::error!(?err, "unhandled error"); } } - async fn run_workflow_inner(&mut self) -> WorkflowResult<()> { + async fn run_inner(&mut self) -> WorkflowResult<()> { tracing::info!(name=%self.name, id=%self.workflow_id, "running workflow"); // Lookup workflow @@ -356,6 +356,23 @@ impl WorkflowCtx { impl WorkflowCtx { /// Dispatch another workflow. pub async fn dispatch_workflow(&mut self, input: I) -> GlobalResult + where + I: WorkflowInput, + ::Workflow: Workflow, + { + self.dispatch_workflow_inner(None, input).await + } + + /// Dispatch another workflow with tags. + pub async fn dispatch_tagged_workflow(&mut self, tags: &serde_json::Value, input: I) -> GlobalResult + where + I: WorkflowInput, + ::Workflow: Workflow, + { + self.dispatch_workflow_inner(Some(tags), input).await + } + + async fn dispatch_workflow_inner(&mut self, tags: Option<&serde_json::Value>, input: I) -> GlobalResult where I: WorkflowInput, ::Workflow: Workflow, @@ -386,7 +403,7 @@ impl WorkflowCtx { else { let name = I::Workflow::NAME; - tracing::debug!(%name, ?input, "dispatching workflow"); + tracing::debug!(%name, ?tags, ?input, "dispatching workflow"); let sub_workflow_id = Uuid::new_v4(); @@ -402,6 +419,7 @@ impl WorkflowCtx { self.full_location().as_ref(), sub_workflow_id, &name, + tags, input_val, ) .await @@ -454,11 +472,37 @@ impl WorkflowCtx { } } - /// Runs a sub workflow in the same process as the current workflow and returns its response. + /// Runs a sub workflow in the same process as the current workflow (if possible) and returns its + /// response. pub async fn workflow( &mut self, input: I, ) -> GlobalResult<<::Workflow as Workflow>::Output> + where + I: WorkflowInput, + ::Workflow: Workflow, + { + self.workflow_inner(None, input).await + } + + /// Runs a sub workflow with tags in the same process as the current workflow (if possible) and returns + /// its response. + pub async fn tagged_workflow( + &mut self, + input: I, + ) -> GlobalResult<<::Workflow as Workflow>::Output> + where + I: WorkflowInput, + ::Workflow: Workflow, + { + self.workflow_inner(None, input).await + } + + async fn workflow_inner( + &mut self, + tags: Option<&serde_json::Value>, + input: I, + ) -> GlobalResult<<::Workflow as Workflow>::Output> where I: WorkflowInput, ::Workflow: Workflow, @@ -476,7 +520,7 @@ impl WorkflowCtx { // worker in the registry, this will diverge in history because it will try to run the sub worker // in-process during the replay // If the workflow isn't in the current registry, dispatch the workflow instead - let sub_workflow_id = self.dispatch_workflow(input).await?; + let sub_workflow_id = self.dispatch_workflow_inner(tags, input).await?; let output = self .wait_for_workflow::(sub_workflow_id) .await?; @@ -631,6 +675,29 @@ impl WorkflowCtx { Ok(signal_id) } + /// Sends a tagged signal. + pub async fn tagged_signal( + &mut self, + tags: &serde_json::Value, + body: T, + ) -> GlobalResult { + tracing::debug!(name=%T::NAME, ?tags, "dispatching tagged signal"); + + let signal_id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(&body) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; + + self.db + .publish_tagged_signal(self.ray_id, tags, signal_id, T::NAME, input_val) + .await + .map_err(GlobalError::raw)?; + + Ok(signal_id) + } + /// Listens for a signal for a short time before setting the workflow to sleep. Once the signal is /// received, the workflow will be woken up and continue. pub async fn listen(&mut self) -> GlobalResult { diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 62cd1b9a9b..0e27439f8a 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -16,6 +16,7 @@ pub trait Database: Send { ray_id: Uuid, workflow_id: Uuid, workflow_name: &str, + tags: Option<&serde_json::Value>, input: serde_json::Value, ) -> WorkflowResult<()>; async fn get_workflow(&self, id: Uuid) -> WorkflowResult>; @@ -60,6 +61,14 @@ pub trait Database: Send { signal_name: &str, body: serde_json::Value, ) -> WorkflowResult<()>; + async fn publish_tagged_signal( + &self, + ray_id: Uuid, + tags: &serde_json::Value, + signal_id: Uuid, + signal_name: &str, + body: serde_json::Value, + ) -> WorkflowResult<()>; async fn pull_next_signal( &self, workflow_id: Uuid, @@ -74,6 +83,7 @@ pub trait Database: Send { location: &[usize], sub_workflow_id: Uuid, sub_workflow_name: &str, + tags: Option<&serde_json::Value>, input: serde_json::Value, ) -> WorkflowResult<()>; diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 97164d9d93..53641c3801 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -61,20 +61,22 @@ impl Database for DatabasePostgres { ray_id: Uuid, workflow_id: Uuid, workflow_name: &str, + tags: Option<&serde_json::Value>, input: serde_json::Value, ) -> WorkflowResult<()> { sqlx::query(indoc!( " INSERT INTO db_workflow.workflows ( - workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate + workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate ) - VALUES ($1, $2, $3, $4, $5, true) + VALUES ($1, $2, $3, $4, $5, $6, true) ", )) .bind(workflow_id) .bind(workflow_name) .bind(rivet_util::timestamp::now()) .bind(ray_id) + .bind(tags) .bind(input) .execute(&mut *self.conn().await?) .await @@ -108,7 +110,7 @@ impl Database for DatabasePostgres { " WITH pull_workflows AS ( - UPDATE db_workflow.workflows as w + UPDATE db_workflow.workflows AS w -- Assign this node to this workflow SET worker_instance_id = $1 WHERE @@ -120,14 +122,25 @@ impl Database for DatabasePostgres { worker_instance_id IS NULL AND -- Check for wake condition ( + -- Immediate wake_immediate OR + -- After deadline wake_deadline_ts IS NOT NULL OR + -- Signal exists ( SELECT true FROM db_workflow.signals AS s WHERE s.signal_name = ANY(wake_signals) LIMIT 1 ) OR + -- Tagged signal exists + ( + SELECT true + FROM db_workflow.tagged_signals AS s + WHERE w.tags @> s.tags + LIMIT 1 + ) OR + -- Sub workflow completed ( SELECT true FROM db_workflow.workflows AS w2 @@ -405,31 +418,48 @@ impl Database for DatabasePostgres { filter: &[&str], location: &[usize], ) -> WorkflowResult> { - // TODO: RVT-3752 let signal = sqlx::query_as::<_, SignalRow>(indoc!( " WITH + -- Finds the oldest signal matching the signal name filter in either the normal signals table + -- or tagged signals table next_signal AS ( - DELETE FROM db_workflow.signals + SELECT false AS tagged, signal_id, create_ts, signal_name, body + FROM db_workflow.signals WHERE workflow_id = $1 AND signal_name = ANY($2) + UNION ALL + SELECT true AS tagged, signal_id, create_ts, signal_name, body + FROM db_workflow.tagged_signals + WHERE + signal_name = ANY($2) AND + tags <@ (SELECT tags FROM db_workflow.workflows WHERE workflow_id = $1) ORDER BY create_ts ASC LIMIT 1 - RETURNING workflow_id, signal_id, signal_name, body ), - clear_wake AS ( - UPDATE db_workflow.workflows AS w - SET wake_signals = ARRAY[] - FROM next_signal AS s - WHERE w.workflow_id = s.workflow_id + -- If the next signal is not tagged, delete it with this statement + delete_signal AS ( + DELETE FROM db_workflow.signals + WHERE signal_id = ( + SELECT signal_id FROM next_signal WHERE tagged = false + ) RETURNING 1 ), + -- If the next signal is tagged, delete it with this statement + delete_tagged_signal AS ( + DELETE FROM db_workflow.tagged_signals + WHERE signal_id = ( + SELECT signal_id FROM next_signal WHERE tagged = true + ) + RETURNING 1 + ), + -- After deleting the signal, add it to the events table (i.e. acknowledge it) insert_event AS ( - INSERT INTO db_workflow.workflow_signal_events( + INSERT INTO db_workflow.workflow_signal_events ( workflow_id, location, signal_id, signal_name, body, ack_ts ) - SELECT workflow_id, $3 AS location, signal_id, signal_name, body, $4 AS ack_ts + SELECT $1 AS workflow_id, $3 AS location, signal_id, signal_name, body, $4 AS ack_ts FROM next_signal RETURNING 1 ) @@ -457,7 +487,7 @@ impl Database for DatabasePostgres { ) -> WorkflowResult<()> { sqlx::query(indoc!( " - INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, create_ts, ray_id) + INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, ray_id, create_ts) VALUES ($1, $2, $3, $4, $5, $6) ", )) @@ -474,6 +504,33 @@ impl Database for DatabasePostgres { Ok(()) } + async fn publish_tagged_signal( + &self, + ray_id: Uuid, + tags: &serde_json::Value, + signal_id: Uuid, + signal_name: &str, + body: serde_json::Value, + ) -> WorkflowResult<()> { + sqlx::query(indoc!( + " + INSERT INTO db_workflow.tagged_signals (signal_id, tags, signal_name, body, ray_id, create_ts) + VALUES ($1, $2, $3, $4, $5, $6) + ", + )) + .bind(signal_id) + .bind(tags) + .bind(signal_name) + .bind(body) + .bind(ray_id) + .bind(rivet_util::timestamp::now()) + .execute(&mut *self.conn().await?) + .await + .map_err(WorkflowError::Sqlx)?; + + Ok(()) + } + async fn dispatch_sub_workflow( &self, ray_id: Uuid, @@ -481,6 +538,7 @@ impl Database for DatabasePostgres { location: &[usize], sub_workflow_id: Uuid, sub_workflow_name: &str, + tags: Option<&serde_json::Value>, input: serde_json::Value, ) -> WorkflowResult<()> { sqlx::query(indoc!( @@ -488,16 +546,16 @@ impl Database for DatabasePostgres { WITH workflow AS ( INSERT INTO db_workflow.workflows ( - workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate + workflow_id, workflow_name, create_ts, ray_id, tags, input, wake_immediate ) - VALUES ($7, $2, $3, $4, $5, true) + VALUES ($8, $2, $3, $4, $5, $6, true) RETURNING 1 ), sub_workflow AS ( INSERT INTO db_workflow.workflow_sub_workflow_events( workflow_id, location, sub_workflow_id, create_ts ) - VALUES($1, $6, $7, $3) + VALUES($1, $7, $8, $3) RETURNING 1 ) SELECT 1 @@ -507,6 +565,7 @@ impl Database for DatabasePostgres { .bind(sub_workflow_name) .bind(rivet_util::timestamp::now()) .bind(ray_id) + .bind(tags) .bind(input) .bind(location.iter().map(|x| *x as i64).collect::>()) .bind(sub_workflow_id) diff --git a/lib/chirp-workflow/core/src/error.rs b/lib/chirp-workflow/core/src/error.rs index 06e7f7d83e..c41fa25e24 100644 --- a/lib/chirp-workflow/core/src/error.rs +++ b/lib/chirp-workflow/core/src/error.rs @@ -42,6 +42,12 @@ pub enum WorkflowError { #[error("deserialize workflow input: {0}")] DeserializeWorkflowOutput(serde_json::Error), + #[error("serialize workflow tags: {0}")] + SerializeWorkflowTags(serde_json::Error), + + #[error("deserialize workflow tags: {0}")] + DeserializeWorkflowTags(serde_json::Error), + #[error("serialize activity input: {0}")] SerializeActivityInput(serde_json::Error), diff --git a/lib/chirp-workflow/core/src/prelude.rs b/lib/chirp-workflow/core/src/prelude.rs index d84bfb68b6..4703fa6311 100644 --- a/lib/chirp-workflow/core/src/prelude.rs +++ b/lib/chirp-workflow/core/src/prelude.rs @@ -15,6 +15,7 @@ pub mod util { pub use crate::{ activity::Activity, + signal::{Listen, Signal}, ctx::*, db, error::{WorkflowError, WorkflowResult}, diff --git a/lib/chirp-workflow/core/src/signal.rs b/lib/chirp-workflow/core/src/signal.rs index bd24a091d8..9db2376667 100644 --- a/lib/chirp-workflow/core/src/signal.rs +++ b/lib/chirp-workflow/core/src/signal.rs @@ -53,7 +53,7 @@ macro_rules! join_signal { impl Listen for $join { async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult { let row = ctx.listen_any(&[$($signals::NAME),*]).await?; - Self::parse(&row.name, row.body) + Self::parse(&row.signal_name, row.body) } fn parse(name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult { diff --git a/lib/chirp-workflow/core/src/worker.rs b/lib/chirp-workflow/core/src/worker.rs index 94b9dadda4..3c10dd310c 100644 --- a/lib/chirp-workflow/core/src/worker.rs +++ b/lib/chirp-workflow/core/src/worker.rs @@ -84,7 +84,7 @@ impl Worker { util::sleep_until_ts(wake_deadline_ts).await; } - ctx.run_workflow().await; + ctx.run().await; } .in_current_span(), ); diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index c8ca0488fe..316ea44cf0 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -282,11 +282,11 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream { const NAME: &'static str = #name; } - #[::async_trait::async_trait] + #[async_trait::async_trait] impl Listen for #struct_ident { async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult { let row = ctx.listen_any(&[Self::NAME]).await?; - Self::parse(&row.name, &row.body) + Self::parse(&row.signal_name, row.body) } fn parse(_name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult { diff --git a/svc/pkg/foo/worker/src/workflows/test.rs b/svc/pkg/foo/worker/src/workflows/test.rs index 07f56ac690..371eb5d76e 100644 --- a/svc/pkg/foo/worker/src/workflows/test.rs +++ b/svc/pkg/foo/worker/src/workflows/test.rs @@ -1,26 +1,28 @@ use chirp_workflow::prelude::*; +use serde_json::json; #[derive(Debug, Serialize, Deserialize)] pub struct TestInput { pub x: i64, } -type TestOutput = Result; - -#[derive(Debug, Serialize, Deserialize)] -pub struct TestOutputOk { - pub y: usize, -} #[derive(Debug, Serialize, Deserialize)] -pub struct TestOutputErr { - pub z: usize, +pub struct TestOutput { + pub y: i64, } #[workflow(Test)] pub async fn test(ctx: &mut WorkflowCtx, input: &TestInput) -> GlobalResult { - let a = ctx.activity(FooInput {}).await?; + ctx.activity(FooInput {}).await?; + + let sig = ctx.listen::().await?; - Ok(Ok(TestOutputOk { y: a.ids.len() })) + Ok(TestOutput { y: input.x + sig.x }) +} + +#[signal("foo-bar")] +pub struct FooBarSignal { + pub x: i64, } #[derive(Debug, Serialize, Deserialize, Hash)] diff --git a/svc/pkg/foo/worker/tests/test.rs b/svc/pkg/foo/worker/tests/test.rs index 3da6bd3311..5a6f4a8898 100644 --- a/svc/pkg/foo/worker/tests/test.rs +++ b/svc/pkg/foo/worker/tests/test.rs @@ -1,9 +1,25 @@ use chirp_workflow::prelude::*; +use serde_json::json; #[workflow_test] async fn empty(ctx: TestCtx) { + let tags = json!({ + "amog": "us", + }); + + let id = ctx + .dispatch_tagged_workflow(&tags, foo_worker::workflows::test::TestInput { x: -2 }) + .await + .unwrap(); + + tokio::time::sleep(std::time::Duration::from_secs(12)).await; + + ctx.tagged_signal(&tags, foo_worker::workflows::test::FooBarSignal { x: 400 }) + .await + .unwrap(); + let res = ctx - .workflow(foo_worker::workflows::test::TestInput { x: 12 }) + .wait_for_workflow::(id) .await .unwrap(); diff --git a/svc/pkg/workflow/db/workflow/migrations/20240626202744_add_tags.down.sql b/svc/pkg/workflow/db/workflow/migrations/20240626202744_add_tags.down.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/svc/pkg/workflow/db/workflow/migrations/20240626202744_add_tags.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240626202744_add_tags.up.sql new file mode 100644 index 0000000000..ab634e3b2c --- /dev/null +++ b/svc/pkg/workflow/db/workflow/migrations/20240626202744_add_tags.up.sql @@ -0,0 +1,26 @@ +-- Add tags +ALTER TABLE workflows +ADD COLUMN tags JSONB; + +CREATE INDEX gin_workflows_tags +ON workflows +USING GIN (tags); + + +-- Stores pending signals with tags +CREATE TABLE tagged_signals ( + signal_id UUID PRIMARY KEY, + tags JSONB NOT NULL, + signal_name TEXT NOT NULL, + + create_ts INT NOT NULL, + ray_id UUID NOT NULL, + + body JSONB NOT NULL, + + INDEX (signal_name) +); + +CREATE INDEX gin_tagged_signals_tags +ON tagged_signals +USING GIN (tags);