Skip to content

Commit

Permalink
clean expired cache entries periodically
Browse files Browse the repository at this point in the history
Signed-off-by: Rudrakh Panigrahi <rudrakh97@gmail.com>
  • Loading branch information
rudrakhp committed Nov 8, 2023
1 parent 89855df commit 3a9499d
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 37 deletions.
96 changes: 86 additions & 10 deletions topdown/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@ package cache

import (
"container/list"
"fmt"
"math"
"sync"
"time"

"github.com/open-policy-agent/opa/ast"

"github.com/open-policy-agent/opa/util"
)

const (
defaultMaxSizeBytes = int64(0) // unlimited
defaultMaxSizeBytes = int64(0) // unlimited
defaultForcedEvictionThresholdPercentage = int64(100) // trigger at max_size_bytes
defaultStaleEntryEvictionPeriodSeconds = int64(0) // never
)

// Config represents the configuration of the inter-query cache.
Expand All @@ -25,15 +30,21 @@ type Config struct {

// InterQueryBuiltinCacheConfig represents the configuration of the inter-query cache that built-in functions can utilize.
type InterQueryBuiltinCacheConfig struct {
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
MaxSizeBytes *int64 `json:"max_size_bytes,omitempty"`
ForcedEvictionThresholdPercentage *int64 `json:"forced_eviction_threshold_percentage,omitempty"`
StaleEntryEvictionPeriodSeconds *int64 `json:"stale_entry_eviction_period_seconds,omitempty"`
}

// ParseCachingConfig returns the config for the inter-query cache.
func ParseCachingConfig(raw []byte) (*Config, error) {
if raw == nil {
maxSize := new(int64)
*maxSize = defaultMaxSizeBytes
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize}}, nil
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
return &Config{InterQueryBuiltinCache: InterQueryBuiltinCacheConfig{MaxSizeBytes: maxSize, ForcedEvictionThresholdPercentage: threshold, StaleEntryEvictionPeriodSeconds: period}}, nil
}

var config Config
Expand All @@ -55,6 +66,26 @@ func (c *Config) validateAndInjectDefaults() error {
*maxSize = defaultMaxSizeBytes
c.InterQueryBuiltinCache.MaxSizeBytes = maxSize
}
if c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage == nil {
threshold := new(int64)
*threshold = defaultForcedEvictionThresholdPercentage
c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage = threshold
} else {
threshold := *c.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
if threshold < 0 || threshold > 100 {
return fmt.Errorf("invalid forced_eviction_threshold_percentage %v", threshold)
}
}
if c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds == nil {
period := new(int64)
*period = defaultStaleEntryEvictionPeriodSeconds
c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds = period
} else {
period := *c.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
if period < 0 {
return fmt.Errorf("invalid stale_entry_eviction_period_seconds %v", period)
}
}
return nil
}

Expand All @@ -67,24 +98,40 @@ type InterQueryCacheValue interface {
// InterQueryCache defines the interface for the inter-query cache.
type InterQueryCache interface {
Get(key ast.Value) (value InterQueryCacheValue, found bool)
Insert(key ast.Value, value InterQueryCacheValue) int
Insert(key ast.Value, value InterQueryCacheValue, expiresAt time.Time) int
Delete(key ast.Value)
UpdateConfig(config *Config)
Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
}

// NewInterQueryCache returns a new inter-query cache.
func NewInterQueryCache(config *Config) InterQueryCache {
return &cache{
iqCache := &cache{
items: map[string]cacheItem{},
usage: 0,
config: config,
l: list.New(),
}

// Start routine to clean up stale values once every StaleEntryEvictionPeriodSeconds
cleanupPeriod := iqCache.staleEntryEvictionTimePeriodSeconds()
if cleanupPeriod > 0 {
ticker := time.NewTicker(time.Duration(cleanupPeriod) * time.Second)
go func() {
defer ticker.Stop()
for {
<-ticker.C
iqCache.cleanStaleValues()
}
}()
}

return iqCache
}

type cacheItem struct {
value InterQueryCacheValue
expiresAt time.Time
keyElement *list.Element
}

Expand All @@ -97,10 +144,10 @@ type cache struct {
}

// Insert inserts a key k into the cache with value v.
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func (c *cache) Insert(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.unsafeInsert(k, v)
return c.unsafeInsert(k, v, expiresAt)
}

// Get returns the value in the cache for k.
Expand Down Expand Up @@ -137,10 +184,9 @@ func (c *cache) Clone(value InterQueryCacheValue) (InterQueryCacheValue, error)
return c.unsafeClone(value)
}

func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int) {
func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue, expiresAt time.Time) (dropped int) {
size := v.SizeInBytes()
limit := c.maxSizeBytes()

limit := int64(math.Ceil(float64(c.forcedEvictionThresholdPercentage())/100.0) * (float64(c.maxSizeBytes())))
if limit > 0 {
if size > limit {
dropped++
Expand All @@ -159,6 +205,7 @@ func (c *cache) unsafeInsert(k ast.Value, v InterQueryCacheValue) (dropped int)

c.items[k.String()] = cacheItem{
value: v,
expiresAt: expiresAt,
keyElement: c.l.PushBack(k),
}
c.usage += size
Expand Down Expand Up @@ -191,3 +238,32 @@ func (c *cache) maxSizeBytes() int64 {
}
return *c.config.InterQueryBuiltinCache.MaxSizeBytes
}

func (c *cache) forcedEvictionThresholdPercentage() int64 {
if c.config == nil {
return defaultForcedEvictionThresholdPercentage
}
return *c.config.InterQueryBuiltinCache.ForcedEvictionThresholdPercentage
}

func (c *cache) staleEntryEvictionTimePeriodSeconds() int64 {
if c.config == nil {
return defaultStaleEntryEvictionPeriodSeconds
}
return *c.config.InterQueryBuiltinCache.StaleEntryEvictionPeriodSeconds
}

func (c *cache) cleanStaleValues() (dropped int) {
c.mtx.Lock()
defer c.mtx.Unlock()
for key := c.l.Front(); key != nil; {
nextKey := key.Next()
// if expiresAt is zero, the item doesn't have an expiry
if ea := c.items[(key.Value.(ast.Value)).String()].expiresAt; !ea.IsZero() && ea.Before(time.Now()) {
c.unsafeDelete(key.Value.(ast.Value))
dropped++
}
key = nextKey
}
return dropped
}
Loading

0 comments on commit 3a9499d

Please sign in to comment.