Skip to content

Commit

Permalink
Merge pull request #26 from libp2p/feat/backoff
Browse files Browse the repository at this point in the history
Add Backoff Cache Discovery
  • Loading branch information
Stebalien authored Oct 30, 2019
2 parents 8427751 + da0575a commit 5b180a2
Show file tree
Hide file tree
Showing 8 changed files with 1,137 additions and 101 deletions.
206 changes: 206 additions & 0 deletions p2p/discovery/generic/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
package discovery

import (
"math"
"math/rand"
"time"
)

type BackoffFactory func() BackoffStrategy

// BackoffStrategy describes how backoff will be implemented. BackoffStratgies are stateful.
type BackoffStrategy interface {
// Delay calculates how long the next backoff duration should be, given the prior calls to Delay
Delay() time.Duration
// Reset clears the internal state of the BackoffStrategy
Reset()
}

// Jitter implementations taken roughly from https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/

// Jitter must return a duration between min and max. Min must be lower than, or equal to, max.
type Jitter func(duration, min, max time.Duration, rng *rand.Rand) time.Duration

// FullJitter returns a random number uniformly chose from the range [min, boundedDur].
// boundedDur is the duration bounded between min and max.
func FullJitter(duration, min, max time.Duration, rng *rand.Rand) time.Duration {
if duration <= min {
return min
}

normalizedDur := boundedDuration(duration, min, max) - min

return boundedDuration(time.Duration(rng.Int63n(int64(normalizedDur)))+min, min, max)
}

// NoJitter returns the duration bounded between min and max
func NoJitter(duration, min, max time.Duration, rng *rand.Rand) time.Duration {
return boundedDuration(duration, min, max)
}

type randomizedBackoff struct {
min time.Duration
max time.Duration
rng *rand.Rand
}

func (b *randomizedBackoff) BoundedDelay(duration time.Duration) time.Duration {
return boundedDuration(duration, b.min, b.max)
}

func boundedDuration(d, min, max time.Duration) time.Duration {
if d < min {
return min
}
if d > max {
return max
}
return d
}

type attemptBackoff struct {
attempt int
jitter Jitter
randomizedBackoff
}

func (b *attemptBackoff) Reset() {
b.attempt = 0
}

// NewFixedBackoff creates a BackoffFactory with a constant backoff duration
func NewFixedBackoff(delay time.Duration) BackoffFactory {
return func() BackoffStrategy {
return &fixedBackoff{delay: delay}
}
}

type fixedBackoff struct {
delay time.Duration
}

func (b *fixedBackoff) Delay() time.Duration {
return b.delay
}

func (b *fixedBackoff) Reset() {}

// NewPolynomialBackoff creates a BackoffFactory with backoff of the form c0*x^0, c1*x^1, ...cn*x^n where x is the attempt number
// jitter is the function for adding randomness around the backoff
// timeUnits are the units of time the polynomial is evaluated in
// polyCoefs is the array of polynomial coefficients from [c0, c1, ... cn]
func NewPolynomialBackoff(min, max time.Duration, jitter Jitter,
timeUnits time.Duration, polyCoefs []float64, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &polynomialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
poly: polyCoefs,
}
}
}

type polynomialBackoff struct {
attemptBackoff
timeUnits time.Duration
poly []float64
}

func (b *polynomialBackoff) Delay() time.Duration {
var polySum float64
switch len(b.poly) {
case 0:
return 0
case 1:
polySum = b.poly[0]
default:
polySum = b.poly[0]
exp := 1
attempt := b.attempt
b.attempt++

for _, c := range b.poly[1:] {
exp *= attempt
polySum += float64(exp) * c
}
}
return b.jitter(time.Duration(float64(b.timeUnits)*polySum), b.min, b.max, b.rng)
}

// NewExponentialBackoff creates a BackoffFactory with backoff of the form base^x + offset where x is the attempt number
// jitter is the function for adding randomness around the backoff
// timeUnits are the units of time the base^x is evaluated in
func NewExponentialBackoff(min, max time.Duration, jitter Jitter,
timeUnits time.Duration, base float64, offset time.Duration, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &exponentialBackoff{
attemptBackoff: attemptBackoff{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
jitter: jitter,
},
timeUnits: timeUnits,
base: base,
offset: offset,
}
}
}

type exponentialBackoff struct {
attemptBackoff
timeUnits time.Duration
base float64
offset time.Duration
}

func (b *exponentialBackoff) Delay() time.Duration {
attempt := b.attempt
b.attempt++
return b.jitter(
time.Duration(math.Pow(b.base, float64(attempt))*float64(b.timeUnits))+b.offset, b.min, b.max, b.rng)
}

// NewExponentialDecorrelatedJitter creates a BackoffFactory with backoff of the roughly of the form base^x where x is the attempt number.
// Delays start at the minimum duration and after each attempt delay = rand(min, delay * base), bounded by the max
// See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for more information
func NewExponentialDecorrelatedJitter(min, max time.Duration, base float64, rng *rand.Rand) BackoffFactory {
return func() BackoffStrategy {
return &exponentialDecorrelatedJitter{
randomizedBackoff: randomizedBackoff{
min: min,
max: max,
rng: rng,
},
base: base,
}
}
}

type exponentialDecorrelatedJitter struct {
randomizedBackoff
base float64
lastDelay time.Duration
}

func (b *exponentialDecorrelatedJitter) Delay() time.Duration {
if b.lastDelay < b.min {
b.lastDelay = b.min
return b.lastDelay
}

nextMax := int64(float64(b.lastDelay) * b.base)
b.lastDelay = boundedDuration(time.Duration(b.rng.Int63n(nextMax-int64(b.min)))+b.min, b.min, b.max)
return b.lastDelay
}

func (b *exponentialDecorrelatedJitter) Reset() { b.lastDelay = 0 }
125 changes: 125 additions & 0 deletions p2p/discovery/generic/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package discovery

import (
"math/rand"
"testing"
"time"
)

func checkDelay(bkf BackoffStrategy, expected time.Duration, t *testing.T) {
t.Helper()
if calculated := bkf.Delay(); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}

func TestFixedBackoff(t *testing.T) {
startDelay := time.Second
delay := startDelay

bkf := NewFixedBackoff(delay)
delay *= 2
b1 := bkf()
delay *= 2
b2 := bkf()

if b1.Delay() != startDelay || b2.Delay() != startDelay {
t.Fatal("incorrect delay time")
}

if b1.Delay() != startDelay {
t.Fatal("backoff is stateful")
}

if b1.Reset(); b1.Delay() != startDelay {
t.Fatalf("Reset does something")
}
}

func TestPolynomialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewPolynomialBackoff(time.Second, time.Second*33, NoJitter, time.Second, []float64{0.5, 2, 3}, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Second || b2.Delay() != time.Second {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*5500, t)
checkDelay(b1, time.Millisecond*16500, t)
checkDelay(b1, time.Millisecond*33000, t)
checkDelay(b2, time.Millisecond*5500, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*5500, t)
}

func TestExponentialBackoff(t *testing.T) {
rng := rand.New(rand.NewSource(0))
bkf := NewExponentialBackoff(time.Millisecond*650, time.Second*7, NoJitter, time.Second, 1.5, -time.Millisecond*400, rng)
b1 := bkf()
b2 := bkf()

if b1.Delay() != time.Millisecond*650 || b2.Delay() != time.Millisecond*650 {
t.Fatal("incorrect delay time")
}

checkDelay(b1, time.Millisecond*1100, t)
checkDelay(b1, time.Millisecond*1850, t)
checkDelay(b1, time.Millisecond*2975, t)
checkDelay(b1, time.Microsecond*4662500, t)
checkDelay(b1, time.Second*7, t)
checkDelay(b2, time.Millisecond*1100, t)

b1.Reset()
b1.Delay()
checkDelay(b1, time.Millisecond*1100, t)
}

func minMaxJitterTest(jitter Jitter, t *testing.T) {
rng := rand.New(rand.NewSource(0))
if jitter(time.Nanosecond, time.Hour*10, time.Hour*20, rng) < time.Hour*10 {
t.Fatal("Min not working")
}
if jitter(time.Hour, time.Nanosecond, time.Nanosecond*10, rng) > time.Nanosecond*10 {
t.Fatal("Max not working")
}
}

func TestNoJitter(t *testing.T) {
minMaxJitterTest(NoJitter, t)
for i := 0; i < 10; i++ {
expected := time.Second * time.Duration(i)
if calculated := NoJitter(expected, time.Duration(0), time.Second*100, nil); calculated != expected {
t.Fatalf("expected %v, got %v", expected, calculated)
}
}
}

func TestFullJitter(t *testing.T) {
rng := rand.New(rand.NewSource(0))
minMaxJitterTest(FullJitter, t)
const numBuckets = 51
const multiplier = 10
const threshold = 20

histogram := make([]int, numBuckets)

for i := 0; i < (numBuckets-1)*multiplier; i++ {
started := time.Nanosecond * 50
calculated := FullJitter(started, 0, 100, rng)
histogram[calculated]++
}

for _, count := range histogram {
if count > threshold {
t.Fatal("jitter is not close to evenly spread")
}
}

if histogram[numBuckets-1] > 0 {
t.Fatal("jitter increased overall time")
}
}
Loading

0 comments on commit 5b180a2

Please sign in to comment.