From 307d2ba287308681770c636e9cac15ccc2153274 Mon Sep 17 00:00:00 2001 From: Karl McGuire Date: Fri, 27 Sep 2019 18:06:56 -0400 Subject: [PATCH 1/5] coster function and testing --- cache.go | 89 ++++++++++++++++++++++++++++++++------------------- cache_test.go | 53 ++++++++++++++++++++++++++++++ policy.go | 39 ++++++++++++++++------ store.go | 39 +++++++++++----------- store_test.go | 24 +++++++++----- 5 files changed, 173 insertions(+), 71 deletions(-) diff --git a/cache.go b/cache.go index 41c2aef5..9aed68f2 100644 --- a/cache.go +++ b/cache.go @@ -51,6 +51,8 @@ type Cache struct { // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. keyToHash func(interface{}) uint64 + // coster calculates cost from a value + coster func(value interface{}) int64 } // Config is passed to NewCache for creating new Cache instances. @@ -90,14 +92,26 @@ type Config struct { // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. KeyToHash func(key interface{}) uint64 + // Coster is called when a new item passed to Set has a cost of 0. The + // output of Coster is set as the cost of the new item before it is sent + // to the Policy and stored. + Coster func(value interface{}) int64 } +type itemFlag byte + +const ( + itemNew itemFlag = iota + itemDelete + itemUpdate +) + // item is passed to setBuf so items can eventually be added to the cache type item struct { - key uint64 - val interface{} - cost int64 - del bool + flag itemFlag + key uint64 + value interface{} + cost int64 } // NewCache returns a new Cache instance and any configuration errors, if any. @@ -122,6 +136,7 @@ func NewCache(config *Config) (*Cache, error) { setBuf: make(chan *item, 32*1024), onEvict: config.OnEvict, keyToHash: config.KeyToHash, + coster: config.Coster, } if cache.keyToHash == nil { cache.keyToHash = z.KeyToHash @@ -162,22 +177,22 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) { // it returns true, there's still a chance it could be dropped by the policy if // its determined that the key-value item isn't worth keeping, but otherwise the // item will be added and other items will be evicted in order to make room. -func (c *Cache) Set(key interface{}, val interface{}, cost int64) bool { +func (c *Cache) Set(key, value interface{}, cost int64) bool { if c == nil { return false } - hash := c.keyToHash(key) - // TODO: Add a c.store.UpdateIfPresent here. This would catch any value updates and avoid having - // to push the key in setBuf. - - // attempt to add the (possibly) new item to the setBuf where it will later - // be processed by the policy and evaluated + i := &item{itemNew, c.keyToHash(key), value, cost} + // attempt to immediately update hashmap value and set flag to update so the + // cost is eventually updated + if c.store.Update(i.key, i.value) { + i.flag = itemUpdate + } + // attempt to send item to policy select { - case c.setBuf <- &item{key: hash, val: val, cost: cost}: + case c.setBuf <- i: return true default: - // drop the set and avoid blocking - c.stats.Add(dropSets, hash, 1) + c.stats.Add(dropSets, i.key, 1) return false } } @@ -187,7 +202,7 @@ func (c *Cache) Del(key interface{}) { if c == nil { return } - c.setBuf <- &item{key: c.keyToHash(key), del: true} + c.setBuf <- &item{itemDelete, c.keyToHash(key), nil, 0} } // Close stops all goroutines and closes all channels. @@ -195,26 +210,34 @@ func (c *Cache) Close() {} // processItems is ran by goroutines processing the Set buffer. func (c *Cache) processItems() { - for item := range c.setBuf { - if item.del { - c.policy.Del(item.key) - c.store.Del(item.key) - continue - } - victims, added := c.policy.Add(item.key, item.cost) - if added { - // item was accepted by the policy, so add to the hashmap - c.store.Set(item.key, item.val) + for i := range c.setBuf { + // calculate item cost value if new or update + if (i.flag == itemNew || i.flag == itemUpdate) && i.cost == 0 { + if c.coster != nil { + i.cost = c.coster(i.value) + } } - // delete victims that are no longer worthy of being in the cache - for _, victim := range victims { - // eviction callback - if c.onEvict != nil { - victim.val, _ = c.store.Get(victim.key) - c.onEvict(victim.key, victim.val, victim.cost) + switch i.flag { + case itemNew: + if victims, added := c.policy.Add(i.key, i.cost); added { + // item was accepted by the policy, so add to the hashmap + c.store.Set(i.key, i.value) + // delete victims + for _, victim := range victims { + // TODO: make Get-Delete atomic + if c.onEvict != nil { + victim.value, _ = c.store.Get(victim.key) + c.onEvict(victim.key, victim.value, victim.cost) + } + c.store.Del(victim.key) + } } - // delete from hashmap - c.store.Del(victim.key) + case itemUpdate: + // TODO: make this nonblocking? + c.policy.Update(i.key, i.cost) + case itemDelete: + c.policy.Del(i.key) + c.store.Del(i.key) } } } diff --git a/cache_test.go b/cache_test.go index 0e94f35a..cb5c9f1b 100644 --- a/cache_test.go +++ b/cache_test.go @@ -21,10 +21,12 @@ import ( "math/rand" "runtime" "sync" + "sync/atomic" "testing" "time" "github.com/dgraph-io/ristretto/sim" + "github.com/dgraph-io/ristretto/z" ) // TestCache is used to pass instances of Ristretto and Clairvoyant around and @@ -112,6 +114,57 @@ func TestCacheSetDel(t *testing.T) { } } +func TestCacheCoster(t *testing.T) { + costerRuns := uint64(0) + cache, err := NewCache(&Config{ + NumCounters: 1000, + MaxCost: 500, + BufferItems: 64, + Coster: func(value interface{}) int64 { + atomic.AddUint64(&costerRuns, 1) + return 5 + }, + }) + if err != nil { + panic(err) + } + for i := 0; i < 100; i++ { + cache.Set(i, i, 0) + } + time.Sleep(time.Second / 100) + for i := 0; i < 100; i++ { + if cache.policy.Cost(z.KeyToHash(i)) != 5 { + t.Fatal("coster not being ran") + } + } + if costerRuns != 100 { + t.Fatal("coster not being ran") + } +} + +// TestCacheUpdate verifies that a Set call on an existing key immediately +// updates the value and cost for that key without using/polluting the Set +// buffer(s). +func TestCacheUpdate(t *testing.T) { + cache := newCache(true) + cache.Set(1, 1, 1) + // wait for new-item Set to go through + time.Sleep(time.Second / 100) + // do 100 updates + for i := 0; i < 100; i++ { + // update the same key (1) with incrementing value and cost, so we can + // verify that they are immediately updated and not going through + // channels + cache.Set(1, i, int64(i)) + if val, ok := cache.Get(1); !ok || val.(int) != i { + t.Fatal("keyUpdate value inconsistent") + } + } + if cache.Metrics().Get(keyUpdate) == 0 { + t.Fatal("keyUpdates not being processed") + } +} + func TestCacheOnEvict(t *testing.T) { mu := &sync.Mutex{} evictions := make(map[uint64]int) diff --git a/policy.go b/policy.go index f51d1c93..746d5458 100644 --- a/policy.go +++ b/policy.go @@ -43,6 +43,10 @@ type policy interface { Del(uint64) // Cap returns the available capacity. Cap() int64 + // Update updates the cost value for the key. + Update(uint64, int64) + // Cost returns the cost value of a key or -1 if missing. + Cost(uint64) int64 // Optionally, set stats object to track how policy is performing. CollectMetrics(stats *metrics) } @@ -157,9 +161,7 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) { // store victim in evicted victims slice victims = append(victims, &item{ key: minKey, - val: nil, cost: minCost, - del: false, }) } p.evict.add(key, cost) @@ -185,6 +187,21 @@ func (p *defaultPolicy) Cap() int64 { return int64(p.evict.maxCost - p.evict.used) } +func (p *defaultPolicy) Update(key uint64, cost int64) { + p.Lock() + defer p.Unlock() + p.evict.updateIfHas(key, cost) +} + +func (p *defaultPolicy) Cost(key uint64) int64 { + p.Lock() + defer p.Unlock() + if cost, found := p.evict.keyCosts[key]; found { + return cost + } + return -1 +} + // sampledLFU is an eviction helper storing key-cost pairs. type sampledLFU struct { keyCosts map[uint64]int64 @@ -238,11 +255,8 @@ func (p *sampledLFU) add(key uint64, cost int64) { p.used += cost } -// TODO: Move this to the store itself. So, it can be used by public Set. -func (p *sampledLFU) updateIfHas(key uint64, cost int64) (updated bool) { - if prev, exists := p.keyCosts[key]; exists { - // Update the cost of the existing key. For simplicity, don't worry about evicting anything - // if the updated cost causes the size to grow beyond maxCost. +func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool { + if prev, found := p.keyCosts[key]; found { p.stats.Add(keyUpdate, key, 1) p.used += cost - prev p.keyCosts[key] = cost @@ -376,9 +390,7 @@ func (p *lruPolicy) Add(key uint64, cost int64) ([]*item, bool) { delete(p.ptrs, victim.key) victims = append(victims, &item{ key: victim.key, - val: nil, cost: victim.cost, - del: false, }) // adjust room p.room += victim.cost @@ -412,6 +424,15 @@ func (p *lruPolicy) Cap() int64 { return int64(p.vals.Len()) } +// TODO +func (p *lruPolicy) Update(key uint64, cost int64) { +} + +// TODO +func (p *lruPolicy) Cost(key uint64) int64 { + return -1 +} + // TODO func (p *lruPolicy) CollectMetrics(stats *metrics) { } diff --git a/store.go b/store.go index cfc30701..ca316fcf 100644 --- a/store.go +++ b/store.go @@ -34,34 +34,16 @@ type store interface { Set(uint64, interface{}) // Del deletes the key-value pair from the Map. Del(uint64) + // Update attempts to update the key with a new value and returns true if + // successful. + Update(uint64, interface{}) bool } // newStore returns the default store implementation. func newStore() store { - // return newSyncMap() return newShardedMap() } -type syncMap struct { - *sync.Map -} - -func newSyncMap() store { - return &syncMap{&sync.Map{}} -} - -func (m *syncMap) Get(key uint64) (interface{}, bool) { - return m.Load(key) -} - -func (m *syncMap) Set(key uint64, value interface{}) { - m.Store(key, value) -} - -func (m *syncMap) Del(key uint64) { - m.Delete(key) -} - const numShards uint64 = 256 type shardedMap struct { @@ -91,6 +73,11 @@ func (sm *shardedMap) Del(key uint64) { sm.shards[idx].Del(key) } +func (sm *shardedMap) Update(key uint64, value interface{}) bool { + idx := key % numShards + return sm.shards[idx].Update(key, value) +} + type lockedMap struct { sync.RWMutex data map[uint64]interface{} @@ -118,3 +105,13 @@ func (m *lockedMap) Del(key uint64) { defer m.Unlock() delete(m.data, key) } + +func (m *lockedMap) Update(key uint64, value interface{}) bool { + m.Lock() + defer m.Unlock() + if _, found := m.data[key]; found { + m.data[key] = value + return true + } + return false +} diff --git a/store_test.go b/store_test.go index 15f2fb55..c15c2482 100644 --- a/store_test.go +++ b/store_test.go @@ -20,10 +20,6 @@ import ( "testing" ) -func BenchmarkStoreSyncMap(b *testing.B) { - GenerateBench(func() store { return newSyncMap() })(b) -} - func BenchmarkStoreLockedMap(b *testing.B) { GenerateBench(func() store { return newLockedMap() })(b) } @@ -48,10 +44,6 @@ func TestStore(t *testing.T) { GenerateTest(func() store { return newStore() })(t) } -func TestStoreSyncMap(t *testing.T) { - GenerateTest(func() store { return newSyncMap() })(t) -} - func TestStoreLockedMap(t *testing.T) { GenerateTest(func() store { return newLockedMap() })(t) } @@ -83,5 +75,21 @@ func GenerateTest(create func() store) func(*testing.T) { t.Fatal("del error") } }) + t.Run("update", func(t *testing.T) { + m := create() + m.Set(1, 1) + if updated := m.Update(1, 2); !updated { + t.Fatal("value should have been updated") + } + if val, _ := m.Get(1); val.(int) != 2 { + t.Fatal("value wasn't updated") + } + if updated := m.Update(2, 2); updated { + t.Fatal("value should not have been updated") + } + if val, found := m.Get(2); val != nil || found { + t.Fatal("value should not have been updated") + } + }) } } From e0afa0bb4067e2eff8d040f16de84e249dcf4f3f Mon Sep 17 00:00:00 2001 From: Karl McGuire Date: Fri, 27 Sep 2019 18:11:39 -0400 Subject: [PATCH 2/5] coster documentation --- cache.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cache.go b/cache.go index 9aed68f2..d959c9b3 100644 --- a/cache.go +++ b/cache.go @@ -92,9 +92,9 @@ type Config struct { // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. KeyToHash func(key interface{}) uint64 - // Coster is called when a new item passed to Set has a cost of 0. The - // output of Coster is set as the cost of the new item before it is sent - // to the Policy and stored. + // Coster evaluates a value and outputs a corresponding cost. This function + // is ran after Set is called for a new item or an item update with a cost + // param of 0. Coster func(value interface{}) int64 } @@ -177,6 +177,10 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) { // it returns true, there's still a chance it could be dropped by the policy if // its determined that the key-value item isn't worth keeping, but otherwise the // item will be added and other items will be evicted in order to make room. +// +// To dynamically evaluate the items cost using the Config.Coster function, set +// the cost parameter to 0 and Coster will be ran when needed in order to find +// the items true cost. func (c *Cache) Set(key, value interface{}, cost int64) bool { if c == nil { return false From 4363fb709b185b6cd8e1544e2ff5a9891c3533b5 Mon Sep 17 00:00:00 2001 From: Karl McGuire Date: Fri, 27 Sep 2019 18:18:32 -0400 Subject: [PATCH 3/5] cacheUpdate test fix --- cache_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cache_test.go b/cache_test.go index cb5c9f1b..693d4a9a 100644 --- a/cache_test.go +++ b/cache_test.go @@ -160,6 +160,8 @@ func TestCacheUpdate(t *testing.T) { t.Fatal("keyUpdate value inconsistent") } } + // wait for keyUpdates to go through + time.Sleep(time.Second / 100) if cache.Metrics().Get(keyUpdate) == 0 { t.Fatal("keyUpdates not being processed") } From df33c70d7e74ce91051ffaca3e18a1494629c3fe Mon Sep 17 00:00:00 2001 From: Karl McGuire Date: Tue, 1 Oct 2019 10:14:37 -0400 Subject: [PATCH 4/5] cost and set/del fixes --- cache.go | 37 ++++++++++++++++++------------------- cache_test.go | 8 ++++---- policy.go | 2 ++ 3 files changed, 24 insertions(+), 23 deletions(-) diff --git a/cache.go b/cache.go index d959c9b3..3d64476e 100644 --- a/cache.go +++ b/cache.go @@ -51,8 +51,8 @@ type Cache struct { // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. keyToHash func(interface{}) uint64 - // coster calculates cost from a value - coster func(value interface{}) int64 + // cost calculates cost from a value + cost func(value interface{}) int64 } // Config is passed to NewCache for creating new Cache instances. @@ -92,10 +92,10 @@ type Config struct { // Each key will be hashed using the provided function. If keyToHash value // is not set, the default keyToHash function is used. KeyToHash func(key interface{}) uint64 - // Coster evaluates a value and outputs a corresponding cost. This function + // Cost evaluates a value and outputs a corresponding cost. This function // is ran after Set is called for a new item or an item update with a cost // param of 0. - Coster func(value interface{}) int64 + Cost func(value interface{}) int64 } type itemFlag byte @@ -136,7 +136,7 @@ func NewCache(config *Config) (*Cache, error) { setBuf: make(chan *item, 32*1024), onEvict: config.OnEvict, keyToHash: config.KeyToHash, - coster: config.Coster, + cost: config.Cost, } if cache.keyToHash == nil { cache.keyToHash = z.KeyToHash @@ -144,13 +144,7 @@ func NewCache(config *Config) (*Cache, error) { if config.Metrics { cache.collectMetrics() } - // We can possibly make this configurable. But having 2 goroutines - // processing this seems sufficient for now. - // - // TODO: Allow a way to stop these goroutines. - for i := 0; i < 2; i++ { - go cache.processItems() - } + go cache.processItems() return cache, nil } @@ -185,7 +179,12 @@ func (c *Cache) Set(key, value interface{}, cost int64) bool { if c == nil { return false } - i := &item{itemNew, c.keyToHash(key), value, cost} + i := &item{ + flag: itemNew, + key: c.keyToHash(key), + value: value, + cost: cost, + } // attempt to immediately update hashmap value and set flag to update so the // cost is eventually updated if c.store.Update(i.key, i.value) { @@ -206,7 +205,10 @@ func (c *Cache) Del(key interface{}) { if c == nil { return } - c.setBuf <- &item{itemDelete, c.keyToHash(key), nil, 0} + c.setBuf <- &item{ + flag: itemDelete, + key: c.keyToHash(key), + } } // Close stops all goroutines and closes all channels. @@ -216,10 +218,8 @@ func (c *Cache) Close() {} func (c *Cache) processItems() { for i := range c.setBuf { // calculate item cost value if new or update - if (i.flag == itemNew || i.flag == itemUpdate) && i.cost == 0 { - if c.coster != nil { - i.cost = c.coster(i.value) - } + if i.cost == 0 && c.cost != nil && i.flag != itemDelete { + i.cost = c.cost(i.value) } switch i.flag { case itemNew: @@ -237,7 +237,6 @@ func (c *Cache) processItems() { } } case itemUpdate: - // TODO: make this nonblocking? c.policy.Update(i.key, i.cost) case itemDelete: c.policy.Del(i.key) diff --git a/cache_test.go b/cache_test.go index 693d4a9a..1574fdb9 100644 --- a/cache_test.go +++ b/cache_test.go @@ -115,13 +115,13 @@ func TestCacheSetDel(t *testing.T) { } func TestCacheCoster(t *testing.T) { - costerRuns := uint64(0) + costRuns := uint64(0) cache, err := NewCache(&Config{ NumCounters: 1000, MaxCost: 500, BufferItems: 64, - Coster: func(value interface{}) int64 { - atomic.AddUint64(&costerRuns, 1) + Cost: func(value interface{}) int64 { + atomic.AddUint64(&costRuns, 1) return 5 }, }) @@ -137,7 +137,7 @@ func TestCacheCoster(t *testing.T) { t.Fatal("coster not being ran") } } - if costerRuns != 100 { + if costRuns != 100 { t.Fatal("coster not being ran") } } diff --git a/policy.go b/policy.go index 746d5458..4d6c9d1f 100644 --- a/policy.go +++ b/policy.go @@ -257,6 +257,8 @@ func (p *sampledLFU) add(key uint64, cost int64) { func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool { if prev, found := p.keyCosts[key]; found { + // update the cost of an existing key, but don't worry about evicting, + // evictions will be handled the next time a new item is added p.stats.Add(keyUpdate, key, 1) p.used += cost - prev p.keyCosts[key] = cost From 043c7523edb7214abe92c7af870df64df744e64e Mon Sep 17 00:00:00 2001 From: Karl McGuire Date: Tue, 1 Oct 2019 10:18:03 -0400 Subject: [PATCH 5/5] remove defers --- policy.go | 14 ++++++-------- store.go | 6 +++--- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/policy.go b/policy.go index 4d6c9d1f..98c369b7 100644 --- a/policy.go +++ b/policy.go @@ -170,27 +170,28 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) { func (p *defaultPolicy) Has(key uint64) bool { p.Lock() - defer p.Unlock() _, exists := p.evict.keyCosts[key] + p.Unlock() return exists } func (p *defaultPolicy) Del(key uint64) { p.Lock() - defer p.Unlock() p.evict.del(key) + p.Unlock() } func (p *defaultPolicy) Cap() int64 { p.Lock() - defer p.Unlock() - return int64(p.evict.maxCost - p.evict.used) + capacity := int64(p.evict.maxCost - p.evict.used) + p.Unlock() + return capacity } func (p *defaultPolicy) Update(key uint64, cost int64) { p.Lock() - defer p.Unlock() p.evict.updateIfHas(key, cost) + p.Unlock() } func (p *defaultPolicy) Cost(key uint64) int64 { @@ -239,10 +240,8 @@ func (p *sampledLFU) del(key uint64) { if !ok { return } - p.stats.Add(keyEvict, key, 1) p.stats.Add(costEvict, key, uint64(cost)) - p.used -= cost delete(p.keyCosts, key) } @@ -250,7 +249,6 @@ func (p *sampledLFU) del(key uint64) { func (p *sampledLFU) add(key uint64, cost int64) { p.stats.Add(keyAdd, key, 1) p.stats.Add(costAdd, key, uint64(cost)) - p.keyCosts[key] = cost p.used += cost } diff --git a/store.go b/store.go index ca316fcf..6f2ef499 100644 --- a/store.go +++ b/store.go @@ -89,21 +89,21 @@ func newLockedMap() *lockedMap { func (m *lockedMap) Get(key uint64) (interface{}, bool) { m.RLock() - defer m.RUnlock() val, found := m.data[key] + m.RUnlock() return val, found } func (m *lockedMap) Set(key uint64, value interface{}) { m.Lock() - defer m.Unlock() m.data[key] = value + m.Unlock() } func (m *lockedMap) Del(key uint64) { m.Lock() - defer m.Unlock() delete(m.data, key) + m.Unlock() } func (m *lockedMap) Update(key uint64, value interface{}) bool {