Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

immediate Updates and lazy cost evaluation #75

Merged
merged 5 commits into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 66 additions & 40 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Cache struct {
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
keyToHash func(interface{}) uint64
// cost calculates cost from a value
cost func(value interface{}) int64
}

// Config is passed to NewCache for creating new Cache instances.
Expand Down Expand Up @@ -90,14 +92,26 @@ type Config struct {
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
KeyToHash func(key interface{}) uint64
// Cost evaluates a value and outputs a corresponding cost. This function
// is ran after Set is called for a new item or an item update with a cost
// param of 0.
Cost func(value interface{}) int64
}

type itemFlag byte

const (
itemNew itemFlag = iota
itemDelete
itemUpdate
)

// item is passed to setBuf so items can eventually be added to the cache
type item struct {
key uint64
val interface{}
cost int64
del bool
flag itemFlag
key uint64
value interface{}
cost int64
}

// NewCache returns a new Cache instance and any configuration errors, if any.
Expand All @@ -122,20 +136,15 @@ func NewCache(config *Config) (*Cache, error) {
setBuf: make(chan *item, 32*1024),
onEvict: config.OnEvict,
keyToHash: config.KeyToHash,
cost: config.Cost,
}
if cache.keyToHash == nil {
cache.keyToHash = z.KeyToHash
}
if config.Metrics {
cache.collectMetrics()
}
// We can possibly make this configurable. But having 2 goroutines
// processing this seems sufficient for now.
//
// TODO: Allow a way to stop these goroutines.
for i := 0; i < 2; i++ {
go cache.processItems()
}
go cache.processItems()
return cache, nil
}

Expand All @@ -162,22 +171,31 @@ func (c *Cache) Get(key interface{}) (interface{}, bool) {
// it returns true, there's still a chance it could be dropped by the policy if
// its determined that the key-value item isn't worth keeping, but otherwise the
// item will be added and other items will be evicted in order to make room.
func (c *Cache) Set(key interface{}, val interface{}, cost int64) bool {
//
// To dynamically evaluate the items cost using the Config.Coster function, set
// the cost parameter to 0 and Coster will be ran when needed in order to find
// the items true cost.
func (c *Cache) Set(key, value interface{}, cost int64) bool {
if c == nil {
return false
}
hash := c.keyToHash(key)
// TODO: Add a c.store.UpdateIfPresent here. This would catch any value updates and avoid having
// to push the key in setBuf.

// attempt to add the (possibly) new item to the setBuf where it will later
// be processed by the policy and evaluated
i := &item{
flag: itemNew,
key: c.keyToHash(key),
value: value,
cost: cost,
}
// attempt to immediately update hashmap value and set flag to update so the
// cost is eventually updated
if c.store.Update(i.key, i.value) {
i.flag = itemUpdate
}
// attempt to send item to policy
select {
case c.setBuf <- &item{key: hash, val: val, cost: cost}:
case c.setBuf <- i:
return true
default:
// drop the set and avoid blocking
c.stats.Add(dropSets, hash, 1)
c.stats.Add(dropSets, i.key, 1)
return false
}
}
Expand All @@ -187,34 +205,42 @@ func (c *Cache) Del(key interface{}) {
if c == nil {
return
}
c.setBuf <- &item{key: c.keyToHash(key), del: true}
c.setBuf <- &item{
flag: itemDelete,
key: c.keyToHash(key),
}
}

// Close stops all goroutines and closes all channels.
func (c *Cache) Close() {}

// processItems is ran by goroutines processing the Set buffer.
func (c *Cache) processItems() {
for item := range c.setBuf {
if item.del {
c.policy.Del(item.key)
c.store.Del(item.key)
continue
}
victims, added := c.policy.Add(item.key, item.cost)
if added {
// item was accepted by the policy, so add to the hashmap
c.store.Set(item.key, item.val)
for i := range c.setBuf {
// calculate item cost value if new or update
if i.cost == 0 && c.cost != nil && i.flag != itemDelete {
i.cost = c.cost(i.value)
}
// delete victims that are no longer worthy of being in the cache
for _, victim := range victims {
// eviction callback
if c.onEvict != nil {
victim.val, _ = c.store.Get(victim.key)
c.onEvict(victim.key, victim.val, victim.cost)
switch i.flag {
case itemNew:
if victims, added := c.policy.Add(i.key, i.cost); added {
karlmcguire marked this conversation as resolved.
Show resolved Hide resolved
// item was accepted by the policy, so add to the hashmap
c.store.Set(i.key, i.value)
// delete victims
for _, victim := range victims {
// TODO: make Get-Delete atomic
if c.onEvict != nil {
victim.value, _ = c.store.Get(victim.key)
c.onEvict(victim.key, victim.value, victim.cost)
}
c.store.Del(victim.key)
}
}
// delete from hashmap
c.store.Del(victim.key)
case itemUpdate:
c.policy.Update(i.key, i.cost)
case itemDelete:
c.policy.Del(i.key)
c.store.Del(i.key)
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"math/rand"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/dgraph-io/ristretto/sim"
"github.com/dgraph-io/ristretto/z"
)

// TestCache is used to pass instances of Ristretto and Clairvoyant around and
Expand Down Expand Up @@ -112,6 +114,59 @@ func TestCacheSetDel(t *testing.T) {
}
}

func TestCacheCoster(t *testing.T) {
costRuns := uint64(0)
cache, err := NewCache(&Config{
NumCounters: 1000,
MaxCost: 500,
BufferItems: 64,
Cost: func(value interface{}) int64 {
atomic.AddUint64(&costRuns, 1)
return 5
},
})
if err != nil {
panic(err)
}
for i := 0; i < 100; i++ {
cache.Set(i, i, 0)
}
time.Sleep(time.Second / 100)
for i := 0; i < 100; i++ {
if cache.policy.Cost(z.KeyToHash(i)) != 5 {
t.Fatal("coster not being ran")
}
}
if costRuns != 100 {
t.Fatal("coster not being ran")
}
}

// TestCacheUpdate verifies that a Set call on an existing key immediately
// updates the value and cost for that key without using/polluting the Set
// buffer(s).
func TestCacheUpdate(t *testing.T) {
cache := newCache(true)
cache.Set(1, 1, 1)
// wait for new-item Set to go through
time.Sleep(time.Second / 100)
// do 100 updates
for i := 0; i < 100; i++ {
// update the same key (1) with incrementing value and cost, so we can
// verify that they are immediately updated and not going through
// channels
cache.Set(1, i, int64(i))
if val, ok := cache.Get(1); !ok || val.(int) != i {
t.Fatal("keyUpdate value inconsistent")
}
}
// wait for keyUpdates to go through
time.Sleep(time.Second / 100)
if cache.Metrics().Get(keyUpdate) == 0 {
t.Fatal("keyUpdates not being processed")
}
}

func TestCacheOnEvict(t *testing.T) {
mu := &sync.Mutex{}
evictions := make(map[uint64]int)
Expand Down
51 changes: 36 additions & 15 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type policy interface {
Del(uint64)
// Cap returns the available capacity.
Cap() int64
// Update updates the cost value for the key.
Update(uint64, int64)
// Cost returns the cost value of a key or -1 if missing.
Cost(uint64) int64
// Optionally, set stats object to track how policy is performing.
CollectMetrics(stats *metrics)
}
Expand Down Expand Up @@ -157,9 +161,7 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) {
// store victim in evicted victims slice
victims = append(victims, &item{
key: minKey,
val: nil,
cost: minCost,
del: false,
})
}
p.evict.add(key, cost)
Expand All @@ -168,21 +170,37 @@ func (p *defaultPolicy) Add(key uint64, cost int64) ([]*item, bool) {

func (p *defaultPolicy) Has(key uint64) bool {
p.Lock()
defer p.Unlock()
_, exists := p.evict.keyCosts[key]
p.Unlock()
return exists
}

func (p *defaultPolicy) Del(key uint64) {
p.Lock()
defer p.Unlock()
p.evict.del(key)
p.Unlock()
}

func (p *defaultPolicy) Cap() int64 {
p.Lock()
capacity := int64(p.evict.maxCost - p.evict.used)
p.Unlock()
return capacity
}

func (p *defaultPolicy) Update(key uint64, cost int64) {
p.Lock()
p.evict.updateIfHas(key, cost)
p.Unlock()
}

func (p *defaultPolicy) Cost(key uint64) int64 {
p.Lock()
defer p.Unlock()
return int64(p.evict.maxCost - p.evict.used)
if cost, found := p.evict.keyCosts[key]; found {
return cost
}
return -1
}

// sampledLFU is an eviction helper storing key-cost pairs.
Expand Down Expand Up @@ -222,27 +240,23 @@ func (p *sampledLFU) del(key uint64) {
if !ok {
return
}

p.stats.Add(keyEvict, key, 1)
p.stats.Add(costEvict, key, uint64(cost))

p.used -= cost
delete(p.keyCosts, key)
}

func (p *sampledLFU) add(key uint64, cost int64) {
p.stats.Add(keyAdd, key, 1)
p.stats.Add(costAdd, key, uint64(cost))

p.keyCosts[key] = cost
p.used += cost
}

// TODO: Move this to the store itself. So, it can be used by public Set.
func (p *sampledLFU) updateIfHas(key uint64, cost int64) (updated bool) {
if prev, exists := p.keyCosts[key]; exists {
// Update the cost of the existing key. For simplicity, don't worry about evicting anything
// if the updated cost causes the size to grow beyond maxCost.
func (p *sampledLFU) updateIfHas(key uint64, cost int64) bool {
if prev, found := p.keyCosts[key]; found {
// update the cost of an existing key, but don't worry about evicting,
// evictions will be handled the next time a new item is added
p.stats.Add(keyUpdate, key, 1)
p.used += cost - prev
p.keyCosts[key] = cost
Expand Down Expand Up @@ -376,9 +390,7 @@ func (p *lruPolicy) Add(key uint64, cost int64) ([]*item, bool) {
delete(p.ptrs, victim.key)
victims = append(victims, &item{
key: victim.key,
val: nil,
cost: victim.cost,
del: false,
})
// adjust room
p.room += victim.cost
Expand Down Expand Up @@ -412,6 +424,15 @@ func (p *lruPolicy) Cap() int64 {
return int64(p.vals.Len())
}

// TODO
func (p *lruPolicy) Update(key uint64, cost int64) {
}

// TODO
func (p *lruPolicy) Cost(key uint64) int64 {
return -1
}

// TODO
func (p *lruPolicy) CollectMetrics(stats *metrics) {
}
Loading