Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rate_limit support to avoid sending too many requests to Consul Agent #1066

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,19 @@ consul {
# This option is also available via the environment variable CONSUL_TOKEN.
token = "abcd1234"

rate_limit {
# Enable rate limit with throttle max requests/sec to consul agent.
enabled = true

# Adds a random delay between consecutive calls
random_backoff = "33ms"

# When watching a resource, if the resource is fetched before that amount
# of time, do not try to fetch the resource again until this timeout occurs.
# Setting this value to 10 seconds means you will never be notified of changes
# faster than every 10 seconds between 2 updates.
min_delay_between_updates = "10s"
}
# This controls the retry behavior when an error is returned from Consul.
# Consul Template is highly fault tolerant, meaning it does not exit in the
# face of failure. Instead, it uses exponential back-off and retry functions
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func Parse(s string) (*Config, error) {
"auth",
"consul",
"consul.auth",
"consul.rate_limit",
"consul.retry",
"consul.ssl",
"consul.transport",
Expand Down
19 changes: 19 additions & 0 deletions config/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type ConsulConfig struct {
// Auth is the HTTP basic authentication for communicating with Consul.
Auth *AuthConfig `mapstructure:"auth"`

// RateLimit is the configuration for re-query on success.
RateLimit *RateLimitConfig `mapstructure:"rate_limit"`

// Retry is the configuration for specifying how to behave on failure.
Retry *RetryConfig `mapstructure:"retry"`

Expand All @@ -30,6 +33,7 @@ type ConsulConfig struct {
func DefaultConsulConfig() *ConsulConfig {
return &ConsulConfig{
Auth: DefaultAuthConfig(),
RateLimit: DefaultRateLimitConfig(),
Retry: DefaultRetryConfig(),
SSL: DefaultSSLConfig(),
Transport: DefaultTransportConfig(),
Expand All @@ -50,6 +54,10 @@ func (c *ConsulConfig) Copy() *ConsulConfig {
o.Auth = c.Auth.Copy()
}

if c.RateLimit != nil {
o.RateLimit = c.RateLimit.Copy()
}

if c.Retry != nil {
o.Retry = c.Retry.Copy()
}
Expand Down Expand Up @@ -93,6 +101,10 @@ func (c *ConsulConfig) Merge(o *ConsulConfig) *ConsulConfig {
r.Auth = r.Auth.Merge(o.Auth)
}

if o.RateLimit != nil {
r.RateLimit = r.RateLimit.Merge(o.RateLimit)
}

if o.Retry != nil {
r.Retry = r.Retry.Merge(o.Retry)
}
Expand Down Expand Up @@ -125,6 +137,11 @@ func (c *ConsulConfig) Finalize() {
}
c.Auth.Finalize()

if c.RateLimit == nil {
c.RateLimit = DefaultRateLimitConfig()
}
c.RateLimit.Finalize()

if c.Retry == nil {
c.Retry = DefaultRetryConfig()
}
Expand Down Expand Up @@ -157,13 +174,15 @@ func (c *ConsulConfig) GoString() string {
return fmt.Sprintf("&ConsulConfig{"+
"Address:%s, "+
"Auth:%#v, "+
"RateLimit:%#v,"+
"Retry:%#v, "+
"SSL:%#v, "+
"Token:%t, "+
"Transport:%#v"+
"}",
StringGoString(c.Address),
c.Auth,
c.RateLimit,
c.Retry,
c.SSL,
StringPresent(c.Token),
Expand Down
16 changes: 11 additions & 5 deletions config/consul_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ func TestConsulConfig_Copy(t *testing.T) {
{
"same_enabled",
&ConsulConfig{
Address: String("1.2.3.4"),
Auth: &AuthConfig{Enabled: Bool(true)},
Retry: &RetryConfig{Enabled: Bool(true)},
SSL: &SSLConfig{Enabled: Bool(true)},
Token: String("abcd1234"),
Address: String("1.2.3.4"),
Auth: &AuthConfig{Enabled: Bool(true)},
RateLimit: &RateLimitConfig{Enabled: Bool(true)},
Retry: &RetryConfig{Enabled: Bool(true)},
SSL: &SSLConfig{Enabled: Bool(true)},
Token: String("abcd1234"),
Transport: &TransportConfig{
DialKeepAlive: TimeDuration(20 * time.Second),
},
Expand Down Expand Up @@ -248,6 +249,11 @@ func TestConsulConfig_Finalize(t *testing.T) {
Username: String(""),
Password: String(""),
},
RateLimit: &RateLimitConfig{
MinDelayBetweenUpdates: TimeDuration(DefaultMinDelayBetweenUpdates),
RandomBackoff: TimeDuration(DefaultRandomBackoff),
Enabled: Bool(true),
},
Retry: &RetryConfig{
Backoff: TimeDuration(DefaultRetryBackoff),
MaxBackoff: TimeDuration(DefaultRetryMaxBackoff),
Expand Down
136 changes: 136 additions & 0 deletions config/ratelimit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package config

import (
"fmt"
"math/rand"
"time"
)

const (
// DefaultRandomBackoff is the default max delay added between successful calls.
DefaultRandomBackoff = 33 * time.Millisecond
// DefaultMinDelayBetweenUpdates is the default delay between successful.
DefaultMinDelayBetweenUpdates = 100 * time.Millisecond
)

// RateLimitFunc is the signature of a function to sleep between calls
type RateLimitFunc func(time.Duration) (bool, time.Duration)

// RateLimitConfig is a shared configuration
type RateLimitConfig struct {

// Minimum Delay between 2 consecutive HTTP calls
RandomBackoff *time.Duration `mapstructure:"random_backoff"`

// Minimum Delay of 2 calls (includes download)
MinDelayBetweenUpdates *time.Duration `mapstructure:"min_delay_between_updates"`

// Enabled signals if this retry is enabled.
Enabled *bool
}

// DefaultRateLimitConfig returns a configuration that is populated with the
// default values.
func DefaultRateLimitConfig() *RateLimitConfig {
return &RateLimitConfig{}
}

// Copy returns a deep copy of this configuration.
func (c *RateLimitConfig) Copy() *RateLimitConfig {
if c == nil {
return nil
}

var o RateLimitConfig

o.RandomBackoff = c.RandomBackoff

o.MinDelayBetweenUpdates = c.MinDelayBetweenUpdates

o.Enabled = c.Enabled

return &o
}

// Merge combines all values in this configuration with the values in the other
// configuration, with values in the other configuration taking precedence.
// Maps and slices are merged, most other values are overwritten. Complex
// structs define their own merge functionality.
func (c *RateLimitConfig) Merge(o *RateLimitConfig) *RateLimitConfig {
if c == nil {
if o == nil {
return nil
}
return o.Copy()
}

if o == nil {
return c.Copy()
}

r := c.Copy()

if o.RandomBackoff != nil {
r.RandomBackoff = o.RandomBackoff
}
if o.MinDelayBetweenUpdates != nil {
r.MinDelayBetweenUpdates = o.MinDelayBetweenUpdates
}

if o.Enabled != nil {
r.Enabled = o.Enabled
}

return r
}

// RateLimitFunc returns the RateLimit function associated with this configuration.
func (c *RateLimitConfig) RateLimitFunc() RateLimitFunc {
return func(lastCallDuration time.Duration) (bool, time.Duration) {
if !BoolVal(c.Enabled) {
return false, 0
}
remaining := *c.MinDelayBetweenUpdates - lastCallDuration
if remaining < 0 {
remaining = 0
}
if c.RandomBackoff.Nanoseconds() < 1 {
return remaining > 0, remaining
}
random := time.Duration(rand.Int63n(int64(c.RandomBackoff.Nanoseconds())))
return true, (remaining + random)
}
}

// Finalize ensures there no nil pointers.
func (c *RateLimitConfig) Finalize() {

if c.RandomBackoff == nil {
c.RandomBackoff = TimeDuration(DefaultRandomBackoff)
}

if c.MinDelayBetweenUpdates == nil {
c.MinDelayBetweenUpdates = TimeDuration(DefaultMinDelayBetweenUpdates)
}

if c.Enabled == nil {
c.Enabled = Bool(true)
}
}

// GoString defines the printable version of this struct.
func (c *RateLimitConfig) GoString() string {
if c == nil {
return "(*RateLimitConfig)(nil)"
}

return fmt.Sprintf("&RateLimitConfig{"+
"RandomBackoff:%s, "+
"MinDelayBetweenUpdates:%s, "+
"Enabled:%s"+
"}",
TimeDurationGoString(c.RandomBackoff),
TimeDurationGoString(c.MinDelayBetweenUpdates),
BoolGoString(c.Enabled),
)
}
Loading