Skip to content

Commit

Permalink
Add a way to stop goroutines (#53)
Browse files Browse the repository at this point in the history
* Let Close to stop goroutines

* formatting

* fix test
  • Loading branch information
hnakamur authored and karlmcguire committed Oct 2, 2019
1 parent 677a7df commit 3728a36
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
8 changes: 7 additions & 1 deletion cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,13 @@ func (c *Cache) Del(key interface{}) {
}

// Close stops all goroutines and closes all channels.
func (c *Cache) Close() {}
func (c *Cache) Close() {
// block until processItems goroutine is returned
c.stop <- struct{}{}
close(c.stop)
close(c.setBuf)
c.policy.Close()
}

// Clear empties the hashmap and zeroes all policy counters. Note that this is
// not an atomic operation (but that shouldn't be a problem as it's assumed that
Expand Down
8 changes: 2 additions & 6 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,8 @@ func TestCacheSetDel(t *testing.T) {
cache := newCache(true)
cache.Set(1, 1, 1)
cache.Del(1)
found := false
// make sure the item is eventually deleted
for i := 0; i < 10; i++ {
_, found = cache.Get(1)
}
if found {
time.Sleep(time.Second / 100)
if _, found := cache.Get(1); found {
t.Fatal("value shouldn't exist")
}
}
Expand Down
27 changes: 22 additions & 5 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type policy interface {
Del(uint64)
// Cap returns the available capacity.
Cap() int64
// Close stops all goroutines and closes all channels.
Close()
// Update updates the cost value for the key.
Update(uint64, int64)
// Cost returns the cost value of a key or -1 if missing.
Expand All @@ -58,8 +60,8 @@ func newPolicy(numCounters, maxCost int64) policy {
admit: newTinyLFU(numCounters),
evict: newSampledLFU(maxCost),
itemsCh: make(chan []uint64, 3),
stop: make(chan struct{}),
}
// TODO: Add a way to stop the goroutine.
go p.processItems()
return p
}
Expand All @@ -71,6 +73,7 @@ type defaultPolicy struct {
admit *tinyLFU
evict *sampledLFU
itemsCh chan []uint64
stop chan struct{}
stats *metrics
}

Expand All @@ -85,10 +88,15 @@ type policyPair struct {
}

func (p *defaultPolicy) processItems() {
for items := range p.itemsCh {
p.Lock()
p.admit.Push(items)
p.Unlock()
for {
select {
case items := <-p.itemsCh:
p.Lock()
p.admit.Push(items)
p.Unlock()
case <-p.stop:
return
}
}
}

Expand Down Expand Up @@ -212,6 +220,13 @@ func (p *defaultPolicy) Clear() {
p.evict.clear()
}

func (p *defaultPolicy) Close() {
// block until p.processItems goroutine is returned
p.stop <- struct{}{}
close(p.stop)
close(p.itemsCh)
}

// sampledLFU is an eviction helper storing key-cost pairs.
type sampledLFU struct {
keyCosts map[uint64]int64
Expand Down Expand Up @@ -444,6 +459,8 @@ func (p *lruPolicy) Cap() int64 {
return int64(p.vals.Len())
}

func (p *lruPolicy) Close() {}

// TODO
func (p *lruPolicy) Update(key uint64, cost int64) {
}
Expand Down

0 comments on commit 3728a36

Please sign in to comment.