Skip to content

Commit

Permalink
Add backoff package
Browse files Browse the repository at this point in the history
Justification for jitter and growth factor:
https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/.

Add backoff to the Consul instancer loop.

Fixes #627.
  • Loading branch information
Nico Tonozzi committed Dec 3, 2017
1 parent 53f10af commit 924501a
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 13 deletions.
70 changes: 70 additions & 0 deletions backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package backoff

import (
"math/rand"
"sync/atomic"
"time"
)

const (
DefaultInterval = time.Second
DefaultMaxInterval = time.Minute
)

// ExponentialBackoff provides jittered exponential durations for the purpose of
// avoiding flodding a service with requests.
type ExponentialBackoff struct {
Interval time.Duration
Max time.Duration

currentInterval atomic.Value
cancel <-chan struct{}
}

// New creates a new ExpontentialBackoff instance with the default values, and
// an optional cancel channel.
func New(cancel <-chan struct{}) *ExponentialBackoff {
backoff := ExponentialBackoff{
Interval: DefaultInterval,
Max: DefaultMaxInterval,
cancel: cancel,
}
backoff.Reset()
return &backoff
}

// Reset should be called after a request succeeds.
func (b *ExponentialBackoff) Reset() {
b.currentInterval.Store(b.Interval)
}

// Wait increases the backoff and blocks until the duration is over or the
// cancel channel is filled.
func (b *ExponentialBackoff) Wait() {
d := b.NextBackoff()
select {
case <-time.After(d):
case <-b.cancel:
}
}

// NextBackoff updates the time interval and returns the updated value.
func (b *ExponentialBackoff) NextBackoff() time.Duration {
d := b.next()
if d > b.Max {
d = b.Max
}

b.currentInterval.Store(d)
return d
}

// next provides the exponential jittered backoff value. See
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
// for rationale.
func (b *ExponentialBackoff) next() time.Duration {
current := b.currentInterval.Load().(time.Duration)
d := float64(current * 2)
jitter := rand.Float64() + 0.5
return time.Duration(d * jitter)
}
34 changes: 34 additions & 0 deletions backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package backoff

import (
"testing"
"time"
)

func TestNext(t *testing.T) {
b := ExponentialBackoff{}
b.currentInterval.Store(time.Duration(12))

next := b.next()

if next < 12 || next > 36 {
t.Errorf("Expected next to be between 12 and 36, got %d", 12)
}
}

func TestNextBackoffMax(t *testing.T) {
max := time.Duration(13)
b := ExponentialBackoff{
Max: max,
}
b.currentInterval.Store(time.Duration(14))
next := b.NextBackoff()
if next != max {
t.Errorf("Expected next to be max, %d, but got %d", max, next)
}

current := b.currentInterval.Load().(time.Duration)
if current != max {
t.Errorf("Expected currentInterval to be max, %d, but got %d", max, current)
}
}
4 changes: 4 additions & 0 deletions sd/consul/instancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

consul "github.com/hashicorp/consul/api"

"github.com/go-kit/kit/backoff"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/internal/instance"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (s *Instancer) loop(lastIndex uint64) {
var (
instances []string
err error
backoff = backoff.New(s.quitc)
)
for {
instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
Expand All @@ -68,8 +70,10 @@ func (s *Instancer) loop(lastIndex uint64) {
case err != nil:
s.logger.Log("err", err)
s.cache.Update(sd.Event{Err: err})
backoff.Wait()
default:
s.cache.Update(sd.Event{Instances: instances})
backoff.Reset()
}
}
}
Expand Down
18 changes: 5 additions & 13 deletions util/conn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"time"

"github.com/go-kit/kit/backoff"
"github.com/go-kit/kit/log"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ func (m *Manager) loop() {
conn = dial(m.dialer, m.network, m.address, m.logger) // may block slightly
connc = make(chan net.Conn, 1)
reconnectc <-chan time.Time // initially nil
backoff = time.Second
backoff = backoff.New(nil)
)

// If the initial dial fails, we need to trigger a reconnect via the loop
Expand All @@ -103,12 +104,11 @@ func (m *Manager) loop() {
case conn = <-connc:
if conn == nil {
// didn't work
backoff = exponential(backoff) // wait longer
reconnectc = m.after(backoff) // try again
reconnectc = m.after(backoff.NextBackoff()) // try again
} else {
// worked!
backoff = time.Second // reset wait time
reconnectc = nil // no retry necessary
backoff.Reset() // reset wait time
reconnectc = nil // no retry necessary
}

case m.takec <- conn:
Expand All @@ -132,14 +132,6 @@ func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
return conn
}

func exponential(d time.Duration) time.Duration {
d *= 2
if d > time.Minute {
d = time.Minute
}
return d
}

// ErrConnectionUnavailable is returned by the Manager's Write method when the
// manager cannot yield a good connection.
var ErrConnectionUnavailable = errors.New("connection unavailable")

0 comments on commit 924501a

Please sign in to comment.