Skip to content

Commit

Permalink
fix(workflows): implement backoff for timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Sep 3, 2024
1 parent 3a27238 commit 2c260a9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 20 deletions.
4 changes: 2 additions & 2 deletions infra/tf/grafana/grafana_dashboards/chirp-workflow.json
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@
"uid": "prometheus"
},
"editorMode": "code",
"expr": "sum by (error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})",
"expr": "sum by (workflow_name, error_code) (rivet_chirp_workflow_dead{workflow_name=~\"[[workflow_name]]\"})",
"instant": false,
"legendFormat": "{{error_code}}",
"legendFormat": "(workflow_name) {{error_code}}",
"range": true,
"refId": "A"
}
Expand Down
2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl ActivityCtx {

let res = tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
.await
.map_err(|_| WorkflowError::OperationTimeout)?
.map_err(|_| WorkflowError::OperationTimeout(0))?
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw);

Expand Down
20 changes: 14 additions & 6 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl WorkflowCtx {

let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input))
.await
.map_err(|_| WorkflowError::ActivityTimeout);
.map_err(|_| WorkflowError::ActivityTimeout(0));

let dt = start_instant.elapsed().as_secs_f64();

Expand Down Expand Up @@ -709,15 +709,24 @@ impl WorkflowCtx {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(err)
} else {
// Add error count to the error
// Add error count to the error for backoff calculation
WorkflowError::ActivityFailure(err, error_count)
}
}
WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => {
WorkflowError::ActivityTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err))
} else {
err
// Add error count to the error for backoff calculation
WorkflowError::ActivityTimeout(error_count)
}
}
WorkflowError::OperationTimeout(_) => {
if error_count + 1 >= I::Activity::MAX_RETRIES {
WorkflowError::ActivityMaxFailuresReached(GlobalError::raw(err))
} else {
// Add error count to the error for backoff calculation
WorkflowError::OperationTimeout(error_count)
}
}
_ => err,
Expand Down Expand Up @@ -1275,8 +1284,7 @@ impl WorkflowCtx {
pub async fn sleep<T: DurationToMillis>(&mut self, duration: T) -> GlobalResult<()> {
let ts = rivet_util::timestamp::now() as u64 + duration.to_millis()?;

self.sleep_until(ts as i64)
.await
self.sleep_until(ts as i64).await
}

pub async fn sleep_until<T: TsToMillis>(&mut self, time: T) -> GlobalResult<()> {
Expand Down
20 changes: 9 additions & 11 deletions lib/chirp-workflow/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ pub enum WorkflowError {
Pools(#[from] rivet_pools::Error),

#[error("activity timed out")]
ActivityTimeout,
ActivityTimeout(usize),

#[error("operation timed out")]
OperationTimeout,
OperationTimeout(usize),

#[error("duplicate registered workflow: {0}")]
DuplicateRegisteredWorkflow(String),
Expand All @@ -140,7 +140,9 @@ impl WorkflowError {
/// Returns the next deadline for a workflow to be woken up again based on the error.
pub(crate) fn deadline_ts(&self) -> Option<i64> {
match self {
WorkflowError::ActivityFailure(_, error_count) => {
WorkflowError::ActivityFailure(_, error_count)
| WorkflowError::ActivityTimeout(error_count)
| WorkflowError::OperationTimeout(error_count) => {
// NOTE: Max retry is handled in `WorkflowCtx::activity`
let mut backoff =
rivet_util::Backoff::new_at(8, None, RETRY_TIMEOUT_MS, 500, *error_count);
Expand All @@ -157,10 +159,6 @@ impl WorkflowError {

Some(deadline_ts)
}
// TODO: Add backoff
WorkflowError::ActivityTimeout | WorkflowError::OperationTimeout => {
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT_MS as i64)
}
WorkflowError::Sleep(ts) => Some(*ts),
_ => None,
}
Expand All @@ -169,8 +167,8 @@ impl WorkflowError {
pub fn is_recoverable(&self) -> bool {
match self {
WorkflowError::ActivityFailure(_, _)
| WorkflowError::ActivityTimeout
| WorkflowError::OperationTimeout
| WorkflowError::ActivityTimeout(_)
| WorkflowError::OperationTimeout(_)
| WorkflowError::NoSignalFound(_)
| WorkflowError::SubWorkflowIncomplete(_)
| WorkflowError::Sleep(_) => true,
Expand All @@ -181,8 +179,8 @@ impl WorkflowError {
pub(crate) fn is_retryable(&self) -> bool {
match self {
WorkflowError::ActivityFailure(_, _)
| WorkflowError::ActivityTimeout
| WorkflowError::OperationTimeout => true,
| WorkflowError::ActivityTimeout(_)
| WorkflowError::OperationTimeout(_) => true,
_ => false,
}
}
Expand Down

0 comments on commit 2c260a9

Please sign in to comment.