diff --git a/agent/agent.go b/agent/agent.go index c86a937a506b..43198c5e8935 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3749,6 +3749,12 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error { return err } + if a.cache.ReloadOptions(newCfg.Cache) { + a.logger.Info("Cache options have been updated") + } else { + a.logger.Debug("Cache options have not been modified") + } + // Update filtered metrics metrics.UpdateFilter(newCfg.Telemetry.AllowedPrefixes, newCfg.Telemetry.BlockedPrefixes) diff --git a/agent/agent_test.go b/agent/agent_test.go index 350edb28d97c..d87f66187d34 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -42,6 +42,7 @@ import ( "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "gopkg.in/square/go-jose.v2/jwt" ) @@ -764,10 +765,18 @@ func TestCacheRateLimit(test *testing.T) { test.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) { tt := currentTest t.Parallel() - a := NewTestAgent(t, fmt.Sprintf("cache = { entry_fetch_rate = %v, entry_fetch_max_burst = 1 }", tt.rateLimit)) + a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }") defer a.Shutdown() testrpc.WaitForTestAgent(t, a.RPC, "dc1") + cfg := a.config + require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate) + require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst) + cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit) + cfg.Cache.EntryFetchMaxBurst = 1 + a.reloadConfigInternal(cfg) + require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate) + require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst) var wg sync.WaitGroup stillProcessing := true diff --git a/agent/cache/cache.go b/agent/cache/cache.go index f0aa8d60cfaa..f7a1a0e8c7c8 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -143,16 +143,26 @@ type Options struct { EntryFetchRate rate.Limit } -// New creates a new cache with the given RPC client and reasonable defaults. -// Further settings can be tweaked on the returned value. -func New(options Options) *Cache { +// Equal return true if both options are equivalent +func (o Options) Equal(other Options) bool { + return o.EntryFetchMaxBurst == other.EntryFetchMaxBurst && o.EntryFetchRate == other.EntryFetchRate +} + +// applyDefaultValuesOnOptions set default values on options and returned updated value +func applyDefaultValuesOnOptions(options Options) Options { if options.EntryFetchRate == 0.0 { options.EntryFetchRate = DefaultEntryFetchRate } if options.EntryFetchMaxBurst == 0 { options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst } + return options +} +// New creates a new cache with the given RPC client and reasonable defaults. +// Further settings can be tweaked on the returned value. +func New(options Options) *Cache { + options = applyDefaultValuesOnOptions(options) // Initialize the heap. The buffer of 1 is really important because // its possible for the expiry loop to trigger the heap to update // itself and it'd block forever otherwise. @@ -232,6 +242,28 @@ func (c *Cache) RegisterType(n string, typ Type) { c.types[n] = typeEntry{Name: n, Type: typ, Opts: &opts} } +// ReloadOptions updates the cache with the new options +// return true if Cache is updated, false if already up to date +func (c *Cache) ReloadOptions(options Options) bool { + options = applyDefaultValuesOnOptions(options) + modified := !options.Equal(c.options) + if modified { + c.entriesLock.RLock() + defer c.entriesLock.RUnlock() + for _, entry := range c.entries { + if c.options.EntryFetchRate != options.EntryFetchRate { + entry.FetchRateLimiter.SetLimit(options.EntryFetchRate) + } + if c.options.EntryFetchMaxBurst != options.EntryFetchMaxBurst { + entry.FetchRateLimiter.SetBurst(options.EntryFetchMaxBurst) + } + } + c.options.EntryFetchRate = options.EntryFetchRate + c.options.EntryFetchMaxBurst = options.EntryFetchMaxBurst + } + return modified +} + // Get loads the data for the given type and request. If data satisfying the // minimum index is present in the cache, it is returned immediately. Otherwise, // this will block until the data is available or the request timeout is diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 54794f4c3de0..c2442ea7c12d 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" ) // Test a basic Get with no indexes (and therefore no blocking queries). @@ -1220,6 +1221,64 @@ func TestCacheGet_nonBlockingType(t *testing.T) { typ.AssertExpectations(t) } +// Test a get with an index set will wait until an index that is higher +// is set in the cache. +func TestCacheReload(t *testing.T) { + t.Parallel() + + typ1 := TestType(t) + defer typ1.AssertExpectations(t) + + c := New(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1}) + c.RegisterType("t1", typ1) + typ1.Mock.On("Fetch", mock.Anything, mock.Anything).Return(FetchResult{Value: 42, Index: 42}, nil).Maybe() + + require.False(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1), EntryFetchMaxBurst: 1}), "Value should not be reloaded") + + _, meta, err := c.Get(context.Background(), "t1", TestRequest(t, RequestInfo{Key: "hello1", MinIndex: uint64(1)})) + require.NoError(t, err) + require.Equal(t, meta.Index, uint64(42)) + + testEntry := func(t *testing.T, doTest func(t *testing.T, entry cacheEntry)) { + c.entriesLock.Lock() + tEntry, ok := c.types["t1"] + require.True(t, ok) + keyName := makeEntryKey("t1", "", "", "hello1") + ok, entryValid, entry := c.getEntryLocked(tEntry, keyName, RequestInfo{}) + require.True(t, ok) + require.True(t, entryValid) + doTest(t, entry) + c.entriesLock.Unlock() + + } + testEntry(t, func(t *testing.T, entry cacheEntry) { + require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1)) + require.Equal(t, entry.FetchRateLimiter.Burst(), 1) + }) + + // Modify only rateLimit + require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 1})) + testEntry(t, func(t *testing.T, entry cacheEntry) { + require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100)) + require.Equal(t, entry.FetchRateLimiter.Burst(), 1) + }) + + // Modify only Burst + require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(100), EntryFetchMaxBurst: 5})) + testEntry(t, func(t *testing.T, entry cacheEntry) { + require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(100)) + require.Equal(t, entry.FetchRateLimiter.Burst(), 5) + }) + + // Modify only Burst and Limit at the same time + require.True(t, c.ReloadOptions(Options{EntryFetchRate: rate.Limit(1000), EntryFetchMaxBurst: 42})) + + testEntry(t, func(t *testing.T, entry cacheEntry) { + require.Equal(t, entry.FetchRateLimiter.Limit(), rate.Limit(1000)) + require.Equal(t, entry.FetchRateLimiter.Burst(), 42) + }) +} + // TestCacheThrottle checks the assumptions for the cache throttling. It sets // up a cache with Options{EntryFetchRate: 10.0, EntryFetchMaxBurst: 1}, which // allows for 10req/s, or one request every 100ms.