Skip to content

Commit

Permalink
Added rate limiting for agent rpc calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtkiewicz committed Aug 10, 2017
1 parent d672924 commit 28c43ef
Show file tree
Hide file tree
Showing 12 changed files with 534 additions and 1 deletion.
9 changes: 9 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,15 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
base.RPCAdvertise = base.RPCAddr
}

// Rate limiting for RPC calls
if a.config.RPCRate > 0 {
base.RPCRate = a.config.RPCRate
}

if a.config.RPCMaxBurst > 0 {
base.RPCMaxBurst = a.config.RPCMaxBurst
}

// set the src address for outgoing rpc connections
// Use port 0 so that outgoing connections use a random port.
if !ipaddr.IsAny(base.RPCAddr.IP) {
Expand Down
19 changes: 19 additions & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-sockaddr/template"
"github.com/mitchellh/mapstructure"
"golang.org/x/time/rate"
)

// Ports is used to simplify the configuration by
Expand Down Expand Up @@ -785,6 +786,14 @@ type Config struct {
SessionTTLMin time.Duration `mapstructure:"-"`
SessionTTLMinRaw string `mapstructure:"session_ttl_min"`

// Rate limiter controls how frequently rpc calls are allowed to happen.
// In any large enough time interval, rate limiter limits the rate to RPCRate tokens per second,
// with a maximum burst size of RPCMaxBurst events.
// As a special case, if RPCRate == Inf (the infinite rate), RPCMaxBurst is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
RPCRate rate.Limit `mapstructure:"rpc_rate"`
RPCMaxBurst int `mapstructure:"rpc_max_burst"`

// deprecated fields
// keep them exported since otherwise the error messages don't show up
DeprecatedAtlasInfrastructure string `mapstructure:"atlas_infrastructure" json:"-"`
Expand Down Expand Up @@ -972,6 +981,9 @@ func DefaultConfig() *Config {
RetryInterval: 30 * time.Second,
RetryIntervalWan: 30 * time.Second,

RPCRate: rate.Inf,
RPCMaxBurst: 1000,

TLSMinVersion: "tls10",

EncryptVerifyIncoming: Bool(true),
Expand Down Expand Up @@ -2048,6 +2060,13 @@ func MergeConfig(a, b *Config) *Config {
result.SessionTTLMinRaw = b.SessionTTLMinRaw
}

if b.RPCRate > 0 {
result.RPCRate = b.RPCRate
}
if b.RPCMaxBurst > 0 {
result.RPCMaxBurst = b.RPCMaxBurst
}

result.HTTPConfig.BlockEndpoints = append(a.HTTPConfig.BlockEndpoints,
b.HTTPConfig.BlockEndpoints...)
if len(b.HTTPConfig.ResponseHeaders) > 0 {
Expand Down
8 changes: 8 additions & 0 deletions agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,14 @@ func TestDecodeConfig(t *testing.T) {
in: `{"retry_max_wan":123}`,
c: &Config{RetryMaxAttemptsWan: 123},
},
{
in: `{"rpc_rate": 100}`,
c: &Config{RPCRate: 100},
},
{
in: `{"rpc_max_burst": 50}`,
c: &Config{RPCMaxBurst: 50},
},
{
in: `{"serf_lan_bind":"1.2.3.4"}`,
c: &Config{SerfLanBindAddr: "1.2.3.4"},
Expand Down
12 changes: 12 additions & 0 deletions agent/consul/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -70,6 +72,8 @@ type Client struct {
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex

rpcLimiter *rate.Limiter
}

// NewClient is used to construct a new Consul client from the
Expand Down Expand Up @@ -341,6 +345,14 @@ func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
return structs.ErrNoServers
}

metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)

// Check rate
if !c.rpcLimiter.Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}

// Forward to remote Consul
if err := c.connPool.RPC(c.config.Datacenter, server.Addr, server.Version, method, server.UseTLS, args, reply); err != nil {
c.routers.NotifyFailedServer(server)
Expand Down
12 changes: 12 additions & 0 deletions agent/consul/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -293,6 +294,14 @@ type Config struct {
// place, and a small jitter is applied to avoid a thundering herd.
RPCHoldTimeout time.Duration

// Rate limiter controls how frequently rpc calls are allowed to happen.
// In any large enough time interval, rate limiter limits the rate to RPCRate tokens per second,
// with a maximum burst size of RPCMaxBurst events.
// As a special case, if RPCRate == Inf (the infinite rate), RPCMaxBurst is ignored.
// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.
RPCRate rate.Limit
RPCMaxBurst int

// AutopilotConfig is used to apply the initial autopilot config when
// bootstrapping.
AutopilotConfig *structs.AutopilotConfig
Expand Down Expand Up @@ -376,6 +385,9 @@ func DefaultConfig() *Config {
// than enough when running in the high performance mode.
RPCHoldTimeout: 7 * time.Second,

RPCRate: rate.Inf,
RPCMaxBurst: 1000,

TLSMinVersion: "tls10",

AutopilotConfig: &structs.AutopilotConfig{
Expand Down
1 change: 1 addition & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
ErrNoDCPath = fmt.Errorf("No path to datacenter")
ErrNoServers = fmt.Errorf("No known Consul servers")
ErrNotReadyForConsistentReads = fmt.Errorf("Not ready to serve consistent reads")
ErrRPCRateExceeded = fmt.Errorf("RPC rate limit exceeded")
)

type MessageType uint8
Expand Down
27 changes: 27 additions & 0 deletions vendor/golang.org/x/time/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions vendor/golang.org/x/time/PATENTS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 28c43ef

Please sign in to comment.