From 49e47089eae16a1ce68cc8ecf2b58cda011d00a8 Mon Sep 17 00:00:00 2001 From: Michael Hurwitz Date: Thu, 30 May 2024 10:13:37 -0400 Subject: [PATCH 1/3] Adding OnEvict callback for when items are removed One or more callbacks can be registered to fire when items are removed from the cache. This is done synchronously to and Delete, Set, or MSet calls, as well as when the reaper thread finds expired items to remove. To keep the cost low, the count of registered callbacks is checked before any action is taken. For existing LazyLRU users, this will add the cost of one `atomic.LoadInt32` to these operations. As compared to the locks that already exist, this is negligible. --- lazylru.go | 101 ++++++++++++++++++++++++++++++++++++++++-------- lazylru_test.go | 63 ++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+), 17 deletions(-) diff --git a/lazylru.go b/lazylru.go index 807e533..151e6d7 100644 --- a/lazylru.go +++ b/lazylru.go @@ -10,6 +10,14 @@ import ( heap "github.com/TriggerMail/lazylru/containers/heap" ) +// EvictCB is a callback function that will be executed when items are removed +// from the cache via eviction due to max size or because the TTL has been +// exceeded. These functions will not be called with a lock and will not block +// future reaping. Be sure any callback registered can complete the number of +// expected calls (based on your expire/eviction rates) or you may create a +// backlog of goroutines. +type EvictCB[K comparable, V any] func(K, V) + // LazyLRU is an LRU cache that only reshuffles values if it is somewhat full. // This is a cache implementation that uses a hash table for lookups and a // priority queue to approximate LRU. Approximate because the usage is not @@ -19,16 +27,18 @@ import ( // undersized and churning a lot, this implementation will perform worse than an // LRU that updates on every read. type LazyLRU[K comparable, V any] struct { - doneCh chan int - index map[K]*item[K, V] - items itemPQ[K, V] - maxItems int - itemIx uint64 - ttl time.Duration - stats Stats - lock sync.RWMutex - isRunning bool - isClosing bool + onEvict []EvictCB[K, V] + doneCh chan int + index map[K]*item[K, V] + items itemPQ[K, V] + maxItems int + itemIx uint64 + ttl time.Duration + stats Stats + lock sync.RWMutex + isRunning bool + isClosing bool + numEvictDb int32 // faster to check than locking and checking the length of onEvict } // New creates a LazyLRU[string, interface{} with the given capacity and default @@ -74,6 +84,45 @@ func NewT[K comparable, V any](maxItems int, ttl time.Duration) *LazyLRU[K, V] { return lru } +// OnEvict registers a callback that will be executed when items are removed +// from the cache via eviction due to max size or because the TTL has been +// exceeded. These functions will not be called with a lock and will not block +// future reaping. Be sure any callback registered can complete the number of +// expected calls (based on your expire/eviction rates) or you may create a +// backlog of goroutines. +// +// If a Set or MSet operation causes an eviction, this function will be called +// synchronously to that Set or MSet call. +func (lru *LazyLRU[K, V]) OnEvict(cb EvictCB[K, V]) { + lru.lock.Lock() + lru.onEvict = append(lru.onEvict, cb) + lru.numEvictDb++ + lru.lock.Unlock() +} + +func (lru *LazyLRU[K, V]) execOnEvict(deathList []*item[K, V]) { + if len(deathList) == 0 { + return + } + if atomic.LoadInt32(&(lru.numEvictDb)) == 0 { + return + } + + var callbacks []EvictCB[K, V] + lru.lock.RLock() + callbacks = lru.onEvict + lru.lock.RUnlock() + if len(callbacks) == 0 { + return // this should never happen + } + + for _, item := range deathList { + for _, cb := range callbacks { + cb(item.key, item.value) + } + } +} + // IsRunning indicates whether the background reaper is active func (lru *LazyLRU[K, V]) IsRunning() bool { lru.lock.RLock() @@ -140,6 +189,7 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) { } cycles := uint32(0) + var aggDeathList []*item[K, V] for { cycles++ // grab a read lock while we are looking for items to kill @@ -161,6 +211,7 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) { for i := start; i < end; i++ { if lru.items[i].expiration.Before(timestamp) { deathList = append(deathList, lru.items[i]) + aggDeathList = append(aggDeathList, lru.items[i]) } } lru.lock.RUnlock() @@ -189,6 +240,9 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) { lru.lock.Unlock() } atomic.AddUint32(&lru.stats.ReaperCycles, cycles) + if len(aggDeathList) > 0 && atomic.LoadInt32(&lru.numEvictDb) > 0 { + lru.execOnEvict(aggDeathList) + } } // shouldBubble determines if a particular item should be updated on read and @@ -336,16 +390,20 @@ func (lru *LazyLRU[K, V]) Set(key K, value V) { // SetTTL writes to the cache, expiring with the given time-to-live value func (lru *LazyLRU[K, V]) SetTTL(key K, value V, ttl time.Duration) { lru.lock.Lock() - lru.setInternal(key, value, time.Now().Add(ttl)) + deathList := lru.setInternal(key, value, time.Now().Add(ttl)) lru.lock.Unlock() + if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictDb) > 0 { + lru.execOnEvict(deathList) + } } // setInternal writes elements. This is NOT thread safe and should always be // called with a write lock -func (lru *LazyLRU[K, V]) setInternal(key K, value V, expiration time.Time) { +func (lru *LazyLRU[K, V]) setInternal(key K, value V, expiration time.Time) []*item[K, V] { if lru.maxItems <= 0 { - return + return nil } + var deathList []*item[K, V] lru.stats.KeysWritten++ if pqi, ok := lru.index[key]; ok { pqi.expiration = expiration @@ -363,11 +421,13 @@ func (lru *LazyLRU[K, V]) setInternal(key K, value V, expiration time.Time) { for lru.items.Len() >= lru.maxItems { deadGuy := heap.Pop[*item[K, V]](&lru.items) delete(lru.index, deadGuy.key) + deathList = append(deathList, deadGuy) lru.stats.Evictions++ } heap.Push[*item[K, V]](&lru.items, pqi) lru.index[key] = pqi } + return deathList } // MSet writes multiple keys and values to the cache. If the "key" and "value" @@ -388,12 +448,16 @@ func (lru *LazyLRU[K, V]) MSetTTL(keys []K, values []V, ttl time.Duration) error return errors.New("Mismatch between number of keys and number of values") } + var deathList []*item[K, V] lru.lock.Lock() expiration := time.Now().Add(ttl) for i := 0; i < len(keys); i++ { - lru.setInternal(keys[i], values[i], expiration) + deathList = append(deathList, lru.setInternal(keys[i], values[i], expiration)...) } lru.lock.Unlock() + if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictDb) > 0 { + lru.execOnEvict(deathList) + } return nil } @@ -412,10 +476,13 @@ func (lru *LazyLRU[K, V]) Delete(key K) { lru.lock.Unlock() return } - delete(lru.index, pqi.key) // remove from search index - lru.items.update(pqi, 0) // move this item to the top of the heap - heap.Pop[*item[K, V]](&lru.items) // pop item from the top of the heap + delete(lru.index, pqi.key) // remove from search index + lru.items.update(pqi, 0) // move this item to the top of the heap + deadguy := heap.Pop[*item[K, V]](&lru.items) // pop item from the top of the heap lru.lock.Unlock() + if atomic.LoadInt32(&lru.numEvictDb) > 0 { + lru.execOnEvict([]*item[K, V]{deadguy}) + } } // Len returns the number of items in the cache diff --git a/lazylru_test.go b/lazylru_test.go index 1ea6615..13380af 100644 --- a/lazylru_test.go +++ b/lazylru_test.go @@ -446,3 +446,66 @@ func TestDelete(t *testing.T) { ExpectedStats{}.WithKeysWritten(1).WithKeysReadOK(1).WithKeysReadNotFound(1), ) } + +func TestCallbackOnEvict(t *testing.T) { + t.Run("set", func(t *testing.T) { + var evicted []int + lru := lazylru.NewT[int, int](5, time.Hour) + lru.OnEvict(func(k, v int) { + require.Equal(t, k<<4, v) + evicted = append(evicted, k) + }) + for i := 0; i < 5; i++ { + lru.Set(i, i<<4) + } + require.Equal(t, 0, len(evicted)) + for i := 5; i < 10; i++ { + lru.Set(i, i<<4) + } + require.Equal(t, 5, len(evicted)) + }) + t.Run("mset", func(t *testing.T) { + var evicted []int + lru := lazylru.NewT[int, int](5, time.Hour) + lru.OnEvict(func(k, v int) { + require.Equal(t, k<<4, v) + evicted = append(evicted, k) + }) + require.NoError(t, lru.MSet([]int{0, 1, 2, 3, 4}, []int{0 << 4, 1 << 4, 2 << 4, 3 << 4, 4 << 4})) + require.Equal(t, 0, len(evicted)) + require.NoError(t, lru.MSet([]int{5, 6, 7, 8, 9}, []int{5 << 4, 6 << 4, 7 << 4, 8 << 4, 9 << 4})) + require.Equal(t, 5, len(evicted)) + }) +} + +func TestCallbackOnDelete(t *testing.T) { + var evicted []int + lru := lazylru.NewT[int, int](5, time.Hour) + lru.OnEvict(func(k, v int) { + require.Equal(t, k<<4, v) + evicted = append(evicted, k) + }) + for i := 0; i < 5; i++ { + lru.Set(i, i<<4) + } + require.Equal(t, 0, len(evicted)) + lru.Delete(3) + require.Equal(t, 1, len(evicted)) +} + +func TestCallbackOnExpire(t *testing.T) { + var evicted []int + lru := lazylru.NewT[int, int](5, time.Hour) + lru.OnEvict(func(k, v int) { + require.Equal(t, k<<4, v) + evicted = append(evicted, k) + }) + for i := 0; i < 5; i++ { + lru.SetTTL(i, i<<4, 5*time.Millisecond) + } + time.Sleep(10 * time.Millisecond) + lru.Reap() + require.Equal(t, 0, lru.Len(), "items left in lru") + time.Sleep(100 * time.Millisecond) + require.Equal(t, 5, len(evicted), "on evict items") +} From fb642f45d42900e5712adbf004735e6679e50693 Mon Sep 17 00:00:00 2001 From: Michael Hurwitz Date: Thu, 30 May 2024 10:18:00 -0400 Subject: [PATCH 2/3] typo --- lazylru.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lazylru.go b/lazylru.go index 151e6d7..07f8df4 100644 --- a/lazylru.go +++ b/lazylru.go @@ -38,7 +38,7 @@ type LazyLRU[K comparable, V any] struct { lock sync.RWMutex isRunning bool isClosing bool - numEvictDb int32 // faster to check than locking and checking the length of onEvict + numEvictCB int32 // faster to check than locking and checking the length of onEvict } // New creates a LazyLRU[string, interface{} with the given capacity and default @@ -96,7 +96,7 @@ func NewT[K comparable, V any](maxItems int, ttl time.Duration) *LazyLRU[K, V] { func (lru *LazyLRU[K, V]) OnEvict(cb EvictCB[K, V]) { lru.lock.Lock() lru.onEvict = append(lru.onEvict, cb) - lru.numEvictDb++ + lru.numEvictCB++ lru.lock.Unlock() } @@ -104,7 +104,7 @@ func (lru *LazyLRU[K, V]) execOnEvict(deathList []*item[K, V]) { if len(deathList) == 0 { return } - if atomic.LoadInt32(&(lru.numEvictDb)) == 0 { + if atomic.LoadInt32(&(lru.numEvictCB)) == 0 { return } @@ -240,7 +240,7 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) { lru.lock.Unlock() } atomic.AddUint32(&lru.stats.ReaperCycles, cycles) - if len(aggDeathList) > 0 && atomic.LoadInt32(&lru.numEvictDb) > 0 { + if len(aggDeathList) > 0 && atomic.LoadInt32(&lru.numEvictCB) > 0 { lru.execOnEvict(aggDeathList) } } @@ -392,7 +392,7 @@ func (lru *LazyLRU[K, V]) SetTTL(key K, value V, ttl time.Duration) { lru.lock.Lock() deathList := lru.setInternal(key, value, time.Now().Add(ttl)) lru.lock.Unlock() - if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictDb) > 0 { + if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictCB) > 0 { lru.execOnEvict(deathList) } } @@ -455,7 +455,7 @@ func (lru *LazyLRU[K, V]) MSetTTL(keys []K, values []V, ttl time.Duration) error deathList = append(deathList, lru.setInternal(keys[i], values[i], expiration)...) } lru.lock.Unlock() - if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictDb) > 0 { + if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictCB) > 0 { lru.execOnEvict(deathList) } return nil @@ -480,7 +480,7 @@ func (lru *LazyLRU[K, V]) Delete(key K) { lru.items.update(pqi, 0) // move this item to the top of the heap deadguy := heap.Pop[*item[K, V]](&lru.items) // pop item from the top of the heap lru.lock.Unlock() - if atomic.LoadInt32(&lru.numEvictDb) > 0 { + if atomic.LoadInt32(&lru.numEvictCB) > 0 { lru.execOnEvict([]*item[K, V]{deadguy}) } } From 13b4fd73a7eca92172a606cb4414d160f1597f7b Mon Sep 17 00:00:00 2001 From: Michael Hurwitz Date: Thu, 30 May 2024 10:46:34 -0400 Subject: [PATCH 3/3] s/int32/atomic.Int32/ --- lazylru.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lazylru.go b/lazylru.go index 07f8df4..41db576 100644 --- a/lazylru.go +++ b/lazylru.go @@ -38,7 +38,7 @@ type LazyLRU[K comparable, V any] struct { lock sync.RWMutex isRunning bool isClosing bool - numEvictCB int32 // faster to check than locking and checking the length of onEvict + numEvictCB atomic.Int32 // faster to check than locking and checking the length of onEvict } // New creates a LazyLRU[string, interface{} with the given capacity and default @@ -96,7 +96,7 @@ func NewT[K comparable, V any](maxItems int, ttl time.Duration) *LazyLRU[K, V] { func (lru *LazyLRU[K, V]) OnEvict(cb EvictCB[K, V]) { lru.lock.Lock() lru.onEvict = append(lru.onEvict, cb) - lru.numEvictCB++ + lru.numEvictCB.Add(1) lru.lock.Unlock() } @@ -104,7 +104,7 @@ func (lru *LazyLRU[K, V]) execOnEvict(deathList []*item[K, V]) { if len(deathList) == 0 { return } - if atomic.LoadInt32(&(lru.numEvictCB)) == 0 { + if lru.numEvictCB.Load() == 0 { return } @@ -240,7 +240,7 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) { lru.lock.Unlock() } atomic.AddUint32(&lru.stats.ReaperCycles, cycles) - if len(aggDeathList) > 0 && atomic.LoadInt32(&lru.numEvictCB) > 0 { + if len(aggDeathList) > 0 && lru.numEvictCB.Load() > 0 { lru.execOnEvict(aggDeathList) } } @@ -392,7 +392,7 @@ func (lru *LazyLRU[K, V]) SetTTL(key K, value V, ttl time.Duration) { lru.lock.Lock() deathList := lru.setInternal(key, value, time.Now().Add(ttl)) lru.lock.Unlock() - if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictCB) > 0 { + if len(deathList) > 0 && lru.numEvictCB.Load() > 0 { lru.execOnEvict(deathList) } } @@ -455,7 +455,7 @@ func (lru *LazyLRU[K, V]) MSetTTL(keys []K, values []V, ttl time.Duration) error deathList = append(deathList, lru.setInternal(keys[i], values[i], expiration)...) } lru.lock.Unlock() - if len(deathList) > 0 && atomic.LoadInt32(&lru.numEvictCB) > 0 { + if len(deathList) > 0 && lru.numEvictCB.Load() > 0 { lru.execOnEvict(deathList) } return nil @@ -480,7 +480,7 @@ func (lru *LazyLRU[K, V]) Delete(key K) { lru.items.update(pqi, 0) // move this item to the top of the heap deadguy := heap.Pop[*item[K, V]](&lru.items) // pop item from the top of the heap lru.lock.Unlock() - if atomic.LoadInt32(&lru.numEvictCB) > 0 { + if lru.numEvictCB.Load() > 0 { lru.execOnEvict([]*item[K, V]{deadguy}) } }