diff --git a/v2/alias.go b/v2/alias.go index bb21b2474..92fee454f 100644 --- a/v2/alias.go +++ b/v2/alias.go @@ -96,10 +96,15 @@ var ( WithTimeNow = client.WithTimeNow WithTracePropagation = client.WithTracePropagation() + // Event Creation + + NewEvent = event.New + // Results - ResultIs = protocol.ResultIs - ResultAs = protocol.ResultAs + NewResult = protocol.NewResult + ResultIs = protocol.ResultIs + ResultAs = protocol.ResultAs // Receipt helpers @@ -111,13 +116,8 @@ var ( IsACK = protocol.IsACK IsNACK = protocol.IsNACK - // Event Creation - - NewEvent = event.New + // HTTP Results - // Results - - NewResult = protocol.NewResult NewHTTPResult = http.NewResult NewHTTPRetriesResult = http.NewRetriesResult @@ -135,11 +135,14 @@ var ( // Context - ContextWithTarget = context.WithTarget - TargetFromContext = context.TargetFrom - ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff - WithEncodingBinary = binding.WithForceBinary - WithEncodingStructured = binding.WithForceStructured + ContextWithTarget = context.WithTarget + TargetFromContext = context.TargetFrom + ContextWithRetriesConstantBackoff = context.WithRetriesConstantBackoff + ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff + ContextWithRetriesExponentialBackoff = context.WithRetriesExponentialBackoff + + WithEncodingBinary = binding.WithForceBinary + WithEncodingStructured = binding.WithForceStructured // Custom Types diff --git a/v2/cmd/samples/http/requester-with-custom-client/main.go b/v2/cmd/samples/http/requester-with-custom-client/main.go index 64be91e2e..32d38265e 100644 --- a/v2/cmd/samples/http/requester-with-custom-client/main.go +++ b/v2/cmd/samples/http/requester-with-custom-client/main.go @@ -70,7 +70,6 @@ func _main(args []string, env envConfig) int { InsecureSkipVerify: true, } - //lint:ignore SA1019 TODO: remove use of deprecated method. tlsConfig.BuildNameToCertificate() seq := 0 diff --git a/v2/cmd/samples/http/sender_retry/main.go b/v2/cmd/samples/http/sender_retry/main.go new file mode 100644 index 000000000..96d9ce53b --- /dev/null +++ b/v2/cmd/samples/http/sender_retry/main.go @@ -0,0 +1,51 @@ +package main + +import ( + "context" + "log" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" +) + +func main() { + ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/") + + p, err := cloudevents.NewHTTP() + if err != nil { + log.Fatalf("failed to create protocol: %s", err.Error()) + } + + c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs()) + if err != nil { + log.Fatalf("failed to create client, %v", err) + } + + // must send each event within 5 seconds for sleepy demo. + + log.Println("--- Constant ---") + send10(cloudevents.ContextWithRetriesConstantBackoff(ctx, 10*time.Millisecond, 10), c) + log.Println("--- Linear ---") + send10(cloudevents.ContextWithRetriesLinearBackoff(ctx, 10*time.Millisecond, 10), c) + log.Println("--- Exponential ---") + send10(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), c) +} + +func send10(ctx context.Context, c cloudevents.Client) { + for i := 0; i < 100; i++ { + e := cloudevents.NewEvent() + e.SetType("com.cloudevents.sample.sent") + e.SetSource("https://github.com/cloudevents/sdk-go/v2/cmd/samples/httpb/sender") + _ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{ + "id": i, + "message": "Hello, World!", + }) + + if result := c.Send(ctx, e); !cloudevents.IsACK(result) { + log.Printf("failed to send: %s", result.Error()) + } else { + log.Printf("sent: %d", i) + } + time.Sleep(50 * time.Millisecond) + } +} diff --git a/v2/cmd/samples/http/sleepy/main.go b/v2/cmd/samples/http/sleepy/main.go index 60a2203ba..3113e0015 100644 --- a/v2/cmd/samples/http/sleepy/main.go +++ b/v2/cmd/samples/http/sleepy/main.go @@ -43,19 +43,19 @@ func gotEvent(event cloudevents.Event) error { } func _main(args []string, env envConfig) int { - p, err := cloudevents.NewHTTP(cloudevents.WithPort(env.Port), cloudevents.WithPath(env.Path)) - if err != nil { - log.Fatalf("failed to create protocol: %s", err.Error()) - } - c, err := cloudevents.NewClient(p) - if err != nil { - log.Printf("failed to create client, %v", err) - return 1 - } + for { + p, err := cloudevents.NewHTTP(cloudevents.WithPort(env.Port), cloudevents.WithPath(env.Path)) + if err != nil { + log.Fatalf("failed to create protocol: %s", err.Error()) + } + c, err := cloudevents.NewClient(p) + if err != nil { + log.Printf("failed to create client, %v", err) + return 1 + } - log.Printf("listening on :%d%s\n", env.Port, env.Path) + log.Printf("listening on :%d%s\n", env.Port, env.Path) - for { ctx, cancel := context.WithCancel(context.TODO()) go func() { diff --git a/v2/context/context.go b/v2/context/context.go index b2a6d9b0a..f9843dd61 100644 --- a/v2/context/context.go +++ b/v2/context/context.go @@ -57,16 +57,41 @@ type retriesKeyType struct{} var retriesKey = retriesKeyType{} -// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy. +// WithRetriesConstantBackoff returns back a new context with retries parameters using constant backoff strategy. // MaxTries is the maximum number for retries and delay is the time interval between retries +func WithRetriesConstantBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context { + return WithRetryParams(ctx, &RetryParams{ + Strategy: BackoffStrategyConstant, + Period: delay, + MaxTries: maxTries, + }) +} + +// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy. +// MaxTries is the maximum number for retries and delay*tries is the time interval between retries func WithRetriesLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context { - return context.WithValue(ctx, retriesKey, &RetryParams{ + return WithRetryParams(ctx, &RetryParams{ Strategy: BackoffStrategyLinear, Period: delay, MaxTries: maxTries, }) } +// WithRetriesExponentialBackoff returns back a new context with retries parameters using exponential backoff strategy. +// MaxTries is the maximum number for retries and period is the amount of time to wait, used as `period * 2^retries`. +func WithRetriesExponentialBackoff(ctx context.Context, period time.Duration, maxTries int) context.Context { + return WithRetryParams(ctx, &RetryParams{ + Strategy: BackoffStrategyExponential, + Period: period, + MaxTries: maxTries, + }) +} + +// WithRetryParams returns back a new context with retries parameters. +func WithRetryParams(ctx context.Context, rp *RetryParams) context.Context { + return context.WithValue(ctx, retriesKey, rp) +} + // RetriesFrom looks in the given context and returns the retries parameters if found. // Otherwise returns the default retries configuration (ie. no retries). func RetriesFrom(ctx context.Context) *RetryParams { diff --git a/v2/context/retry.go b/v2/context/retry.go index 1beb3baab..f590d4662 100644 --- a/v2/context/retry.go +++ b/v2/context/retry.go @@ -1,15 +1,19 @@ package context -import "time" +import ( + "context" + "errors" + "math" + "time" +) type BackoffStrategy string const ( - BackoffStrategyNone = "none" - BackoffStrategyLinear = "linear" - - // TODO - // BackoffStrategyExponential = "exponential" + BackoffStrategyNone = "none" + BackoffStrategyConstant = "constant" + BackoffStrategyLinear = "linear" + BackoffStrategyExponential = "exponential" ) var DefaultRetryParams = RetryParams{Strategy: BackoffStrategyNone} @@ -23,7 +27,45 @@ type RetryParams struct { MaxTries int // Period is - // - for linear strategy: the delay interval between retries - // - for exponential strategy: the factor applied after each retry + // - for none strategy: no delay + // - for constant strategy: the delay interval between retries + // - for linear strategy: interval between retries = Period * retries + // - for exponential strategy: interval between retries = Period * retries^2 Period time.Duration } + +// BackoffFor tries will return the time duration that should be used for this +// current try count. +// `tries` is assumed to be the number of times the caller has already retried. +func (r *RetryParams) BackoffFor(tries int) time.Duration { + switch r.Strategy { + case BackoffStrategyConstant: + return r.Period + case BackoffStrategyLinear: + return r.Period * time.Duration(tries) + case BackoffStrategyExponential: + exp := math.Exp2(float64(tries)) + return r.Period * time.Duration(exp) + case BackoffStrategyNone: + fallthrough // default + default: + return r.Period + } +} + +// Backoff is a blocking call to wait for the correct amount of time for the retry. +// `tries` is assumed to be the number of times the caller has already retried. +func (r *RetryParams) Backoff(ctx context.Context, tries int) error { + if tries > r.MaxTries { + return errors.New("too many retries") + } + ticker := time.NewTicker(r.BackoffFor(tries)) + select { + case <-ctx.Done(): + ticker.Stop() + return errors.New("context has been cancelled") + case <-ticker.C: + ticker.Stop() + } + return nil +} diff --git a/v2/context/retry_test.go b/v2/context/retry_test.go new file mode 100644 index 000000000..8da46ac15 --- /dev/null +++ b/v2/context/retry_test.go @@ -0,0 +1,114 @@ +package context + +import ( + "context" + "testing" + "time" +) + +func TestRetryParams_Backoff(t *testing.T) { + tests := map[string]struct { + rp *RetryParams + ctx context.Context + tries int + wantErr bool + }{ + "none 1": { + ctx: context.Background(), + rp: &RetryParams{Strategy: BackoffStrategyNone}, + tries: 1, + wantErr: true, + }, + "const 1": { + ctx: context.Background(), + rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Nanosecond}, + tries: 5, + }, + "linear 1": { + ctx: context.Background(), + rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Nanosecond}, + tries: 1, + }, + "exponential 1": { + ctx: context.Background(), + rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Nanosecond}, + tries: 1, + }, + "const timeout": { + ctx: func() context.Context { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + return ctx + }(), + rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second}, + tries: 5, + wantErr: true, + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + if err := tc.rp.Backoff(tc.ctx, tc.tries); (err != nil) != tc.wantErr { + t.Errorf("Backoff() error = %v, wantErr %v", err, tc.wantErr) + } + }) + } +} + +func TestRetryParams_BackoffFor(t *testing.T) { + tests := map[string]struct { + rp *RetryParams + tries int + want time.Duration + }{ + "none 1": { + rp: &RetryParams{Strategy: BackoffStrategyNone}, + tries: 1, + want: time.Duration(0), + }, + "const 1": { + rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second}, + tries: 1, + want: 1 * time.Second, + }, + "linear 1": { + rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Second}, + tries: 1, + want: 1 * time.Second, + }, + "exponential 1": { + rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Second}, + tries: 1, + want: 2 * time.Second, // 1 == 2^1 + }, + "none 5": { + rp: &RetryParams{Strategy: BackoffStrategyNone}, + tries: 5, + want: time.Duration(0), + }, + "const 5": { + rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second}, + tries: 5, + want: 1 * time.Second, + }, + "linear 5": { + rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Second}, + tries: 5, + want: 5 * time.Second, + }, + "exponential 5": { + rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Second}, + tries: 5, + want: 32 * time.Second, // 32 == 2^5 + }, + } + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + if got := tc.rp.BackoffFor(tc.tries); got != tc.want { + t.Errorf("BackoffFor() = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/v2/protocol/http/protocol.go b/v2/protocol/http/protocol.go index 1213388e2..4b1c13283 100644 --- a/v2/protocol/http/protocol.go +++ b/v2/protocol/http/protocol.go @@ -2,7 +2,6 @@ package http import ( "context" - "errors" "fmt" "io" "net" @@ -119,91 +118,7 @@ func (p *Protocol) Request(ctx context.Context, m binding.Message) (binding.Mess return nil, err } - return p.doWithRetry(ctx, cecontext.RetriesFrom(ctx))(req) -} - -func (p *Protocol) doOnce(req *http.Request) (binding.Message, error) { - resp, err := p.Client.Do(req) - if err != nil { - return nil, protocol.NewReceipt(false, "%w", err) - } - - var result protocol.Result - if resp.StatusCode/100 == 2 { - result = protocol.ResultACK - } else { - result = protocol.ResultNACK - } - - return NewMessage(resp.Header, resp.Body), NewResult(resp.StatusCode, "%w", result) -} - -func (p *Protocol) doWithRetry(ctx context.Context, retriesParams *cecontext.RetryParams) func(*http.Request) (binding.Message, error) { - return func(req *http.Request) (binding.Message, error) { - retries := 0 - retriesDuration := time.Duration(0) - var results []protocol.Result - for { - msg, result := p.doOnce(req) - - // Fast track common case. - if msg != nil && result.(*Result).StatusCode/100 == 2 { - if retries != 0 { - result = NewRetriesResult(result, retries, retriesDuration, results) - } - return msg, result - } - - // Slow case: determine whether or not to retry - retry := retries < retriesParams.MaxTries - - if retry { - if msg == nil { - var err *url.Error - if errors.As(result, &err) { // should always be true - // Only retry when the error is a timeout - retry = err.Timeout() - } - } else { - // Got a msg but status code is not 2xx - - // Potentially retry when: - // - 413 Payload Too Large with Retry-After (NOT SUPPORTED) - // - 425 Too Early - // - 429 Too Many Requests - // - 503 Service Unavailable (with or without Retry-After) (IGNORE Retry-After) - // - 504 Gateway Timeout - - sc := result.(*Result).StatusCode - if sc != 425 && sc != 429 && sc != 503 && sc != 504 { - // Permanent error - retry = false - } - } - } - - if !retry { - // return the last http response (or nil if none). - if retries != 0 { - return msg, NewRetriesResult(result, retries, retriesDuration, results) - } - return msg, result - } - - results = append(results, result) - retries++ - retriesDuration += retriesParams.Period - - ticker := time.NewTicker(retriesParams.Period) - select { - case <-ctx.Done(): - ticker.Stop() - return nil, protocol.NewReceipt(false, "request has been cancelled") - case <-ticker.C: - ticker.Stop() - } - } - } + return p.do(ctx, req) } func (p *Protocol) makeRequest(ctx context.Context) *http.Request { diff --git a/v2/protocol/http/protocol_retry.go b/v2/protocol/http/protocol_retry.go new file mode 100644 index 000000000..d3444b7cb --- /dev/null +++ b/v2/protocol/http/protocol_retry.go @@ -0,0 +1,107 @@ +package http + +import ( + "context" + "errors" + "go.uber.org/zap" + "net/http" + "net/url" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/protocol" +) + +func (p *Protocol) do(ctx context.Context, req *http.Request) (binding.Message, error) { + params := cecontext.RetriesFrom(ctx) + + switch params.Strategy { + case cecontext.BackoffStrategyConstant, cecontext.BackoffStrategyLinear, cecontext.BackoffStrategyExponential: + return p.doWithRetry(ctx, params, req) + case cecontext.BackoffStrategyNone: + fallthrough + default: + return p.doOnce(req) + } +} + +func (p *Protocol) doOnce(req *http.Request) (binding.Message, protocol.Result) { + resp, err := p.Client.Do(req) + if err != nil { + return nil, protocol.NewReceipt(false, "%w", err) + } + + var result protocol.Result + if resp.StatusCode/100 == 2 { + result = protocol.ResultACK + } else { + result = protocol.ResultNACK + } + + return NewMessage(resp.Header, resp.Body), NewResult(resp.StatusCode, "%w", result) +} + +func (p *Protocol) doWithRetry(ctx context.Context, params *cecontext.RetryParams, req *http.Request) (binding.Message, error) { + then := time.Now() + retry := 0 + results := make([]protocol.Result, 0) + + for { + msg, result := p.doOnce(req) + + // Fast track common case. + if protocol.IsACK(result) { + return msg, NewRetriesResult(result, retry, then, results) + } + + // Try again? + // + // Make sure the error was something we should retry. + + { + var uErr *url.Error + if errors.As(result, &uErr) { + goto DoBackoff + } + } + + { + var httpResult *Result + if errors.As(result, &httpResult) { + // Potentially retry when: + // - 404 Not Found + // - 413 Payload Too Large with Retry-After (NOT SUPPORTED) + // - 425 Too Early + // - 429 Too Many Requests + // - 503 Service Unavailable (with or without Retry-After) (IGNORE Retry-After) + // - 504 Gateway Timeout + + sc := httpResult.StatusCode + if sc == 404 || sc == 425 || sc == 429 || sc == 503 || sc == 504 { + // retry! + goto DoBackoff + } else { + // Permanent error + cecontext.LoggerFrom(ctx).Debugw("status code not retryable, will not try again", + zap.Error(httpResult), + zap.Int("statusCode", sc)) + return msg, NewRetriesResult(result, retry, then, results) + } + } + } + + DoBackoff: + // Wait for the correct amount of backoff time. + + // total tries = retry + 1 + if err := params.Backoff(ctx, retry+1); err != nil { + // do not try again. + cecontext.LoggerFrom(ctx).Debugw("backoff error, will not try again", zap.Error(err)) + return msg, NewRetriesResult(result, retry, then, results) + } + + retry++ + results = append(results, result) + } +} diff --git a/v2/protocol/http/protocol_retry_test.go b/v2/protocol/http/protocol_retry_test.go new file mode 100644 index 000000000..08bd50a55 --- /dev/null +++ b/v2/protocol/http/protocol_retry_test.go @@ -0,0 +1,140 @@ +package http + +import ( + "context" + "github.com/google/go-cmp/cmp/cmpopts" + "net/http" + "testing" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + cecontext "github.com/cloudevents/sdk-go/v2/context" + "github.com/cloudevents/sdk-go/v2/event" + "github.com/cloudevents/sdk-go/v2/protocol" + "github.com/google/go-cmp/cmp" +) + +func TestRequestWithRetries_linear(t *testing.T) { + dummyEvent := event.New() + dummyMsg := binding.ToMessage(&dummyEvent) + ctx := cecontext.WithTarget(context.Background(), "http://test") + testCases := map[string]struct { + // roundTripperTest + statusCodes []int // -1 = timeout + + // Linear Backoff + delay time.Duration + retries int + + // Wants + wantResult protocol.Result + wantRequestCount int + + skipResults bool + }{ + "no retries, ACK": { + statusCodes: []int{200}, + retries: 0, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + Retries: 0, + }, + wantRequestCount: 1, + }, + "no retries, NACK": { + statusCodes: []int{404}, + retries: 0, + wantResult: &RetriesResult{ + Result: NewResult(404, "%w", protocol.ResultNACK), + Retries: 0, + }, + wantRequestCount: 1, + }, + "retries, no NACK": { + statusCodes: []int{200}, + delay: time.Nanosecond, + retries: 3, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + }, + wantRequestCount: 1, + }, + "3 retries, 425, 200, ACK": { + statusCodes: []int{425, 200}, + delay: time.Nanosecond, + retries: 3, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + Retries: 1, + Duration: time.Nanosecond, + Attempts: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)}, + }, + wantRequestCount: 2, + }, + "1 retry, 425, 429, 200, NACK": { + statusCodes: []int{425, 429, 200}, + delay: time.Nanosecond, + retries: 1, + wantResult: &RetriesResult{ + Result: NewResult(429, "%w", protocol.ResultNACK), + Retries: 1, + Duration: time.Nanosecond, + Attempts: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)}, + }, + wantRequestCount: 2, + }, + "10 retries, 425, 429, 503, 504, 200, ACK": { + statusCodes: []int{425, 429, 503, 504, 200}, + delay: time.Nanosecond, + retries: 10, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + Retries: 4, + Attempts: []protocol.Result{ + NewResult(425, "%w", protocol.ResultNACK), + NewResult(429, "%w", protocol.ResultNACK), + NewResult(503, "%w", protocol.ResultNACK), + NewResult(504, "%w", protocol.ResultNACK), + }, + }, + wantRequestCount: 5, + }, + "retries, timeout, 200, ACK": { + delay: time.Nanosecond, + statusCodes: []int{-1, 200}, + retries: 5, + wantResult: &RetriesResult{ + Result: NewResult(200, "%w", protocol.ResultACK), + Retries: 1, + Duration: time.Nanosecond, + Attempts: nil, // skipping test as it contains internal http errors + }, + wantRequestCount: 2, + skipResults: true, + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + roundTripper := roundTripperTest{statusCodes: tc.statusCodes} + + p, err := New(WithClient(http.Client{Timeout: time.Second}), WithRoundTripper(&roundTripper)) + if err != nil { + t.Fatalf("no protocol") + } + ctxWithRetries := cecontext.WithRetriesLinearBackoff(ctx, tc.delay, tc.retries) + _, got := p.Request(ctxWithRetries, dummyMsg) + + if roundTripper.requestCount != tc.wantRequestCount { + t.Errorf("expected %d requests, got %d", tc.wantRequestCount, roundTripper.requestCount) + } + + if tc.skipResults { + got.(*RetriesResult).Attempts = nil + } + + if diff := cmp.Diff(tc.wantResult, got, cmpopts.IgnoreFields(RetriesResult{}, "Duration")); diff != "" { + t.Errorf("unexpected diff (-want, +got) = %v", diff) + } + }) + } +} diff --git a/v2/protocol/http/protocol_test.go b/v2/protocol/http/protocol_test.go index 79511f79c..39674c1fc 100644 --- a/v2/protocol/http/protocol_test.go +++ b/v2/protocol/http/protocol_test.go @@ -8,8 +8,6 @@ import ( "time" "github.com/cloudevents/sdk-go/v2/binding" - cecontext "github.com/cloudevents/sdk-go/v2/context" - "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -252,124 +250,6 @@ func ReceiveTest(t *testing.T, p *Protocol, ctx context.Context, want binding.Me } } -func TestRequestWithRetries(t *testing.T) { - dummyEvent := event.New() - dummyMsg := binding.ToMessage(&dummyEvent) - ctx := cecontext.WithTarget(context.Background(), "http://test") - testCases := map[string]struct { - // roundTripperTest - statusCodes []int // -1 = timeout - - // Linear Backoff - delay time.Duration - retries int - - // Wants - wantResult protocol.Result - wantRequestCount int - - skipResults bool - }{ - "no retries, ACK": { - statusCodes: []int{200}, - retries: 0, - wantResult: NewResult(200, "%w", protocol.ResultACK), - wantRequestCount: 1, - }, - "no retries, NACK": { - statusCodes: []int{404}, - retries: 0, - wantResult: NewResult(404, "%w", protocol.ResultNACK), - wantRequestCount: 1, - }, - "retries, no NACK": { - statusCodes: []int{200}, - delay: time.Nanosecond, - retries: 3, - wantResult: NewResult(200, "%w", protocol.ResultACK), - wantRequestCount: 1, - }, - "3 retries, 425, 200, ACK": { - statusCodes: []int{425, 200}, - delay: time.Nanosecond, - retries: 3, - wantResult: &RetriesResult{ - Result: NewResult(200, "%w", protocol.ResultACK), - Retries: 1, - RetriesDuration: time.Nanosecond, - Results: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)}, - }, - wantRequestCount: 2, - }, - "1 retry, 425, 429, 200, NACK": { - statusCodes: []int{425, 429, 200}, - delay: time.Nanosecond, - retries: 1, - wantResult: &RetriesResult{ - Result: NewResult(429, "%w", protocol.ResultNACK), - Retries: 1, - RetriesDuration: time.Nanosecond, - Results: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)}, - }, - wantRequestCount: 2, - }, - "10 retries, 425, 429, 503, 504, 200, ACK": { - statusCodes: []int{425, 429, 503, 504, 200}, - delay: time.Nanosecond, - retries: 10, - wantResult: &RetriesResult{ - Result: NewResult(200, "%w", protocol.ResultACK), - Retries: 4, - RetriesDuration: 4 * time.Nanosecond, - Results: []protocol.Result{ - NewResult(425, "%w", protocol.ResultNACK), - NewResult(429, "%w", protocol.ResultNACK), - NewResult(503, "%w", protocol.ResultNACK), - NewResult(504, "%w", protocol.ResultNACK), - }, - }, - wantRequestCount: 5, - }, - "retries, timeout, 200, ACK": { - delay: time.Nanosecond, - statusCodes: []int{-1, 200}, - retries: 5, - wantResult: &RetriesResult{ - Result: NewResult(200, "%w", protocol.ResultACK), - Retries: 1, - RetriesDuration: time.Nanosecond, - Results: nil, // skipping test as it contains internal http errors - }, - wantRequestCount: 2, - skipResults: true, - }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - roundTripper := roundTripperTest{statusCodes: tc.statusCodes} - - p, err := New(WithClient(http.Client{Timeout: time.Second}), WithRoundTripper(&roundTripper)) - if err != nil { - t.Fail() - } - ctxWithRetries := cecontext.WithRetriesLinearBackoff(ctx, tc.delay, tc.retries) - _, got := p.Request(ctxWithRetries, dummyMsg) - - if roundTripper.requestCount != tc.wantRequestCount { - t.Errorf("expected %d requests, got %d", tc.wantRequestCount, roundTripper.requestCount) - } - - if tc.skipResults { - got.(*RetriesResult).Results = nil - } - - if diff := cmp.Diff(tc.wantResult, got); diff != "" { - t.Errorf("unexpected diff (-want, +got) = %v", diff) - } - }) - } -} - type roundTripperTest struct { statusCodes []int requestCount int diff --git a/v2/protocol/http/retries_result.go b/v2/protocol/http/retries_result.go index 3b4839221..0f25f7059 100644 --- a/v2/protocol/http/retries_result.go +++ b/v2/protocol/http/retries_result.go @@ -9,13 +9,16 @@ import ( // NewRetriesResult returns a http RetriesResult that should be used as // a transport.Result without retries -func NewRetriesResult(result protocol.Result, retries int, retriesDuration time.Duration, results []protocol.Result) protocol.Result { - return &RetriesResult{ - Result: result, - Retries: retries, - RetriesDuration: retriesDuration, - Results: results, +func NewRetriesResult(result protocol.Result, retries int, startTime time.Time, attempts []protocol.Result) protocol.Result { + rr := &RetriesResult{ + Result: result, + Retries: retries, + Duration: time.Since(startTime), } + if len(attempts) > 0 { + rr.Attempts = attempts + } + return rr } // RetriesResult wraps the fields required to make adjustments for http Responses. @@ -26,11 +29,11 @@ type RetriesResult struct { // Retries is the number of times the request was tried Retries int - // RetriesDuration records the time spent retrying. Exclude the successful request (if any) - RetriesDuration time.Duration + // Duration records the time spent retrying. Exclude the successful request (if any) + Duration time.Duration - // Results of all failed requests. Exclude last result. - Results []protocol.Result + // Attempts of all failed requests. Exclude last result. + Attempts []protocol.Result } // make sure RetriesResult implements error. diff --git a/v2/protocol/http/retries_result_test.go b/v2/protocol/http/retries_result_test.go index 6e8f4b5d9..79d23e1a7 100644 --- a/v2/protocol/http/retries_result_test.go +++ b/v2/protocol/http/retries_result_test.go @@ -13,48 +13,48 @@ import ( func TestRetriesNil_Is(t *testing.T) { var err error - if protocol.ResultIs(err, NewRetriesResult(nil, 0, time.Nanosecond, nil)) { + if protocol.ResultIs(err, NewRetriesResult(nil, 0, time.Now(), nil)) { t.Error("Did not expect error to be a NewRetriesResult") } } func TestRetriesError_Is(t *testing.T) { err := errors.New("some other error") - if protocol.ResultIs(err, NewRetriesResult(nil, 0, time.Nanosecond, nil)) { + if protocol.ResultIs(err, NewRetriesResult(nil, 0, time.Now(), nil)) { t.Error("Did not expect error to be a Result") } } func TestRetriesNew_Is(t *testing.T) { - err := NewRetriesResult(NewResult(200, "this is an example message, %s", "yep"), 0, time.Nanosecond, nil) + err := NewRetriesResult(NewResult(200, "this is an example message, %s", "yep"), 0, time.Now(), nil) if !protocol.ResultIs(err, NewResult(200, "OK")) { t.Error("Expected error to be a 200 level result") } } func TestRetriesNewOtherType_Is(t *testing.T) { - err := NewRetriesResult(NewResult(404, "this is an example error, %s", "yep"), 0, time.Nanosecond, nil) + err := NewRetriesResult(NewResult(404, "this is an example error, %s", "yep"), 0, time.Now(), nil) if protocol.ResultIs(err, NewResult(200, "OK")) { t.Error("Expected error to be a [Normal, ExampleStatusFailed], filtered by eventtype failed") } } func TestRetriesNewWrappedErrors_Is(t *testing.T) { - err := NewRetriesResult(NewResult(500, "this is a wrapped error, %w", io.ErrUnexpectedEOF), 0, time.Nanosecond, nil) + err := NewRetriesResult(NewResult(500, "this is a wrapped error, %w", io.ErrUnexpectedEOF), 0, time.Now(), nil) if !protocol.ResultIs(err, io.ErrUnexpectedEOF) { t.Error("Result expected to be a wrapped ErrUnexpectedEOF but was not") } } func TestRetriesNewOtherStatus_Is(t *testing.T) { - err := NewRetriesResult(NewResult(403, "this is an example error, %s", "yep"), 0, time.Nanosecond, nil) - if protocol.ResultIs(err, NewRetriesResult(NewResult(200, "OK"), 0, time.Nanosecond, nil)) { + err := NewRetriesResult(NewResult(403, "this is an example error, %s", "yep"), 0, time.Now(), nil) + if protocol.ResultIs(err, NewRetriesResult(NewResult(200, "OK"), 0, time.Now(), nil)) { t.Error("Did not expect event to be StatusCode=200") } } func TestRetriesNew_As(t *testing.T) { - err := NewRetriesResult(NewResult(404, "this is an example error, %s", "yep"), 5, time.Nanosecond, nil) + err := NewRetriesResult(NewResult(404, "this is an example error, %s", "yep"), 5, time.Now(), nil) var event *RetriesResult if !protocol.ResultAs(err, &event) { @@ -76,7 +76,7 @@ func TestRetriesNil_As(t *testing.T) { } func TestRetriesNew_Error(t *testing.T) { - err := NewRetriesResult(NewResult(500, "this is an example error, %s", "yep"), 0, time.Nanosecond, nil) + err := NewRetriesResult(NewResult(500, "this is an example error, %s", "yep"), 0, time.Now(), nil) const want = "500: this is an example error, yep" got := err.Error() @@ -86,7 +86,7 @@ func TestRetriesNew_Error(t *testing.T) { } func TestRetriesNew_ErrorWithRetries(t *testing.T) { - err := NewRetriesResult(NewResult(500, "this is an example error, %s", "yep"), 10, time.Nanosecond, nil) + err := NewRetriesResult(NewResult(500, "this is an example error, %s", "yep"), 10, time.Now(), nil) const want = "500: this is an example error, yep (10x)" got := err.Error()