diff --git a/backend.go b/backend.go index 489d0b2..ffb5483 100644 --- a/backend.go +++ b/backend.go @@ -14,6 +14,8 @@ type backend[K comparable, V any] interface { Set(key K, v V) // Delete the value for key. Delete(key K) + // DeleteIf deletes all values that match the predicate. + DeleteIf(predicate func(key K, value V) bool) // Purge all values. Purge() } @@ -37,6 +39,14 @@ func (m mapBackend[K, V]) Delete(key K) { delete(m, key) } +func (m mapBackend[K, V]) DeleteIf(predicate func(key K, value V) bool) { + for k, v := range m { + if predicate(k, v) { + delete(m, k) + } + } +} + func (m mapBackend[K, V]) Purge() { // This form is optimized by the Go-compiler; it calls faster internal mapclear() instead of looping, and avoids // allocating new memory. diff --git a/cache.go b/cache.go index 29858d0..16f070e 100644 --- a/cache.go +++ b/cache.go @@ -3,6 +3,7 @@ package sc import ( "context" "errors" + "runtime" "sync" "time" ) @@ -58,14 +59,55 @@ func New[K comparable, V any](replaceFn replaceFunc[K, V], freshFor, ttl time.Du return nil, errors.New("unknown cache backend") } - return &Cache[K, V]{ - values: b, - calls: make(map[K]*call[V]), - fn: replaceFn, - freshFor: freshFor, - ttl: ttl, - strictCoalescing: config.enableStrictCoalescing, - }, nil + c := &Cache[K, V]{ + cache: &cache[K, V]{ + values: b, + calls: make(map[K]*call[V]), + fn: replaceFn, + freshFor: freshFor, + ttl: ttl, + strictCoalescing: config.enableStrictCoalescing, + }, + } + + if config.cleanupInterval > 0 { + closer := make(chan struct{}) + c.cl = newCleaner(c.cache, config.cleanupInterval, closer) + runtime.SetFinalizer(c, stopCleaner[K, V]) + } + + return c, nil +} + +type cleaner[K comparable, V any] struct { + ticker *time.Ticker + closer chan struct{} + c *cache[K, V] +} + +func newCleaner[K comparable, V any](c *cache[K, V], interval time.Duration, closer chan struct{}) *cleaner[K, V] { + cl := &cleaner[K, V]{ + ticker: time.NewTicker(interval), + closer: closer, + c: c, + } + go cl.run() + return cl +} + +func (c *cleaner[K, V]) run() { + for { + select { + case <-c.ticker.C: + c.c.cleanup() + case <-c.closer: + return + } + } +} + +func stopCleaner[K comparable, V any](c *Cache[K, V]) { + c.cl.closer <- struct{}{} } // Cache represents a single cache instance. @@ -75,6 +117,14 @@ func New[K comparable, V any](replaceFn replaceFunc[K, V], freshFor, ttl time.Du // Notice that Cache doesn't have Set(key K, value V) method - this is intentional. Users are expected to delegate // the cache replacement logic to Cache by simply calling Get. type Cache[K comparable, V any] struct { + *cache[K, V] + cl *cleaner[K, V] +} + +// cache is the actual cache instance. +// See https://github.com/patrickmn/go-cache/blob/46f407853014144407b6c2ec7ccc76bf67958d93/cache.go#L1115 +// for the reason Cache and cache is separate. +type cache[K comparable, V any] struct { values backend[K, value[V]] calls map[K]*call[V] mu sync.Mutex // mu protects values and calls @@ -89,7 +139,7 @@ type Cache[K comparable, V any] struct { // May return a stale item (older than freshFor, but younger than ttl) while a single goroutine is launched // in the background to update the cache. // Returns an error as it is if replaceFn returns an error. -func (c *Cache[K, V]) Get(ctx context.Context, key K) (V, error) { +func (c *cache[K, V]) Get(ctx context.Context, key K) (V, error) { // Record time as soon as Get is called *before acquiring the lock* - this maximizes the reuse of values t0 := time.Now() c.mu.Lock() @@ -145,7 +195,7 @@ retry: // Forget instructs the cache to forget about the key. // Corresponding item will be deleted, ongoing cache replacement results (if any) will not be added to the cache, // and any future Get calls will immediately retrieve a new item. -func (c *Cache[K, V]) Forget(key K) { +func (c *cache[K, V]) Forget(key K) { c.mu.Lock() if ca, ok := c.calls[key]; ok { ca.forgotten = true @@ -158,7 +208,7 @@ func (c *Cache[K, V]) Forget(key K) { // Purge instructs the cache to delete all values, and Forget about all ongoing calls. // Note that frequently calling Purge will worsen the cache performance. // If you only need to Forget about a specific key, use Forget instead. -func (c *Cache[K, V]) Purge() { +func (c *cache[K, V]) Purge() { c.mu.Lock() for _, cl := range c.calls { cl.forgotten = true @@ -168,7 +218,7 @@ func (c *Cache[K, V]) Purge() { c.mu.Unlock() } -func (c *Cache[K, V]) set(ctx context.Context, cl *call[V], key K) { +func (c *cache[K, V]) set(ctx context.Context, cl *call[V], key K) { // Record time *just before* fn() is called - this maximizes the reuse of values cl.val.t = time.Now() cl.val.v, cl.err = c.fn(ctx, key) @@ -184,3 +234,13 @@ func (c *Cache[K, V]) set(ctx context.Context, cl *call[V], key K) { c.mu.Unlock() cl.wg.Done() } + +// cleanup cleans up expired items from the cache, freeing memory. +func (c *cache[K, V]) cleanup() { + c.mu.Lock() + now := time.Now() // Record time after acquiring the lock to maximize freeing of expired items + c.values.DeleteIf(func(key K, value value[V]) bool { + return value.isExpired(now, c.ttl) + }) + c.mu.Unlock() +} diff --git a/cache_test.go b/cache_test.go index c16a638..3a18d03 100644 --- a/cache_test.go +++ b/cache_test.go @@ -3,6 +3,7 @@ package sc import ( "context" "errors" + "runtime" "strconv" "sync" "sync/atomic" @@ -910,3 +911,64 @@ func TestCache_ZeroTimeCache(t *testing.T) { }) } } + +// TestCleaningCache tests caches with cleaner option, which will clean up expired items on a regular interval. +func TestCleaningCache(t *testing.T) { + t.Parallel() + + for _, c := range allCaches(10) { + c := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + + var cnt int64 + replaceFn := func(ctx context.Context, key string) (string, error) { + atomic.AddInt64(&cnt, 1) + return "value-" + key, nil + } + cache, err := New(replaceFn, 700*time.Millisecond, 1000*time.Millisecond, append(c.cacheOpts, WithCleanupInterval(300*time.Millisecond))...) + assert.NoError(t, err) + + // t=0ms, cache the value + v, err := cache.Get(context.Background(), "k1") + assert.NoError(t, err) + assert.Equal(t, "value-k1", v) + assert.EqualValues(t, 1, atomic.LoadInt64(&cnt)) + + time.Sleep(400 * time.Millisecond) + // t=400ms, value is still cached and fresh + v, err = cache.Get(context.Background(), "k1") + assert.NoError(t, err) + assert.Equal(t, "value-k1", v) + assert.EqualValues(t, 1, atomic.LoadInt64(&cnt)) + + time.Sleep(1 * time.Second) + // t=1400ms, expired value is automatically removed from the cache, freeing memory + // although, this has no effect if viewed from the public interface of Cache + v, err = cache.Get(context.Background(), "k1") + assert.NoError(t, err) + assert.Equal(t, "value-k1", v) + assert.EqualValues(t, 2, atomic.LoadInt64(&cnt)) + }) + } +} + +// TestCleaningCacheFinalizer tests that cache finalizers to stop cleaner is working. +// Since there's not really a good way of ensuring call to the finalizer, this just increases the test coverage. +func TestCleaningCacheFinalizer(t *testing.T) { + t.Parallel() + + for _, c := range allCaches(10) { + c := c + t.Run(c.name, func(t *testing.T) { + t.Parallel() + + replaceFn := func(_ context.Context, _ struct{}) (string, error) { return "", nil } + c, err := New(replaceFn, time.Hour, time.Hour, append(c.cacheOpts, WithCleanupInterval(time.Second))...) + assert.NoError(t, err) + + _, _ = c.Get(context.Background(), struct{}{}) + runtime.GC() // finalizer is called and cleaner is stopped + }) + } +} diff --git a/config.go b/config.go index a2354f2..0185889 100644 --- a/config.go +++ b/config.go @@ -1,5 +1,9 @@ package sc +import ( + "time" +) + // CacheOption represents a single cache option. // See other package-level functions which return CacheOption for more details. type CacheOption func(c *cacheConfig) @@ -8,6 +12,7 @@ type cacheConfig struct { enableStrictCoalescing bool backend cacheBackendType capacity int + cleanupInterval time.Duration } type cacheBackendType int @@ -23,6 +28,7 @@ func defaultConfig() cacheConfig { enableStrictCoalescing: false, backend: cacheBackendMap, capacity: 0, + cleanupInterval: 0, } } @@ -72,3 +78,15 @@ func EnableStrictCoalescing() CacheOption { c.enableStrictCoalescing = true } } + +// WithCleanupInterval specifies cleanup interval of expired items. +// +// Note that by default, a cache will be initialized without a cleaner. +// Try tuning your cache size (and using non-map backend) before using this option. +// Using cleanup interval on a cache with many items may decrease the through-put, +// since the cleaner has to take a lock to iterate through all items. +func WithCleanupInterval(interval time.Duration) CacheOption { + return func(c *cacheConfig) { + c.cleanupInterval = interval + } +} diff --git a/go.mod b/go.mod index 07a7db6..39016a8 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,7 @@ module github.com/motoki317/sc go 1.18 -require ( - github.com/motoki317/lru v0.0.3 - github.com/stretchr/testify v1.7.1 -) +require github.com/stretchr/testify v1.7.1 require ( github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index ce1c778..623ff53 100644 --- a/go.sum +++ b/go.sum @@ -6,8 +6,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/motoki317/lru v0.0.3 h1:+WETs/rojxz4y4KVAQM1syZLVQilL/BD1Lxb2uSSQ1A= -github.com/motoki317/lru v0.0.3/go.mod h1:KWwfc+XCJ4hn+RM4f1+017dyzXWYkHWj6nouX3bU6PM= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= diff --git a/lru/lru.go b/lru/lru.go index 0601466..c510098 100644 --- a/lru/lru.go +++ b/lru/lru.go @@ -93,6 +93,15 @@ func (c *Cache[K, V]) Delete(key K) bool { return true } +// DeleteIf deletes all elements that match the predicate. +func (c *Cache[K, V]) DeleteIf(predicate func(key K, value V) bool) { + for k, v := range c.items { + if predicate(k, v.Value.value) { + c.deleteElement(v) + } + } +} + // DeleteOldest deletes the oldest item from the cache. func (c *Cache[K, V]) DeleteOldest() (key K, value V, ok bool) { if e := c.ll.Back(); e != nil { diff --git a/lru/lru_test.go b/lru/lru_test.go index 675f68e..ce39f16 100644 --- a/lru/lru_test.go +++ b/lru/lru_test.go @@ -36,7 +36,7 @@ func TestCapacity(t *testing.T) { } } -func TestGet(t *testing.T) { +func TestCache_Get(t *testing.T) { t.Run("missing", func(t *testing.T) { lru := lru.New[int, int]() @@ -56,7 +56,7 @@ func TestGet(t *testing.T) { }) } -func TestPeek(t *testing.T) { +func TestCache_Peek(t *testing.T) { t.Run("missing", func(t *testing.T) { lru := lru.New[int, int]() @@ -75,7 +75,7 @@ func TestPeek(t *testing.T) { }) } -func TestSet(t *testing.T) { +func TestCache_Set(t *testing.T) { t.Run("missing", func(t *testing.T) { lru := lru.New[int, int]() @@ -97,7 +97,7 @@ func TestSet(t *testing.T) { }) } -func TestDelete(t *testing.T) { +func TestCache_Delete(t *testing.T) { t.Run("missing", func(t *testing.T) { lru := lru.New[int, int]() @@ -118,7 +118,30 @@ func TestDelete(t *testing.T) { }) } -func TestDeleteOldest(t *testing.T) { +func TestCache_DeleteIf(t *testing.T) { + lru := lru.New[int, int]() + + lru.Set(1, 10) + lru.Set(2, 10) + lru.Set(3, 10) + lru.Set(4, 10) + + lru.DeleteIf(func(key int, value int) bool { + return key%2 == 0 + }) + + require.Equal(t, 2, lru.Len()) + _, ok := lru.Peek(1) + require.True(t, ok) + _, ok = lru.Peek(2) + require.False(t, ok) + _, ok = lru.Peek(3) + require.True(t, ok) + _, ok = lru.Peek(4) + require.False(t, ok) +} + +func TestCache_DeleteOldest(t *testing.T) { t.Run("missing", func(t *testing.T) { lru := lru.New[int, int]() @@ -145,7 +168,7 @@ func TestDeleteOldest(t *testing.T) { }) } -func TestFlush(t *testing.T) { +func TestCache_Flush(t *testing.T) { lru := lru.New[int, int]() key, value := 1, 100 diff --git a/sc_test.go b/sc_test.go index 080a0f8..bcaaa3b 100644 --- a/sc_test.go +++ b/sc_test.go @@ -50,7 +50,7 @@ func strictCaches(cap int) []testCase { return Map(nonStrictCaches(cap), func(t testCase, _ int) testCase { return testCase{ name: "strict " + t.name, - cacheOpts: append(append([]CacheOption{}, t.cacheOpts...), EnableStrictCoalescing()), + cacheOpts: append(t.cacheOpts, EnableStrictCoalescing()), } }) } diff --git a/stats.go b/stats.go index f6d0bdd..f3c8c71 100644 --- a/stats.go +++ b/stats.go @@ -40,7 +40,7 @@ func (s Stats) HitRatio() float64 { // Stats returns cache metrics. // It is useful for monitoring performance and tuning your cache size/type. -func (c *Cache[K, V]) Stats() Stats { +func (c *cache[K, V]) Stats() Stats { c.mu.Lock() defer c.mu.Unlock() return c.stats diff --git a/tq/2q.go b/tq/2q.go index 3d71de2..6cb83fd 100644 --- a/tq/2q.go +++ b/tq/2q.go @@ -1,7 +1,7 @@ package tq import ( - "github.com/motoki317/lru" + "github.com/motoki317/sc/lru" ) // Cache is a fixed size 2Q cache. @@ -119,6 +119,13 @@ func (c *Cache[K, V]) ensureSpace(recentEvict bool) { c.frequent.DeleteOldest() } +// DeleteIf deletes all elements that match the predicate. +func (c *Cache[K, V]) DeleteIf(predicate func(key K, value V) bool) { + c.frequent.DeleteIf(predicate) + c.recent.DeleteIf(predicate) + // does not add to recentEvict, but that is okay for sc's use-case +} + // Delete removes the provided key from the cache. func (c *Cache[K, V]) Delete(key K) { if c.frequent.Delete(key) {