From 40275a30c8889bd6a3f230892801c26a181917e1 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Fri, 26 Jun 2020 20:22:30 +0800 Subject: [PATCH] Ability to dynamically SetMaxSize To support this, rather than adding another field/channel like `getDroppedReq`, I added a `control` channel that can be used for these miscellaneous interactions with the worker. The control can also be used to take over for the `donec` channel --- cache.go | 59 ++++++++++++++++++++++++++++++-------------- cache_test.go | 32 ++++++++++++++++++++++++ layeredcache.go | 50 +++++++++++++++++++++++-------------- layeredcache_test.go | 32 ++++++++++++++++++++++++ 4 files changed, 135 insertions(+), 38 deletions(-) diff --git a/cache.go b/cache.go index 63da72e..1190b57 100644 --- a/cache.go +++ b/cache.go @@ -8,17 +8,24 @@ import ( "time" ) +// The cache has a generic 'control' channel that is used to send +// messages to the worker. These are the messages that can be sent to it +type getDropped struct { + res chan int +} +type setMaxSize struct { + size int64 +} + type Cache struct { *Configuration - list *list.List - size int64 - buckets []*bucket - bucketMask uint32 - deletables chan *Item - promotables chan *Item - donec chan struct{} - getDroppedReq chan struct{} - getDroppedRes chan int + list *list.List + size int64 + buckets []*bucket + bucketMask uint32 + deletables chan *Item + promotables chan *Item + control chan interface{} } // Create a new cache with the specified configuration @@ -29,8 +36,7 @@ func New(config *Configuration) *Cache { Configuration: config, bucketMask: uint32(config.buckets) - 1, buckets: make([]*bucket, config.buckets), - getDroppedReq: make(chan struct{}), - getDroppedRes: make(chan int), + control: make(chan interface{}), } for i := 0; i < int(config.buckets); i++ { c.buckets[i] = &bucket{ @@ -138,20 +144,27 @@ func (c *Cache) Clear() { // is called are likely to panic func (c *Cache) Stop() { close(c.promotables) - <-c.donec + <-c.control } // Gets the number of items removed from the cache due to memory pressure since // the last time GetDropped was called func (c *Cache) GetDropped() int { - c.getDroppedReq <- struct{}{} - return <-c.getDroppedRes + res := make(chan int) + c.control <- getDropped{res: res} + return <-res +} + +// Sets a new max size. That can result in a GC being run if the new maxium size +// is smaller than the cached size +func (c *Cache) SetMaxSize(size int64) { + c.control <- setMaxSize{size} } func (c *Cache) restart() { c.deletables = make(chan *Item, c.deleteBuffer) c.promotables = make(chan *Item, c.promoteBuffer) - c.donec = make(chan struct{}) + c.control = make(chan interface{}) go c.worker() } @@ -180,7 +193,7 @@ func (c *Cache) promote(item *Item) { } func (c *Cache) worker() { - defer close(c.donec) + defer close(c.control) dropped := 0 for { select { @@ -193,9 +206,17 @@ func (c *Cache) worker() { } case item := <-c.deletables: c.doDelete(item) - case _ = <-c.getDroppedReq: - c.getDroppedRes <- dropped - dropped = 0 + case control := <-c.control: + switch msg := control.(type) { + case getDropped: + msg.res <- dropped + dropped = 0 + case setMaxSize: + c.maxSize = msg.size + if c.size > c.maxSize { + dropped += c.gc() + } + } } } diff --git a/cache_test.go b/cache_test.go index db9614f..5db2264 100644 --- a/cache_test.go +++ b/cache_test.go @@ -208,6 +208,38 @@ func (_ CacheTests) ReplaceChangesSize() { checkSize(cache, 5) } +func (_ CacheTests) ResizeOnTheFly() { + cache := New(Configure().MaxSize(9).ItemsToPrune(1)) + for i := 0; i < 5; i++ { + cache.Set(strconv.Itoa(i), i, time.Minute) + } + cache.SetMaxSize(3) + time.Sleep(time.Millisecond * 10) + Expect(cache.GetDropped()).To.Equal(2) + Expect(cache.Get("0")).To.Equal(nil) + Expect(cache.Get("1")).To.Equal(nil) + Expect(cache.Get("2").Value()).To.Equal(2) + Expect(cache.Get("3").Value()).To.Equal(3) + Expect(cache.Get("4").Value()).To.Equal(4) + + cache.Set("5", 5, time.Minute) + time.Sleep(time.Millisecond * 5) + Expect(cache.GetDropped()).To.Equal(1) + Expect(cache.Get("2")).To.Equal(nil) + Expect(cache.Get("3").Value()).To.Equal(3) + Expect(cache.Get("4").Value()).To.Equal(4) + Expect(cache.Get("5").Value()).To.Equal(5) + + cache.SetMaxSize(10) + cache.Set("6", 6, time.Minute) + time.Sleep(time.Millisecond * 10) + Expect(cache.GetDropped()).To.Equal(0) + Expect(cache.Get("3").Value()).To.Equal(3) + Expect(cache.Get("4").Value()).To.Equal(4) + Expect(cache.Get("5").Value()).To.Equal(5) + Expect(cache.Get("6").Value()).To.Equal(6) +} + type SizedItem struct { id int s int64 diff --git a/layeredcache.go b/layeredcache.go index 2d87da0..65174f5 100644 --- a/layeredcache.go +++ b/layeredcache.go @@ -10,15 +10,13 @@ import ( type LayeredCache struct { *Configuration - list *list.List - buckets []*layeredBucket - bucketMask uint32 - size int64 - deletables chan *Item - promotables chan *Item - donec chan struct{} - getDroppedReq chan struct{} - getDroppedRes chan int + list *list.List + buckets []*layeredBucket + bucketMask uint32 + size int64 + deletables chan *Item + promotables chan *Item + control chan interface{} } // Create a new layered cache with the specified configuration. @@ -41,8 +39,7 @@ func Layered(config *Configuration) *LayeredCache { bucketMask: uint32(config.buckets) - 1, buckets: make([]*layeredBucket, config.buckets), deletables: make(chan *Item, config.deleteBuffer), - getDroppedReq: make(chan struct{}), - getDroppedRes: make(chan int), + control: make(chan interface{}), } for i := 0; i < int(config.buckets); i++ { c.buckets[i] = &layeredBucket{ @@ -163,19 +160,26 @@ func (c *LayeredCache) Clear() { func (c *LayeredCache) Stop() { close(c.promotables) - <-c.donec + <-c.control } // Gets the number of items removed from the cache due to memory pressure since // the last time GetDropped was called func (c *LayeredCache) GetDropped() int { - c.getDroppedReq <- struct{}{} - return <-c.getDroppedRes + res := make(chan int) + c.control <- getDropped{res: res} + return <-res +} + +// Sets a new max size. That can result in a GC being run if the new maxium size +// is smaller than the cached size +func (c *LayeredCache) SetMaxSize(size int64) { + c.control <- setMaxSize{size} } func (c *LayeredCache) restart() { c.promotables = make(chan *Item, c.promoteBuffer) - c.donec = make(chan struct{}) + c.control = make(chan interface{}) go c.worker() } @@ -199,7 +203,7 @@ func (c *LayeredCache) promote(item *Item) { } func (c *LayeredCache) worker() { - defer close(c.donec) + defer close(c.control) dropped := 0 for { select { @@ -220,9 +224,17 @@ func (c *LayeredCache) worker() { } c.list.Remove(item.element) } - case _ = <-c.getDroppedReq: - c.getDroppedRes <- dropped - dropped = 0 + case control := <-c.control: + switch msg := control.(type) { + case getDropped: + msg.res <- dropped + dropped = 0 + case setMaxSize: + c.maxSize = msg.size + if c.size > c.maxSize { + dropped += c.gc() + } + } } } } diff --git a/layeredcache_test.go b/layeredcache_test.go index a1edc64..2493edb 100644 --- a/layeredcache_test.go +++ b/layeredcache_test.go @@ -174,6 +174,38 @@ func (_ LayeredCacheTests) RemovesOldestItemWhenFull() { Expect(cache.GetDropped()).To.Equal(0) } +func (_ LayeredCacheTests) ResizeOnTheFly() { + cache := Layered(Configure().MaxSize(9).ItemsToPrune(1)) + for i := 0; i < 5; i++ { + cache.Set(strconv.Itoa(i), "a", i, time.Minute) + } + cache.SetMaxSize(3) + time.Sleep(time.Millisecond * 10) + Expect(cache.GetDropped()).To.Equal(2) + Expect(cache.Get("0", "a")).To.Equal(nil) + Expect(cache.Get("1", "a")).To.Equal(nil) + Expect(cache.Get("2", "a").Value()).To.Equal(2) + Expect(cache.Get("3", "a").Value()).To.Equal(3) + Expect(cache.Get("4", "a").Value()).To.Equal(4) + + cache.Set("5", "a", 5, time.Minute) + time.Sleep(time.Millisecond * 5) + Expect(cache.GetDropped()).To.Equal(1) + Expect(cache.Get("2", "a")).To.Equal(nil) + Expect(cache.Get("3", "a").Value()).To.Equal(3) + Expect(cache.Get("4", "a").Value()).To.Equal(4) + Expect(cache.Get("5", "a").Value()).To.Equal(5) + + cache.SetMaxSize(10) + cache.Set("6", "a", 6, time.Minute) + time.Sleep(time.Millisecond * 10) + Expect(cache.GetDropped()).To.Equal(0) + Expect(cache.Get("3", "a").Value()).To.Equal(3) + Expect(cache.Get("4", "a").Value()).To.Equal(4) + Expect(cache.Get("5", "a").Value()).To.Equal(5) + Expect(cache.Get("6", "a").Value()).To.Equal(6) +} + func newLayered() *LayeredCache { return Layered(Configure()) }