Skip to content

Commit

Permalink
feat(das): add backoff to retry jobs (#2103)
Browse files Browse the repository at this point in the history
## Overview
Resolves  #2027
  • Loading branch information
walldiss authored Apr 27, 2023
1 parent fc691ea commit 71fc46d
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 25 deletions.
59 changes: 59 additions & 0 deletions das/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package das

import (
"time"
)

var (
// first retry attempt should happen after defaultBackoffInitialInterval
defaultBackoffInitialInterval = time.Minute
// next retry attempt will happen with delay of previous one multiplied by defaultBackoffMultiplier
defaultBackoffMultiplier = 4
// after defaultBackoffMaxRetryCount amount of attempts retry backoff interval will stop growing
// and each retry attempt will produce WARN log
defaultBackoffMaxRetryCount = 4
)

// retryStrategy defines a backoff for retries.
type retryStrategy struct {
// attempts delays will follow durations stored in retryIntervals
retryIntervals []time.Duration
}

// newRetryStrategy creates and initializes a new retry backoff.
func newRetryStrategy(retryIntervals []time.Duration) retryStrategy {
return retryStrategy{retryIntervals: retryIntervals}
}

// nextRetry creates a retry attempt with a backoff delay based on the retry backoff.
// It takes the number of retry attempts and the time of the last attempt as inputs and returns a
// retry instance and a boolean value indicating whether the retries amount have exceeded.
func (s retryStrategy) nextRetry(lastRetry retryAttempt, lastAttempt time.Time,
) (retry retryAttempt, retriesExceeded bool) {
lastRetry.count++

if len(s.retryIntervals) == 0 {
return lastRetry, false
}

if lastRetry.count > len(s.retryIntervals) {
// try count exceeded backoff try limit
lastRetry.after = lastAttempt.Add(s.retryIntervals[len(s.retryIntervals)-1])
return lastRetry, true
}

lastRetry.after = lastAttempt.Add(s.retryIntervals[lastRetry.count-1])
return lastRetry, false
}

// exponentialBackoff generates an array of time.Duration values using an exponential growth
// multiplier.
func exponentialBackoff(baseInterval time.Duration, multiplier, amount int) []time.Duration {
backoff := make([]time.Duration, 0, amount)
next := baseInterval
for i := 0; i < amount; i++ {
backoff = append(backoff, next)
next *= time.Duration(multiplier)
}
return backoff
}
108 changes: 108 additions & 0 deletions das/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package das

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func Test_exponentialBackoff(t *testing.T) {
type args struct {
baseInterval time.Duration
factor int
amount int
}
tests := []struct {
name string
args args
want []time.Duration
}{
{
name: "defaults",
args: args{
baseInterval: time.Minute,
factor: 4,
amount: 4,
},
want: []time.Duration{
time.Minute,
4 * time.Minute,
16 * time.Minute,
64 * time.Minute,
}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t,
tt.want, exponentialBackoff(tt.args.baseInterval, tt.args.factor, tt.args.amount),
"exponentialBackoff(%v, %v, %v)", tt.args.baseInterval, tt.args.factor, tt.args.amount)
})
}
}

func Test_retryStrategy_nextRetry(t *testing.T) {
tNow := time.Now()
type args struct {
retry retryAttempt
lastAttempt time.Time
}
tests := []struct {
name string
backoff retryStrategy
args args
wantRetry retryAttempt
wantRetriesExceeded bool
}{
{
name: "empty_strategy",
backoff: newRetryStrategy(nil),
args: args{
retry: retryAttempt{count: 1},
lastAttempt: tNow,
},
wantRetry: retryAttempt{
count: 2,
},
wantRetriesExceeded: false,
},
{
name: "before_limit",
backoff: newRetryStrategy([]time.Duration{time.Second, time.Minute}),
args: args{
retry: retryAttempt{count: 1},
lastAttempt: tNow,
},
wantRetry: retryAttempt{
count: 2,
after: tNow.Add(time.Minute),
},
wantRetriesExceeded: false,
},
{
name: "after_limit",
backoff: newRetryStrategy([]time.Duration{time.Second, time.Minute}),
args: args{
retry: retryAttempt{count: 2},
lastAttempt: tNow,
},
wantRetry: retryAttempt{
count: 3,
after: tNow.Add(time.Minute),
},
wantRetriesExceeded: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := retryStrategy{
retryIntervals: tt.backoff.retryIntervals,
}
gotRetry, gotRetriesExceeded := s.nextRetry(tt.args.retry, tt.args.lastAttempt)
assert.Equalf(t, tt.wantRetry, gotRetry,
"nextRetry(%v, %v)", tt.args.retry, tt.args.lastAttempt)
assert.Equalf(t, tt.wantRetriesExceeded, gotRetriesExceeded,
"nextRetry(%v, %v)", tt.args.retry, tt.args.lastAttempt)
})
}
}
74 changes: 50 additions & 24 deletions das/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package das
import (
"context"
"sync/atomic"
"time"

"github.com/celestiaorg/celestia-node/header"
)
Expand All @@ -16,11 +17,14 @@ type coordinatorState struct {

// keeps track of running workers
inProgress map[int]func() workerState

// retryStrategy implements retry backoff
retryStrategy retryStrategy
// stores heights of failed headers with amount of retry attempt as value
failed map[uint64]int
failed map[uint64]retryAttempt
// inRetry stores (height -> attempt count) of failed headers that are currently being retried by
// workers
inRetry map[uint64]int
inRetry map[uint64]retryAttempt

// nextJobID is a unique identifier that will be used for creation of next job
nextJobID int
Expand All @@ -35,14 +39,26 @@ type coordinatorState struct {
catchUpDoneCh chan struct{}
}

// retryAttempt represents a retry attempt with a backoff delay.
type retryAttempt struct {
// count specifies the number of retry attempts made so far.
count int
// after specifies the time for the next retry attempt.
after time.Time
}

// newCoordinatorState initiates state for samplingCoordinator
func newCoordinatorState(params Parameters) coordinatorState {
return coordinatorState{
sampleFrom: params.SampleFrom,
samplingRange: params.SamplingRange,
inProgress: make(map[int]func() workerState),
failed: make(map[uint64]int),
inRetry: make(map[uint64]int),
retryStrategy: newRetryStrategy(exponentialBackoff(
defaultBackoffInitialInterval,
defaultBackoffMultiplier,
defaultBackoffMaxRetryCount)),
failed: make(map[uint64]retryAttempt),
inRetry: make(map[uint64]retryAttempt),
nextJobID: 0,
next: params.SampleFrom,
networkHead: params.SampleFrom,
Expand All @@ -54,10 +70,10 @@ func (s *coordinatorState) resumeFromCheckpoint(c checkpoint) {
s.next = c.SampleFrom
s.networkHead = c.NetworkHead

for h := range c.Failed {
// TODO(@walldiss): reset retry counter to allow retries after restart. Will be removed when retry
// backoff is implemented.
s.failed[h] = 0
for h, count := range c.Failed {
s.failed[h] = retryAttempt{
count: count,
}
}
}

Expand All @@ -77,14 +93,19 @@ func (s *coordinatorState) handleResult(res result) {

// update failed heights
for h := range res.failed {
failCount := 1
if res.job.jobType == retryJob {
// if job was already in retry and failed again, persist attempt count
failCount += s.inRetry[h]
// if job was already in retry and failed again, carry over attempt count
lastRetry, ok := s.inRetry[h]
if ok {
delete(s.inRetry, h)
}

s.failed[h] = failCount
nextRetry, retryExceeded := s.retryStrategy.nextRetry(lastRetry, time.Now())
if retryExceeded {
log.Warnw("header exceeded maximum amount of sampling attempts",
"height", h,
"attempts", nextRetry.count)
}
s.failed[h] = nextRetry
}
s.checkDone()
}
Expand Down Expand Up @@ -125,15 +146,15 @@ func (s *coordinatorState) recentJob(header *header.ExtendedHeader) job {
}
}

// nextJob will return next catchup or retry job according to priority (catchup > retry)
// nextJob will return next catchup or retry job according to priority (retry -> catchup)
func (s *coordinatorState) nextJob() (next job, found bool) {
// check for catchup job
if job, found := s.catchupJob(); found {
// check for if any retry jobs are available
if job, found := s.retryJob(); found {
return job, found
}

// if caught up already, make a retry job
return s.retryJob()
// if no retry jobs, make a catchup job
return s.catchupJob()
}

// catchupJob creates a catchup job if catchup is not finished
Expand All @@ -153,15 +174,15 @@ func (s *coordinatorState) catchupJob() (next job, found bool) {

// retryJob creates a job to retry previously failed header
func (s *coordinatorState) retryJob() (next job, found bool) {
for h, count := range s.failed {
// TODO(@walldiss): limit max amount of retries until retry backoff is implemented
if count > 3 {
for h, attempt := range s.failed {
if !attempt.canRetry() {
// height will be retried later
continue
}

// move header from failed into retry
delete(s.failed, h)
s.inRetry[h] = count
s.inRetry[h] = attempt
j := s.newJob(retryJob, h, h)
return j, true
}
Expand Down Expand Up @@ -217,8 +238,8 @@ func (s *coordinatorState) unsafeStats() SamplingStats {
}

// set lowestFailedOrInProgress to minimum failed - 1
for h, count := range s.failed {
failed[h] += count
for h, retry := range s.failed {
failed[h] += retry.count
if h < lowestFailedOrInProgress {
lowestFailedOrInProgress = h
}
Expand Down Expand Up @@ -263,3 +284,8 @@ func (s *coordinatorState) waitCatchUp(ctx context.Context) error {
}
return nil
}

// canRetry returns true if the time stored in the "after" has passed.
func (r retryAttempt) canRetry() bool {
return r.after.Before(time.Now())
}
6 changes: 5 additions & 1 deletion das/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func Test_coordinatorStats(t *testing.T) {
}
},
},
failed: map[uint64]int{22: 1, 23: 1, 24: 2},
failed: map[uint64]retryAttempt{
22: {count: 1},
23: {count: 1},
24: {count: 2},
},
nextJobID: 0,
next: 31,
networkHead: 100,
Expand Down

0 comments on commit 71fc46d

Please sign in to comment.