From e1de25deba33d7de8ab1880eca1966ef9892cd09 Mon Sep 17 00:00:00 2001 From: Rudrakh Panigrahi Date: Sat, 18 Nov 2023 12:04:58 +0530 Subject: [PATCH] clean expired cache entries periodically --- runtime/runtime.go | 8 +++ sdk/opa.go | 2 +- server/server.go | 6 +- topdown/cache/cache.go | 126 ++++++++++++++++++++++++++++++---- topdown/cache/cache_test.go | 131 +++++++++++++++++++++++++++++++++++- topdown/http.go | 19 +++--- topdown/http_test.go | 6 +- 7 files changed, 269 insertions(+), 29 deletions(-) diff --git a/runtime/runtime.go b/runtime/runtime.go index 09d1ab1d8c4..80feaf6fe44 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -562,15 +562,18 @@ func (rt *Runtime) Serve(ctx context.Context) error { rt.server = rt.server.WithUnixSocketPermission(rt.Params.UnixSocketPerm) } + ctx, cancel := context.WithCancel(ctx) rt.server, err = rt.server.Init(ctx) if err != nil { rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to initialize server.") + cancel() return err } if rt.Params.Watch { if err := rt.startWatcher(ctx, rt.Params.Paths, rt.onReloadLogger); err != nil { rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to open watch.") + cancel() return err } } @@ -594,12 +597,14 @@ func (rt *Runtime) Serve(ctx context.Context) error { 100*time.Millisecond, time.Second*time.Duration(rt.Params.ReadyTimeout)); err != nil { rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to wait for plugins activation.") + cancel() return err } loops, err := rt.server.Listeners() if err != nil { rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to create listeners.") + cancel() return err } @@ -630,11 +635,14 @@ func (rt *Runtime) Serve(ctx context.Context) error { for { select { case <-ctx.Done(): + cancel() return rt.gracefulServerShutdown(rt.server) case <-signalc: + cancel() return rt.gracefulServerShutdown(rt.server) case err := <-errc: rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Listener failed.") + cancel() os.Exit(1) } } diff --git a/sdk/opa.go b/sdk/opa.go index f7e2cccaa1f..93ad6138962 100644 --- a/sdk/opa.go +++ b/sdk/opa.go @@ -212,7 +212,7 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b opa.state.manager = manager opa.state.queryCache.Clear() - opa.state.interQueryBuiltinCache = cache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig()) + opa.state.interQueryBuiltinCache = cache.NewInterQueryCacheWithContext(ctx, manager.InterQueryBuiltinCacheConfig()) opa.config = bs return nil diff --git a/server/server.go b/server/server.go index a2b652dd2ff..7e1ab78c5d6 100644 --- a/server/server.go +++ b/server/server.go @@ -165,7 +165,7 @@ func New() *Server { // Init initializes the server. This function MUST be called before starting any loops // from s.Listeners(). func (s *Server) Init(ctx context.Context) (*Server, error) { - s.initRouters() + s.initRouters(ctx) txn, err := s.store.NewTransaction(ctx, storage.WriteParams) if err != nil { @@ -706,7 +706,7 @@ func (s *Server) initHandlerCompression(handler http.Handler) (http.Handler, err return compressHandler, nil } -func (s *Server) initRouters() { +func (s *Server) initRouters(ctx context.Context) { mainRouter := s.router if mainRouter == nil { mainRouter = mux.NewRouter() @@ -715,7 +715,7 @@ func (s *Server) initRouters() { diagRouter := mux.NewRouter() // authorizer, if configured, needs the iCache to be set up already - s.interQueryBuiltinCache = iCache.NewInterQueryCache(s.manager.InterQueryBuiltinCacheConfig()) + s.interQueryBuiltinCache = iCache.NewInterQueryCacheWithContext(ctx, s.manager.InterQueryBuiltinCacheConfig()) s.manager.RegisterCacheTrigger(s.updateCacheConfig) // Add authorization handler. This must come BEFORE authentication handler diff --git a/topdown/cache/cache.go b/topdown/cache/cache.go index f9d2bcff752..d58278c1f25 100644 --- a/topdown/cache/cache.go +++ b/topdown/cache/cache.go @@ -7,15 +7,20 @@ package cache import ( "container/list" + "context" + "fmt" + "math" "sync" + "time" "github.com/open-policy-agent/opa/ast" - "github.com/open-policy-agent/opa/util" ) const ( - defaultMaxSizeBytes = int64(0) // unlimited + defaultMaxSizeBytes = int64(0) // unlimited + defaultForcedEvictionThresholdPercentage = int64(100) // trigger at max_size_bytes + defaultStaleEntryEvictionPeriodSeconds = int64(0) // never ) // Config represents the configuration of the inter-query cache. @@ -25,7 +30,9 @@ type Config struct { // InterQueryBuiltinCacheConfig represents the configuration of the inter-query cache that built-in functions can utilize. type InterQueryBuiltinCacheConfig struct { - MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"` + MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"` + ForcedEvictionThresholdPercentage *int64 `json:"forced_eviction_threshold_percentage,omitempty"` + StaleEntryEvictionPeriodSeconds *int64 `json:"stale_entry_eviction_period_seconds,omitempty"` } // ParseCachingConfig returns the config for the inter-query cache. @@ -33,7 +40,11 @@ func ParseCachingConfig(raw []byte) (*Config, error) { if raw == nil { maxSize := new(int64) *maxSize = defaultMaxSizeBytes - return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}}, nil + threshold := new(int64) + *threshold = defaultForcedEvictionThresholdPercentage + period := new(int64) + *period = defaultStaleEntryEvictionPeriodSeconds + return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, ForcedEvictionThresholdPercentage: threshold, StaleEntryEvictionPeriodSeconds: period}}, nil } var config Config @@ -55,6 +66,26 @@ func (c *Config) validateAndInjectDefaults() error { *maxSize = defaultMaxSizeBytes c.InterQueryBuiltinCache.MaxSizeBytes = maxSize } + if c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage == nil { + threshold := new(int64) + *threshold = defaultForcedEvictionThresholdPercentage + c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage = threshold + } else { + threshold := *c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage + if threshold < 0 || threshold > 100 { + return fmt.Errorf("invalid forced_eviction_threshold_percentage %v", threshold) + } + } + if c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds == nil { + period := new(int64) + *period = defaultStaleEntryEvictionPeriodSeconds + c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds = period + } else { + period := *c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds + if period < 0 { + return fmt.Errorf("invalid stale_entry_eviction_period_seconds %v", period) + } + } return nil } @@ -68,6 +99,7 @@ type InterQueryCacheValue interface { type InterQueryCache interface { Get(key ast.Value) (value InterQueryCacheValue, found bool) Insert(key ast.Value, value InterQueryCacheValue) int + InsertWithExpiry(key ast.Value, value InterQueryCacheValue, expiresAt time.Time) int Delete(key ast.Value) UpdateConfig(config *Config) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error) @@ -75,16 +107,38 @@ type InterQueryCache interface { // NewInterQueryCache returns a new inter-query cache. func NewInterQueryCache(config *Config) InterQueryCache { - return &cache{ - items: map[string]cacheItem{}, - usage: 0, - config: config, - l: list.New(), + return newCache(config) +} + +// NewInterQueryCacheWithContext returns a new inter-query cache with context. +func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache { + iqCache := newCache(config) + + // Start routine to clean up stale values once every StaleEntryEvictionPeriodSeconds + cleanupPeriod := iqCache.staleEntryEvictionTimePeriodSeconds() + if cleanupPeriod > 0 { + ticker := time.NewTicker(time.Duration(cleanupPeriod) * time.Second) + go func() { + defer func() { + ticker.Stop() + }() + for { + select { + case <-ticker.C: + iqCache.cleanStaleValues() + case <-ctx.Done(): + return + } + } + }() } + + return iqCache } type cacheItem struct { value InterQueryCacheValue + expiresAt time.Time keyElement *list.Element } @@ -96,11 +150,26 @@ type cache struct { mtx sync.Mutex } +func newCache(config *Config) *cache { + return &cache{ + items: map[string]cacheItem{}, + usage: 0, + config: config, + l: list.New(), + } +} + // Insert inserts a key k into the cache with value v. -func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) { +func (c *cache) InsertWithExpiry(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) { c.mtx.Lock() defer c.mtx.Unlock() - return c.unsafeInsert(k, v) + return c.unsafeInsert(k, v, expiresAt) +} + +// Insert inserts a key k into the cache with value v. +func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) { + // zero time value indicates no expiry + return c.InsertWithExpiry(k, v, time.Time{}) } // Get returns the value in the cache for k. @@ -137,10 +206,9 @@ func (c *cache) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error) return c.unsafeClone(value) } -func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) { +func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) { size := v.SizeInBytes() - limit := c.maxSizeBytes() - + limit := int64(math.Ceil(float64(c.forcedEvictionThresholdPercentage())/100.0) * (float64(c.maxSizeBytes()))) if limit > 0 { if size > limit { dropped++ @@ -159,6 +227,7 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) c.items[k.String()] = cacheItem{ value: v, + expiresAt: expiresAt, keyElement: c.l.PushBack(k), } c.usage += size @@ -191,3 +260,32 @@ func (c *cache) maxSizeBytes() int64 { } return *c.config.InterQueryBuiltinCache.MaxSizeBytes } + +func (c *cache) forcedEvictionThresholdPercentage() int64 { + if c.config == nil { + return defaultForcedEvictionThresholdPercentage + } + return *c.config.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage +} + +func (c *cache) staleEntryEvictionTimePeriodSeconds() int64 { + if c.config == nil { + return defaultStaleEntryEvictionPeriodSeconds + } + return *c.config.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds +} + +func (c *cache) cleanStaleValues() (dropped int) { + c.mtx.Lock() + defer c.mtx.Unlock() + for key := c.l.Front(); key != nil; { + nextKey := key.Next() + // if expiresAt is zero, the item doesn't have an expiry + if ea := c.items[(key.Value.(ast.Value)).String()].expiresAt; !ea.IsZero() && ea.Before(time.Now()) { + c.unsafeDelete(key.Value.(ast.Value)) + dropped++ + } + key = nextKey + } + return dropped +} diff --git a/topdown/cache/cache_test.go b/topdown/cache/cache_test.go index 85375e93f0c..2717f5cef0d 100644 --- a/topdown/cache/cache_test.go +++ b/topdown/cache/cache_test.go @@ -5,9 +5,11 @@ package cache import ( + "context" "reflect" "sync" "testing" + "time" "github.com/open-policy-agent/opa/ast" ) @@ -15,7 +17,11 @@ import ( func TestParseCachingConfig(t *testing.T) { maxSize := new(int64) *maxSize = defaultMaxSizeBytes - expected := &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}} + period := new(int64) + *period = defaultStaleEntryEvictionPeriodSeconds + threshold := new(int64) + *threshold = defaultForcedEvictionThresholdPercentage + expected := &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, StaleEntryEvictionPeriodSeconds: period, ForcedEvictionThresholdPercentage: threshold}} tests := map[string]struct { input []byte @@ -277,6 +283,129 @@ func TestDelete(t *testing.T) { verifyCacheList(t, cache) } +func TestInsertWithExpiryAndEviction(t *testing.T) { + // 50 byte max size + // 1s stale cleanup period + // 80% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 50, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 80},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + cache := NewInterQueryCacheWithContext(context.Background(), config) + + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("force_evicted_foo").Value, cacheValue, time.Now().Add(100*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + + // Ensure stale entries clean up routine runs at least once + time.Sleep(2 * time.Second) + + // Entry deleted even though not expired because force evicted when foo is inserted + if fetchedCacheValue, found := cache.Get(ast.StringTerm("force_evicted_foo").Value); found { + t.Fatalf("Didn't expect cache entry for force_evicted_foo, found entry with value %v", fetchedCacheValue) + } + // Stale clean up routine runs and deletes expired entry + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); found { + t.Fatalf("Didn't expect cache entry for expired_foo, found entry with value %v", fetchedCacheValue) + } + // Stale clean up routine runs but doesn't delete the entry + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found %v", cacheValue, fetchedCacheValue) + } +} + +func TestInsertHighTTLWithStaleEntryCleanup(t *testing.T) { + // 40 byte max size + // 1s stale cleanup period + // 100% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 40, "stale_entry_eviction_period_seconds": 1, "forced_eviction_threshold_percentage": 100},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + cache := NewInterQueryCacheWithContext(context.Background(), config) + + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found no entry", fetchedCacheValue) + } + + // Ensure stale entries clean up routine runs at least once + time.Sleep(2 * time.Second) + + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v, found %v", cacheValue, fetchedCacheValue) + } + + // Since expired_foo is deleted by stale cleanup routine, high_ttl_foo is not evicted when foo is inserted + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for high_ttl_foo, found %v", cacheValue, fetchedCacheValue) + } + // Stale clean up routine runs and deletes expired entry + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); found { + t.Fatalf("Didn't expect cache entry for expired_foo, found entry with value %v", fetchedCacheValue) + } +} + +func TestInsertHighTTLWithoutStaleEntryCleanup(t *testing.T) { + // 40 byte max size + // 0s stale cleanup period -> no cleanup + // 100% threshold to for FIFO eviction (eviction after 40 bytes) + in := `{"inter_query_builtin_cache": {"max_size_bytes": 40, "stale_entry_eviction_period_seconds": 0, "forced_eviction_threshold_percentage": 100},}` + + config, err := ParseCachingConfig([]byte(in)) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + cache := NewInterQueryCacheWithContext(context.Background(), config) + + cacheValue := newInterQueryCacheValue(ast.StringTerm("bar").Value, 20) + cache.InsertWithExpiry(ast.StringTerm("high_ttl_foo").Value, cacheValue, time.Now().Add(100*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for high_ttl_foo, found no entry", fetchedCacheValue) + } + cache.InsertWithExpiry(ast.StringTerm("expired_foo").Value, cacheValue, time.Now().Add(1*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for expired_foo, found no entry", fetchedCacheValue) + } + + cache.InsertWithExpiry(ast.StringTerm("foo").Value, cacheValue, time.Now().Add(10*time.Second)) + if fetchedCacheValue, found := cache.Get(ast.StringTerm("foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for foo, found no entry", fetchedCacheValue) + } + + // Since stale cleanup routine is disabled, high_ttl_foo is evicted when foo is inserted + if fetchedCacheValue, found := cache.Get(ast.StringTerm("high_ttl_foo").Value); found { + t.Fatalf("Didn't expect cache entry for high_ttl_foo, found entry with value %v", fetchedCacheValue) + } + // Stale clean up disabled so expired entry exists + if fetchedCacheValue, found := cache.Get(ast.StringTerm("expired_foo").Value); !found { + t.Fatalf("Expected cache entry with value %v for expired_foo, found no entry", fetchedCacheValue) + } +} + func TestUpdateConfig(t *testing.T) { config, err := ParseCachingConfig(nil) if err != nil { diff --git a/topdown/http.go b/topdown/http.go index bf5dbb55d30..d4d67d85ecd 100644 --- a/topdown/http.go +++ b/topdown/http.go @@ -888,7 +888,7 @@ func (c *interQueryCache) checkHTTPSendInterQueryCache() (ast.Value, error) { pcv = cachedRespData } - c.bctx.InterQueryBuiltinCache.Insert(c.key, pcv) + c.bctx.InterQueryBuiltinCache.InsertWithExpiry(c.key, pcv, cachedRespData.ExpiresAt) return cachedRespData.formatToAST(c.forceJSONDecode, c.forceYAMLDecode) } @@ -924,18 +924,19 @@ func insertIntoHTTPSendInterQueryCache(bctx BuiltinContext, key ast.Value, resp } var pcv cache.InterQueryCacheValue - + var pcvData *interQueryCacheData if cachingMode == defaultCachingMode { - pcv, err = newInterQueryCacheValue(bctx, resp, respBody, cacheParams) + pcv, pcvData, err = newInterQueryCacheValue(bctx, resp, respBody, cacheParams) } else { - pcv, err = newInterQueryCacheData(bctx, resp, respBody, cacheParams) + pcvData, err = newInterQueryCacheData(bctx, resp, respBody, cacheParams) + pcv = pcvData } if err != nil { return err } - requestCache.Insert(key, pcv) + requestCache.InsertWithExpiry(key, pcv, pcvData.ExpiresAt) return nil } @@ -1030,17 +1031,17 @@ type interQueryCacheValue struct { Data []byte } -func newInterQueryCacheValue(bctx BuiltinContext, resp *http.Response, respBody []byte, cacheParams *forceCacheParams) (*interQueryCacheValue, error) { +func newInterQueryCacheValue(bctx BuiltinContext, resp *http.Response, respBody []byte, cacheParams *forceCacheParams) (*interQueryCacheValue, *interQueryCacheData, error) { data, err := newInterQueryCacheData(bctx, resp, respBody, cacheParams) if err != nil { - return nil, err + return nil, nil, err } b, err := json.Marshal(data) if err != nil { - return nil, err + return nil, nil, err } - return &interQueryCacheValue{Data: b}, nil + return &interQueryCacheValue{Data: b}, data, nil } func (cb interQueryCacheValue) Clone() (cache.InterQueryCacheValue, error) { diff --git a/topdown/http_test.go b/topdown/http_test.go index 8f9c71b9e37..0a50eed7754 100644 --- a/topdown/http_test.go +++ b/topdown/http_test.go @@ -2159,7 +2159,7 @@ func TestNewInterQueryCacheValue(t *testing.T) { Body: io.NopCloser(bytes.NewBuffer(b)), } - result, err := newInterQueryCacheValue(BuiltinContext{}, response, b, &forceCacheParams{}) + result, _, err := newInterQueryCacheValue(BuiltinContext{}, response, b, &forceCacheParams{}) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -2992,6 +2992,10 @@ func (c *onlyOnceInterQueryCache) Insert(_ ast.Value, _ iCache.InterQueryCacheVa return 0 } +func (c *onlyOnceInterQueryCache) InsertWithExpiry(_ ast.Value, _ iCache.InterQueryCacheValue, _ time.Time) int { + return 0 +} + func (c *onlyOnceInterQueryCache) Delete(_ ast.Value) {} func (c *onlyOnceInterQueryCache) UpdateConfig(_ *iCache.Config) {}