Skip to content

Commit

Permalink
Sets with TTL (#122)
Browse files Browse the repository at this point in the history
Add a new method SetWithTTL that supports adding key-value pairs to ristretto
that expire after the given duration.
  • Loading branch information
martinmr authored Jan 28, 2020
1 parent d3e7c37 commit 51e97ad
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 74 deletions.
77 changes: 54 additions & 23 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"sync/atomic"
"time"

"github.com/dgraph-io/ristretto/z"
)
Expand All @@ -33,6 +34,8 @@ const (
setBufSize = 32 * 1024
)

type onEvictFunc func(uint64, uint64, interface{}, int64)

// Cache is a thread-safe implementation of a hashmap with a TinyLFU admission
// policy and a Sampled LFU eviction policy. You can use the same Cache instance
// from as many goroutines as you want.
Expand All @@ -48,7 +51,7 @@ type Cache struct {
// contention.
setBuf chan *item
// onEvict is called for item evictions.
onEvict func(uint64, uint64, interface{}, int64)
onEvict onEvictFunc
// KeyToHash function is used to customize the key hashing algorithm.
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
Expand All @@ -57,6 +60,8 @@ type Cache struct {
stop chan struct{}
// cost calculates cost from a value.
cost func(value interface{}) int64
// cleanupTicker is used to periodically check for entries whose TTL has passed.
cleanupTicker *time.Ticker
// Metrics contains a running log of important statistics like hits, misses,
// and dropped items.
Metrics *Metrics
Expand Down Expand Up @@ -115,11 +120,12 @@ const (

// item is passed to setBuf so items can eventually be added to the cache.
type item struct {
flag itemFlag
key uint64
conflict uint64
value interface{}
cost int64
flag itemFlag
key uint64
conflict uint64
value interface{}
cost int64
expiration time.Time
}

// NewCache returns a new Cache instance and any configuration errors, if any.
Expand All @@ -134,14 +140,15 @@ func NewCache(config *Config) (*Cache, error) {
}
policy := newPolicy(config.NumCounters, config.MaxCost)
cache := &Cache{
store: newStore(),
policy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *item, setBufSize),
onEvict: config.OnEvict,
keyToHash: config.KeyToHash,
stop: make(chan struct{}),
cost: config.Cost,
store: newStore(),
policy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *item, setBufSize),
onEvict: config.OnEvict,
keyToHash: config.KeyToHash,
stop: make(chan struct{}),
cost: config.Cost,
cleanupTicker: time.NewTicker(time.Duration(bucketDurationSecs) * time.Second / 2),
}
if cache.keyToHash == nil {
cache.keyToHash = z.KeyToHash
Expand Down Expand Up @@ -184,20 +191,42 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) {
// the cost parameter to 0 and Coster will be ran when needed in order to find
// the items true cost.
func (c *Cache) Set(key, value interface{}, cost int64) bool {
return c.SetWithTTL(key, value, cost, 0*time.Second)
}

// SetWithTTL works like Set but adds a key-value pair to the cache that will expire
// after the specified TTL (time to live) has passed. A zero value means the value never
// expires, which is identical to calling Set. A negative value is a no-op and the value
// is discarded.
func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
if c == nil || key == nil {
return false
}

var expiration time.Time
switch {
case ttl == 0:
// No expiration.
break
case ttl < 0:
// Treat this a a no-op.
return false
default:
expiration = time.Now().Add(ttl)
}

keyHash, conflictHash := c.keyToHash(key)
i := &item{
flag: itemNew,
key: keyHash,
conflict: conflictHash,
value: value,
cost: cost,
flag: itemNew,
key: keyHash,
conflict: conflictHash,
value: value,
cost: cost,
expiration: expiration,
}
// Attempt to immediately update hashmap value and set flag to update so the
// cost is eventually updated.
if c.store.Update(keyHash, conflictHash, i.value) {
// cost is eventually updated. The expiration must also be immediately updated
// to prevent items from being prematurely removed from the map.
if c.store.Update(i) {
i.flag = itemUpdate
}
// Attempt to send item to policy.
Expand Down Expand Up @@ -277,7 +306,7 @@ func (c *Cache) processItems() {
case itemNew:
victims, added := c.policy.Add(i.key, i.cost)
if added {
c.store.Set(i.key, i.conflict, i.value)
c.store.Set(i)
c.Metrics.add(keyAdd, i.key, 1)
}
for _, victim := range victims {
Expand All @@ -294,6 +323,8 @@ func (c *Cache) processItems() {
c.policy.Del(i.key) // Deals with metrics updates.
c.store.Del(i.key, i.conflict)
}
case <-c.cleanupTicker.C:
c.store.Cleanup(c.policy, c.onEvict)
case <-c.stop:
return
}
Expand Down
97 changes: 96 additions & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/dgraph-io/ristretto/z"
"github.com/stretchr/testify/require"
)

var wait = time.Millisecond * 10
Expand Down Expand Up @@ -273,7 +274,12 @@ func TestCacheGet(t *testing.T) {
panic(err)
}
key, conflict := z.KeyToHash(1)
c.store.Set(key, conflict, 1)
i := item{
key: key,
conflict: conflict,
value: 1,
}
c.store.Set(&i)
if val, ok := c.Get(1); val == nil || !ok {
t.Fatal("get should be successful")
}
Expand Down Expand Up @@ -338,6 +344,73 @@ func TestCacheSet(t *testing.T) {
}
}

// retrySet calls SetWithTTL until the item is accepted by the cache.
func retrySet(t *testing.T, c *Cache, key, value int, cost int64, ttl time.Duration) {
for {
if set := c.SetWithTTL(key, value, cost, ttl); !set {
time.Sleep(wait)
continue
}

time.Sleep(wait)
val, ok := c.Get(key)
require.True(t, ok)
require.NotNil(t, val)
require.Equal(t, value, val.(int))
return
}
}

func TestCacheSetWithTTL(t *testing.T) {
m := &sync.Mutex{}
evicted := make(map[uint64]struct{})
c, err := NewCache(&Config{
NumCounters: 100,
MaxCost: 10,
BufferItems: 64,
Metrics: true,
OnEvict: func(key, conflict uint64, value interface{}, cost int64) {
m.Lock()
defer m.Unlock()
evicted[key] = struct{}{}
},
})
require.NoError(t, err)

retrySet(t, c, 1, 1, 1, time.Second)

// Sleep to make sure the item has expired after execution resumes.
time.Sleep(2 * time.Second)
val, ok := c.Get(1)
require.False(t, ok)
require.Nil(t, val)

// Sleep to ensure that the bucket where the item was stored has been cleared
// from the expiraton map.
time.Sleep(5 * time.Second)
m.Lock()
require.Equal(t, 1, len(evicted))
_, ok = evicted[1]
require.True(t, ok)
m.Unlock()

// Verify that expiration times are overwritten.
retrySet(t, c, 2, 1, 1, time.Second)
retrySet(t, c, 2, 2, 1, 100*time.Second)
time.Sleep(3 * time.Second)
val, ok = c.Get(2)
require.True(t, ok)
require.Equal(t, 2, val.(int))

// Verify that entries with no expiration are overwritten.
retrySet(t, c, 3, 1, 1, 0)
retrySet(t, c, 3, 2, 1, time.Second)
time.Sleep(3 * time.Second)
val, ok = c.Get(3)
require.False(t, ok)
require.Nil(t, val)
}

func TestCacheDel(t *testing.T) {
c, err := NewCache(&Config{
NumCounters: 100,
Expand All @@ -361,6 +434,23 @@ func TestCacheDel(t *testing.T) {
c.Del(1)
}

func TestCacheDelWithTTL(t *testing.T) {
c, err := NewCache(&Config{
NumCounters: 100,
MaxCost: 10,
BufferItems: 64,
})
require.NoError(t, err)
retrySet(t, c, 3, 1, 1, 10*time.Second)
time.Sleep(1 * time.Second)
// Delete the item
c.Del(3)
// Ensure the key is deleted.
val, ok := c.Get(3)
require.False(t, ok)
require.Nil(t, val)
}

func TestCacheClear(t *testing.T) {
c, err := NewCache(&Config{
NumCounters: 100,
Expand Down Expand Up @@ -524,3 +614,8 @@ func TestCacheMetricsClear(t *testing.T) {
c.Metrics = nil
c.Metrics.Clear()
}

func init() {
// Set bucketSizeSecs to 1 to avoid waiting too much during the tests.
bucketDurationSecs = 1
}
Loading

0 comments on commit 51e97ad

Please sign in to comment.