Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for HTTP retries #436

Merged
merged 8 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions v2/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ var (

// Event Creation

NewEvent = event.New
NewResult = protocol.NewResult
NewEvent = event.New

NewHTTPResult = http.NewResult
// Results

NewResult = protocol.NewResult
NewHTTPResult = http.NewResult
NewHTTPRetriesResult = http.NewRetriesResult

// Message Creation

Expand All @@ -132,10 +135,11 @@ var (

// Context

ContextWithTarget = context.WithTarget
TargetFromContext = context.TargetFrom
WithEncodingBinary = binding.WithForceBinary
WithEncodingStructured = binding.WithForceStructured
ContextWithTarget = context.WithTarget
TargetFromContext = context.TargetFrom
ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff
WithEncodingBinary = binding.WithForceBinary
WithEncodingStructured = binding.WithForceStructured

// Custom Types

Expand Down
28 changes: 28 additions & 0 deletions v2/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package context
import (
"context"
"net/url"
"time"
)

// Opaque key type used to store target
Expand Down Expand Up @@ -50,3 +51,30 @@ func TopicFrom(ctx context.Context) string {
}
return ""
}

// Opaque key type used to store retry parameters
type retriesKeyType struct{}

var retriesKey = retriesKeyType{}

// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy.
// MaxTries is the maximum number for retries and delay is the time interval between retries
func WithRetriesLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context {
return context.WithValue(ctx, retriesKey, &RetryParams{
Strategy: BackoffStrategyLinear,
Period: delay,
MaxTries: maxTries,
})
}

// RetriesFrom looks in the given context and returns the retries parameters if found.
// Otherwise returns the default retries configuration (ie. no retries).
func RetriesFrom(ctx context.Context) *RetryParams {
c := ctx.Value(retriesKey)
if c != nil {
if s, ok := c.(*RetryParams); ok {
return s
}
}
return &DefaultRetryParams
}
29 changes: 29 additions & 0 deletions v2/context/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package context

import "time"

type BackoffStrategy string

const (
BackoffStrategyNone = "none"
BackoffStrategyLinear = "linear"

// TODO
// BackoffStrategyExponential = "exponential"
)

var DefaultRetryParams = RetryParams{Strategy: BackoffStrategyNone}

// RetryParams holds parameters applied to retries
type RetryParams struct {
// Strategy is the backoff strategy to applies between retries
Strategy BackoffStrategy

// MaxTries is the maximum number of times to retry request before giving up
MaxTries int

// Period is
// - for linear strategy: the delay interval between retries
// - for exponential strategy: the factor applied after each retry
Period time.Duration
}
11 changes: 11 additions & 0 deletions v2/protocol/http/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,14 @@ func WithRoundTripper(roundTripper nethttp.RoundTripper) Option {
return nil
}
}

// WithClient sets the protocol client
func WithClient(client nethttp.Client) Option {
return func(p *Protocol) error {
if p == nil {
return fmt.Errorf("client option can not set nil protocol")
}
p.Client = &client
return nil
}
}
77 changes: 75 additions & 2 deletions v2/protocol/http/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http

import (
"context"
"errors"
"fmt"
"io"
"net"
Expand All @@ -10,10 +11,9 @@ import (
"sync"
"time"

"github.com/cloudevents/sdk-go/v2/protocol"

"github.com/cloudevents/sdk-go/v2/binding"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/protocol"
)

const (
Expand Down Expand Up @@ -118,6 +118,11 @@ func (p *Protocol) Request(ctx context.Context, m binding.Message) (binding.Mess
if err = WriteRequest(ctx, m, req, p.transformers); err != nil {
return nil, err
}

return p.doWithRetry(ctx, cecontext.RetriesFrom(ctx))(req)
}

func (p *Protocol) doOnce(req *http.Request) (binding.Message, error) {
resp, err := p.Client.Do(req)
if err != nil {
return nil, protocol.NewReceipt(false, "%w", err)
Expand All @@ -133,6 +138,74 @@ func (p *Protocol) Request(ctx context.Context, m binding.Message) (binding.Mess
return NewMessage(resp.Header, resp.Body), NewResult(resp.StatusCode, "%w", result)
}

func (p *Protocol) doWithRetry(ctx context.Context, retriesParams *cecontext.RetryParams) func(*http.Request) (binding.Message, error) {
return func(req *http.Request) (binding.Message, error) {
retries := 0
retriesDuration := time.Duration(0)
var results []protocol.Result
for {
msg, result := p.doOnce(req)

// Fast track common case.
if msg != nil && result.(*Result).StatusCode/100 == 2 {
if retries != 0 {
result = NewRetriesResult(result, retries, retriesDuration, results)
}
return msg, result
}

// Slow case: determine whether or not to retry
retry := retries < retriesParams.MaxTries

if retry {
if msg == nil {
var err *url.Error
if errors.As(result, &err) { // should always be true
// Only retry when the error is a timeout
retry = err.Timeout()
}
} else {
// Got a msg but status code is not 2xx

// Potentially retry when:
// - 413 Payload Too Large with Retry-After (NOT SUPPORTED)
// - 425 Too Early
// - 429 Too Many Requests
// - 503 Service Unavailable (with or without Retry-After) (IGNORE Retry-After)
// - 504 Gateway Timeout

sc := result.(*Result).StatusCode
if sc != 425 && sc != 429 && sc != 503 && sc != 504 {
// Permanent error
retry = false
}
}
}

if !retry {
// return the last http response (or nil if none).
if retries != 0 {
return msg, NewRetriesResult(result, retries, retriesDuration, results)
}
return msg, result
}

results = append(results, result)
retries++
retriesDuration += retriesParams.Period

ticker := time.NewTicker(retriesParams.Period)
select {
case <-ctx.Done():
ticker.Stop()
return nil, protocol.NewReceipt(false, "request has been cancelled")
case <-ticker.C:
ticker.Stop()
}
}
}
}

func (p *Protocol) makeRequest(ctx context.Context) *http.Request {
// TODO: support custom headers from context?
req := &http.Request{
Expand Down
145 changes: 141 additions & 4 deletions v2/protocol/http/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package http

import (
"context"
"errors"
"net/http"
"testing"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
cecontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"

"net/http"
"testing"
"time"
)

func TestNew(t *testing.T) {
Expand Down Expand Up @@ -248,3 +251,137 @@ func ReceiveTest(t *testing.T, p *Protocol, ctx context.Context, want binding.Me
require.IsType(t, want, got)
}
}

func TestRequestWithRetries(t *testing.T) {
dummyEvent := event.New()
dummyMsg := binding.ToMessage(&dummyEvent)
ctx := cecontext.WithTarget(context.Background(), "http://test")
testCases := map[string]struct {
// roundTripperTest
statusCodes []int // -1 = timeout

// Linear Backoff
delay time.Duration
retries int

// Wants
wantResult protocol.Result
wantRequestCount int

skipResults bool
}{
"no retries, ACK": {
statusCodes: []int{200},
retries: 0,
wantResult: NewResult(200, "%w", protocol.ResultACK),
wantRequestCount: 1,
},
"no retries, NACK": {
statusCodes: []int{404},
retries: 0,
wantResult: NewResult(404, "%w", protocol.ResultNACK),
wantRequestCount: 1,
},
"retries, no NACK": {
statusCodes: []int{200},
delay: time.Nanosecond,
retries: 3,
wantResult: NewResult(200, "%w", protocol.ResultACK),
wantRequestCount: 1,
},
"3 retries, 425, 200, ACK": {
statusCodes: []int{425, 200},
delay: time.Nanosecond,
retries: 3,
wantResult: &RetriesResult{
Result: NewResult(200, "%w", protocol.ResultACK),
Retries: 1,
RetriesDuration: time.Nanosecond,
Results: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)},
},
wantRequestCount: 2,
},
"1 retry, 425, 429, 200, NACK": {
statusCodes: []int{425, 429, 200},
delay: time.Nanosecond,
retries: 1,
wantResult: &RetriesResult{
Result: NewResult(429, "%w", protocol.ResultNACK),
Retries: 1,
RetriesDuration: time.Nanosecond,
Results: []protocol.Result{NewResult(425, "%w", protocol.ResultNACK)},
},
wantRequestCount: 2,
},
"10 retries, 425, 429, 503, 504, 200, ACK": {
statusCodes: []int{425, 429, 503, 504, 200},
delay: time.Nanosecond,
retries: 10,
wantResult: &RetriesResult{
Result: NewResult(200, "%w", protocol.ResultACK),
Retries: 4,
RetriesDuration: 4 * time.Nanosecond,
Results: []protocol.Result{
NewResult(425, "%w", protocol.ResultNACK),
NewResult(429, "%w", protocol.ResultNACK),
NewResult(503, "%w", protocol.ResultNACK),
NewResult(504, "%w", protocol.ResultNACK),
},
},
wantRequestCount: 5,
},
"retries, timeout, 200, ACK": {
delay: time.Nanosecond,
statusCodes: []int{-1, 200},
retries: 5,
wantResult: &RetriesResult{
Result: NewResult(200, "%w", protocol.ResultACK),
Retries: 1,
RetriesDuration: time.Nanosecond,
Results: nil, // skipping test as it contains internal http errors
},
wantRequestCount: 2,
skipResults: true,
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
roundTripper := roundTripperTest{statusCodes: tc.statusCodes}

p, err := New(WithClient(http.Client{Timeout: time.Second}), WithRoundTripper(&roundTripper))
if err != nil {
t.Fail()
}
ctxWithRetries := cecontext.WithRetriesLinearBackoff(ctx, tc.delay, tc.retries)
_, got := p.Request(ctxWithRetries, dummyMsg)

if roundTripper.requestCount != tc.wantRequestCount {
t.Errorf("expected %d requests, got %d", tc.wantRequestCount, roundTripper.requestCount)
}

if tc.skipResults {
got.(*RetriesResult).Results = nil
}

if diff := cmp.Diff(tc.wantResult, got); diff != "" {
t.Errorf("unexpected diff (-want, +got) = %v", diff)
}
})
}
}

type roundTripperTest struct {
statusCodes []int
requestCount int
}

func (r *roundTripperTest) RoundTrip(req *http.Request) (*http.Response, error) {
code := r.statusCodes[r.requestCount]
r.requestCount++
if code == -1 {
time.Sleep(2 * time.Second)
return nil, errors.New("timeout")
}

return &http.Response{StatusCode: code}, nil
}
Loading