Skip to content

Commit

Permalink
fix(workflows): improve sleep accuracy
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Aug 30, 2024
1 parent 19e877b commit 33da306
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl WorkflowCtx {
// Run workflow
match (workflow.run)(self).await {
Ok(output) => {
tracing::info!(name=%self.name, id=%self.workflow_id, "workflow success");
tracing::info!(name=%self.name, id=%self.workflow_id, "workflow completed");

let mut retries = 0;
let mut interval = tokio::time::interval(DB_ACTION_RETRY);
Expand Down Expand Up @@ -1273,13 +1273,17 @@ impl WorkflowCtx {
}

pub async fn sleep<T: DurationToMillis>(&mut self, duration: T) -> GlobalResult<()> {
let ts = rivet_util::timestamp::now() as u64 + duration.to_millis()?;
let now = rivet_util::timestamp::now();
let ts = now + i64::try_from(duration.to_millis()?)?;

self.sleep_until(ts as i64)
.await
self.sleep_inner(now, ts).await
}

pub async fn sleep_until<T: TsToMillis>(&mut self, ts: T) -> GlobalResult<()> {
self.sleep_inner(rivet_util::timestamp::now(), ts).await
}

pub async fn sleep_until<T: TsToMillis>(&mut self, time: T) -> GlobalResult<()> {
async fn sleep_inner<T: TsToMillis>(&mut self, now: i64, ts: T) -> GlobalResult<()> {
let event = self.relevant_history().nth(self.location_idx);

// Slept before
Expand All @@ -1299,7 +1303,7 @@ impl WorkflowCtx {
}
// Sleep
else {
let deadline_ts = time.to_millis()?;
let deadline_ts = ts.to_millis()?;

self.db
.commit_workflow_sleep_event(
Expand All @@ -1313,12 +1317,12 @@ impl WorkflowCtx {
(deadline_ts, false)
};

let duration = deadline_ts.saturating_sub(rivet_util::timestamp::now());
let duration = deadline_ts.saturating_sub(now);

// No-op
if duration < 0 {
if !replay {
tracing::warn!("tried to sleep for a negative duration");
tracing::warn!(name=%self.name, id=%self.workflow_id, %duration, "tried to sleep for a negative duration");
}
} else if duration < worker::TICK_INTERVAL.as_millis() as i64 + 1 {
tracing::info!(name=%self.name, id=%self.workflow_id, %deadline_ts, "sleeping in memory");
Expand Down

0 comments on commit 33da306

Please sign in to comment.