Skip to content

Commit

Permalink
add basic rate limiting to consul api calls
Browse files Browse the repository at this point in the history
Rate limit all API calls to Consul to ~1/100ms to keep consul-template
from flooding consul-agent with requests. This takes the idea from #1066
and simplifies it by making it work this way by default instead of
making it a configurable option.

Closes #1066
  • Loading branch information
eikenb committed Sep 9, 2019
1 parent 47de449 commit 4e39954
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
8 changes: 4 additions & 4 deletions watch/dependencies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type TestDep struct {
}

func (d *TestDep) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)
data := "this is some data"
rm := &dep.ResponseMetadata{LastIndex: 1}
return data, rm, nil
Expand Down Expand Up @@ -42,7 +42,7 @@ type TestDepStale struct {

// Fetch is used to implement the dependency interface.
func (d *TestDepStale) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)

if opts == nil {
opts = &dep.QueryOptions{}
Expand Down Expand Up @@ -79,7 +79,7 @@ type TestDepFetchError struct {
}

func (d *TestDepFetchError) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)
return nil, nil, fmt.Errorf("failed to contact server")
}

Expand Down Expand Up @@ -129,7 +129,7 @@ type TestDepRetry struct {
}

func (d *TestDepRetry) Fetch(clients *dep.ClientSet, opts *dep.QueryOptions) (interface{}, *dep.ResponseMetadata, error) {
time.Sleep(10 * time.Millisecond)
time.Sleep(time.Millisecond)

d.Lock()
defer d.Unlock()
Expand Down
19 changes: 19 additions & 0 deletions watch/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package watch
import (
"fmt"
"log"
"math/rand"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -207,6 +208,8 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
default:
}

start := time.Now() // for rateLimiter below

data, rm, err := v.dependency.Fetch(v.clients, &dep.QueryOptions{
AllowStale: allowStale,
WaitTime: defaultWaitTime,
Expand Down Expand Up @@ -247,6 +250,10 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
allowStale = true
}

if dur := rateLimiter(start); dur > 1 {
time.Sleep(dur)
}

if rm.LastIndex == v.lastIndex {
log.Printf("[TRACE] (view) %s no new data (index was the same)", v.dependency)
continue
Expand Down Expand Up @@ -282,6 +289,18 @@ func (v *View) fetch(doneCh, successCh chan<- struct{}, errCh chan<- error) {
}
}

const minDelayBetweenUpdates = time.Millisecond * 100

// return a duration to sleep to limit the frequency of upstream calls
func rateLimiter(start time.Time) time.Duration {
remaining := minDelayBetweenUpdates - time.Since(start)
if remaining > 0 {
dither := time.Duration(rand.Int63n(20000000)) // 0-20ms
return remaining + dither
}
return 0
}

// stop halts polling of this view.
func (v *View) stop() {
v.dependency.Stop()
Expand Down
21 changes: 21 additions & 0 deletions watch/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,24 @@ func TestStop_stopsPolling(t *testing.T) {
// Successfully stopped
}
}

func TestRateLimiter(t *testing.T) {
// test for rate limiting delay working
elapsed := minDelayBetweenUpdates / 2 // simulate time passing
start := time.Now().Add(-elapsed) // add negative to subtract
dur := rateLimiter(start) // should close to elapsed
if !(dur > 0) {
t.Errorf("rate limiting duration should be > 0, found: %v", dur)
}
if dur > minDelayBetweenUpdates {
t.Errorf("rate limiting duration extected to be < %v, found %v",
minDelayBetweenUpdates, dur)
}
// test that you get 0 when enough time is past
elapsed = minDelayBetweenUpdates // simulate time passing
start = time.Now().Add(-elapsed) // add negative to subtract
dur = rateLimiter(start) // should be 0
if dur != 0 {
t.Errorf("rate limiting duration should be 0, found: %v", dur)
}
}

0 comments on commit 4e39954

Please sign in to comment.