diff --git a/README.md b/README.md index 2041d190..a71b1fb4 100644 --- a/README.md +++ b/README.md @@ -108,11 +108,12 @@ If for some reason you see Get performance decreasing with lots of contention (y Metrics is true when you want real-time logging of a variety of stats. The reason this is a Config flag is because there's a 10% throughput performance overhead. -**OnEvict** `func(hashes [2]uint64, value interface{}, cost int64)` +**OnEvict** `func(key, conflict uint64, value interface{}, cost int64)` -OnEvict is called for every eviction. +OnEvict is called for every eviction. The "key, conflict" param pair is the two +hashes used internally (the same two hashes returned by KeyToHash). -**KeyToHash** `func(key interface{}) [2]uint64` +**KeyToHash** `func(key interface{}) (uint64, uint64)` KeyToHash is the hashing algorithm used for every key. If this is nil, Ristretto has a variety of [defaults depending on the underlying interface type](https://github.com/dgraph-io/ristretto/blob/master/z/z.go#L19-L41). diff --git a/cache.go b/cache.go index 6fdab869..6a5d7260 100644 --- a/cache.go +++ b/cache.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sync/atomic" + "time" "github.com/dgraph-io/ristretto/z" ) @@ -120,6 +121,7 @@ type item struct { conflict uint64 value interface{} cost int64 + ttl int64 } // NewCache returns a new Cache instance and any configuration errors, if any. @@ -183,17 +185,24 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) { // To dynamically evaluate the items cost using the Config.Coster function, set // the cost parameter to 0 and Coster will be ran when needed in order to find // the items true cost. -func (c *Cache) Set(key, value interface{}, cost int64) bool { +// +// TTL is the amount of time (in seconds) that the item will remain in the +// cache. To make an item stay indefinitely, set TTL to -1. +func (c *Cache) Set(key, value interface{}, cost, ttl int64) bool { if c == nil || key == nil { return false } keyHash, conflictHash := c.keyToHash(key) + if ttl != -1 { + ttl = time.Now().Unix() + ttl + } i := &item{ flag: itemNew, key: keyHash, conflict: conflictHash, value: value, cost: cost, + ttl: ttl, } // attempt to immediately update hashmap value and set flag to update so the // cost is eventually updated @@ -262,7 +271,7 @@ func (c *Cache) processItems() { } switch i.flag { case itemNew: - victims, added := c.policy.Add(i.key, i.cost) + victims, added := c.policy.Add(i.key, i.cost, i.ttl) if added { c.store.Set(i.key, i.conflict, i.value) c.Metrics.add(keyAdd, i.key, 1) diff --git a/cache_test.go b/cache_test.go index 914090f5..1d4bb51c 100644 --- a/cache_test.go +++ b/cache_test.go @@ -12,6 +12,58 @@ import ( var wait time.Duration = time.Millisecond * 10 +func TestCacheTTL(t *testing.T) { + m := &sync.Mutex{} + evicted := make(map[uint64]struct{}) + c, err := NewCache(&Config{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + OnEvict: func(key, conflict uint64, value interface{}, cost int64) { + m.Lock() + defer m.Unlock() + evicted[key] = struct{}{} + }, + }) + if err != nil { + panic(err) + } + // item 1 will live for 1 second + c.Set(1, 1, 2, 1) + // item 2 will live for 2 seconds + c.Set(2, 2, 3, 2) + // items 11-14 will live indefinitely + c.Set(11, 1, 1, -1) + c.Set(12, 1, 1, -1) + c.Set(13, 1, 1, -1) + c.Set(14, 1, 1, 10) + // sleep for 3 seconds (2+1 for good measure) + time.Sleep(time.Second * 3) + // gets to simulate load (1 and 2 should be evicted despite this because + // they're expiring) + c.Get(1) + c.Get(1) + c.Get(1) + c.Get(2) + c.Get(2) + // try to set a new item to force 1 and 2 expiration + c.Set(3, 3, 7, 3) + // wait for new set to go through + time.Sleep(time.Millisecond) + m.Lock() + defer m.Unlock() + // should be 1, 2, and some other item with a cost of 1 + if len(evicted) != 3 { + t.Fatal("items 1 and 2 should have expired") + } + if _, ok := evicted[1]; !ok { + t.Fatal("items 1 and 2 should have expired") + } + if _, ok := evicted[2]; !ok { + t.Fatal("items 1 and 2 should have expired") + } +} + func TestCacheKeyToHash(t *testing.T) { keyToHashCount := 0 c, err := NewCache(&Config{ @@ -26,7 +78,7 @@ func TestCacheKeyToHash(t *testing.T) { if err != nil { panic(err) } - if c.Set(1, 1, 1) { + if c.Set(1, 1, 1, -1) { time.Sleep(wait) if val, ok := c.Get(1); val == nil || !ok { t.Fatal("get should be successful") @@ -75,7 +127,7 @@ func TestCacheMaxCost(t *testing.T) { } else { val = strings.Repeat("a", 1000) } - c.Set(key(), val, int64(2+len(val))) + c.Set(key(), val, int64(2+len(val)), -1) } } } @@ -270,7 +322,7 @@ func TestCacheSet(t *testing.T) { if err != nil { panic(err) } - if c.Set(1, 1, 1) { + if c.Set(1, 1, 1, -1) { time.Sleep(wait) if val, ok := c.Get(1); val == nil || val.(int) != 1 || !ok { t.Fatal("set/get returned wrong value") @@ -280,7 +332,7 @@ func TestCacheSet(t *testing.T) { t.Fatal("set was dropped but value still added") } } - c.Set(1, 2, 2) + c.Set(1, 2, 2, -1) val, ok := c.store.Get(z.KeyToHash(1)) if val == nil || val.(int) != 2 || !ok { t.Fatal("set/update was unsuccessful") @@ -296,7 +348,7 @@ func TestCacheSet(t *testing.T) { cost: 1, } } - if c.Set(2, 2, 1) { + if c.Set(2, 2, 1, -1) { t.Fatal("set should be dropped with full setBuf") } if c.Metrics.SetsDropped() != 1 { @@ -305,7 +357,7 @@ func TestCacheSet(t *testing.T) { close(c.setBuf) close(c.stop) c = nil - if c.Set(1, 1, 1) { + if c.Set(1, 1, 1, -1) { t.Fatal("set shouldn't be successful with nil cache") } } @@ -319,7 +371,7 @@ func TestCacheDel(t *testing.T) { if err != nil { panic(err) } - c.Set(1, 1, 1) + c.Set(1, 1, 1, -1) c.Del(1) time.Sleep(wait) if val, ok := c.Get(1); val != nil || ok { @@ -345,7 +397,7 @@ func TestCacheClear(t *testing.T) { panic(err) } for i := 0; i < 10; i++ { - c.Set(i, i, 1) + c.Set(i, i, 1, -1) } time.Sleep(wait) if c.Metrics.KeysAdded() != 10 { @@ -373,7 +425,7 @@ func TestCacheMetrics(t *testing.T) { panic(err) } for i := 0; i < 10; i++ { - c.Set(i, i, 1) + c.Set(i, i, 1, -1) } time.Sleep(wait) m := c.Metrics @@ -460,7 +512,7 @@ func TestCacheMetricsClear(t *testing.T) { if err != nil { panic(err) } - c.Set(1, 1, 1) + c.Set(1, 1, 1, -1) stop := make(chan struct{}) go func() { for { diff --git a/policy.go b/policy.go index 19d74962..fe75c8aa 100644 --- a/policy.go +++ b/policy.go @@ -17,8 +17,10 @@ package ristretto import ( + "container/list" "math" "sync" + "time" "github.com/dgraph-io/ristretto/z" ) @@ -38,7 +40,7 @@ type policy interface { // Add attempts to Add the key-cost pair to the Policy. It returns a slice // of evicted keys and a bool denoting whether or not the key-cost pair // was added. If it returns true, the key should be stored in cache. - Add(uint64, int64) ([]*item, bool) + Add(uint64, int64, int64) ([]*item, bool) // Has returns true if the key exists in the Policy. Has(uint64) bool // Del deletes the key from the Policy. @@ -68,6 +70,7 @@ type defaultPolicy struct { itemsCh chan []uint64 stop chan struct{} metrics *Metrics + times *list.List } func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy { @@ -76,6 +79,7 @@ func newDefaultPolicy(numCounters, maxCost int64) *defaultPolicy { evict: newSampledLFU(maxCost), itemsCh: make(chan []uint64, 3), stop: make(chan struct{}), + times: list.New(), } go p.processItems() return p @@ -118,7 +122,7 @@ func (p *defaultPolicy) Push(keys []uint64) bool { } } -func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) { +func (p *defaultPolicy) Add(key uint64, cost, ttl int64) ([]*item, bool) { p.Lock() defer p.Unlock() // can't add an item bigger than entire cache @@ -137,8 +141,36 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) { // there's enough room in the cache to store the new item without // overflowing, so we can do that now and stop here p.evict.add(key, cost) + if ttl != -1 { + p.times.PushFront([2]uint64{key, uint64(ttl)}) + } return nil, true } + // as items are evicted they will be appended to victims + victims := make([]*item, 0) + // delete expired items before doing any evictions, we may not even need to + for e := p.times.Back(); e != nil; { + i := e.Value.([2]uint64) + if i[1] > uint64(time.Now().Unix()) { + break + } + next := e.Prev() + p.times.Remove(e) + e = next + p.evict.del(i[0]) + victims = append(victims, &item{ + key: i[0], + // TODO: better way of getting cost? + cost: p.evict.keys[i[0]], + }) + } + if misc := p.evict.roomLeft(cost); misc >= 0 { + p.evict.add(key, cost) + if ttl != -1 { + p.times.PushFront([2]uint64{key, uint64(ttl)}) + } + return victims, true + } // incHits is the hit count for the incoming item incHits := p.admit.Estimate(key) // sample is the eviction candidate pool to be filled via random sampling @@ -147,8 +179,6 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) { // complexity is N for finding the min. Min heap should bring it down to // O(lg N). sample := make([]*policyPair, 0, lfuSample) - // as items are evicted they will be appended to victims - victims := make([]*item, 0) // delete victims until there's enough space or a minKey is found that has // more hits than incoming item. for ; room < 0; room = p.evict.roomLeft(cost) { @@ -173,19 +203,18 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) { sample[minId] = sample[len(sample)-1] sample = sample[:len(sample)-1] // store victim in evicted victims slice - victims = append(victims, &item{ - key: minKey, - conflict: 0, - cost: minCost, - }) + victims = append(victims, &item{key: minKey, cost: minCost}) } p.evict.add(key, cost) + if ttl != -1 { + p.times.PushFront([2]uint64{key, uint64(ttl)}) + } return victims, true } func (p *defaultPolicy) Has(key uint64) bool { p.Lock() - _, exists := p.evict.keyCosts[key] + _, exists := p.evict.keys[key] p.Unlock() return exists } @@ -211,7 +240,7 @@ func (p *defaultPolicy) Update(key uint64, cost int64) { func (p *defaultPolicy) Cost(key uint64) int64 { p.Lock() - if cost, found := p.evict.keyCosts[key]; found { + if cost, found := p.evict.keys[key]; found { p.Unlock() return cost } @@ -235,16 +264,16 @@ func (p *defaultPolicy) Close() { // sampledLFU is an eviction helper storing key-cost pairs. type sampledLFU struct { - keyCosts map[uint64]int64 - maxCost int64 - used int64 - metrics *Metrics + keys map[uint64]int64 + maxCost int64 + used int64 + metrics *Metrics } func newSampledLFU(maxCost int64) *sampledLFU { return &sampledLFU{ - keyCosts: make(map[uint64]int64), - maxCost: maxCost, + keys: make(map[uint64]int64), + maxCost: maxCost, } } @@ -256,7 +285,7 @@ func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair { if len(in) >= lfuSample { return in } - for key, cost := range p.keyCosts { + for key, cost := range p.keys { in = append(in, &policyPair{key, cost}) if len(in) >= lfuSample { return in @@ -266,26 +295,26 @@ func (p *sampledLFU) fillSample(in []*policyPair) []*policyPair { } func (p *sampledLFU) del(key uint64) { - cost, ok := p.keyCosts[key] + cost, ok := p.keys[key] if !ok { return } p.used -= cost - delete(p.keyCosts, key) + delete(p.keys, key) } func (p *sampledLFU) add(key uint64, cost int64) { - p.keyCosts[key] = cost + p.keys[key] = cost p.used += cost } func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool { - if prev, found := p.keyCosts[key]; found { + if keyCost, found := p.keys[key]; found { // update the cost of an existing key, but don't worry about evicting, // evictions will be handled the next time a new item is added p.metrics.add(keyUpdate, key, 1) - p.used += cost - prev - p.keyCosts[key] = cost + p.used += cost - keyCost + p.keys[key] = cost return true } return false @@ -293,7 +322,7 @@ func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool { func (p *sampledLFU) clear() { p.used = 0 - p.keyCosts = make(map[uint64]int64) + p.keys = make(map[uint64]int64) } // tinyLFU is an admission helper that keeps track of access frequency using diff --git a/policy_test.go b/policy_test.go index 8bb3291a..0bd3e79f 100644 --- a/policy_test.go +++ b/policy_test.go @@ -61,7 +61,7 @@ func TestPolicyPush(t *testing.T) { func TestPolicyAdd(t *testing.T) { p := newDefaultPolicy(1000, 100) - if victims, added := p.Add(1, 101); victims != nil || added { + if victims, added := p.Add(1, 101, -1); victims != nil || added { t.Fatal("can't add an item bigger than entire cache") } p.Lock() @@ -70,23 +70,23 @@ func TestPolicyAdd(t *testing.T) { p.admit.Increment(2) p.admit.Increment(3) p.Unlock() - if victims, added := p.Add(1, 1); victims != nil || !added { + if victims, added := p.Add(1, 1, -1); victims != nil || !added { t.Fatal("item should already exist") } - if victims, added := p.Add(2, 20); victims != nil || !added { + if victims, added := p.Add(2, 20, -1); victims != nil || !added { t.Fatal("item should be added with no eviction") } - if victims, added := p.Add(3, 90); victims == nil || !added { + if victims, added := p.Add(3, 90, -1); victims == nil || !added { t.Fatal("item should be added with eviction") } - if victims, added := p.Add(4, 20); victims == nil || added { + if victims, added := p.Add(4, 20, -1); victims == nil || added { t.Fatal("item should not be added") } } func TestPolicyHas(t *testing.T) { p := newDefaultPolicy(100, 10) - p.Add(1, 1) + p.Add(1, 1, -1) if !p.Has(1) { t.Fatal("policy should have key") } @@ -97,7 +97,7 @@ func TestPolicyHas(t *testing.T) { func TestPolicyDel(t *testing.T) { p := newDefaultPolicy(100, 10) - p.Add(1, 1) + p.Add(1, 1, -1) p.Del(1) p.Del(2) if p.Has(1) { @@ -110,7 +110,7 @@ func TestPolicyDel(t *testing.T) { func TestPolicyCap(t *testing.T) { p := newDefaultPolicy(100, 10) - p.Add(1, 1) + p.Add(1, 1, -1) if p.Cap() != 9 { t.Fatal("cap returned wrong value") } @@ -118,10 +118,10 @@ func TestPolicyCap(t *testing.T) { func TestPolicyUpdate(t *testing.T) { p := newDefaultPolicy(100, 10) - p.Add(1, 1) + p.Add(1, 1, -1) p.Update(1, 2) p.Lock() - if p.evict.keyCosts[1] != 2 { + if p.evict.keys[1] != 2 { p.Unlock() t.Fatal("update failed") } @@ -130,7 +130,7 @@ func TestPolicyUpdate(t *testing.T) { func TestPolicyCost(t *testing.T) { p := newDefaultPolicy(100, 10) - p.Add(1, 2) + p.Add(1, 2, -1) if p.Cost(1) != 2 { t.Fatal("cost for existing key returned wrong value") } @@ -141,9 +141,9 @@ func TestPolicyCost(t *testing.T) { func TestPolicyClear(t *testing.T) { p := newDefaultPolicy(100, 10) - p.Add(1, 1) - p.Add(2, 2) - p.Add(3, 3) + p.Add(1, 1, -1) + p.Add(2, 2, -1) + p.Add(3, 3, -1) p.Clear() if p.Cap() != 10 || p.Has(1) || p.Has(2) || p.Has(3) { t.Fatal("clear didn't clear properly") @@ -157,7 +157,7 @@ func TestPolicyClose(t *testing.T) { } }() p := newDefaultPolicy(100, 10) - p.Add(1, 1) + p.Add(1, 1, -1) p.Close() p.itemsCh <- []uint64{1} } @@ -170,7 +170,7 @@ func TestSampledLFUAdd(t *testing.T) { if e.used != 4 { t.Fatal("used not being incremented") } - if e.keyCosts[2] != 2 { + if e.keys[2] != 2 { t.Fatal("keyCosts not being updated") } } @@ -183,7 +183,7 @@ func TestSampledLFUDel(t *testing.T) { if e.used != 1 { t.Fatal("del not updating used field") } - if _, ok := e.keyCosts[2]; ok { + if _, ok := e.keys[2]; ok { t.Fatal("del not deleting value from keyCosts") } e.del(4) @@ -209,7 +209,7 @@ func TestSampledLFUClear(t *testing.T) { e.add(2, 2) e.add(3, 1) e.clear() - if len(e.keyCosts) != 0 || e.used != 0 { + if len(e.keys) != 0 || e.used != 0 { t.Fatal("clear not deleting keyCosts or zeroing used field") } } diff --git a/stress_test.go b/stress_test.go index 4f5174d3..9cb3500b 100644 --- a/stress_test.go +++ b/stress_test.go @@ -23,7 +23,7 @@ func TestStressSetGet(t *testing.T) { panic(err) } for i := 0; i < 100; i++ { - c.Set(i, i, 1) + c.Set(i, i, 1, -1) } time.Sleep(wait) wg := &sync.WaitGroup{} @@ -74,7 +74,7 @@ func TestStressHitRatio(t *testing.T) { o.Set(k, k, 1) } if _, ok := c.Get(k); !ok { - c.Set(k, k, 1) + c.Set(k, k, 1, -1) } } t.Logf("actual: %.2f, optimal: %.2f", c.Metrics.Ratio(), o.Metrics().Ratio())