From a9f5592ef140989ec9c06e9d67053ab7480b3fde Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 2 Apr 2020 16:17:07 -0400 Subject: [PATCH 1/8] WIP: add support for HTTP retries Signed-off-by: Lionel Villard --- v2/alias.go | 8 +++- v2/context/context.go | 23 ++++++++++ v2/protocol/http/protocol.go | 70 ++++++++++++++++++++++++++++++ v2/protocol/http/retries_result.go | 46 ++++++++++++++++++++ v2/types/backoff.go | 14 ++++++ 5 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 v2/protocol/http/retries_result.go create mode 100644 v2/types/backoff.go diff --git a/v2/alias.go b/v2/alias.go index 435c755d4..d1da5ad8d 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -113,10 +113,13 @@ var ( // Event Creation - NewEvent = event.New - NewResult = protocol.NewResult + NewEvent = event.New + // Results + + NewResult = protocol.NewResult NewHTTPResult = http.NewResult + NewHTTPRetriesResult = http.NewRetriesResult // Message Creation @@ -134,6 +137,7 @@ var ( ContextWithTarget = context.WithTarget TargetFromContext = context.TargetFrom + ContextWithRetry = context.WithRetry WithEncodingBinary = binding.WithForceBinary WithEncodingStructured = binding.WithForceStructured diff --git a/v2/context/context.go b/v2/context/context.go index 0cf24f496..00ef3693f 100644 --- a/v2/context/context.go +++ b/v2/context/context.go @@ -3,6 +3,8 @@ package context import ( "context" "net/url" + + "github.com/cloudevents/sdk-go/v2/types" ) // Opaque key type used to store target @@ -50,3 +52,24 @@ func TopicFrom(ctx context.Context) string { } return "" } + +// Opaque key type used to store backoff parameters +type retryKeyType struct{} + +var retryKey = retryKeyType{} + +// WithRetry returns back a new context with the retry backoff parameters. +func WithRetry(ctx context.Context, backoff types.Backoff) context.Context { + return context.WithValue(ctx, retryKey, backoff) +} + +// RetryFrom looks in the given context and returns the backoff parameters if found, otherwise nil +func RetryFrom(ctx context.Context) *types.Backoff { + c := ctx.Value(retryKey) + if c != nil { + if s, ok := c.(types.Backoff); ok { + return &s + } + } + return nil +} diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index 01fddca47..fdad72ba3 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/cloudevents/sdk-go/v2/types" + "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/binding" @@ -118,6 +120,16 @@ func (p *Protocol) Request(ctx context.Context, m binding.Message) (binding.Mess if err = WriteRequest(ctx, m, req, p.transformers); err != nil { return nil, err } + + do := p.doOnce + if backoff := cecontext.RetryFrom(ctx); backoff != nil { + do = p.doWithRetry(backoff) + } + + return do(req) +} + +func (p *Protocol) doOnce(req *http.Request) (binding.Message, error) { resp, err := p.Client.Do(req) if err != nil { return nil, protocol.NewReceipt(false, "%w", err) @@ -133,6 +145,64 @@ func (p *Protocol) Request(ctx context.Context, m binding.Message) (binding.Mess return NewMessage(resp.Header, resp.Body), NewResult(resp.StatusCode, "%w", result) } +func (p *Protocol) doWithRetry(backoff *types.Backoff) func(*http.Request) (binding.Message, error) { + return func(req *http.Request) (binding.Message, error) { + retries := 0 + var results []protocol.Result + for { + resp, err := p.Client.Do(req) + + // Fast track common case. + if err == nil && resp.StatusCode/100 == 2 { + result := NewResult(resp.StatusCode, "%w", protocol.ResultACK) + + if retries != 0 { + result = NewRetriesResult(result, retries, results) + } + return NewMessage(resp.Header, resp.Body), result + } + + // Slow case. + retry := retries < backoff.Retry + + var result protocol.Result + if err != nil { + result = protocol.NewReceipt(false, "%w", err) + if retry && !err.(*url.Error).Timeout() { + // Do not retry if the error is not a timeout + retry = false + } + } else { + // No error, status code is not 2xx + result = NewResult(resp.StatusCode, "%w", protocol.ResultNACK) + + // Potentially retry when: + // - 413 Payload Too Large with Retry-After (NOT SUPPORTED) + // - 425 Too Early + // - 429 Too Many Requests + // - 503 Service Unavailable (with or without Retry-After) (IGNORE Retry-After) + // - 504 Gateway Timeout + + sc := resp.StatusCode + if retry && sc != 425 && sc != 429 && sc != 503 && sc != 504 { + // Permanent error + retry = false + } + } + + if !retry { + return NewMessage(resp.Header, resp.Body), NewRetriesResult(result, retries, results) + } + + results = append(results, result) + retries++ + + // Linear backoff. + time.Sleep(backoff.Delay) + } + } +} + func (p *Protocol) makeRequest(ctx context.Context) *http.Request { // TODO: support custom headers from context? req := &http.Request{ diff --git a/v2/protocol/http/retries_result.go b/v2/protocol/http/retries_result.go new file mode 100644 index 000000000..9e8b34a86 --- /dev/null +++ b/v2/protocol/http/retries_result.go @@ -0,0 +1,46 @@ +package http + +import ( + "fmt" + + "github.com/cloudevents/sdk-go/v2/protocol" +) + +// NewRetriesResult returns a http RetriesResult that should be used as +// a transport.Result without retries +func NewRetriesResult(result protocol.Result, retries int, results []protocol.Result) protocol.Result { + return &RetriesResult{ + Result: result, + Retries: retries, + Results: results, + } +} + +// Result wraps the fields required to make adjustments for http Responses. +type RetriesResult struct { + // The last result + protocol.Result + + // Retries is the number of times the request was tried + Retries int + + // + Results []protocol.Result +} + +// make sure RetriesResult implements error. +var _ error = (*RetriesResult)(nil) + +// Is returns if the target error is a RetriesResult type checking target. +func (e *RetriesResult) Is(target error) bool { + return protocol.ResultIs(e.Result, target) +} + +// Error returns the string that is formed by using the format string with the +// provided args. +func (e *RetriesResult) Error() string { + if e.Retries == 0 { + return e.Result.Error() + } + return fmt.Sprintf("%s (%dx)", e.Result.Error(), e.Retries) +} diff --git a/v2/types/backoff.go b/v2/types/backoff.go new file mode 100644 index 000000000..90b1d4300 --- /dev/null +++ b/v2/types/backoff.go @@ -0,0 +1,14 @@ +package types + +import "time" + +// Backoff holds parameters applied to retries +type Backoff struct { + + // Retry is the number of times to retry request before giving up + Retry int + + // Delay is the delay before retrying. + // For linear policy, backoff delay is the time interval between retries. + Delay time.Duration +} From 4ad540f1c576d44ab286fe4912a33c26750bcb48 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Fri, 3 Apr 2020 13:12:08 -0400 Subject: [PATCH 2/8] add retries duration. Cleaner API Signed-off-by: Lionel Villard --- v2/alias.go | 14 ++++---- v2/context/context.go | 28 +++++++++------ v2/protocol/http/protocol.go | 58 ++++++++++++++++-------------- v2/protocol/http/retries_result.go | 15 +++++--- v2/types/backoff.go | 20 ++++++++--- 5 files changed, 81 insertions(+), 54 deletions(-) diff --git a/v2/alias.go b/v2/alias.go index d1da5ad8d..eeed86554 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -117,8 +117,8 @@ var ( // Results - NewResult = protocol.NewResult - NewHTTPResult = http.NewResult + NewResult = protocol.NewResult + NewHTTPResult = http.NewResult NewHTTPRetriesResult = http.NewRetriesResult // Message Creation @@ -135,11 +135,11 @@ var ( // Context - ContextWithTarget = context.WithTarget - TargetFromContext = context.TargetFrom - ContextWithRetry = context.WithRetry - WithEncodingBinary = binding.WithForceBinary - WithEncodingStructured = binding.WithForceStructured + ContextWithTarget = context.WithTarget + TargetFromContext = context.TargetFrom + ContextWithLinearBackoff = context.WithLinearBackoff + WithEncodingBinary = binding.WithForceBinary + WithEncodingStructured = binding.WithForceStructured // Custom Types diff --git a/v2/context/context.go b/v2/context/context.go index 00ef3693f..be1cd2882 100644 --- a/v2/context/context.go +++ b/v2/context/context.go @@ -3,6 +3,7 @@ package context import ( "context" "net/url" + "time" "github.com/cloudevents/sdk-go/v2/types" ) @@ -53,23 +54,28 @@ func TopicFrom(ctx context.Context) string { return "" } -// Opaque key type used to store backoff parameters -type retryKeyType struct{} +// Opaque key type used to store linear backoff parameters +type linearBackoffKeyType struct{} -var retryKey = retryKeyType{} +var linearBackoffKey = linearBackoffKeyType{} -// WithRetry returns back a new context with the retry backoff parameters. -func WithRetry(ctx context.Context, backoff types.Backoff) context.Context { - return context.WithValue(ctx, retryKey, backoff) +// WithLinearBackoff returns back a new context with the linear backoff parameters. +// MaxTries is the maximum number for retries and delay is the time interval between retries +func WithLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context { + return context.WithValue(ctx, linearBackoffKey, types.Backoff{ + Strategy: types.BackoffStrategyLinear, + Period: delay, + MaxTries: maxTries, + }) } -// RetryFrom looks in the given context and returns the backoff parameters if found, otherwise nil -func RetryFrom(ctx context.Context) *types.Backoff { - c := ctx.Value(retryKey) +// RetryFrom looks in the given context and returns the linear backoff parameters if found, otherwise 0,0 +func LinearBackoffFrom(ctx context.Context) (time.Duration, int) { + c := ctx.Value(linearBackoffKey) if c != nil { if s, ok := c.(types.Backoff); ok { - return &s + return s.Period, s.MaxTries } } - return nil + return time.Duration(0), 0 } diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index fdad72ba3..1ea6088d0 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -2,6 +2,7 @@ package http import ( "context" + "errors" "fmt" "io" "net" @@ -10,8 +11,6 @@ import ( "sync" "time" - "github.com/cloudevents/sdk-go/v2/types" - "github.com/cloudevents/sdk-go/v2/protocol" "github.com/cloudevents/sdk-go/v2/binding" @@ -122,8 +121,8 @@ func (p *Protocol) Request(ctx context.Context, m binding.Message) (binding.Mess } do := p.doOnce - if backoff := cecontext.RetryFrom(ctx); backoff != nil { - do = p.doWithRetry(backoff) + if delay, tries := cecontext.LinearBackoffFrom(ctx); tries > 0 { + do = p.doWithLinearRetry(ctx, delay, tries) } return do(req) @@ -145,36 +144,35 @@ func (p *Protocol) doOnce(req *http.Request) (binding.Message, error) { return NewMessage(resp.Header, resp.Body), NewResult(resp.StatusCode, "%w", result) } -func (p *Protocol) doWithRetry(backoff *types.Backoff) func(*http.Request) (binding.Message, error) { +func (p *Protocol) doWithLinearRetry(ctx context.Context, delay time.Duration, tries int) func(*http.Request) (binding.Message, error) { return func(req *http.Request) (binding.Message, error) { retries := 0 + retriesDuration := time.Duration(0) var results []protocol.Result for { - resp, err := p.Client.Do(req) + msg, result := p.doOnce(req) // Fast track common case. - if err == nil && resp.StatusCode/100 == 2 { - result := NewResult(resp.StatusCode, "%w", protocol.ResultACK) - + if msg != nil && result.(*Result).StatusCode/100 == 2 { if retries != 0 { - result = NewRetriesResult(result, retries, results) + result = NewRetriesResult(result, retries, retriesDuration, results) } - return NewMessage(resp.Header, resp.Body), result + return msg, result } // Slow case. - retry := retries < backoff.Retry - - var result protocol.Result - if err != nil { - result = protocol.NewReceipt(false, "%w", err) - if retry && !err.(*url.Error).Timeout() { - // Do not retry if the error is not a timeout - retry = false + retry := retries < tries + + if msg == nil { + if retry { + var err *url.Error + if errors.As(result, &err) { // should always be true + // Do not retry if the error is not a timeout + retry = !err.Timeout() + } } } else { - // No error, status code is not 2xx - result = NewResult(resp.StatusCode, "%w", protocol.ResultNACK) + // Got a msg but status code is not 2xx // Potentially retry when: // - 413 Payload Too Large with Retry-After (NOT SUPPORTED) @@ -183,7 +181,7 @@ func (p *Protocol) doWithRetry(backoff *types.Backoff) func(*http.Request) (bind // - 503 Service Unavailable (with or without Retry-After) (IGNORE Retry-After) // - 504 Gateway Timeout - sc := resp.StatusCode + sc := result.(*Result).StatusCode if retry && sc != 425 && sc != 429 && sc != 503 && sc != 504 { // Permanent error retry = false @@ -191,14 +189,22 @@ func (p *Protocol) doWithRetry(backoff *types.Backoff) func(*http.Request) (bind } if !retry { - return NewMessage(resp.Header, resp.Body), NewRetriesResult(result, retries, results) + // return the last http response (or nil if none). + return msg, NewRetriesResult(result, retries, retriesDuration, results) } results = append(results, result) retries++ - - // Linear backoff. - time.Sleep(backoff.Delay) + retriesDuration += delay + + ticker := time.NewTicker(delay) + select { + case <-ctx.Done(): + ticker.Stop() + return nil, protocol.NewReceipt(false, "request has been cancelled") + case <-ticker.C: + ticker.Stop() + } } } } diff --git a/v2/protocol/http/retries_result.go b/v2/protocol/http/retries_result.go index 9e8b34a86..6f9b1b1af 100644 --- a/v2/protocol/http/retries_result.go +++ b/v2/protocol/http/retries_result.go @@ -2,17 +2,19 @@ package http import ( "fmt" + "time" "github.com/cloudevents/sdk-go/v2/protocol" ) // NewRetriesResult returns a http RetriesResult that should be used as // a transport.Result without retries -func NewRetriesResult(result protocol.Result, retries int, results []protocol.Result) protocol.Result { +func NewRetriesResult(result protocol.Result, retries int, retriesDuration time.Duration, results []protocol.Result) protocol.Result { return &RetriesResult{ - Result: result, - Retries: retries, - Results: results, + Result: result, + Retries: retries, + RetriesDuration: retriesDuration, + Results: results, } } @@ -24,7 +26,10 @@ type RetriesResult struct { // Retries is the number of times the request was tried Retries int - // + // RetriesDuration records the time spent retrying. Exclude the successful request (if any) + RetriesDuration time.Duration + + // Results of all failed requests. Exclude last result. Results []protocol.Result } diff --git a/v2/types/backoff.go b/v2/types/backoff.go index 90b1d4300..38ef634de 100644 --- a/v2/types/backoff.go +++ b/v2/types/backoff.go @@ -2,13 +2,23 @@ package types import "time" +type BackoffStrategy string + +const ( + BackoffStrategyLinear = "linear" + BackoffStrategyExponential = "exponential" +) + // Backoff holds parameters applied to retries type Backoff struct { + // Backoff strategy + Strategy BackoffStrategy - // Retry is the number of times to retry request before giving up - Retry int + // MaxTries is the maximum number of times to retry request before giving up + MaxTries int - // Delay is the delay before retrying. - // For linear policy, backoff delay is the time interval between retries. - Delay time.Duration + // Period is + // - for linear strategy: the delay interval between retries + // - for exponential strategy: the factor applied after each retry + Period time.Duration } From 6a4899952fb033d13507df784825b11e9df4c48a Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Fri, 3 Apr 2020 15:00:21 -0400 Subject: [PATCH 3/8] bare minimum http retry test Signed-off-by: Lionel Villard --- v2/protocol/http/protocol_test.go | 72 +++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 4 deletions(-) diff --git a/v2/protocol/http/protocol_test.go b/v2/protocol/http/protocol_test.go index 3fd682f19..d017e2312 100644 --- a/v2/protocol/http/protocol_test.go +++ b/v2/protocol/http/protocol_test.go @@ -2,15 +2,17 @@ package http import ( "context" + "net/http" + "testing" + "time" + "github.com/cloudevents/sdk-go/v2/binding" + cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" - - "net/http" - "testing" - "time" ) func TestNew(t *testing.T) { @@ -248,3 +250,65 @@ func ReceiveTest(t *testing.T, p *Protocol, ctx context.Context, want binding.Me require.IsType(t, want, got) } } + +func TestRequestWithRetries(t *testing.T) { + dummyEvent := event.New() + dummyMsg := binding.ToMessage(&dummyEvent) + ctx := cecontext.WithTarget(context.Background(), "http://test") + testCases := map[string]struct { + // roundTripperTest + statusCodes []int + + // Linear Backoff + delay time.Duration + retries int + + // Wants + wantResult protocol.Result + wantRequestCount int + }{ + "425, 200, 3 retries": { + statusCodes: []int{425, 200}, + delay: time.Nanosecond, + retries: 3, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + Retries: 1, + RetriesDuration: time.Nanosecond, + Results: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)}, + }, + wantRequestCount: 2, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + roundTripper := roundTripperTest{statusCodes: tc.statusCodes} + + p, err := New(WithRoundTripper(&roundTripper)) + if err != nil { + t.Fail() + } + ctxWithRetries := cecontext.WithLinearBackoff(ctx, tc.delay, tc.retries) + _, got := p.Request(ctxWithRetries, dummyMsg) + + if roundTripper.requestCount != tc.wantRequestCount { + t.Errorf("expected %d requests, got %d", tc.wantRequestCount, roundTripper.requestCount) + } + + if diff := cmp.Diff(tc.wantResult, got); diff != "" { + t.Errorf("unexpected diff (-want, +got) = %v", diff) + } + }) + } +} + +type roundTripperTest struct { + statusCodes []int + requestCount int +} + +func (r *roundTripperTest) RoundTrip(*http.Request) (*http.Response, error) { + code := r.statusCodes[r.requestCount] + r.requestCount++ + return &http.Response{StatusCode: code}, nil +} From 7008e423cfa4534c98b927be5582c64580b833fb Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Fri, 3 Apr 2020 16:41:22 -0400 Subject: [PATCH 4/8] adding more tests Signed-off-by: Lionel Villard --- v2/protocol/http/options.go | 14 +++++- v2/protocol/http/protocol.go | 7 ++- v2/protocol/http/protocol_test.go | 81 +++++++++++++++++++++++++++++-- v2/protocol/result.go | 6 +++ 4 files changed, 99 insertions(+), 9 deletions(-) diff --git a/v2/protocol/http/options.go b/v2/protocol/http/options.go index 5ced0748b..1b580faf6 100644 --- a/v2/protocol/http/options.go +++ b/v2/protocol/http/options.go @@ -3,6 +3,7 @@ package http import ( "fmt" "net" + "net/http" nethttp "net/http" "net/url" "strings" @@ -175,7 +176,7 @@ func WithMiddleware(middleware Middleware) Option { } // WithRoundTripper sets the HTTP RoundTripper. -func WithRoundTripper(roundTripper nethttp.RoundTripper) Option { +func WithRoundTripper(roundTripper http.RoundTripper) Option { return func(t *Protocol) error { if t == nil { return fmt.Errorf("http round tripper option can not set nil protocol") @@ -184,3 +185,14 @@ func WithRoundTripper(roundTripper nethttp.RoundTripper) Option { return nil } } + +// WithClient sets the protocol client +func WithClient(client http.Client) Option { + return func(p *Protocol) error { + if p == nil { + return fmt.Errorf("client option can not set nil protocol") + } + p.Client = &client + return nil + } +} diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index 1ea6088d0..3095d88d2 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -11,10 +11,9 @@ import ( "sync" "time" - "github.com/cloudevents/sdk-go/v2/protocol" - "github.com/cloudevents/sdk-go/v2/binding" cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/protocol" ) const ( @@ -167,8 +166,8 @@ func (p *Protocol) doWithLinearRetry(ctx context.Context, delay time.Duration, t if retry { var err *url.Error if errors.As(result, &err) { // should always be true - // Do not retry if the error is not a timeout - retry = !err.Timeout() + // Only retry when the error is a timeout + retry = err.Timeout() } } } else { diff --git a/v2/protocol/http/protocol_test.go b/v2/protocol/http/protocol_test.go index d017e2312..1f215aa54 100644 --- a/v2/protocol/http/protocol_test.go +++ b/v2/protocol/http/protocol_test.go @@ -2,6 +2,7 @@ package http import ( "context" + "errors" "net/http" "testing" "time" @@ -257,7 +258,7 @@ func TestRequestWithRetries(t *testing.T) { ctx := cecontext.WithTarget(context.Background(), "http://test") testCases := map[string]struct { // roundTripperTest - statusCodes []int + statusCodes []int // -1 = timeout // Linear Backoff delay time.Duration @@ -266,8 +267,29 @@ func TestRequestWithRetries(t *testing.T) { // Wants wantResult protocol.Result wantRequestCount int + + skipResults bool }{ - "425, 200, 3 retries": { + "no retries, ACK": { + statusCodes: []int{200}, + retries: 0, + wantResult: NewResult(200, "%w", protocol.ResultACK), + wantRequestCount: 1, + }, + "no retries, NACK": { + statusCodes: []int{404}, + retries: 0, + wantResult: NewResult(404, "%w", protocol.ResultNACK), + wantRequestCount: 1, + }, + "retries, no NACK": { + statusCodes: []int{200}, + delay: time.Nanosecond, + retries: 3, + wantResult: NewResult(200, "%w", protocol.ResultACK), + wantRequestCount: 1, + }, + "3 retries, 425, 200, ACK": { statusCodes: []int{425, 200}, delay: time.Nanosecond, retries: 3, @@ -279,12 +301,54 @@ func TestRequestWithRetries(t *testing.T) { }, wantRequestCount: 2, }, + "1 retry, 425, 429, 200, NACK": { + statusCodes: []int{425, 429, 200}, + delay: time.Nanosecond, + retries: 1, + wantResult: &RetriesResult{ + Result: NewResult(429, "%w", protocol.ResultNACK), + Retries: 1, + RetriesDuration: time.Nanosecond, + Results: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)}, + }, + wantRequestCount: 2, + }, + "10 retries, 425, 429, 503, 504, 200, ACK": { + statusCodes: []int{425, 429, 503, 504, 200}, + delay: time.Nanosecond, + retries: 10, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + Retries: 4, + RetriesDuration: 4 * time.Nanosecond, + Results: []protocol.Result{ + NewResult(425, "%w", protocol.ResultNACK), + NewResult(429, "%w", protocol.ResultNACK), + NewResult(503, "%w", protocol.ResultNACK), + NewResult(504, "%w", protocol.ResultNACK), + }, + }, + wantRequestCount: 5, + }, + "retries, timeout, 200, ACK": { + delay: time.Nanosecond, + statusCodes: []int{-1, 200}, + retries: 5, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + Retries: 1, + RetriesDuration: time.Nanosecond, + Results: nil, // skipping test as it contains internal http errors + }, + wantRequestCount: 2, + skipResults: true, + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { roundTripper := roundTripperTest{statusCodes: tc.statusCodes} - p, err := New(WithRoundTripper(&roundTripper)) + p, err := New(WithClient(http.Client{Timeout: time.Second}), WithRoundTripper(&roundTripper)) if err != nil { t.Fail() } @@ -295,6 +359,10 @@ func TestRequestWithRetries(t *testing.T) { t.Errorf("expected %d requests, got %d", tc.wantRequestCount, roundTripper.requestCount) } + if tc.skipResults { + got.(*RetriesResult).Results = nil + } + if diff := cmp.Diff(tc.wantResult, got); diff != "" { t.Errorf("unexpected diff (-want, +got) = %v", diff) } @@ -307,8 +375,13 @@ type roundTripperTest struct { requestCount int } -func (r *roundTripperTest) RoundTrip(*http.Request) (*http.Response, error) { +func (r *roundTripperTest) RoundTrip(req *http.Request) (*http.Response, error) { code := r.statusCodes[r.requestCount] r.requestCount++ + if code == -1 { + time.Sleep(2 * time.Second) + return nil, errors.New("timeout") + } + return &http.Response{StatusCode: code}, nil } diff --git a/v2/protocol/result.go b/v2/protocol/result.go index dd6873e55..f5583ff7c 100644 --- a/v2/protocol/result.go +++ b/v2/protocol/result.go @@ -91,3 +91,9 @@ func (e *Receipt) Is(target error) bool { func (e *Receipt) Error() string { return fmt.Sprintf(e.Format, e.Args...) } + +// Unwrap returns the wrapped error if exist or nil +func (e *Receipt) Unwrap() error { + err := fmt.Errorf(e.Format, e.Args...) + return errors.Unwrap(err) +} From f0e3a8e9033de3bd9b30b65286a31802684c7a54 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Fri, 3 Apr 2020 17:00:12 -0400 Subject: [PATCH 5/8] more tests Signed-off-by: Lionel Villard --- v2/protocol/http/retries_result_test.go | 96 +++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 v2/protocol/http/retries_result_test.go diff --git a/v2/protocol/http/retries_result_test.go b/v2/protocol/http/retries_result_test.go new file mode 100644 index 000000000..6e8f4b5d9 --- /dev/null +++ b/v2/protocol/http/retries_result_test.go @@ -0,0 +1,96 @@ +package http + +import ( + "errors" + "io" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + "github.com/cloudevents/sdk-go/v2/protocol" +) + +func TestRetriesNil_Is(t *testing.T) { + var err error + if protocol.ResultIs(err, NewRetriesResult(nil, 0, time.Nanosecond, nil)) { + t.Error("Did not expect error to be a NewRetriesResult") + } +} + +func TestRetriesError_Is(t *testing.T) { + err := errors.New("some other error") + if protocol.ResultIs(err, NewRetriesResult(nil, 0, time.Nanosecond, nil)) { + t.Error("Did not expect error to be a Result") + } +} + +func TestRetriesNew_Is(t *testing.T) { + err := NewRetriesResult(NewResult(200, "this is an example message, %s", "yep"), 0, time.Nanosecond, nil) + if !protocol.ResultIs(err, NewResult(200, "OK")) { + t.Error("Expected error to be a 200 level result") + } +} + +func TestRetriesNewOtherType_Is(t *testing.T) { + err := NewRetriesResult(NewResult(404, "this is an example error, %s", "yep"), 0, time.Nanosecond, nil) + if protocol.ResultIs(err, NewResult(200, "OK")) { + t.Error("Expected error to be a [Normal, ExampleStatusFailed], filtered by eventtype failed") + } +} + +func TestRetriesNewWrappedErrors_Is(t *testing.T) { + err := NewRetriesResult(NewResult(500, "this is a wrapped error, %w", io.ErrUnexpectedEOF), 0, time.Nanosecond, nil) + if !protocol.ResultIs(err, io.ErrUnexpectedEOF) { + t.Error("Result expected to be a wrapped ErrUnexpectedEOF but was not") + } +} + +func TestRetriesNewOtherStatus_Is(t *testing.T) { + err := NewRetriesResult(NewResult(403, "this is an example error, %s", "yep"), 0, time.Nanosecond, nil) + if protocol.ResultIs(err, NewRetriesResult(NewResult(200, "OK"), 0, time.Nanosecond, nil)) { + t.Error("Did not expect event to be StatusCode=200") + } +} + +func TestRetriesNew_As(t *testing.T) { + err := NewRetriesResult(NewResult(404, "this is an example error, %s", "yep"), 5, time.Nanosecond, nil) + + var event *RetriesResult + if !protocol.ResultAs(err, &event) { + t.Errorf("Expected error to be a NewRetriesResult, is not") + } + + if event.Retries != 5 { + t.Errorf("Mismatched retries") + } +} + +func TestRetriesNil_As(t *testing.T) { + var err error + + var event *RetriesResult + if protocol.ResultAs(err, &event) { + t.Error("Did not expect error to be a Result") + } +} + +func TestRetriesNew_Error(t *testing.T) { + err := NewRetriesResult(NewResult(500, "this is an example error, %s", "yep"), 0, time.Nanosecond, nil) + + const want = "500: this is an example error, yep" + got := err.Error() + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Unexpected diff (-want, +got) = %v", diff) + } +} + +func TestRetriesNew_ErrorWithRetries(t *testing.T) { + err := NewRetriesResult(NewResult(500, "this is an example error, %s", "yep"), 10, time.Nanosecond, nil) + + const want = "500: this is an example error, yep (10x)" + got := err.Error() + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Unexpected diff (-want, +got) = %v", diff) + } +} From 2f28c00082d20c3a56948ab73bd59d2901376c86 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 6 Apr 2020 09:35:34 -0400 Subject: [PATCH 6/8] move backoff under context Signed-off-by: Lionel Villard --- v2/{types => context}/backoff.go | 2 +- v2/context/context.go | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) rename v2/{types => context}/backoff.go (96%) diff --git a/v2/types/backoff.go b/v2/context/backoff.go similarity index 96% rename from v2/types/backoff.go rename to v2/context/backoff.go index 38ef634de..7f3e6047d 100644 --- a/v2/types/backoff.go +++ b/v2/context/backoff.go @@ -1,4 +1,4 @@ -package types +package context import "time" diff --git a/v2/context/context.go b/v2/context/context.go index be1cd2882..254bf9709 100644 --- a/v2/context/context.go +++ b/v2/context/context.go @@ -4,8 +4,6 @@ import ( "context" "net/url" "time" - - "github.com/cloudevents/sdk-go/v2/types" ) // Opaque key type used to store target @@ -62,8 +60,8 @@ var linearBackoffKey = linearBackoffKeyType{} // WithLinearBackoff returns back a new context with the linear backoff parameters. // MaxTries is the maximum number for retries and delay is the time interval between retries func WithLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context { - return context.WithValue(ctx, linearBackoffKey, types.Backoff{ - Strategy: types.BackoffStrategyLinear, + return context.WithValue(ctx, linearBackoffKey, Backoff{ + Strategy: BackoffStrategyLinear, Period: delay, MaxTries: maxTries, }) @@ -73,7 +71,7 @@ func WithLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) c func LinearBackoffFrom(ctx context.Context) (time.Duration, int) { c := ctx.Value(linearBackoffKey) if c != nil { - if s, ok := c.(types.Backoff); ok { + if s, ok := c.(Backoff); ok { return s.Period, s.MaxTries } } From 6c37e728d987b8a87f7bebceac5055f5ee636840 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 6 Apr 2020 13:08:59 -0400 Subject: [PATCH 7/8] changes after code review Signed-off-by: Lionel Villard --- v2/alias.go | 10 +++--- v2/context/context.go | 25 ++++++------- v2/context/{backoff.go => retry.go} | 15 +++++--- v2/protocol/http/protocol.go | 56 ++++++++++++++--------------- v2/protocol/http/protocol_test.go | 2 +- 5 files changed, 56 insertions(+), 52 deletions(-) rename v2/context/{backoff.go => retry.go} (51%) diff --git a/v2/alias.go b/v2/alias.go index eeed86554..bb21b2474 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -135,11 +135,11 @@ var ( // Context - ContextWithTarget = context.WithTarget - TargetFromContext = context.TargetFrom - ContextWithLinearBackoff = context.WithLinearBackoff - WithEncodingBinary = binding.WithForceBinary - WithEncodingStructured = binding.WithForceStructured + ContextWithTarget = context.WithTarget + TargetFromContext = context.TargetFrom + ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff + WithEncodingBinary = binding.WithForceBinary + WithEncodingStructured = binding.WithForceStructured // Custom Types diff --git a/v2/context/context.go b/v2/context/context.go index 254bf9709..b2a6d9b0a 100644 --- a/v2/context/context.go +++ b/v2/context/context.go @@ -52,28 +52,29 @@ func TopicFrom(ctx context.Context) string { return "" } -// Opaque key type used to store linear backoff parameters -type linearBackoffKeyType struct{} +// Opaque key type used to store retry parameters +type retriesKeyType struct{} -var linearBackoffKey = linearBackoffKeyType{} +var retriesKey = retriesKeyType{} -// WithLinearBackoff returns back a new context with the linear backoff parameters. +// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy. // MaxTries is the maximum number for retries and delay is the time interval between retries -func WithLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context { - return context.WithValue(ctx, linearBackoffKey, Backoff{ +func WithRetriesLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context { + return context.WithValue(ctx, retriesKey, &RetryParams{ Strategy: BackoffStrategyLinear, Period: delay, MaxTries: maxTries, }) } -// RetryFrom looks in the given context and returns the linear backoff parameters if found, otherwise 0,0 -func LinearBackoffFrom(ctx context.Context) (time.Duration, int) { - c := ctx.Value(linearBackoffKey) +// RetriesFrom looks in the given context and returns the retries parameters if found. +// Otherwise returns the default retries configuration (ie. no retries). +func RetriesFrom(ctx context.Context) *RetryParams { + c := ctx.Value(retriesKey) if c != nil { - if s, ok := c.(Backoff); ok { - return s.Period, s.MaxTries + if s, ok := c.(*RetryParams); ok { + return s } } - return time.Duration(0), 0 + return &DefaultRetryParams } diff --git a/v2/context/backoff.go b/v2/context/retry.go similarity index 51% rename from v2/context/backoff.go rename to v2/context/retry.go index 7f3e6047d..1beb3baab 100644 --- a/v2/context/backoff.go +++ b/v2/context/retry.go @@ -5,13 +5,18 @@ import "time" type BackoffStrategy string const ( - BackoffStrategyLinear = "linear" - BackoffStrategyExponential = "exponential" + BackoffStrategyNone = "none" + BackoffStrategyLinear = "linear" + + // TODO + // BackoffStrategyExponential = "exponential" ) -// Backoff holds parameters applied to retries -type Backoff struct { - // Backoff strategy +var DefaultRetryParams = RetryParams{Strategy: BackoffStrategyNone} + +// RetryParams holds parameters applied to retries +type RetryParams struct { + // Strategy is the backoff strategy to applies between retries Strategy BackoffStrategy // MaxTries is the maximum number of times to retry request before giving up diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index 3095d88d2..1213388e2 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -119,12 +119,7 @@ func (p *Protocol) Request(ctx context.Context, m binding.Message) (binding.Mess return nil, err } - do := p.doOnce - if delay, tries := cecontext.LinearBackoffFrom(ctx); tries > 0 { - do = p.doWithLinearRetry(ctx, delay, tries) - } - - return do(req) + return p.doWithRetry(ctx, cecontext.RetriesFrom(ctx))(req) } func (p *Protocol) doOnce(req *http.Request) (binding.Message, error) { @@ -143,7 +138,7 @@ func (p *Protocol) doOnce(req *http.Request) (binding.Message, error) { return NewMessage(resp.Header, resp.Body), NewResult(resp.StatusCode, "%w", result) } -func (p *Protocol) doWithLinearRetry(ctx context.Context, delay time.Duration, tries int) func(*http.Request) (binding.Message, error) { +func (p *Protocol) doWithRetry(ctx context.Context, retriesParams *cecontext.RetryParams) func(*http.Request) (binding.Message, error) { return func(req *http.Request) (binding.Message, error) { retries := 0 retriesDuration := time.Duration(0) @@ -159,44 +154,47 @@ func (p *Protocol) doWithLinearRetry(ctx context.Context, delay time.Duration, t return msg, result } - // Slow case. - retry := retries < tries + // Slow case: determine whether or not to retry + retry := retries < retriesParams.MaxTries - if msg == nil { - if retry { + if retry { + if msg == nil { var err *url.Error if errors.As(result, &err) { // should always be true // Only retry when the error is a timeout retry = err.Timeout() } - } - } else { - // Got a msg but status code is not 2xx - - // Potentially retry when: - // - 413 Payload Too Large with Retry-After (NOT SUPPORTED) - // - 425 Too Early - // - 429 Too Many Requests - // - 503 Service Unavailable (with or without Retry-After) (IGNORE Retry-After) - // - 504 Gateway Timeout - - sc := result.(*Result).StatusCode - if retry && sc != 425 && sc != 429 && sc != 503 && sc != 504 { - // Permanent error - retry = false + } else { + // Got a msg but status code is not 2xx + + // Potentially retry when: + // - 413 Payload Too Large with Retry-After (NOT SUPPORTED) + // - 425 Too Early + // - 429 Too Many Requests + // - 503 Service Unavailable (with or without Retry-After) (IGNORE Retry-After) + // - 504 Gateway Timeout + + sc := result.(*Result).StatusCode + if sc != 425 && sc != 429 && sc != 503 && sc != 504 { + // Permanent error + retry = false + } } } if !retry { // return the last http response (or nil if none). - return msg, NewRetriesResult(result, retries, retriesDuration, results) + if retries != 0 { + return msg, NewRetriesResult(result, retries, retriesDuration, results) + } + return msg, result } results = append(results, result) retries++ - retriesDuration += delay + retriesDuration += retriesParams.Period - ticker := time.NewTicker(delay) + ticker := time.NewTicker(retriesParams.Period) select { case <-ctx.Done(): ticker.Stop() diff --git a/v2/protocol/http/protocol_test.go b/v2/protocol/http/protocol_test.go index 1f215aa54..79511f79c 100644 --- a/v2/protocol/http/protocol_test.go +++ b/v2/protocol/http/protocol_test.go @@ -352,7 +352,7 @@ func TestRequestWithRetries(t *testing.T) { if err != nil { t.Fail() } - ctxWithRetries := cecontext.WithLinearBackoff(ctx, tc.delay, tc.retries) + ctxWithRetries := cecontext.WithRetriesLinearBackoff(ctx, tc.delay, tc.retries) _, got := p.Request(ctxWithRetries, dummyMsg) if roundTripper.requestCount != tc.wantRequestCount { From 1905c75a72c72b2d9b61dca7334c0e746b86edb6 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 6 Apr 2020 13:59:54 -0400 Subject: [PATCH 8/8] fix linting Signed-off-by: Lionel Villard --- v2/protocol/http/options.go | 5 ++--- v2/protocol/http/retries_result.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/v2/protocol/http/options.go b/v2/protocol/http/options.go index 1b580faf6..f33485fc8 100644 --- a/v2/protocol/http/options.go +++ b/v2/protocol/http/options.go @@ -3,7 +3,6 @@ package http import ( "fmt" "net" - "net/http" nethttp "net/http" "net/url" "strings" @@ -176,7 +175,7 @@ func WithMiddleware(middleware Middleware) Option { } // WithRoundTripper sets the HTTP RoundTripper. -func WithRoundTripper(roundTripper http.RoundTripper) Option { +func WithRoundTripper(roundTripper nethttp.RoundTripper) Option { return func(t *Protocol) error { if t == nil { return fmt.Errorf("http round tripper option can not set nil protocol") @@ -187,7 +186,7 @@ func WithRoundTripper(roundTripper http.RoundTripper) Option { } // WithClient sets the protocol client -func WithClient(client http.Client) Option { +func WithClient(client nethttp.Client) Option { return func(p *Protocol) error { if p == nil { return fmt.Errorf("client option can not set nil protocol") diff --git a/v2/protocol/http/retries_result.go b/v2/protocol/http/retries_result.go index 6f9b1b1af..3b4839221 100644 --- a/v2/protocol/http/retries_result.go +++ b/v2/protocol/http/retries_result.go @@ -18,7 +18,7 @@ func NewRetriesResult(result protocol.Result, retries int, retriesDuration time. } } -// Result wraps the fields required to make adjustments for http Responses. +// RetriesResult wraps the fields required to make adjustments for http Responses. type RetriesResult struct { // The last result protocol.Result