diff --git a/das/backoff.go b/das/backoff.go new file mode 100644 index 0000000000..f92c04d1ef --- /dev/null +++ b/das/backoff.go @@ -0,0 +1,59 @@ +package das + +import ( + "time" +) + +var ( + // first retry attempt should happen after defaultBackoffInitialInterval + defaultBackoffInitialInterval = time.Minute + // next retry attempt will happen with delay of previous one multiplied by defaultBackoffMultiplier + defaultBackoffMultiplier = 4 + // after defaultBackoffMaxRetryCount amount of attempts retry backoff interval will stop growing + // and each retry attempt will produce WARN log + defaultBackoffMaxRetryCount = 4 +) + +// retryStrategy defines a backoff for retries. +type retryStrategy struct { + // attempts delays will follow durations stored in retryIntervals + retryIntervals []time.Duration +} + +// newRetryStrategy creates and initializes a new retry backoff. +func newRetryStrategy(retryIntervals []time.Duration) retryStrategy { + return retryStrategy{retryIntervals: retryIntervals} +} + +// nextRetry creates a retry attempt with a backoff delay based on the retry backoff. +// It takes the number of retry attempts and the time of the last attempt as inputs and returns a +// retry instance and a boolean value indicating whether the retries amount have exceeded. +func (s retryStrategy) nextRetry(lastRetry retryAttempt, lastAttempt time.Time, +) (retry retryAttempt, retriesExceeded bool) { + lastRetry.count++ + + if len(s.retryIntervals) == 0 { + return lastRetry, false + } + + if lastRetry.count > len(s.retryIntervals) { + // try count exceeded backoff try limit + lastRetry.after = lastAttempt.Add(s.retryIntervals[len(s.retryIntervals)-1]) + return lastRetry, true + } + + lastRetry.after = lastAttempt.Add(s.retryIntervals[lastRetry.count-1]) + return lastRetry, false +} + +// exponentialBackoff generates an array of time.Duration values using an exponential growth +// multiplier. +func exponentialBackoff(baseInterval time.Duration, multiplier, amount int) []time.Duration { + backoff := make([]time.Duration, 0, amount) + next := baseInterval + for i := 0; i < amount; i++ { + backoff = append(backoff, next) + next *= time.Duration(multiplier) + } + return backoff +} diff --git a/das/backoff_test.go b/das/backoff_test.go new file mode 100644 index 0000000000..e032ec175a --- /dev/null +++ b/das/backoff_test.go @@ -0,0 +1,108 @@ +package das + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_exponentialBackoff(t *testing.T) { + type args struct { + baseInterval time.Duration + factor int + amount int + } + tests := []struct { + name string + args args + want []time.Duration + }{ + { + name: "defaults", + args: args{ + baseInterval: time.Minute, + factor: 4, + amount: 4, + }, + want: []time.Duration{ + time.Minute, + 4 * time.Minute, + 16 * time.Minute, + 64 * time.Minute, + }}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, + tt.want, exponentialBackoff(tt.args.baseInterval, tt.args.factor, tt.args.amount), + "exponentialBackoff(%v, %v, %v)", tt.args.baseInterval, tt.args.factor, tt.args.amount) + }) + } +} + +func Test_retryStrategy_nextRetry(t *testing.T) { + tNow := time.Now() + type args struct { + retry retryAttempt + lastAttempt time.Time + } + tests := []struct { + name string + backoff retryStrategy + args args + wantRetry retryAttempt + wantRetriesExceeded bool + }{ + { + name: "empty_strategy", + backoff: newRetryStrategy(nil), + args: args{ + retry: retryAttempt{count: 1}, + lastAttempt: tNow, + }, + wantRetry: retryAttempt{ + count: 2, + }, + wantRetriesExceeded: false, + }, + { + name: "before_limit", + backoff: newRetryStrategy([]time.Duration{time.Second, time.Minute}), + args: args{ + retry: retryAttempt{count: 1}, + lastAttempt: tNow, + }, + wantRetry: retryAttempt{ + count: 2, + after: tNow.Add(time.Minute), + }, + wantRetriesExceeded: false, + }, + { + name: "after_limit", + backoff: newRetryStrategy([]time.Duration{time.Second, time.Minute}), + args: args{ + retry: retryAttempt{count: 2}, + lastAttempt: tNow, + }, + wantRetry: retryAttempt{ + count: 3, + after: tNow.Add(time.Minute), + }, + wantRetriesExceeded: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := retryStrategy{ + retryIntervals: tt.backoff.retryIntervals, + } + gotRetry, gotRetriesExceeded := s.nextRetry(tt.args.retry, tt.args.lastAttempt) + assert.Equalf(t, tt.wantRetry, gotRetry, + "nextRetry(%v, %v)", tt.args.retry, tt.args.lastAttempt) + assert.Equalf(t, tt.wantRetriesExceeded, gotRetriesExceeded, + "nextRetry(%v, %v)", tt.args.retry, tt.args.lastAttempt) + }) + } +} diff --git a/das/state.go b/das/state.go index 3fddb7ce17..12de2a0b01 100644 --- a/das/state.go +++ b/das/state.go @@ -3,6 +3,7 @@ package das import ( "context" "sync/atomic" + "time" "github.com/celestiaorg/celestia-node/header" ) @@ -16,11 +17,14 @@ type coordinatorState struct { // keeps track of running workers inProgress map[int]func() workerState + + // retryStrategy implements retry backoff + retryStrategy retryStrategy // stores heights of failed headers with amount of retry attempt as value - failed map[uint64]int + failed map[uint64]retryAttempt // inRetry stores (height -> attempt count) of failed headers that are currently being retried by // workers - inRetry map[uint64]int + inRetry map[uint64]retryAttempt // nextJobID is a unique identifier that will be used for creation of next job nextJobID int @@ -35,14 +39,26 @@ type coordinatorState struct { catchUpDoneCh chan struct{} } +// retryAttempt represents a retry attempt with a backoff delay. +type retryAttempt struct { + // count specifies the number of retry attempts made so far. + count int + // after specifies the time for the next retry attempt. + after time.Time +} + // newCoordinatorState initiates state for samplingCoordinator func newCoordinatorState(params Parameters) coordinatorState { return coordinatorState{ sampleFrom: params.SampleFrom, samplingRange: params.SamplingRange, inProgress: make(map[int]func() workerState), - failed: make(map[uint64]int), - inRetry: make(map[uint64]int), + retryStrategy: newRetryStrategy(exponentialBackoff( + defaultBackoffInitialInterval, + defaultBackoffMultiplier, + defaultBackoffMaxRetryCount)), + failed: make(map[uint64]retryAttempt), + inRetry: make(map[uint64]retryAttempt), nextJobID: 0, next: params.SampleFrom, networkHead: params.SampleFrom, @@ -54,10 +70,10 @@ func (s *coordinatorState) resumeFromCheckpoint(c checkpoint) { s.next = c.SampleFrom s.networkHead = c.NetworkHead - for h := range c.Failed { - // TODO(@walldiss): reset retry counter to allow retries after restart. Will be removed when retry - // backoff is implemented. - s.failed[h] = 0 + for h, count := range c.Failed { + s.failed[h] = retryAttempt{ + count: count, + } } } @@ -77,14 +93,19 @@ func (s *coordinatorState) handleResult(res result) { // update failed heights for h := range res.failed { - failCount := 1 - if res.job.jobType == retryJob { - // if job was already in retry and failed again, persist attempt count - failCount += s.inRetry[h] + // if job was already in retry and failed again, carry over attempt count + lastRetry, ok := s.inRetry[h] + if ok { delete(s.inRetry, h) } - s.failed[h] = failCount + nextRetry, retryExceeded := s.retryStrategy.nextRetry(lastRetry, time.Now()) + if retryExceeded { + log.Warnw("header exceeded maximum amount of sampling attempts", + "height", h, + "attempts", nextRetry.count) + } + s.failed[h] = nextRetry } s.checkDone() } @@ -125,15 +146,15 @@ func (s *coordinatorState) recentJob(header *header.ExtendedHeader) job { } } -// nextJob will return next catchup or retry job according to priority (catchup > retry) +// nextJob will return next catchup or retry job according to priority (retry -> catchup) func (s *coordinatorState) nextJob() (next job, found bool) { - // check for catchup job - if job, found := s.catchupJob(); found { + // check for if any retry jobs are available + if job, found := s.retryJob(); found { return job, found } - // if caught up already, make a retry job - return s.retryJob() + // if no retry jobs, make a catchup job + return s.catchupJob() } // catchupJob creates a catchup job if catchup is not finished @@ -153,15 +174,15 @@ func (s *coordinatorState) catchupJob() (next job, found bool) { // retryJob creates a job to retry previously failed header func (s *coordinatorState) retryJob() (next job, found bool) { - for h, count := range s.failed { - // TODO(@walldiss): limit max amount of retries until retry backoff is implemented - if count > 3 { + for h, attempt := range s.failed { + if !attempt.canRetry() { + // height will be retried later continue } // move header from failed into retry delete(s.failed, h) - s.inRetry[h] = count + s.inRetry[h] = attempt j := s.newJob(retryJob, h, h) return j, true } @@ -217,8 +238,8 @@ func (s *coordinatorState) unsafeStats() SamplingStats { } // set lowestFailedOrInProgress to minimum failed - 1 - for h, count := range s.failed { - failed[h] += count + for h, retry := range s.failed { + failed[h] += retry.count if h < lowestFailedOrInProgress { lowestFailedOrInProgress = h } @@ -263,3 +284,8 @@ func (s *coordinatorState) waitCatchUp(ctx context.Context) error { } return nil } + +// canRetry returns true if the time stored in the "after" has passed. +func (r retryAttempt) canRetry() bool { + return r.after.Before(time.Now()) +} diff --git a/das/state_test.go b/das/state_test.go index 1bef1ba223..57425082eb 100644 --- a/das/state_test.go +++ b/das/state_test.go @@ -47,7 +47,11 @@ func Test_coordinatorStats(t *testing.T) { } }, }, - failed: map[uint64]int{22: 1, 23: 1, 24: 2}, + failed: map[uint64]retryAttempt{ + 22: {count: 1}, + 23: {count: 1}, + 24: {count: 2}, + }, nextJobID: 0, next: 31, networkHead: 100,