Skip to content

Commit

Permalink
clean expired cache entries periodically
Browse files Browse the repository at this point in the history
Signed-off-by: Rudrakh Panigrahi <rudrakh97@gmail.com>
  • Loading branch information
rudrakhp committed Dec 23, 2023
1 parent a1a2ae3 commit 4595908
Show file tree
Hide file tree
Showing 7 changed files with 344 additions and 37 deletions.
2 changes: 2 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,8 @@ func (rt *Runtime) Serve(ctx context.Context) error {
})
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()
rt.server, err = rt.server.Init(ctx)
if err != nil {
rt.logger.WithFields(map[string]interface{}{"err": err}).Error("Unable to initialize server.")
Expand Down
2 changes: 1 addition & 1 deletion sdk/opa.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (opa *OPA) configure(ctx context.Context, bs []byte, ready chan struct{}, b

opa.state.manager = manager
opa.state.queryCache.Clear()
opa.state.interQueryBuiltinCache = cache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig())
opa.state.interQueryBuiltinCache = cache.NewInterQueryCacheWithContext(ctx, manager.InterQueryBuiltinCacheConfig())
opa.config = bs

return nil
Expand Down
6 changes: 3 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func New() *Server {
// Init initializes the server. This function MUST be called before starting any loops
// from s.Listeners().
func (s *Server) Init(ctx context.Context) (*Server, error) {
s.initRouters()
s.initRouters(ctx)

txn, err := s.store.NewTransaction(ctx, storage.WriteParams)
if err != nil {
Expand Down Expand Up @@ -755,7 +755,7 @@ func (s *Server) initHandlerCompression(handler http.Handler) (http.Handler, err
return compressHandler, nil
}

func (s *Server) initRouters() {
func (s *Server) initRouters(ctx context.Context) {
mainRouter := s.router
if mainRouter == nil {
mainRouter = mux.NewRouter()
Expand All @@ -764,7 +764,7 @@ func (s *Server) initRouters() {
diagRouter := mux.NewRouter()

// authorizer, if configured, needs the iCache to be set up already
s.interQueryBuiltinCache = iCache.NewInterQueryCache(s.manager.InterQueryBuiltinCacheConfig())
s.interQueryBuiltinCache = iCache.NewInterQueryCacheWithContext(ctx, s.manager.InterQueryBuiltinCacheConfig())
s.manager.RegisterCacheTrigger(s.updateCacheConfig)

// Add authorization handler. This must come BEFORE authentication handler
Expand Down
140 changes: 125 additions & 15 deletions topdown/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ package cache

import (
"container/list"
"context"
"fmt"
"math"
"sync"
"time"

"github.com/open-policy-agent/opa/ast"

"github.com/open-policy-agent/opa/util"
)

const (
defaultMaxSizeBytes = int64(0) // unlimited
defaultMaxSizeBytes = int64(0) // unlimited
defaultForcedEvictionThresholdPercentage = int64(100) // trigger at max_size_bytes
defaultStaleEntryEvictionPeriodSeconds = int64(0) // never
)

// Config represents the configuration of the inter-query cache.
Expand All @@ -24,16 +29,25 @@ type Config struct {
}

// InterQueryBuiltinCacheConfig represents the configuration of the inter-query cache that built-in functions can utilize.
// MaxSizeBytes - max capacity of cache in bytes
// ForcedEvictionThresholdPercentage - capacity usage in percentage after which forced FIFO eviction starts
// StaleEntryEvictionPeriodSeconds - time period between end of previous and start of new stale entry eviction routine
type InterQueryBuiltinCacheConfig struct {
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
ForcedEvictionThresholdPercentage *int64 `json:"forced_eviction_threshold_percentage,omitempty"`
StaleEntryEvictionPeriodSeconds *int64 `json:"stale_entry_eviction_period_seconds,omitempty"`
}

// ParseCachingConfig returns the config for the inter-query cache.
func ParseCachingConfig(raw []byte) (*Config, error) {
if raw == nil {
maxSize := new(int64)
*maxSize = defaultMaxSizeBytes
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}}, nil
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, ForcedEvictionThresholdPercentage: threshold, StaleEntryEvictionPeriodSeconds: period}}, nil
}

var config Config
Expand All @@ -55,6 +69,26 @@ func (c *Config) validateAndInjectDefaults() error {
*maxSize = defaultMaxSizeBytes
c.InterQueryBuiltinCache.MaxSizeBytes = maxSize
}
if c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage == nil {
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage = threshold
} else {
threshold := *c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
if threshold < 0 || threshold > 100 {
return fmt.Errorf("invalid forced_eviction_threshold_percentage %v", threshold)
}
}
if c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds == nil {
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds = period
} else {
period := *c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
if period < 0 {
return fmt.Errorf("invalid stale_entry_eviction_period_seconds %v", period)
}
}
return nil
}

Expand All @@ -68,23 +102,55 @@ type InterQueryCacheValue interface {
type InterQueryCache interface {
Get(key ast.Value) (value InterQueryCacheValue, found bool)
Insert(key ast.Value, value InterQueryCacheValue) int
InsertWithExpiry(key ast.Value, value InterQueryCacheValue, expiresAt time.Time) int
Delete(key ast.Value)
UpdateConfig(config *Config)
Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
}

// NewInterQueryCache returns a new inter-query cache.
// The cache uses a FIFO eviction policy when it reaches the forced eviction threshold.
// Parameters:
//
// config - to configure the InterQueryCache
func NewInterQueryCache(config *Config) InterQueryCache {
return &cache{
items: map[string]cacheItem{},
usage: 0,
config: config,
l: list.New(),
return newCache(config)
}

// NewInterQueryCacheWithContext returns a new inter-query cache with context.
// The cache uses a combination of FIFO eviction policy when it reaches the forced eviction threshold
// and a periodic cleanup routine to remove stale entries that exceed their expiration time, if specified.
// If configured with a zero stale_entry_eviction_period_seconds value, the stale entry cleanup routine is disabled.
//
// Parameters:
//
// ctx - used to control lifecycle of the stale entry cleanup routine
// config - to configure the InterQueryCache
func NewInterQueryCacheWithContext(ctx context.Context, config *Config) InterQueryCache {
iqCache := newCache(config)
if iqCache.staleEntryEvictionTimePeriodSeconds() > 0 {
cleanupTicker := time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
go func() {
defer cleanupTicker.Stop()
for {
select {
case <-cleanupTicker.C:
cleanupTicker.Stop()
iqCache.cleanStaleValues()
cleanupTicker = time.NewTicker(time.Duration(iqCache.staleEntryEvictionTimePeriodSeconds()) * time.Second)
case <-ctx.Done():
return
}
}
}()
}

return iqCache
}

type cacheItem struct {
value InterQueryCacheValue
expiresAt time.Time
keyElement *list.Element
}

Expand All @@ -96,11 +162,26 @@ type cache struct {
mtx sync.Mutex
}

// Insert inserts a key k into the cache with value v.
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func newCache(config *Config) *cache {
return &cache{
items: map[string]cacheItem{},
usage: 0,
config: config,
l: list.New(),
}
}

// InsertWithExpiry inserts a key k into the cache with value v with an expiration time expiresAt.
// A zero time value for expiresAt indicates no expiry
func (c *cache) InsertWithExpiry(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.unsafeInsert(k, v)
return c.unsafeInsert(k, v, expiresAt)
}

// Insert inserts a key k into the cache with value v with no expiration time.
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) {
return c.InsertWithExpiry(k, v, time.Time{})
}

// Get returns the value in the cache for k.
Expand Down Expand Up @@ -137,10 +218,9 @@ func (c *cache) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
return c.unsafeClone(value)
}

func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
size := v.SizeInBytes()
limit := c.maxSizeBytes()

limit := int64(math.Ceil(float64(c.forcedEvictionThresholdPercentage())/100.0) * (float64(c.maxSizeBytes())))
if limit > 0 {
if size > limit {
dropped++
Expand All @@ -159,6 +239,7 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int)

c.items[k.String()] = cacheItem{
value: v,
expiresAt: expiresAt,
keyElement: c.l.PushBack(k),
}
c.usage += size
Expand Down Expand Up @@ -191,3 +272,32 @@ func (c *cache) maxSizeBytes() int64 {
}
return *c.config.InterQueryBuiltinCache.MaxSizeBytes
}

func (c *cache) forcedEvictionThresholdPercentage() int64 {
if c.config == nil {
return defaultForcedEvictionThresholdPercentage
}
return *c.config.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
}

func (c *cache) staleEntryEvictionTimePeriodSeconds() int64 {
if c.config == nil {
return defaultStaleEntryEvictionPeriodSeconds
}
return *c.config.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
}

func (c *cache) cleanStaleValues() (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
for key := c.l.Front(); key != nil; {
nextKey := key.Next()
// if expiresAt is zero, the item doesn't have an expiry
if ea := c.items[(key.Value.(ast.Value)).String()].expiresAt; !ea.IsZero() && ea.Before(time.Now()) {
c.unsafeDelete(key.Value.(ast.Value))
dropped++
}
key = nextKey
}
return dropped
}
Loading

0 comments on commit 4595908

Please sign in to comment.