Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Exp and Const backoff for good measure. #442

Merged
merged 4 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions v2/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ var (
WithTimeNow = client.WithTimeNow
WithTracePropagation = client.WithTracePropagation()

// Event Creation

NewEvent = event.New

// Results

ResultIs = protocol.ResultIs
ResultAs = protocol.ResultAs
NewResult = protocol.NewResult
ResultIs = protocol.ResultIs
ResultAs = protocol.ResultAs

// Receipt helpers

Expand All @@ -111,13 +116,8 @@ var (
IsACK = protocol.IsACK
IsNACK = protocol.IsNACK

// Event Creation

NewEvent = event.New
// HTTP Results

// Results

NewResult = protocol.NewResult
NewHTTPResult = http.NewResult
NewHTTPRetriesResult = http.NewRetriesResult

Expand All @@ -135,11 +135,14 @@ var (

// Context

ContextWithTarget = context.WithTarget
TargetFromContext = context.TargetFrom
ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff
WithEncodingBinary = binding.WithForceBinary
WithEncodingStructured = binding.WithForceStructured
ContextWithTarget = context.WithTarget
TargetFromContext = context.TargetFrom
ContextWithRetriesConstantBackoff = context.WithRetriesConstantBackoff
ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff
ContextWithRetriesExponentialBackoff = context.WithRetriesExponentialBackoff
slinkydeveloper marked this conversation as resolved.
Show resolved Hide resolved

WithEncodingBinary = binding.WithForceBinary
WithEncodingStructured = binding.WithForceStructured

// Custom Types

Expand Down
1 change: 0 additions & 1 deletion v2/cmd/samples/http/requester-with-custom-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func _main(args []string, env envConfig) int {
InsecureSkipVerify: true,
}

//lint:ignore SA1019 TODO: remove use of deprecated method.
tlsConfig.BuildNameToCertificate()

seq := 0
Expand Down
51 changes: 51 additions & 0 deletions v2/cmd/samples/http/sender_retry/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"log"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
)

func main() {
ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/")

p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}

c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

// must send each event within 5 seconds for sleepy demo.

log.Println("--- Constant ---")
send10(cloudevents.ContextWithRetriesConstantBackoff(ctx, 10*time.Millisecond, 10), c)
log.Println("--- Linear ---")
send10(cloudevents.ContextWithRetriesLinearBackoff(ctx, 10*time.Millisecond, 10), c)
log.Println("--- Exponential ---")
send10(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), c)
}

func send10(ctx context.Context, c cloudevents.Client) {
for i := 0; i < 100; i++ {
e := cloudevents.NewEvent()
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/v2/cmd/samples/httpb/sender")
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
})

if result := c.Send(ctx, e); !cloudevents.IsACK(result) {
log.Printf("failed to send: %s", result.Error())
} else {
log.Printf("sent: %d", i)
}
time.Sleep(50 * time.Millisecond)
}
}
22 changes: 11 additions & 11 deletions v2/cmd/samples/http/sleepy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ func gotEvent(event cloudevents.Event) error {
}

func _main(args []string, env envConfig) int {
p, err := cloudevents.NewHTTP(cloudevents.WithPort(env.Port), cloudevents.WithPath(env.Path))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
c, err := cloudevents.NewClient(p)
if err != nil {
log.Printf("failed to create client, %v", err)
return 1
}
for {
p, err := cloudevents.NewHTTP(cloudevents.WithPort(env.Port), cloudevents.WithPath(env.Path))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
c, err := cloudevents.NewClient(p)
if err != nil {
log.Printf("failed to create client, %v", err)
return 1
}

log.Printf("listening on :%d%s\n", env.Port, env.Path)
log.Printf("listening on :%d%s\n", env.Port, env.Path)

for {
ctx, cancel := context.WithCancel(context.TODO())

go func() {
Expand Down
29 changes: 27 additions & 2 deletions v2/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,41 @@ type retriesKeyType struct{}

var retriesKey = retriesKeyType{}

// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy.
// WithRetriesConstantBackoff returns back a new context with retries parameters using constant backoff strategy.
// MaxTries is the maximum number for retries and delay is the time interval between retries
func WithRetriesConstantBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context {
return WithRetryParams(ctx, &RetryParams{
Strategy: BackoffStrategyConstant,
Period: delay,
MaxTries: maxTries,
})
}

// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy.
// MaxTries is the maximum number for retries and delay*tries is the time interval between retries
func WithRetriesLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context {
return context.WithValue(ctx, retriesKey, &RetryParams{
return WithRetryParams(ctx, &RetryParams{
Strategy: BackoffStrategyLinear,
Period: delay,
MaxTries: maxTries,
})
}

// WithRetriesExponentialBackoff returns back a new context with retries parameters using exponential backoff strategy.
// MaxTries is the maximum number for retries and period is the amount of time to wait, used as `period * 2^retries`.
func WithRetriesExponentialBackoff(ctx context.Context, period time.Duration, maxTries int) context.Context {
return WithRetryParams(ctx, &RetryParams{
Strategy: BackoffStrategyExponential,
Period: period,
MaxTries: maxTries,
})
}

// WithRetryParams returns back a new context with retries parameters.
func WithRetryParams(ctx context.Context, rp *RetryParams) context.Context {
return context.WithValue(ctx, retriesKey, rp)
}

// RetriesFrom looks in the given context and returns the retries parameters if found.
// Otherwise returns the default retries configuration (ie. no retries).
func RetriesFrom(ctx context.Context) *RetryParams {
Expand Down
58 changes: 50 additions & 8 deletions v2/context/retry.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package context

import "time"
import (
"context"
"errors"
"math"
"time"
)

type BackoffStrategy string

const (
BackoffStrategyNone = "none"
BackoffStrategyLinear = "linear"

// TODO
// BackoffStrategyExponential = "exponential"
BackoffStrategyNone = "none"
BackoffStrategyConstant = "constant"
BackoffStrategyLinear = "linear"
BackoffStrategyExponential = "exponential"
)

var DefaultRetryParams = RetryParams{Strategy: BackoffStrategyNone}
Expand All @@ -23,7 +27,45 @@ type RetryParams struct {
MaxTries int

// Period is
// - for linear strategy: the delay interval between retries
// - for exponential strategy: the factor applied after each retry
// - for none strategy: no delay
// - for constant strategy: the delay interval between retries
// - for linear strategy: interval between retries = Period * retries
// - for exponential strategy: interval between retries = Period * retries^2
Period time.Duration
}

// BackoffFor tries will return the time duration that should be used for this
// current try count.
// `tries` is assumed to be the number of times the caller has already retried.
func (r *RetryParams) BackoffFor(tries int) time.Duration {
switch r.Strategy {
case BackoffStrategyConstant:
return r.Period
case BackoffStrategyLinear:
return r.Period * time.Duration(tries)
case BackoffStrategyExponential:
exp := math.Exp2(float64(tries))
return r.Period * time.Duration(exp)
case BackoffStrategyNone:
fallthrough // default
default:
return r.Period
}
}

// Backoff is a blocking call to wait for the correct amount of time for the retry.
// `tries` is assumed to be the number of times the caller has already retried.
func (r *RetryParams) Backoff(ctx context.Context, tries int) error {
if tries > r.MaxTries {
return errors.New("too many retries")
}
ticker := time.NewTicker(r.BackoffFor(tries))
select {
case <-ctx.Done():
ticker.Stop()
return errors.New("context has been cancelled")
case <-ticker.C:
ticker.Stop()
}
return nil
}
114 changes: 114 additions & 0 deletions v2/context/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package context

import (
"context"
"testing"
"time"
)

func TestRetryParams_Backoff(t *testing.T) {
tests := map[string]struct {
rp *RetryParams
ctx context.Context
tries int
wantErr bool
}{
"none 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyNone},
tries: 1,
wantErr: true,
},
"const 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Nanosecond},
tries: 5,
},
"linear 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Nanosecond},
tries: 1,
},
"exponential 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Nanosecond},
tries: 1,
},
"const timeout": {
ctx: func() context.Context {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()
return ctx
}(),
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
wantErr: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
if err := tc.rp.Backoff(tc.ctx, tc.tries); (err != nil) != tc.wantErr {
t.Errorf("Backoff() error = %v, wantErr %v", err, tc.wantErr)
}
})
}
}

func TestRetryParams_BackoffFor(t *testing.T) {
tests := map[string]struct {
rp *RetryParams
tries int
want time.Duration
}{
"none 1": {
rp: &RetryParams{Strategy: BackoffStrategyNone},
tries: 1,
want: time.Duration(0),
},
"const 1": {
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second},
tries: 1,
want: 1 * time.Second,
},
"linear 1": {
rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Second},
tries: 1,
want: 1 * time.Second,
},
"exponential 1": {
rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Second},
tries: 1,
want: 2 * time.Second, // 1 == 2^1
},
"none 5": {
rp: &RetryParams{Strategy: BackoffStrategyNone},
tries: 5,
want: time.Duration(0),
},
"const 5": {
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
want: 1 * time.Second,
},
"linear 5": {
rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
want: 5 * time.Second,
},
"exponential 5": {
rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
want: 32 * time.Second, // 32 == 2^5
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
if got := tc.rp.BackoffFor(tc.tries); got != tc.want {
t.Errorf("BackoffFor() = %v, want %v", got, tc.want)
}
})
}
}
Loading