Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding OnEvict callback for when items are removed #28

Merged
merged 3 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 84 additions & 17 deletions lazylru.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
heap "github.com/TriggerMail/lazylru/containers/heap"
)

// EvictCB is a callback function that will be executed when items are removed
// from the cache via eviction due to max size or because the TTL has been
// exceeded. These functions will not be called with a lock and will not block
// future reaping. Be sure any callback registered can complete the number of
// expected calls (based on your expire/eviction rates) or you may create a
// backlog of goroutines.
type EvictCB[K comparable, V any] func(K, V)

// LazyLRU is an LRU cache that only reshuffles values if it is somewhat full.
// This is a cache implementation that uses a hash table for lookups and a
// priority queue to approximate LRU. Approximate because the usage is not
Expand All @@ -19,16 +27,18 @@ import (
// undersized and churning a lot, this implementation will perform worse than an
// LRU that updates on every read.
type LazyLRU[K comparable, V any] struct {
doneCh chan int
index map[K]*item[K, V]
items itemPQ[K, V]
maxItems int
itemIx uint64
ttl time.Duration
stats Stats
lock sync.RWMutex
isRunning bool
isClosing bool
onEvict []EvictCB[K, V]
doneCh chan int
index map[K]*item[K, V]
items itemPQ[K, V]
maxItems int
itemIx uint64
ttl time.Duration
stats Stats
lock sync.RWMutex
isRunning bool
isClosing bool
numEvictCB atomic.Int32 // faster to check than locking and checking the length of onEvict
}

// New creates a LazyLRU[string, interface{} with the given capacity and default
Expand Down Expand Up @@ -74,6 +84,45 @@ func NewT[K comparable, V any](maxItems int, ttl time.Duration) *LazyLRU[K, V] {
return lru
}

// OnEvict registers a callback that will be executed when items are removed
// from the cache via eviction due to max size or because the TTL has been
// exceeded. These functions will not be called with a lock and will not block
// future reaping. Be sure any callback registered can complete the number of
// expected calls (based on your expire/eviction rates) or you may create a
// backlog of goroutines.
//
// If a Set or MSet operation causes an eviction, this function will be called
// synchronously to that Set or MSet call.
func (lru *LazyLRU[K, V]) OnEvict(cb EvictCB[K, V]) {
lru.lock.Lock()
lru.onEvict = append(lru.onEvict, cb)
lru.numEvictCB.Add(1)
lru.lock.Unlock()
}

func (lru *LazyLRU[K, V]) execOnEvict(deathList []*item[K, V]) {
if len(deathList) == 0 {
return
}
if lru.numEvictCB.Load() == 0 {
return
}

var callbacks []EvictCB[K, V]
lru.lock.RLock()
callbacks = lru.onEvict
lru.lock.RUnlock()
if len(callbacks) == 0 {
return // this should never happen
}

for _, item := range deathList {
for _, cb := range callbacks {
cb(item.key, item.value)
}
}
}

// IsRunning indicates whether the background reaper is active
func (lru *LazyLRU[K, V]) IsRunning() bool {
lru.lock.RLock()
Expand Down Expand Up @@ -140,6 +189,7 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) {
}

cycles := uint32(0)
var aggDeathList []*item[K, V]
for {
cycles++
// grab a read lock while we are looking for items to kill
Expand All @@ -161,6 +211,7 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) {
for i := start; i < end; i++ {
if lru.items[i].expiration.Before(timestamp) {
deathList = append(deathList, lru.items[i])
aggDeathList = append(aggDeathList, lru.items[i])
}
}
lru.lock.RUnlock()
Expand Down Expand Up @@ -189,6 +240,9 @@ func (lru *LazyLRU[K, V]) reap(start int, deathList []*item[K, V]) {
lru.lock.Unlock()
}
atomic.AddUint32(&lru.stats.ReaperCycles, cycles)
if len(aggDeathList) > 0 && lru.numEvictCB.Load() > 0 {
lru.execOnEvict(aggDeathList)
}
}

// shouldBubble determines if a particular item should be updated on read and
Expand Down Expand Up @@ -336,16 +390,20 @@ func (lru *LazyLRU[K, V]) Set(key K, value V) {
// SetTTL writes to the cache, expiring with the given time-to-live value
func (lru *LazyLRU[K, V]) SetTTL(key K, value V, ttl time.Duration) {
lru.lock.Lock()
lru.setInternal(key, value, time.Now().Add(ttl))
deathList := lru.setInternal(key, value, time.Now().Add(ttl))
lru.lock.Unlock()
if len(deathList) > 0 && lru.numEvictCB.Load() > 0 {
lru.execOnEvict(deathList)
}
}

// setInternal writes elements. This is NOT thread safe and should always be
// called with a write lock
func (lru *LazyLRU[K, V]) setInternal(key K, value V, expiration time.Time) {
func (lru *LazyLRU[K, V]) setInternal(key K, value V, expiration time.Time) []*item[K, V] {
if lru.maxItems <= 0 {
return
return nil
}
var deathList []*item[K, V]
lru.stats.KeysWritten++
if pqi, ok := lru.index[key]; ok {
pqi.expiration = expiration
Expand All @@ -363,11 +421,13 @@ func (lru *LazyLRU[K, V]) setInternal(key K, value V, expiration time.Time) {
for lru.items.Len() >= lru.maxItems {
deadGuy := heap.Pop[*item[K, V]](&lru.items)
delete(lru.index, deadGuy.key)
deathList = append(deathList, deadGuy)
lru.stats.Evictions++
}
heap.Push[*item[K, V]](&lru.items, pqi)
lru.index[key] = pqi
}
return deathList
}

// MSet writes multiple keys and values to the cache. If the "key" and "value"
Expand All @@ -388,12 +448,16 @@ func (lru *LazyLRU[K, V]) MSetTTL(keys []K, values []V, ttl time.Duration) error
return errors.New("Mismatch between number of keys and number of values")
}

var deathList []*item[K, V]
lru.lock.Lock()
expiration := time.Now().Add(ttl)
for i := 0; i < len(keys); i++ {
lru.setInternal(keys[i], values[i], expiration)
deathList = append(deathList, lru.setInternal(keys[i], values[i], expiration)...)
}
lru.lock.Unlock()
if len(deathList) > 0 && lru.numEvictCB.Load() > 0 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

lru.execOnEvict(deathList)
}
return nil
}

Expand All @@ -412,10 +476,13 @@ func (lru *LazyLRU[K, V]) Delete(key K) {
lru.lock.Unlock()
return
}
delete(lru.index, pqi.key) // remove from search index
lru.items.update(pqi, 0) // move this item to the top of the heap
heap.Pop[*item[K, V]](&lru.items) // pop item from the top of the heap
delete(lru.index, pqi.key) // remove from search index
lru.items.update(pqi, 0) // move this item to the top of the heap
deadguy := heap.Pop[*item[K, V]](&lru.items) // pop item from the top of the heap
lru.lock.Unlock()
if lru.numEvictCB.Load() > 0 {
lru.execOnEvict([]*item[K, V]{deadguy})
}
}

// Len returns the number of items in the cache
Expand Down
63 changes: 63 additions & 0 deletions lazylru_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,66 @@ func TestDelete(t *testing.T) {
ExpectedStats{}.WithKeysWritten(1).WithKeysReadOK(1).WithKeysReadNotFound(1),
)
}

func TestCallbackOnEvict(t *testing.T) {
t.Run("set", func(t *testing.T) {
var evicted []int
lru := lazylru.NewT[int, int](5, time.Hour)
lru.OnEvict(func(k, v int) {
require.Equal(t, k<<4, v)
evicted = append(evicted, k)
})
for i := 0; i < 5; i++ {
lru.Set(i, i<<4)
}
require.Equal(t, 0, len(evicted))
for i := 5; i < 10; i++ {
lru.Set(i, i<<4)
}
require.Equal(t, 5, len(evicted))
})
t.Run("mset", func(t *testing.T) {
var evicted []int
lru := lazylru.NewT[int, int](5, time.Hour)
lru.OnEvict(func(k, v int) {
require.Equal(t, k<<4, v)
evicted = append(evicted, k)
})
require.NoError(t, lru.MSet([]int{0, 1, 2, 3, 4}, []int{0 << 4, 1 << 4, 2 << 4, 3 << 4, 4 << 4}))
require.Equal(t, 0, len(evicted))
require.NoError(t, lru.MSet([]int{5, 6, 7, 8, 9}, []int{5 << 4, 6 << 4, 7 << 4, 8 << 4, 9 << 4}))
require.Equal(t, 5, len(evicted))
})
}

func TestCallbackOnDelete(t *testing.T) {
var evicted []int
lru := lazylru.NewT[int, int](5, time.Hour)
lru.OnEvict(func(k, v int) {
require.Equal(t, k<<4, v)
evicted = append(evicted, k)
})
for i := 0; i < 5; i++ {
lru.Set(i, i<<4)
}
require.Equal(t, 0, len(evicted))
lru.Delete(3)
require.Equal(t, 1, len(evicted))
}

func TestCallbackOnExpire(t *testing.T) {
var evicted []int
lru := lazylru.NewT[int, int](5, time.Hour)
lru.OnEvict(func(k, v int) {
require.Equal(t, k<<4, v)
evicted = append(evicted, k)
})
for i := 0; i < 5; i++ {
lru.SetTTL(i, i<<4, 5*time.Millisecond)
}
time.Sleep(10 * time.Millisecond)
lru.Reap()
require.Equal(t, 0, lru.Len(), "items left in lru")
time.Sleep(100 * time.Millisecond)
require.Equal(t, 5, len(evicted), "on evict items")
}