Skip to content

Commit

Permalink
Merge pull request #8552 from pierresouchay/reload_cache_throttling_c…
Browse files Browse the repository at this point in the history
…onfig

Ensure that Cache options are reloaded when `consul reload` is performed
  • Loading branch information
dnephin authored Aug 28, 2020
2 parents 95094b8 + d5974b1 commit 72bf350
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 4 deletions.
6 changes: 6 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3764,6 +3764,12 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error {
return err
}

if a.cache.ReloadOptions(newCfg.Cache) {
a.logger.Info("Cache options have been updated")
} else {
a.logger.Debug("Cache options have not been modified")
}

// Update filtered metrics
metrics.UpdateFilter(newCfg.Telemetry.AllowedPrefixes,
newCfg.Telemetry.BlockedPrefixes)
Expand Down
11 changes: 10 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"gopkg.in/square/go-jose.v2/jwt"
)

Expand Down Expand Up @@ -765,10 +766,18 @@ func TestCacheRateLimit(test *testing.T) {
test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
tt := currentTest
t.Parallel()
a := NewTestAgent(t, fmt.Sprintf("cache = { entry_fetch_rate = %v, entry_fetch_max_burst = 1 }", tt.rateLimit))
a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

cfg := a.config
require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate)
require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst)
cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit)
cfg.Cache.EntryFetchMaxBurst = 1
a.reloadConfigInternal(cfg)
require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate)
require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst)
var wg sync.WaitGroup
stillProcessing := true

Expand Down
38 changes: 35 additions & 3 deletions agent/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,26 @@ type Options struct {
EntryFetchRate rate.Limit
}

// New creates a new cache with the given RPC client and reasonable defaults.
// Further settings can be tweaked on the returned value.
func New(options Options) *Cache {
// Equal return true if both options are equivalent
func (o Options) Equal(other Options) bool {
return o.EntryFetchMaxBurst == other.EntryFetchMaxBurst && o.EntryFetchRate == other.EntryFetchRate
}

// applyDefaultValuesOnOptions set default values on options and returned updated value
func applyDefaultValuesOnOptions(options Options) Options {
if options.EntryFetchRate == 0.0 {
options.EntryFetchRate = DefaultEntryFetchRate
}
if options.EntryFetchMaxBurst == 0 {
options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst
}
return options
}

// New creates a new cache with the given RPC client and reasonable defaults.
// Further settings can be tweaked on the returned value.
func New(options Options) *Cache {
options = applyDefaultValuesOnOptions(options)
// Initialize the heap. The buffer of 1 is really important because
// its possible for the expiry loop to trigger the heap to update
// itself and it'd block forever otherwise.
Expand Down Expand Up @@ -234,6 +244,28 @@ func (c *Cache) RegisterType(n string, typ Type) {
c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts}
}

// ReloadOptions updates the cache with the new options
// return true if Cache is updated, false if already up to date
func (c *Cache) ReloadOptions(options Options) bool {
options = applyDefaultValuesOnOptions(options)
modified := !options.Equal(c.options)
if modified {
c.entriesLock.RLock()
defer c.entriesLock.RUnlock()
for _, entry := range c.entries {
if c.options.EntryFetchRate != options.EntryFetchRate {
entry.FetchRateLimiter.SetLimit(options.EntryFetchRate)
}
if c.options.EntryFetchMaxBurst != options.EntryFetchMaxBurst {
entry.FetchRateLimiter.SetBurst(options.EntryFetchMaxBurst)
}
}
c.options.EntryFetchRate = options.EntryFetchRate
c.options.EntryFetchMaxBurst = options.EntryFetchMaxBurst
}
return modified
}

// Get loads the data for the given type and request. If data satisfying the
// minimum index is present in the cache, it is returned immediately. Otherwise,
// this will block until the data is available or the request timeout is
Expand Down
59 changes: 59 additions & 0 deletions agent/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
)

// Test a basic Get with no indexes (and therefore no blocking queries).
Expand Down Expand Up @@ -1220,6 +1221,64 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
typ.AssertExpectations(t)
}

// Test a get with an index set will wait until an index that is higher
// is set in the cache.
func TestCacheReload(t *testing.T) {
t.Parallel()

typ1 := TestType(t)
defer typ1.AssertExpectations(t)

c := New(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1})
c.RegisterType("t1", typ1)
typ1.Mock.On("Fetch", mock.Anything, mock.Anything).Return(FetchResult{Value: 42, Index: 42}, nil).Maybe()

require.False(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1}), "Value should not be reloaded")

_, meta, err := c.Get(context.Background(), "t1", TestRequest(t, RequestInfo{Key: "hello1", MinIndex: uint64(1)}))
require.NoError(t, err)
require.Equal(t, meta.Index, uint64(42))

testEntry := func(t *testing.T, doTest func(t *testing.T, entry cacheEntry)) {
c.entriesLock.Lock()
tEntry, ok := c.types["t1"]
require.True(t, ok)
keyName := makeEntryKey("t1", "", "", "hello1")
ok, entryValid, entry := c.getEntryLocked(tEntry, keyName, RequestInfo{})
require.True(t, ok)
require.True(t, entryValid)
doTest(t, entry)
c.entriesLock.Unlock()

}
testEntry(t, func(t *testing.T, entry cacheEntry) {
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1))
require.Equal(t, entry.FetchRateLimiter.Burst(), 1)
})

// Modify only rateLimit
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 1}))
testEntry(t, func(t *testing.T, entry cacheEntry) {
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100))
require.Equal(t, entry.FetchRateLimiter.Burst(), 1)
})

// Modify only Burst
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 5}))
testEntry(t, func(t *testing.T, entry cacheEntry) {
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100))
require.Equal(t, entry.FetchRateLimiter.Burst(), 5)
})

// Modify only Burst and Limit at the same time
require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1000), EntryFetchMaxBurst: 42}))

testEntry(t, func(t *testing.T, entry cacheEntry) {
require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1000))
require.Equal(t, entry.FetchRateLimiter.Burst(), 42)
})
}

// TestCacheThrottle checks the assumptions for the cache throttling. It sets
// up a cache with Options{EntryFetchRate: 10.0, EntryFetchMaxBurst: 1}, which
// allows for 10req/s, or one request every 100ms.
Expand Down

0 comments on commit 72bf350

Please sign in to comment.