Skip to content

Commit

Permalink
clean expired cache entries periodically
Browse files Browse the repository at this point in the history
Signed-off-by: Rudrakh Panigrahi <rudrakh97@gmail.com>
  • Loading branch information
rudrakhp committed Nov 18, 2023
1 parent 89855df commit 2619e32
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 31 deletions.
8 changes: 8 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,18 @@ func (rt *Runtime) Serve(ctx context.Context) error {
rt.server = rt.server.WithUnixSocketPermission(rt.Params.UnixSocketPerm)
}

ctx, cancel := context.WithCancel(ctx)
rt.server, err = rt.server.Init(ctx)
if err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to initialize server.")
cancel()
return err
}

if rt.Params.Watch {
if err := rt.startWatcher(ctx, rt.Params.Paths, rt.onReloadLogger); err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to open watch.")
cancel()
return err
}
}
Expand All @@ -594,12 +597,14 @@ func (rt *Runtime) Serve(ctx context.Context) error {
100*time.Millisecond,
time.Second*time.Duration(rt.Params.ReadyTimeout)); err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Failed to wait for plugins activation.")
cancel()
return err
}

loops, err := rt.server.Listeners()
if err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to create listeners.")
cancel()
return err
}

Expand Down Expand Up @@ -630,11 +635,14 @@ func (rt *Runtime) Serve(ctx context.Context) error {
for {
select {
case <-ctx.Done():
cancel()
return rt.gracefulServerShutdown(rt.server)
case <-signalc:
cancel()
return rt.gracefulServerShutdown(rt.server)
case err := <-errc:
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Listener failed.")
cancel()
os.Exit(1)
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b

opa.state.manager = manager
opa.state.queryCache.Clear()
opa.state.interQueryBuiltinCache = cache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig())
opa.state.interQueryBuiltinCache = cache.NewInterQueryCacheWithContext(ctx, manager.InterQueryBuiltinCacheConfig())
opa.config = bs

return nil
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func New() *Server {
// Init initializes the server. This function MUST be called before starting any loops
// from s.Listeners().
func (s *Server) Init(ctx context.Context) (*Server, error) {
s.initRouters()
s.initRouters(ctx)

txn, err := s.store.NewTransaction(ctx, storage.WriteParams)
if err != nil {
Expand Down Expand Up @@ -706,7 +706,7 @@ func (s *Server) initHandlerCompression(handler http.Handler) (http.Handler, err
return compressHandler, nil
}

func (s *Server) initRouters() {
func (s *Server) initRouters(ctx context.Context) {
mainRouter := s.router
if mainRouter == nil {
mainRouter = mux.NewRouter()
Expand All @@ -715,7 +715,7 @@ func (s *Server) initRouters() {
diagRouter := mux.NewRouter()

// authorizer, if configured, needs the iCache to be set up already
s.interQueryBuiltinCache = iCache.NewInterQueryCache(s.manager.InterQueryBuiltinCacheConfig())
s.interQueryBuiltinCache = iCache.NewInterQueryCacheWithContext(ctx, s.manager.InterQueryBuiltinCacheConfig())
s.manager.RegisterCacheTrigger(s.updateCacheConfig)

// Add authorization handler. This must come BEFORE authentication handler
Expand Down
124 changes: 110 additions & 14 deletions topdown/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ package cache

import (
"container/list"
"context"
"fmt"
"math"
"sync"
"time"

"github.com/open-policy-agent/opa/ast"

"github.com/open-policy-agent/opa/util"
)

const (
defaultMaxSizeBytes = int64(0) // unlimited
defaultMaxSizeBytes = int64(0) // unlimited
defaultForcedEvictionThresholdPercentage = int64(100) // trigger at max_size_bytes
defaultStaleEntryEvictionPeriodSeconds = int64(0) // never
)

// Config represents the configuration of the inter-query cache.
Expand All @@ -25,15 +30,21 @@ type Config struct {

// InterQueryBuiltinCacheConfig represents the configuration of the inter-query cache that built-in functions can utilize.
type InterQueryBuiltinCacheConfig struct {
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
ForcedEvictionThresholdPercentage *int64 `json:"forced_eviction_threshold_percentage,omitempty"`
StaleEntryEvictionPeriodSeconds *int64 `json:"stale_entry_eviction_period_seconds,omitempty"`
}

// ParseCachingConfig returns the config for the inter-query cache.
func ParseCachingConfig(raw []byte) (*Config, error) {
if raw == nil {
maxSize := new(int64)
*maxSize = defaultMaxSizeBytes
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}}, nil
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, ForcedEvictionThresholdPercentage: threshold, StaleEntryEvictionPeriodSeconds: period}}, nil
}

var config Config
Expand All @@ -55,6 +66,26 @@ func (c *Config) validateAndInjectDefaults() error {
*maxSize = defaultMaxSizeBytes
c.InterQueryBuiltinCache.MaxSizeBytes = maxSize
}
if c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage == nil {
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage = threshold
} else {
threshold := *c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
if threshold < 0 || threshold > 100 {
return fmt.Errorf("invalid forced_eviction_threshold_percentage %v", threshold)
}
}
if c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds == nil {
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds = period
} else {
period := *c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
if period < 0 {
return fmt.Errorf("invalid stale_entry_eviction_period_seconds %v", period)
}
}
return nil
}

Expand All @@ -68,23 +99,44 @@ type InterQueryCacheValue interface {
type InterQueryCache interface {
Get(key ast.Value) (value InterQueryCacheValue, found bool)
Insert(key ast.Value, value InterQueryCacheValue) int
InsertWithExpiry(key ast.Value, value InterQueryCacheValue, expiresAt time.Time) int
Delete(key ast.Value)
UpdateConfig(config *Config)
Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
}

// NewInterQueryCache returns a new inter-query cache.
func NewInterQueryCache(config *Config) InterQueryCache {
return &cache{
items: map[string]cacheItem{},
usage: 0,
config: config,
l: list.New(),
return newCache(config)
}

// NewInterQueryCacheWithContext returns a new inter-query cache with context.
func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache {
iqCache := newCache(config)

// Start routine to clean up stale values once every StaleEntryEvictionPeriodSeconds
cleanupPeriod := iqCache.staleEntryEvictionTimePeriodSeconds()
if cleanupPeriod > 0 {
ticker := time.NewTicker(time.Duration(cleanupPeriod) * time.Second)
go func() {
defer ticker.Stop()
for {
select {
case <-ticker.C:
iqCache.cleanStaleValues()
case <-ctx.Done():
return
}
}
}()
}

return iqCache
}

type cacheItem struct {
value InterQueryCacheValue
expiresAt time.Time
keyElement *list.Element
}

Expand All @@ -96,11 +148,26 @@ type cache struct {
mtx sync.Mutex
}

func newCache(config *Config) *cache {
return &cache{
items: map[string]cacheItem{},
usage: 0,
config: config,
l: list.New(),
}
}

// Insert inserts a key k into the cache with value v.
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func (c *cache) InsertWithExpiry(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.unsafeInsert(k, v)
return c.unsafeInsert(k, v, expiresAt)
}

// Insert inserts a key k into the cache with value v.
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) {
// zero time value indicates no expiry
return c.InsertWithExpiry(k, v, time.Time{})
}

// Get returns the value in the cache for k.
Expand Down Expand Up @@ -137,10 +204,9 @@ func (c *cache) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
return c.unsafeClone(value)
}

func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
size := v.SizeInBytes()
limit := c.maxSizeBytes()

limit := int64(math.Ceil(float64(c.forcedEvictionThresholdPercentage())/100.0) * (float64(c.maxSizeBytes())))
if limit > 0 {
if size > limit {
dropped++
Expand All @@ -159,6 +225,7 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int)

c.items[k.String()] = cacheItem{
value: v,
expiresAt: expiresAt,
keyElement: c.l.PushBack(k),
}
c.usage += size
Expand Down Expand Up @@ -191,3 +258,32 @@ func (c *cache) maxSizeBytes() int64 {
}
return *c.config.InterQueryBuiltinCache.MaxSizeBytes
}

func (c *cache) forcedEvictionThresholdPercentage() int64 {
if c.config == nil {
return defaultForcedEvictionThresholdPercentage
}
return *c.config.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
}

func (c *cache) staleEntryEvictionTimePeriodSeconds() int64 {
if c.config == nil {
return defaultStaleEntryEvictionPeriodSeconds
}
return *c.config.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
}

func (c *cache) cleanStaleValues() (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
for key := c.l.Front(); key != nil; {
nextKey := key.Next()
// if expiresAt is zero, the item doesn't have an expiry
if ea := c.items[(key.Value.(ast.Value)).String()].expiresAt; !ea.IsZero() && ea.Before(time.Now()) {
c.unsafeDelete(key.Value.(ast.Value))
dropped++
}
key = nextKey
}
return dropped
}
Loading

0 comments on commit 2619e32

Please sign in to comment.