Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add backoff package and fix Consul CPU usage #635

Merged
merged 8 commits into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package backoff

import (
"math/rand"
"sync/atomic"
"time"
)

const (
DefaultInterval = time.Second
DefaultMaxInterval = time.Minute
)

// ExponentialBackoff provides jittered exponential durations for the purpose of
// avoiding flodding a service with requests.
type ExponentialBackoff struct {
Interval time.Duration
Max time.Duration

currentInterval atomic.Value
cancel <-chan struct{}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mixing exported and non-exported fields can be OK, but then you also mix direct and constructor initialization for this type, and combined I think that's a recipe for trouble...


// New creates a new ExpontentialBackoff instance with the default values, and
// an optional cancel channel.
func New(cancel <-chan struct{}) *ExponentialBackoff {
backoff := ExponentialBackoff{
Interval: DefaultInterval,
Max: DefaultMaxInterval,
cancel: cancel,
}
backoff.Reset()
return &backoff
}

// Reset should be called after a request succeeds.
func (b *ExponentialBackoff) Reset() {
b.currentInterval.Store(b.Interval)
}

// Wait increases the backoff and blocks until the duration is over or the
// cancel channel is filled.
func (b *ExponentialBackoff) Wait() {
d := b.NextBackoff()
select {
case <-time.After(d):
case <-b.cancel:
}
}

// NextBackoff updates the time interval and returns the updated value.
func (b *ExponentialBackoff) NextBackoff() time.Duration {
d := b.next()
if d > b.Max {
d = b.Max
}

b.currentInterval.Store(d)
return d
}

// next provides the exponential jittered backoff value. See
// https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
// for rationale.
func (b *ExponentialBackoff) next() time.Duration {
current := b.currentInterval.Load().(time.Duration)
d := float64(current * 2)
jitter := rand.Float64() + 0.5
return time.Duration(d * jitter)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this jitter bit is the real meat of the change. Rather than adding a new package, types, API, package docs, etc. — can you instead put this jitter calculation into the old exponential func, statelessly?

34 changes: 34 additions & 0 deletions backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package backoff

import (
"testing"
"time"
)

func TestNext(t *testing.T) {
b := ExponentialBackoff{}
b.currentInterval.Store(time.Duration(12))

next := b.next()

if next < 12 || next > 36 {
t.Errorf("Expected next to be between 12 and 36, got %d", 12)
}
}

func TestNextBackoffMax(t *testing.T) {
max := time.Duration(13)
b := ExponentialBackoff{
Max: max,
}
b.currentInterval.Store(time.Duration(14))
next := b.NextBackoff()
if next != max {
t.Errorf("Expected next to be max, %d, but got %d", max, next)
}

current := b.currentInterval.Load().(time.Duration)
if current != max {
t.Errorf("Expected currentInterval to be max, %d, but got %d", max, current)
}
}
4 changes: 4 additions & 0 deletions sd/consul/instancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

consul "github.com/hashicorp/consul/api"

"github.com/go-kit/kit/backoff"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/sd"
"github.com/go-kit/kit/sd/internal/instance"
Expand Down Expand Up @@ -59,6 +60,7 @@ func (s *Instancer) loop(lastIndex uint64) {
var (
instances []string
err error
backoff = backoff.New(s.quitc)
)
for {
instances, lastIndex, err = s.getInstances(lastIndex, s.quitc)
Expand All @@ -68,8 +70,10 @@ func (s *Instancer) loop(lastIndex uint64) {
case err != nil:
s.logger.Log("err", err)
s.cache.Update(sd.Event{Err: err})
backoff.Wait()
default:
s.cache.Update(sd.Event{Instances: instances})
backoff.Reset()
}
}
}
Expand Down
18 changes: 5 additions & 13 deletions util/conn/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"time"

"github.com/go-kit/kit/backoff"
"github.com/go-kit/kit/log"
)

Expand Down Expand Up @@ -86,7 +87,7 @@ func (m *Manager) loop() {
conn = dial(m.dialer, m.network, m.address, m.logger) // may block slightly
connc = make(chan net.Conn, 1)
reconnectc <-chan time.Time // initially nil
backoff = time.Second
backoff = backoff.New(nil)
)

// If the initial dial fails, we need to trigger a reconnect via the loop
Expand All @@ -103,12 +104,11 @@ func (m *Manager) loop() {
case conn = <-connc:
if conn == nil {
// didn't work
backoff = exponential(backoff) // wait longer
reconnectc = m.after(backoff) // try again
reconnectc = m.after(backoff.NextBackoff()) // try again
} else {
// worked!
backoff = time.Second // reset wait time
reconnectc = nil // no retry necessary
backoff.Reset() // reset wait time
reconnectc = nil // no retry necessary
}

case m.takec <- conn:
Expand All @@ -132,14 +132,6 @@ func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
return conn
}

func exponential(d time.Duration) time.Duration {
d *= 2
if d > time.Minute {
d = time.Minute
}
return d
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking a 1-line change can be enough, something like this, feel free to make corrections...

func exponential(d time.Duration) time.Duration {
    d *= 2
    if d > time.Minute {
        d = time.Minute
    }
    return d * (rand.Float64() + 0.5)
}


// ErrConnectionUnavailable is returned by the Manager's Write method when the
// manager cannot yield a good connection.
var ErrConnectionUnavailable = errors.New("connection unavailable")