Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflows): add metrics #1008

Merged
merged 1 commit into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,121 changes: 1,121 additions & 0 deletions infra/tf/grafana/grafana_dashboards/chirp-workflow.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions lib/chirp-workflow/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ formatted-error = { path = "../../formatted-error" }
futures-util = "0.3"
global-error = { path = "../../global-error" }
indoc = "2.0.5"
lazy_static = "1.4"
prost = "0.12.4"
prost-types = "0.12.4"
rand = "0.8.5"
Expand Down
5 changes: 5 additions & 0 deletions lib/chirp-workflow/core/src/ctx/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ impl<'a> ListenCtx<'a> {
return Err(WorkflowError::NoSignalFound(Box::from(signal_names)));
};

let recv_lag = (rivet_util::timestamp::now() as f64 - signal.create_ts as f64) / 1000.;
crate::metrics::SIGNAL_RECV_LAG
.with_label_values(&[&self.ctx.name(), &signal.signal_name])
.observe(recv_lag);

tracing::info!(
workflow_name=%self.ctx.name(),
workflow_id=%self.ctx.workflow_id(),
Expand Down
11 changes: 11 additions & 0 deletions lib/chirp-workflow/core/src/ctx/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@ impl MessageCtx {
let message = if let Some(message_buf) = message_buf {
let message = ReceivedMessage::<M>::deserialize(message_buf.as_slice())?;
tracing::info!(?message, "immediate read tail message");

let recv_lag = (rivet_util::timestamp::now() as f64 - message.ts as f64) / 1000.;
crate::metrics::MESSAGE_RECV_LAG
.with_label_values(&[M::NAME])
.observe(recv_lag);

Some(message)
} else {
tracing::info!("no tail message to read");
Expand Down Expand Up @@ -520,6 +526,11 @@ where
let message = ReceivedMessage::<M>::deserialize(&nats_message.payload[..])?;
tracing::info!(?message, "received message");

let recv_lag = (rivet_util::timestamp::now() as f64 - message.ts as f64) / 1000.;
crate::metrics::MESSAGE_RECV_LAG
.with_label_values(&[M::NAME])
.observe(recv_lag);

return Ok(message);
}

Expand Down
48 changes: 40 additions & 8 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Instant};

use global_error::{GlobalError, GlobalResult};
use serde::{de::DeserializeOwned, Serialize};
Expand All @@ -15,9 +15,10 @@ use crate::{
executable::{closure, AsyncResult, Executable},
listen::{CustomListener, Listen},
message::Message,
metrics,
registry::RegistryHandle,
signal::Signal,
util::Location,
util::{GlobalErrorExt, Location},
workflow::{Workflow, WorkflowInput},
};

Expand Down Expand Up @@ -211,7 +212,7 @@ impl WorkflowCtx {
// Retry the workflow if its recoverable
let deadline_ts = if let Some(deadline_ts) = err.backoff() {
Some(deadline_ts)
} else if err.is_recoverable() {
} else if err.is_retryable() {
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64)
} else {
None
Expand All @@ -225,11 +226,14 @@ impl WorkflowCtx {
// finish. This workflow will be retried when the sub workflow completes
let wake_sub_workflow = err.sub_workflow();

if deadline_ts.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some()
{
if err.is_recoverable() && !err.is_retryable() {
tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping");
} else {
tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");

metrics::WORKFLOW_ERRORS
.with_label_values(&[&self.name, err.to_string().as_str()])
.inc();
}

let err_str = err.to_string();
Expand Down Expand Up @@ -288,10 +292,14 @@ impl WorkflowCtx {
A::NAME,
);

let start_instant = Instant::now();

let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input))
.await
.map_err(|_| WorkflowError::ActivityTimeout);

let dt = start_instant.elapsed().as_secs_f64();

match res {
Ok(Ok(output)) => {
tracing::debug!("activity success");
Expand All @@ -313,45 +321,69 @@ impl WorkflowCtx {
)
.await?;

metrics::ACTIVITY_DURATION
.with_label_values(&[&self.name, A::NAME, ""])
.observe(dt);

Ok(output)
}
Ok(Err(err)) => {
tracing::debug!(?err, "activity error");

// Write error (failed state)
let err_str = err.to_string();
let input_val =
serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?;

// Write error (failed state)
self.db
.commit_workflow_activity_event(
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Err(&err.to_string()),
Err(&err_str),
self.loop_location(),
)
.await?;

if !err.is_workflow_recoverable() {
metrics::ACTIVITY_ERRORS
.with_label_values(&[&self.name, A::NAME, &err_str])
.inc();
}
metrics::ACTIVITY_DURATION
.with_label_values(&[&self.name, A::NAME, &err_str])
.observe(dt);

Err(WorkflowError::ActivityFailure(err, 0))
}
Err(err) => {
tracing::debug!("activity timeout");

let err_str = err.to_string();
let input_val =
serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?;

self.db
.commit_workflow_activity_event(
self.workflow_id,
self.full_location().as_ref(),
activity_id,
create_ts,
input_val,
Err(&err.to_string()),
Err(&err_str),
self.loop_location(),
)
.await?;

metrics::ACTIVITY_ERRORS
.with_label_values(&[&self.name, A::NAME, &err_str])
.inc();
metrics::ACTIVITY_DURATION
.with_label_values(&[&self.name, A::NAME, &err_str])
.observe(dt);

Err(err)
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/chirp-workflow/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ pub struct SignalRow {
pub signal_id: Uuid,
pub signal_name: String,
pub body: serde_json::Value,
pub create_ts: i64,
}

#[derive(sqlx::FromRow)]
Expand Down
Loading
Loading