Skip to content

Commit

Permalink
Adding Exp and Const backoff for good measure. (#442)
Browse files Browse the repository at this point in the history
* Adding Exp and Const backoff for good measure.

Signed-off-by: Scott Nichols <snichols@vmware.com>

* vet wants the cancel called...

Signed-off-by: Scott Nichols <snichols@vmware.com>

* vet fix.

Signed-off-by: Scott Nichols <snichols@vmware.com>

* no need for the inner function

Signed-off-by: Scott Nichols <snichols@vmware.com>
  • Loading branch information
n3wscott authored Apr 7, 2020
1 parent 6c21831 commit 5be5fcc
Show file tree
Hide file tree
Showing 13 changed files with 540 additions and 261 deletions.
29 changes: 16 additions & 13 deletions v2/alias.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,15 @@ var (
WithTimeNow = client.WithTimeNow
WithTracePropagation = client.WithTracePropagation()

// Event Creation

NewEvent = event.New

// Results

ResultIs = protocol.ResultIs
ResultAs = protocol.ResultAs
NewResult = protocol.NewResult
ResultIs = protocol.ResultIs
ResultAs = protocol.ResultAs

// Receipt helpers

Expand All @@ -111,13 +116,8 @@ var (
IsACK = protocol.IsACK
IsNACK = protocol.IsNACK

// Event Creation

NewEvent = event.New
// HTTP Results

// Results

NewResult = protocol.NewResult
NewHTTPResult = http.NewResult
NewHTTPRetriesResult = http.NewRetriesResult

Expand All @@ -135,11 +135,14 @@ var (

// Context

ContextWithTarget = context.WithTarget
TargetFromContext = context.TargetFrom
ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff
WithEncodingBinary = binding.WithForceBinary
WithEncodingStructured = binding.WithForceStructured
ContextWithTarget = context.WithTarget
TargetFromContext = context.TargetFrom
ContextWithRetriesConstantBackoff = context.WithRetriesConstantBackoff
ContextWithRetriesLinearBackoff = context.WithRetriesLinearBackoff
ContextWithRetriesExponentialBackoff = context.WithRetriesExponentialBackoff

WithEncodingBinary = binding.WithForceBinary
WithEncodingStructured = binding.WithForceStructured

// Custom Types

Expand Down
1 change: 0 additions & 1 deletion v2/cmd/samples/http/requester-with-custom-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func _main(args []string, env envConfig) int {
InsecureSkipVerify: true,
}

//lint:ignore SA1019 TODO: remove use of deprecated method.
tlsConfig.BuildNameToCertificate()

seq := 0
Expand Down
51 changes: 51 additions & 0 deletions v2/cmd/samples/http/sender_retry/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package main

import (
"context"
"log"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
)

func main() {
ctx := cloudevents.ContextWithTarget(context.Background(), "http://localhost:8080/")

p, err := cloudevents.NewHTTP()
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}

c, err := cloudevents.NewClient(p, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}

// must send each event within 5 seconds for sleepy demo.

log.Println("--- Constant ---")
send10(cloudevents.ContextWithRetriesConstantBackoff(ctx, 10*time.Millisecond, 10), c)
log.Println("--- Linear ---")
send10(cloudevents.ContextWithRetriesLinearBackoff(ctx, 10*time.Millisecond, 10), c)
log.Println("--- Exponential ---")
send10(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), c)
}

func send10(ctx context.Context, c cloudevents.Client) {
for i := 0; i < 100; i++ {
e := cloudevents.NewEvent()
e.SetType("com.cloudevents.sample.sent")
e.SetSource("https://github.com/cloudevents/sdk-go/v2/cmd/samples/httpb/sender")
_ = e.SetData(cloudevents.ApplicationJSON, map[string]interface{}{
"id": i,
"message": "Hello, World!",
})

if result := c.Send(ctx, e); !cloudevents.IsACK(result) {
log.Printf("failed to send: %s", result.Error())
} else {
log.Printf("sent: %d", i)
}
time.Sleep(50 * time.Millisecond)
}
}
22 changes: 11 additions & 11 deletions v2/cmd/samples/http/sleepy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ func gotEvent(event cloudevents.Event) error {
}

func _main(args []string, env envConfig) int {
p, err := cloudevents.NewHTTP(cloudevents.WithPort(env.Port), cloudevents.WithPath(env.Path))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
c, err := cloudevents.NewClient(p)
if err != nil {
log.Printf("failed to create client, %v", err)
return 1
}
for {
p, err := cloudevents.NewHTTP(cloudevents.WithPort(env.Port), cloudevents.WithPath(env.Path))
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
c, err := cloudevents.NewClient(p)
if err != nil {
log.Printf("failed to create client, %v", err)
return 1
}

log.Printf("listening on :%d%s\n", env.Port, env.Path)
log.Printf("listening on :%d%s\n", env.Port, env.Path)

for {
ctx, cancel := context.WithCancel(context.TODO())

go func() {
Expand Down
29 changes: 27 additions & 2 deletions v2/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,41 @@ type retriesKeyType struct{}

var retriesKey = retriesKeyType{}

// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy.
// WithRetriesConstantBackoff returns back a new context with retries parameters using constant backoff strategy.
// MaxTries is the maximum number for retries and delay is the time interval between retries
func WithRetriesConstantBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context {
return WithRetryParams(ctx, &RetryParams{
Strategy: BackoffStrategyConstant,
Period: delay,
MaxTries: maxTries,
})
}

// WithRetriesLinearBackoff returns back a new context with retries parameters using linear backoff strategy.
// MaxTries is the maximum number for retries and delay*tries is the time interval between retries
func WithRetriesLinearBackoff(ctx context.Context, delay time.Duration, maxTries int) context.Context {
return context.WithValue(ctx, retriesKey, &RetryParams{
return WithRetryParams(ctx, &RetryParams{
Strategy: BackoffStrategyLinear,
Period: delay,
MaxTries: maxTries,
})
}

// WithRetriesExponentialBackoff returns back a new context with retries parameters using exponential backoff strategy.
// MaxTries is the maximum number for retries and period is the amount of time to wait, used as `period * 2^retries`.
func WithRetriesExponentialBackoff(ctx context.Context, period time.Duration, maxTries int) context.Context {
return WithRetryParams(ctx, &RetryParams{
Strategy: BackoffStrategyExponential,
Period: period,
MaxTries: maxTries,
})
}

// WithRetryParams returns back a new context with retries parameters.
func WithRetryParams(ctx context.Context, rp *RetryParams) context.Context {
return context.WithValue(ctx, retriesKey, rp)
}

// RetriesFrom looks in the given context and returns the retries parameters if found.
// Otherwise returns the default retries configuration (ie. no retries).
func RetriesFrom(ctx context.Context) *RetryParams {
Expand Down
58 changes: 50 additions & 8 deletions v2/context/retry.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package context

import "time"
import (
"context"
"errors"
"math"
"time"
)

type BackoffStrategy string

const (
BackoffStrategyNone = "none"
BackoffStrategyLinear = "linear"

// TODO
// BackoffStrategyExponential = "exponential"
BackoffStrategyNone = "none"
BackoffStrategyConstant = "constant"
BackoffStrategyLinear = "linear"
BackoffStrategyExponential = "exponential"
)

var DefaultRetryParams = RetryParams{Strategy: BackoffStrategyNone}
Expand All @@ -23,7 +27,45 @@ type RetryParams struct {
MaxTries int

// Period is
// - for linear strategy: the delay interval between retries
// - for exponential strategy: the factor applied after each retry
// - for none strategy: no delay
// - for constant strategy: the delay interval between retries
// - for linear strategy: interval between retries = Period * retries
// - for exponential strategy: interval between retries = Period * retries^2
Period time.Duration
}

// BackoffFor tries will return the time duration that should be used for this
// current try count.
// `tries` is assumed to be the number of times the caller has already retried.
func (r *RetryParams) BackoffFor(tries int) time.Duration {
switch r.Strategy {
case BackoffStrategyConstant:
return r.Period
case BackoffStrategyLinear:
return r.Period * time.Duration(tries)
case BackoffStrategyExponential:
exp := math.Exp2(float64(tries))
return r.Period * time.Duration(exp)
case BackoffStrategyNone:
fallthrough // default
default:
return r.Period
}
}

// Backoff is a blocking call to wait for the correct amount of time for the retry.
// `tries` is assumed to be the number of times the caller has already retried.
func (r *RetryParams) Backoff(ctx context.Context, tries int) error {
if tries > r.MaxTries {
return errors.New("too many retries")
}
ticker := time.NewTicker(r.BackoffFor(tries))
select {
case <-ctx.Done():
ticker.Stop()
return errors.New("context has been cancelled")
case <-ticker.C:
ticker.Stop()
}
return nil
}
114 changes: 114 additions & 0 deletions v2/context/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package context

import (
"context"
"testing"
"time"
)

func TestRetryParams_Backoff(t *testing.T) {
tests := map[string]struct {
rp *RetryParams
ctx context.Context
tries int
wantErr bool
}{
"none 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyNone},
tries: 1,
wantErr: true,
},
"const 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Nanosecond},
tries: 5,
},
"linear 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Nanosecond},
tries: 1,
},
"exponential 1": {
ctx: context.Background(),
rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Nanosecond},
tries: 1,
},
"const timeout": {
ctx: func() context.Context {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
go func() {
time.Sleep(10 * time.Millisecond)
cancel()
}()
return ctx
}(),
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
wantErr: true,
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
if err := tc.rp.Backoff(tc.ctx, tc.tries); (err != nil) != tc.wantErr {
t.Errorf("Backoff() error = %v, wantErr %v", err, tc.wantErr)
}
})
}
}

func TestRetryParams_BackoffFor(t *testing.T) {
tests := map[string]struct {
rp *RetryParams
tries int
want time.Duration
}{
"none 1": {
rp: &RetryParams{Strategy: BackoffStrategyNone},
tries: 1,
want: time.Duration(0),
},
"const 1": {
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second},
tries: 1,
want: 1 * time.Second,
},
"linear 1": {
rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Second},
tries: 1,
want: 1 * time.Second,
},
"exponential 1": {
rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Second},
tries: 1,
want: 2 * time.Second, // 1 == 2^1
},
"none 5": {
rp: &RetryParams{Strategy: BackoffStrategyNone},
tries: 5,
want: time.Duration(0),
},
"const 5": {
rp: &RetryParams{Strategy: BackoffStrategyConstant, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
want: 1 * time.Second,
},
"linear 5": {
rp: &RetryParams{Strategy: BackoffStrategyLinear, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
want: 5 * time.Second,
},
"exponential 5": {
rp: &RetryParams{Strategy: BackoffStrategyExponential, MaxTries: 10, Period: 1 * time.Second},
tries: 5,
want: 32 * time.Second, // 32 == 2^5
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
if got := tc.rp.BackoffFor(tc.tries); got != tc.want {
t.Errorf("BackoffFor() = %v, want %v", got, tc.want)
}
})
}
}
Loading

0 comments on commit 5be5fcc

Please sign in to comment.