Skip to content

Commit

Permalink
calculate retries based off current time, not job start time
Browse files Browse the repository at this point in the history
In a demo app, I was seeing a bunch of errors with this message:

> Retry policy returned invalid next retry before current time; using
> default retry policy instead

The only problem is, I _was_ using the default retry policy for this
worker and client. These log lines manifested because the
`DefaultClientRetryPolicy` is calculating the next attempt based on:

    job.AttemptedAt + backoffDuration

This isn't really noticeable if the job is super quick, but if like my
demo app your job is taking 10s of seconds or longer, the next attempt
could actually be scheduled for far in the past instead of in the
future, making it run again immediately instead of after the backoff
duration.

This commit changes the behavior to only look at the current time
(`time.Now().UTC()`) when determining when the next attempt should be,
without regard to when the last attempt began.
  • Loading branch information
bgentry committed Nov 26, 2023
1 parent 2df95b2 commit f25a4ec
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- **DATABASE MIGRATION**: Database schema v3 was introduced in v0.0.8 and contained an obvious flaw preventing it from running against existing tables. This migration was altered to execute the migration in multiple steps.

### Changed

- `DefaultClientRetryPolicy`: calculate the next attempt based on the current time instead of the time the prior attempt began.

## [0.0.8] - 2023-11-21

### Changed
Expand Down
4 changes: 3 additions & 1 deletion job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,12 @@ func (e *jobExecutor) reportError(ctx context.Context) {
if nextRetryScheduledAt.IsZero() {
nextRetryScheduledAt = e.ClientRetryPolicy.NextRetry(e.JobRow)
}
if nextRetryScheduledAt.Before(time.Now()) {
now := time.Now()
if nextRetryScheduledAt.Before(now) {
e.Logger.WarnContext(ctx,
e.Name+": Retry policy returned invalid next retry before current time; using default retry policy instead",
slog.Time("next_retry_scheduled_at", nextRetryScheduledAt),
slog.Time("now", now),
)
nextRetryScheduledAt = (&DefaultClientRetryPolicy{}).NextRetry(e.JobRow)
}
Expand Down
15 changes: 12 additions & 3 deletions retry_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type ClientRetryPolicy interface {

// River's default retry policy.
type DefaultClientRetryPolicy struct {
rand *rand.Rand
randMu sync.RWMutex
rand *rand.Rand
randMu sync.RWMutex
timeNowFunc func() time.Time
}

// NextRetry gets the next retry given for the given job, accounting for when it
Expand All @@ -52,7 +53,15 @@ func (p *DefaultClientRetryPolicy) NextRetry(job *rivertype.JobRow) time.Time {
// Note that we explicitly add 1 here, because the error hasn't been appended
// yet at the time this is called (that happens in the completer).
errorCount := len(job.Errors) + 1
return job.AttemptedAt.Add(timeutil.SecondsAsDuration(p.retrySeconds(errorCount)))
return p.timeNowUTC().Add(timeutil.SecondsAsDuration(p.retrySeconds(errorCount)))
}

func (p *DefaultClientRetryPolicy) timeNowUTC() time.Time {
if p.timeNowFunc != nil {
return p.timeNowFunc()
}

return time.Now().UTC()
}

// Lazily marshals a random source. Written this way instead of using a
Expand Down
5 changes: 3 additions & 2 deletions retry_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ var _ ClientRetryPolicy = &DefaultClientRetryPolicy{}
func TestDefaultClientRetryPolicy_NextRetry(t *testing.T) {
t.Parallel()

now := time.Now()
retryPolicy := &DefaultClientRetryPolicy{}
now := time.Now().UTC()
timeNowFunc := func() time.Time { return now }
retryPolicy := &DefaultClientRetryPolicy{timeNowFunc: timeNowFunc}

for attempt := 1; attempt < 10; attempt++ {
retrySecondsWithoutJitter := retryPolicy.retrySecondsWithoutJitter(attempt)
Expand Down

0 comments on commit f25a4ec

Please sign in to comment.