Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(workflows): get workflows working again with new history management #1210

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,27 +78,10 @@ futures_util::stream::iter(iter)
.await?;
```

If you plan on running more than one workflow step in each future, use a closure instead:
If you plan on running more than one workflow step in each future, use a `closure` stub.

```rust
let iter = actions.into_iter().map(|action| {
(
closure(|ctx| async move {
ctx.activity(MyActivityInput {
action,
}).await?;
}).await
)(ctx)
});

futures_util::stream::iter(iter)
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
```

Note that the first example would also work with a branch, but its a bit overkill as it creates a new layer in
the internal location.
Note that the first example would also work with a `closure`, but its a bit overkill as it creates a new layer
in the internal location.

> **\*** Even if they did know about each other via atomics, there is no guarantee of consistency from
> `buffer_unordered`. Preemptively incrementing the location ensures consistency regardless of the order or
Expand Down
2 changes: 1 addition & 1 deletion infra/tf/k8s_infra/cockroachdb.tf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ resource "helm_release" "cockroachdb" {
namespace = kubernetes_namespace.cockroachdb[0].metadata.0.name
repository = "https://charts.cockroachdb.com/"
chart = "cockroachdb"
version = "11.1.5" # v23.1.9
version = "14.0.4" # v24.2.3
values = [yamlencode({
statefulset = {
replicas = local.service_cockroachdb.count
Expand Down
26 changes: 9 additions & 17 deletions lib/bolt/core/src/tasks/wf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,21 +450,17 @@ pub async fn print_history(

println!(
"{} {}",
style(workflow_name).yellow().bold(),
style(workflow_id).yellow()
style(workflow_name).blue().bold(),
style(workflow_id).blue()
);

print!(
" {} {}",
style("╰").yellow().dim(),
style("input").yellow()
);
print!(" {} {}", style("╰").blue().dim(), style("input").blue());

let input = serde_json::to_string(&input)?;
let input_trim = input.chars().take(50).collect::<String>();
print!(" {}", style(input_trim).yellow().dim());
print!(" {}", style(input_trim).blue().dim());
if input.len() > 50 {
print!(" {}", style("...").yellow().dim());
print!(" {}", style("...").blue().dim());
}

println!("\n");
Expand Down Expand Up @@ -648,19 +644,15 @@ pub async fn print_history(

// Print footer
if let Some(output) = output {
println!("{}", style("Workflow complete").yellow().bold());
println!("{}", style("Workflow complete").green().bold());

print!(
" {} {}",
style("╰").yellow().dim(),
style("output").yellow()
);
print!(" {} {}", style("╰").green().dim(), style("output").green());

let output = serde_json::to_string(&output)?;
let output_trim = output.chars().take(50).collect::<String>();
print!(" {}", style(output_trim).yellow().dim());
print!(" {}", style(output_trim).green().dim());
if output.len() > 50 {
print!(" {}", style("...").yellow().dim());
print!(" {}", style("...").green().dim());
}

println!("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where
.map_err(GlobalError::raw)?;
let mut branch = self
.ctx
.branch_inner(Arc::new(input_val), self.version, None)
.custom_branch(Arc::new(input_val), self.version)
.await
.map_err(GlobalError::raw)?;

Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
message::{MessageCtx, SubscriptionHandle},
},
db::{DatabaseHandle, DatabasePgNats},
message::Message,
message::{AsTags, Message},
operation::{Operation, OperationInput},
signal::Signal,
workflow::{Workflow, WorkflowInput},
Expand Down Expand Up @@ -91,7 +91,7 @@ where

pub async fn subscribe<M, B>(
ctx: &rivet_operation::OperationContext<B>,
tags: &serde_json::Value,
tags: impl AsTags,
) -> GlobalResult<SubscriptionHandle<M>>
where
M: Message,
Expand Down
14 changes: 4 additions & 10 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
},
db::DatabaseHandle,
error::WorkflowResult,
message::{Message, NatsMessage},
message::{AsTags, Message, NatsMessage},
operation::{Operation, OperationInput},
signal::Signal,
workflow::{Workflow, WorkflowInput},
Expand Down Expand Up @@ -117,10 +117,7 @@ impl ApiCtx {
builder::message::MessageBuilder::new(&self.msg_ctx, body)
}

pub async fn subscribe<M>(
&self,
tags: &serde_json::Value,
) -> GlobalResult<SubscriptionHandle<M>>
pub async fn subscribe<M>(&self, tags: impl AsTags) -> GlobalResult<SubscriptionHandle<M>>
where
M: Message,
{
Expand All @@ -130,10 +127,7 @@ impl ApiCtx {
.map_err(GlobalError::raw)
}

pub async fn tail_read<M>(
&self,
tags: serde_json::Value,
) -> GlobalResult<Option<NatsMessage<M>>>
pub async fn tail_read<M>(&self, tags: impl AsTags) -> GlobalResult<Option<NatsMessage<M>>>
where
M: Message,
{
Expand All @@ -145,7 +139,7 @@ impl ApiCtx {

pub async fn tail_anchor<M>(
&self,
tags: serde_json::Value,
tags: impl AsTags,
anchor: &TailAnchor,
) -> GlobalResult<TailAnchorResponse<M>>
where
Expand Down
4 changes: 0 additions & 4 deletions lib/chirp-workflow/core/src/ctx/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ impl WorkflowBackfillCtx {
branch
}

// TODO:
// pub fn set_location(&mut self, location: &Location) {
// }

pub fn finalize(&mut self) {
let wake_immediate = true;

Expand Down
4 changes: 4 additions & 0 deletions lib/chirp-workflow/core/src/ctx/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ impl<'a> ListenCtx<'a> {
}
}

pub(crate) fn reset(&mut self) {
self.used = false;
}

/// Checks for a signal to this workflow with any of the given signal names.
/// - Will error if called more than once.
pub async fn listen_any(
Expand Down
64 changes: 23 additions & 41 deletions lib/chirp-workflow/core/src/ctx/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use uuid::Uuid;

use crate::{
error::{WorkflowError, WorkflowResult},
message::{redis_keys, Message, NatsMessage, NatsMessageWrapper},
message::{redis_keys, AsTags, Message, NatsMessage, NatsMessageWrapper},
utils,
};

Expand Down Expand Up @@ -56,7 +56,11 @@ impl MessageCtx {
/// service should need to wait or fail if a message does not publish
/// successfully.
#[tracing::instrument(err, skip_all, fields(message = M::NAME))]
pub async fn message<M>(&self, tags: serde_json::Value, message_body: M) -> WorkflowResult<()>
pub async fn message<M>(
&self,
tags: impl AsTags + 'static,
message_body: M,
) -> WorkflowResult<()>
where
M: Message,
{
Expand Down Expand Up @@ -87,15 +91,11 @@ impl MessageCtx {
/// messages at once so we put the messages in a queue instead of submitting
/// a large number of tasks to Tokio at once.
#[tracing::instrument(err, skip_all, fields(message = M::NAME))]
pub async fn message_wait<M>(
&self,
tags: serde_json::Value,
message_body: M,
) -> WorkflowResult<()>
pub async fn message_wait<M>(&self, tags: impl AsTags, message_body: M) -> WorkflowResult<()>
where
M: Message,
{
let tags_str = cjson::to_string(&tags).map_err(WorkflowError::SerializeMessageTags)?;
let tags_str = tags.as_cjson_tags()?;
let nats_subject = M::nats_subject();
let duration_since_epoch = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
Expand All @@ -114,30 +114,18 @@ impl MessageCtx {
let message = NatsMessageWrapper {
req_id: req_id,
ray_id: self.ray_id,
tags,
tags: tags.as_tags()?,
ts,
allow_recursive: false, // TODO:
body: &body_buf,
};
let message_buf = serde_json::to_vec(&message).map_err(WorkflowError::SerializeMessage)?;

// TODO: opts.dont_log_body
if true {
tracing::info!(
%nats_subject,
body_bytes = ?body_buf_len,
message_bytes = ?message_buf.len(),
"publish message"
);
} else {
tracing::info!(
%nats_subject,
?message_body,
body_bytes = ?body_buf_len,
message_bytes = ?message_buf.len(),
"publish message"
);
}
tracing::info!(
%nats_subject,
body_bytes = ?body_buf_len,
message_bytes = ?message_buf.len(),
"publish message"
);

// Write to Redis and NATS.
//
Expand Down Expand Up @@ -241,15 +229,12 @@ impl MessageCtx {
impl MessageCtx {
/// Listens for Chirp workflow messages globally on NATS.
#[tracing::instrument(level = "debug", err, skip_all)]
pub async fn subscribe<M>(
&self,
tags: &serde_json::Value,
) -> WorkflowResult<SubscriptionHandle<M>>
pub async fn subscribe<M>(&self, tags: impl AsTags) -> WorkflowResult<SubscriptionHandle<M>>
where
M: Message,
{
self.subscribe_opt::<M>(SubscribeOpts {
tags,
tags: tags.as_tags()?,
flush_nats: true,
})
.await
Expand All @@ -259,7 +244,7 @@ impl MessageCtx {
#[tracing::instrument(err, skip_all, fields(message = M::NAME))]
pub async fn subscribe_opt<M>(
&self,
opts: SubscribeOpts<'_>,
opts: SubscribeOpts,
) -> WorkflowResult<SubscriptionHandle<M>>
where
M: Message,
Expand Down Expand Up @@ -287,17 +272,14 @@ impl MessageCtx {

/// Reads the tail message of a stream without waiting for a message.
#[tracing::instrument(err, skip_all, fields(message = M::NAME))]
pub async fn tail_read<M>(
&self,
tags: serde_json::Value,
) -> WorkflowResult<Option<NatsMessage<M>>>
pub async fn tail_read<M>(&self, tags: impl AsTags) -> WorkflowResult<Option<NatsMessage<M>>>
where
M: Message,
{
let mut conn = self.redis_chirp_ephemeral.clone();

// Fetch message
let tags_str = cjson::to_string(&tags).map_err(WorkflowError::SerializeMessageTags)?;
let tags_str = tags.as_cjson_tags()?;
let tail_key = redis_keys::message_tail::<M>(&tags_str);
let message_buf = conn
.hget::<_, _, Option<Vec<u8>>>(&tail_key, redis_keys::message_tail::BODY)
Expand Down Expand Up @@ -341,7 +323,7 @@ impl MessageCtx {
#[tracing::instrument(err, skip_all, fields(message = M::NAME))]
pub async fn tail_anchor<M>(
&self,
tags: serde_json::Value,
tags: impl AsTags,
anchor: &TailAnchor,
) -> WorkflowResult<TailAnchorResponse<M>>
where
Expand Down Expand Up @@ -379,8 +361,8 @@ impl MessageCtx {
}

#[derive(Debug)]
pub struct SubscribeOpts<'a> {
pub tags: &'a serde_json::Value,
pub struct SubscribeOpts {
pub tags: serde_json::Value,
pub flush_nats: bool,
}

Expand Down
14 changes: 4 additions & 10 deletions lib/chirp-workflow/core/src/ctx/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
},
db::DatabaseHandle,
error::WorkflowResult,
message::{Message, NatsMessage},
message::{AsTags, Message, NatsMessage},
operation::{Operation, OperationInput},
signal::Signal,
workflow::{Workflow, WorkflowInput},
Expand Down Expand Up @@ -120,10 +120,7 @@ impl StandaloneCtx {
builder::message::MessageBuilder::new(&self.msg_ctx, body)
}

pub async fn subscribe<M>(
&self,
tags: &serde_json::Value,
) -> GlobalResult<SubscriptionHandle<M>>
pub async fn subscribe<M>(&self, tags: impl AsTags) -> GlobalResult<SubscriptionHandle<M>>
where
M: Message,
{
Expand All @@ -133,10 +130,7 @@ impl StandaloneCtx {
.map_err(GlobalError::raw)
}

pub async fn tail_read<M>(
&self,
tags: serde_json::Value,
) -> GlobalResult<Option<NatsMessage<M>>>
pub async fn tail_read<M>(&self, tags: impl AsTags) -> GlobalResult<Option<NatsMessage<M>>>
where
M: Message,
{
Expand All @@ -148,7 +142,7 @@ impl StandaloneCtx {

pub async fn tail_anchor<M>(
&self,
tags: serde_json::Value,
tags: impl AsTags,
anchor: &TailAnchor,
) -> GlobalResult<TailAnchorResponse<M>>
where
Expand Down
Loading
Loading