From 8fe0b2db650fe4f0e3dd5ad91b66bdc82aafcca3 Mon Sep 17 00:00:00 2001 From: toga4 <81744248+toga4@users.noreply.github.com> Date: Wed, 18 May 2022 17:42:31 +0900 Subject: [PATCH] Change the backoff interfaces to allow limiting number of attempts --- README.md | 2 +- .../gax-go.v2/gaxbackoff/backoff.go | 45 ++++++++++++-- .../gax-go.v2/gaxbackoff/backoff_test.go | 35 +++++------ .../googleapis/gax-go.v2/gaxbackoff/go.mod | 2 + .../backoff.v2/lestrratbackoff/backoff.go | 8 +-- .../backoff.v2/lestrratbackoff/constant.go | 12 ++-- .../lestrratbackoff/constant_test.go | 34 ++++++----- .../backoff.v2/lestrratbackoff/exponential.go | 10 ++-- .../lestrratbackoff/exponential_test.go | 58 ++++++------------- .../backoff.v2/lestrratbackoff/go.mod | 2 + backoff.go | 10 ++-- examples/go.mod | 6 ++ transport.go | 40 +++++++------ transport_test.go | 27 ++++++--- 14 files changed, 170 insertions(+), 121 deletions(-) diff --git a/README.md b/README.md index 82df28e..398f49e 100644 --- a/README.md +++ b/README.md @@ -55,4 +55,4 @@ Some implementation for using 3rd-party backoff libraries are provided. See [ada ## Caveats -- No limit on the number of attempts or the length of retries. Instead, use `http.Client.Timeout` or `context.WithTimeout` to limit the length of retries. +- No limit on the length of retries. Instead, use `http.Client.Timeout` or `context.WithTimeout` to limit the length of retries. diff --git a/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff.go b/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff.go index f5ba251..22b9160 100644 --- a/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff.go +++ b/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff.go @@ -1,6 +1,8 @@ package gaxbackoff import ( + "context" + "sync" "time" "github.com/googleapis/gax-go/v2" @@ -15,10 +17,43 @@ type BackoffPolicy struct { var _ retryabletransport.BackoffPolicy = (*BackoffPolicy)(nil) -func (p *BackoffPolicy) New() retryabletransport.Backoff { - return &gax.Backoff{ - Initial: p.Initial, - Max: p.Max, - Multiplier: p.Multiplier, +type backoff struct { + ctx context.Context + first bool + backoff *gax.Backoff + mu sync.Mutex +} + +func (p *BackoffPolicy) New(ctx context.Context) retryabletransport.Backoff { + return &backoff{ + ctx: ctx, + first: true, + backoff: &gax.Backoff{ + Initial: p.Initial, + Max: p.Max, + Multiplier: p.Multiplier, + }, + } +} + +func (b *backoff) Continue() bool { + if b.isFirst() { + return true } + + c := time.After(b.backoff.Pause()) + select { + case <-b.ctx.Done(): + return false + case <-c: + return true + } +} + +func (b *backoff) isFirst() bool { + b.mu.Lock() + defer b.mu.Unlock() + f := b.first + b.first = false + return f } diff --git a/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff_test.go b/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff_test.go index 26c98ee..53bf4a4 100644 --- a/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff_test.go +++ b/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/backoff_test.go @@ -1,39 +1,36 @@ package gaxbackoff import ( - "math" + "context" + "log" "testing" "time" ) func TestBackoff(t *testing.T) { - initialBackoff := 250 * time.Millisecond - maxBackoff := 5 * time.Second + ctx := context.Background() + + initialBackoff := 20 * time.Millisecond + maxBackoff := 50 * time.Millisecond backoffMultiplier := 2.0 - backoffConfig := &BackoffPolicy{ + backoffPolicy := &BackoffPolicy{ Initial: initialBackoff, Max: maxBackoff, Multiplier: backoffMultiplier, } - b := backoffConfig.New() - - var val time.Duration - - for i := 0; i < 5; i++ { - max := initialBackoff * time.Duration(math.Pow(2, float64(i))) + ctx, cancel := context.WithTimeout(ctx, 110*time.Millisecond) + defer cancel() - val = b.Pause() - if val > max { - t.Errorf("expected %v to be less than %v", val, max) - } + b := backoffPolicy.New(ctx) + count := 0 + for b.Continue() { + log.Println(count) + count++ } - for i := 0; i < 100_000; i++ { - val = b.Pause() - if val > maxBackoff { - t.Errorf("expected %v to be less than %v", val, maxBackoff) - } + if count < 4 { + t.Errorf("expected count is greater than or equal to 4 but %v", count) } } diff --git a/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/go.mod b/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/go.mod index cf5c2cd..a9b3bf0 100644 --- a/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/go.mod +++ b/adapter/github.com/googleapis/gax-go.v2/gaxbackoff/go.mod @@ -6,3 +6,5 @@ require ( github.com/googleapis/gax-go/v2 v2.4.0 github.com/toga4/go-retryabletransport v0.2.0 ) + +replace github.com/toga4/go-retryabletransport => ../../../../.. diff --git a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/backoff.go b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/backoff.go index 55c2b02..adac356 100644 --- a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/backoff.go +++ b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/backoff.go @@ -1,15 +1,13 @@ package lestrratbackoff import ( - "time" - "github.com/lestrrat-go/backoff/v2" ) type adapter struct { - backoff backoff.IntervalGenerator + controller backoff.Controller } -func (a *adapter) Pause() time.Duration { - return a.backoff.Next() +func (a *adapter) Continue() bool { + return backoff.Continue(a.controller) } diff --git a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant.go b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant.go index 92a7eec..1cc3169 100644 --- a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant.go +++ b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant.go @@ -1,18 +1,20 @@ package lestrratbackoff import ( + "context" + "github.com/lestrrat-go/backoff/v2" "github.com/toga4/go-retryabletransport" ) type constantPolicy struct { - options []backoff.ConstantOption + policy backoff.Policy } -func NewConstantPolicy(options ...backoff.ConstantOption) retryabletransport.BackoffPolicy { - return &constantPolicy{options} +func NewConstantPolicy(options ...backoff.Option) retryabletransport.BackoffPolicy { + return &constantPolicy{backoff.Constant(options...)} } -func (p *constantPolicy) New() retryabletransport.Backoff { - return &adapter{backoff.NewConstantInterval(p.options...)} +func (p *constantPolicy) New(ctx context.Context) retryabletransport.Backoff { + return &adapter{p.policy.Start(ctx)} } diff --git a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant_test.go b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant_test.go index 32155bb..2c0de65 100644 --- a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant_test.go +++ b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/constant_test.go @@ -1,6 +1,7 @@ package lestrratbackoff import ( + "context" "testing" "time" @@ -8,23 +9,30 @@ import ( ) func Test_ConstantPolicy(t *testing.T) { + ctx := context.Background() + backoffPolicy := NewConstantPolicy( - backoff.WithInterval(1000*time.Millisecond), - backoff.WithJitterFactor(0.1), + backoff.WithInterval(20*time.Millisecond), + backoff.WithMaxRetries(3), ) - min := 900 * time.Millisecond - max := 1100 * time.Millisecond + b := backoffPolicy.New(ctx) + + start := time.Now() - b := backoffPolicy.New() + count := 0 + for b.Continue() { + count++ + } - for i := 0; i < 100_000; i++ { - val := b.Pause() - if val < min { - t.Errorf("expected %v to be greater than %v", val, min) - } - if val > max { - t.Errorf("expected %v to be less than %v", val, max) - } + durationMillis := time.Now().Sub(start) + + if count != 4 { + t.Errorf("expected count is equal to 4 but %v", count) + } + min := (60 - 5) * time.Millisecond + max := (60 + 5) * time.Millisecond + if durationMillis < min || durationMillis > max { + t.Errorf("expected duration is between %v and %v but %v", min, max, durationMillis) } } diff --git a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential.go b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential.go index 51425e8..66e43ba 100644 --- a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential.go +++ b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential.go @@ -1,18 +1,20 @@ package lestrratbackoff import ( + "context" + "github.com/lestrrat-go/backoff/v2" "github.com/toga4/go-retryabletransport" ) type exponentialPolicy struct { - options []backoff.ExponentialOption + policy backoff.Policy } func NewExponentialPolicy(options ...backoff.ExponentialOption) retryabletransport.BackoffPolicy { - return &exponentialPolicy{options} + return &exponentialPolicy{backoff.Exponential(options...)} } -func (p *exponentialPolicy) New() retryabletransport.Backoff { - return &adapter{backoff.NewExponentialInterval(p.options...)} +func (p *exponentialPolicy) New(ctx context.Context) retryabletransport.Backoff { + return &adapter{p.policy.Start(ctx)} } diff --git a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential_test.go b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential_test.go index 043360f..5f084ae 100644 --- a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential_test.go +++ b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/exponential_test.go @@ -1,6 +1,7 @@ package lestrratbackoff import ( + "context" "testing" "time" @@ -8,53 +9,32 @@ import ( ) func Test_ExponentialPolicy(t *testing.T) { + ctx := context.Background() + backoffPolicy := NewExponentialPolicy( - backoff.WithMinInterval(100*time.Millisecond), - backoff.WithMaxInterval(1*time.Second), - backoff.WithJitterFactor(0.1), + backoff.WithMinInterval(20*time.Millisecond), + backoff.WithMaxInterval(50*time.Millisecond), backoff.WithMultiplier(2.0), + backoff.WithMaxRetries(3), ) - b := backoffPolicy.New() - - val := b.Pause() + b := backoffPolicy.New(ctx) - { - min := 90 * time.Millisecond - max := 110 * time.Millisecond + start := time.Now() - if val < min { - t.Errorf("expected %v to be greater than %v", val, min) - } - if val > max { - t.Errorf("expected %v to be less than %v", val, max) - } + count := 0 + for b.Continue() { + count++ } - for i := 0; i < 3; i++ { - interval := val * 2.0 - min := time.Duration(float64(interval) * 0.9) - max := time.Duration(float64(interval) * 1.1) - - val = b.Pause() - if val < min { - t.Errorf("expected %v to be greater than %v", val, min) - } - if val > max { - t.Errorf("expected %v to be less than %v", val, max) - } - } + durationMillis := time.Now().Sub(start) - min := 900 * time.Millisecond - max := 1100 * time.Millisecond - - for i := 0; i < 100_000; i++ { - val := b.Pause() - if val < min { - t.Errorf("expected %v to be greater than %v", val, min) - } - if val > max { - t.Errorf("expected %v to be less than %v", val, max) - } + if count != 4 { + t.Errorf("expected count is equal to 4 but %v", count) + } + min := (110 - 5) * time.Millisecond + max := (110 + 5) * time.Millisecond + if durationMillis < min || durationMillis > max { + t.Errorf("expected duration is between %v and %v but %v", min, max, durationMillis) } } diff --git a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/go.mod b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/go.mod index 636f299..b2bc903 100644 --- a/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/go.mod +++ b/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff/go.mod @@ -6,3 +6,5 @@ require ( github.com/lestrrat-go/backoff/v2 v2.0.8 github.com/toga4/go-retryabletransport v0.2.0 ) + +replace github.com/toga4/go-retryabletransport => ../../../../.. diff --git a/backoff.go b/backoff.go index e197d08..0d2a963 100644 --- a/backoff.go +++ b/backoff.go @@ -1,17 +1,19 @@ package retryabletransport import ( - "time" + "context" ) // Backoff is an interface that generates next backoff interval. type Backoff interface { - // Pause returns next backoff interval. - Pause() time.Duration + // Continue returns when to run the next backoff. The 1st call should return + // true immediately. The next and subsequent calls should return whether or + // not the next should be run after a backoff timeout. + Continue() bool } // BackoffPolicy is an interface that generates new Backoff instance. type BackoffPolicy interface { // New returns new Backoff instance. - New() Backoff + New(ctx context.Context) Backoff } diff --git a/examples/go.mod b/examples/go.mod index 4a17fc8..af06247 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -8,3 +8,9 @@ require ( github.com/toga4/go-retryabletransport/adapter/github.com/googleapis/gax-go.v2/gaxbackoff v0.2.1 github.com/toga4/go-retryabletransport/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff v0.2.1 ) + +replace ( + github.com/toga4/go-retryabletransport => ../ + github.com/toga4/go-retryabletransport/adapter/github.com/googleapis/gax-go.v2/gaxbackoff => ../adapter/github.com/googleapis/gax-go.v2/gaxbackoff + github.com/toga4/go-retryabletransport/adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff => ../adapter/github.com/lestrrat-go/backoff.v2/lestrratbackoff +) diff --git a/transport.go b/transport.go index 56ce8a1..a131539 100644 --- a/transport.go +++ b/transport.go @@ -5,7 +5,6 @@ import ( "io" "io/ioutil" "net/http" - "time" ) type ShouldRetryErrorFunc func(*http.Request, error) bool @@ -52,7 +51,7 @@ func New(backoffPolicy BackoffPolicy, options ...TransportOption) *Transport { } // RoundTrip implements http.RoundTripper -func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { +func (t *Transport) RoundTrip(req *http.Request) (res *http.Response, err error) { ctx := req.Context() // Since the request body cannot be read more than once, read the entire request body in case a retry is necessary. @@ -66,38 +65,41 @@ func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { buf = b } - backoff := t.backoffPolicy.New() + backoff := t.backoffPolicy.New(ctx) + + for backoff.Continue() { + + if res != nil { + // If this is the 2nd or subsequent run, and a response has been received on the previous call, + // discard and close the response body to reuse HTTP connections. + _, _ = io.Copy(ioutil.Discard, res.Body) + res.Body.Close() + } - for { req.Body = ioutil.NopCloser(bytes.NewReader(buf)) transport := t.transport if transport == nil { transport = http.DefaultTransport } - res, err := transport.RoundTrip(req) + res, err = transport.RoundTrip(req) if err != nil { if !t.shouldRetryError(req, err) { - return nil, err + return } } else { if !t.shouldRetryResponse(res) { - return res, nil + return } } + } - if res != nil { - // Discards response body to reuse HTTP connections. - _, _ = io.Copy(ioutil.Discard, res.Body) - res.Body.Close() - } - - wait := backoff.Pause() - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(wait): - } + // If context canceled or deadline exceeded while waiting to backoff timeout, return context error. + // Else returns the result of the last attempt. + // For example, when the maximum number of attempts has been reached. + if ctx.Err() != nil { + return nil, ctx.Err() } + return } diff --git a/transport_test.go b/transport_test.go index 1dd9c93..b7a5fcd 100644 --- a/transport_test.go +++ b/transport_test.go @@ -2,6 +2,7 @@ package retryabletransport import ( "bytes" + "context" "errors" "fmt" "io" @@ -18,7 +19,6 @@ import ( ) func TestNew(t *testing.T) { - t.Parallel() t.Run("with_all_options", func(t *testing.T) { backoffPolicy := &testBackoffPolicy{} @@ -68,18 +68,31 @@ func TestNew(t *testing.T) { } type testBackoffPolicy struct{} -type testBackoff struct{} +type testBackoff struct { + ctx context.Context + first bool +} -func (*testBackoffPolicy) New() Backoff { - return &testBackoff{} +func (*testBackoffPolicy) New(ctx context.Context) Backoff { + return &testBackoff{ctx, true} } -func (*testBackoff) Pause() time.Duration { - return 90 * time.Millisecond +func (b *testBackoff) Continue() bool { + if b.first { + b.first = false + return true + } + + c := time.After(90 * time.Millisecond) + select { + case <-b.ctx.Done(): + return false + case <-c: + return true + } } func TestRoundTrip(t *testing.T) { - t.Parallel() backoffPolicy := &testBackoffPolicy{}