From 3cd0e5aa35fd10f3288b53d95f12431a50515a63 Mon Sep 17 00:00:00 2001 From: Thomas LE ROUX Date: Thu, 16 Apr 2020 18:41:06 +0200 Subject: [PATCH 1/6] fix(store/memory): Try an experiment to uses sync.Map to store counters --- drivers/store/memory/cache.go | 163 +++++++++++++++++++++++----------- 1 file changed, 113 insertions(+), 50 deletions(-) diff --git a/drivers/store/memory/cache.go b/drivers/store/memory/cache.go index 361471f..7c39a26 100644 --- a/drivers/store/memory/cache.go +++ b/drivers/store/memory/cache.go @@ -37,6 +37,7 @@ func (cleaner *cleaner) Run(cache *Cache) { // stopCleaner is a callback from GC used to stop cleaner goroutine. func stopCleaner(wrapper *CacheWrapper) { wrapper.cleaner.stop <- true + wrapper.cleaner = nil } // startCleaner will start a cleaner goroutine for given cache. @@ -50,34 +51,64 @@ func startCleaner(cache *Cache, interval time.Duration) { go cleaner.Run(cache) } -// Counter is a simple counter with an optional expiration. +// Counter is a simple counter with an expiration. type Counter struct { - Value int64 - Expiration int64 + mutex sync.RWMutex + value int64 + expiration int64 } // Expired returns true if the counter has expired. -func (counter Counter) Expired() bool { - if counter.Expiration == 0 { - return false +func (counter *Counter) Expired() bool { + counter.mutex.RLock() + defer counter.mutex.RUnlock() + + if counter.expiration == 0 { + return true + } + return time.Now().UnixNano() > counter.expiration +} + +// Load returns the value and the expiration of this counter. +// If the counter is expired, it will use the given expiration. +func (counter *Counter) Load(expiration int64) (int64, int64) { + counter.mutex.RLock() + defer counter.mutex.RUnlock() + + if counter.expiration == 0 || time.Now().UnixNano() > counter.expiration { + return 0, expiration + } + + return counter.value, counter.expiration +} + +// Increment increments given value on this counter. +// If the counter is expired, it will use the given expiration. +// It returns its current value and expiration. +func (counter *Counter) Increment(value int64, expiration int64) (int64, int64) { + counter.mutex.Lock() + defer counter.mutex.Unlock() + + if counter.expiration == 0 || time.Now().UnixNano() > counter.expiration { + counter.value = value + counter.expiration = expiration + return counter.value, counter.expiration } - return time.Now().UnixNano() > counter.Expiration + + counter.value += value + return counter.value, counter.expiration } // Cache contains a collection of counters. type Cache struct { - mutex sync.RWMutex - counters map[string]Counter + counters sync.Map cleaner *cleaner } // NewCache returns a new cache. func NewCache(cleanInterval time.Duration) *CacheWrapper { - cache := &Cache{ - counters: map[string]Counter{}, - } - + cache := &Cache{} wrapper := &CacheWrapper{Cache: cache} if cleanInterval > 0 { @@ -88,71 +119,103 @@ func NewCache(cleanInterval time.Duration) *CacheWrapper { return wrapper } +// LoadOrStore returns the existing counter for the key if present. +// Otherwise, it stores and returns the given counter. +// The loaded result is true if the counter was loaded, false if stored. +func (cache *Cache) LoadOrStore(key string, counter *Counter) (*Counter, bool) { + val, loaded := cache.counters.LoadOrStore(key, counter) + if val == nil { + return counter, false + } + + actual := val.(*Counter) + return actual, loaded +} + +// Load returns the counter stored in the map for a key, or nil if no counter is present. +// The ok result indicates whether counter was found in the map. +func (cache *Cache) Load(key string) (*Counter, bool) { + val, ok := cache.counters.Load(key) + if val == nil || !ok { + return nil, false + } + actual := val.(*Counter) + return actual, true +} + +// Store sets the counter for a key. +func (cache *Cache) Store(key string, counter *Counter) { + cache.counters.Store(key, counter) +} + +// Delete deletes the value for a key. +func (cache *Cache) Delete(key string) { + cache.counters.Delete(key) +} + +// Range calls handler sequentially for each key and value present in the cache. +// If handler returns false, range stops the iteration. +func (cache *Cache) Range(handler func(key string, counter *Counter)) { + cache.counters.Range(func(k interface{}, v interface{}) bool { + if v == nil { + return true + } + + key := k.(string) + counter := v.(*Counter) + + handler(key, counter) + + return true + }) +} + // Increment increments given value on key. // If key is undefined or expired, it will create it. func (cache *Cache) Increment(key string, value int64, duration time.Duration) (int64, time.Time) { - cache.mutex.Lock() - - counter, ok := cache.counters[key] - if !ok || counter.Expired() { - expiration := time.Now().Add(duration).UnixNano() - counter = Counter{ - Value: value, - Expiration: expiration, - } + expiration := time.Now().Add(duration).UnixNano() - cache.counters[key] = counter - cache.mutex.Unlock() + counter, loaded := cache.LoadOrStore(key, &Counter{ + mutex: sync.RWMutex{}, + value: value, + expiration: expiration, + }) + if !loaded { return value, time.Unix(0, expiration) } - value = counter.Value + value - counter.Value = value - expiration := counter.Expiration - - cache.counters[key] = counter - cache.mutex.Unlock() - + value, expiration = counter.Increment(value, expiration) + cache.Store(key, counter) return value, time.Unix(0, expiration) } // Get returns key's value and expiration. func (cache *Cache) Get(key string, duration time.Duration) (int64, time.Time) { - cache.mutex.RLock() + expiration := time.Now().Add(duration).UnixNano() - counter, ok := cache.counters[key] - if !ok || counter.Expired() { - expiration := time.Now().Add(duration).UnixNano() - cache.mutex.RUnlock() + counter, ok := cache.Load(key) + if !ok { return 0, time.Unix(0, expiration) } - value := counter.Value - expiration := counter.Expiration - cache.mutex.RUnlock() + value, expiration := counter.Load(expiration) return value, time.Unix(0, expiration) } // Clean will deleted any expired keys. func (cache *Cache) Clean() { - now := time.Now().UnixNano() - - cache.mutex.Lock() - for key, counter := range cache.counters { - if now > counter.Expiration { - delete(cache.counters, key) + cache.Range(func(key string, counter *Counter) { + if counter.Expired() { + cache.counters.Delete(key) } - } - cache.mutex.Unlock() + }) } // Reset changes the key's value and resets the expiration. func (cache *Cache) Reset(key string, duration time.Duration) (int64, time.Time) { - cache.mutex.Lock() - delete(cache.counters, key) - cache.mutex.Unlock() + cache.counters.Delete(key) expiration := time.Now().Add(duration).UnixNano() return 0, time.Unix(0, expiration) From e64d32b5d43cf170381f5813a843a59985b8615c Mon Sep 17 00:00:00 2001 From: Thomas LE ROUX Date: Thu, 16 Apr 2020 19:19:27 +0200 Subject: [PATCH 2/6] refactor(store/memory): Use better naming --- drivers/store/memory/cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drivers/store/memory/cache.go b/drivers/store/memory/cache.go index 7c39a26..cae0dee 100644 --- a/drivers/store/memory/cache.go +++ b/drivers/store/memory/cache.go @@ -208,14 +208,14 @@ func (cache *Cache) Get(key string, duration time.Duration) (int64, time.Time) { func (cache *Cache) Clean() { cache.Range(func(key string, counter *Counter) { if counter.Expired() { - cache.counters.Delete(key) + cache.Delete(key) } }) } // Reset changes the key's value and resets the expiration. func (cache *Cache) Reset(key string, duration time.Duration) (int64, time.Time) { - cache.counters.Delete(key) + cache.Delete(key) expiration := time.Now().Add(duration).UnixNano() return 0, time.Unix(0, expiration) From c316c0bacbcce8e738c6e75da1476ebdc36c86f4 Mon Sep 17 00:00:00 2001 From: Thomas LE ROUX Date: Fri, 13 Nov 2020 17:57:22 +0100 Subject: [PATCH 3/6] chore(test): Improve benchmark --- drivers/store/memory/cache.go | 6 +----- drivers/store/tests/tests.go | 14 ++++---------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/drivers/store/memory/cache.go b/drivers/store/memory/cache.go index cae0dee..4728d89 100644 --- a/drivers/store/memory/cache.go +++ b/drivers/store/memory/cache.go @@ -63,10 +63,7 @@ func (counter *Counter) Expired() bool { counter.mutex.RLock() defer counter.mutex.RUnlock() - if counter.expiration == 0 { - return true - } - return time.Now().UnixNano() > counter.expiration + return counter.expiration == 0 || time.Now().UnixNano() > counter.expiration } // Load returns the value and the expiration of this counter. @@ -200,7 +197,6 @@ func (cache *Cache) Get(key string, duration time.Duration) (int64, time.Time) { } value, expiration := counter.Load(expiration) - return value, time.Unix(0, expiration) } diff --git a/drivers/store/tests/tests.go b/drivers/store/tests/tests.go index b08c086..fefd587 100644 --- a/drivers/store/tests/tests.go +++ b/drivers/store/tests/tests.go @@ -145,28 +145,24 @@ func TestStoreConcurrentAccess(t *testing.T, store limiter.Store) { // BenchmarkStoreSequentialAccess executes a benchmark against a store without parallel setting. func BenchmarkStoreSequentialAccess(b *testing.B, store limiter.Store) { - is := require.New(b) ctx := context.Background() - limiter := limiter.New(store, limiter.Rate{ + instance := limiter.New(store, limiter.Rate{ Limit: 100000, Period: 10 * time.Second, }) b.ResetTimer() for i := 0; i < b.N; i++ { - lctx, err := limiter.Get(ctx, "foo") - is.NoError(err) - is.NotZero(lctx) + _, _ = instance.Get(ctx, "foo") } } // BenchmarkStoreConcurrentAccess executes a benchmark against a store with parallel setting. func BenchmarkStoreConcurrentAccess(b *testing.B, store limiter.Store) { - is := require.New(b) ctx := context.Background() - limiter := limiter.New(store, limiter.Rate{ + instance := limiter.New(store, limiter.Rate{ Limit: 100000, Period: 10 * time.Second, }) @@ -174,9 +170,7 @@ func BenchmarkStoreConcurrentAccess(b *testing.B, store limiter.Store) { b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - lctx, err := limiter.Get(ctx, "foo") - is.NoError(err) - is.NotZero(lctx) + _, _ = instance.Get(ctx, "foo") } }) } From 869e7a05b158ae66eb38b1b9454e50352d8f5a9e Mon Sep 17 00:00:00 2001 From: Thomas LE ROUX Date: Tue, 17 Nov 2020 11:44:09 +0100 Subject: [PATCH 4/6] refactor(store/memory): Improve performance --- drivers/store/memory/cache.go | 18 ++++++--- drivers/store/memory/store.go | 29 ++++++++------ drivers/store/memory/store_test.go | 8 ++-- internal/bytebuffer/pool.go | 58 +++++++++++++++++++++++++++ internal/fasttime/fasttime.go | 21 ++++++++++ internal/fasttime/fasttime_windows.go | 13 ++++++ store.go | 7 +++- 7 files changed, 130 insertions(+), 24 deletions(-) create mode 100644 internal/bytebuffer/pool.go create mode 100644 internal/fasttime/fasttime.go create mode 100644 internal/fasttime/fasttime_windows.go diff --git a/drivers/store/memory/cache.go b/drivers/store/memory/cache.go index 4728d89..4eab50f 100644 --- a/drivers/store/memory/cache.go +++ b/drivers/store/memory/cache.go @@ -172,18 +172,26 @@ func (cache *Cache) Range(handler func(key string, counter *Counter)) { func (cache *Cache) Increment(key string, value int64, duration time.Duration) (int64, time.Time) { expiration := time.Now().Add(duration).UnixNano() - counter, loaded := cache.LoadOrStore(key, &Counter{ + // If counter is in cache, try to load it first. + counter, loaded := cache.Load(key) + if loaded { + value, expiration = counter.Increment(value, expiration) + return value, time.Unix(0, expiration) + } + + // If it's not in cache, try to atomically create it. + // We do that in two step to reduce memory allocation. + counter, loaded = cache.LoadOrStore(key, &Counter{ mutex: sync.RWMutex{}, value: value, expiration: expiration, }) - - if !loaded { + if loaded { + value, expiration = counter.Increment(value, expiration) return value, time.Unix(0, expiration) } - value, expiration = counter.Increment(value, expiration) - cache.Store(key, counter) + // Otherwise, it has been created, return given value. return value, time.Unix(0, expiration) } diff --git a/drivers/store/memory/store.go b/drivers/store/memory/store.go index db36ce1..6a71803 100644 --- a/drivers/store/memory/store.go +++ b/drivers/store/memory/store.go @@ -2,11 +2,11 @@ package memory import ( "context" - "fmt" "time" "github.com/ulule/limiter/v3" "github.com/ulule/limiter/v3/drivers/store/common" + "github.com/ulule/limiter/v3/internal/bytebuffer" ) // Store is the in-memory store. @@ -35,33 +35,36 @@ func NewStoreWithOptions(options limiter.StoreOptions) limiter.Store { // Get returns the limit for given identifier. func (store *Store) Get(ctx context.Context, key string, rate limiter.Rate) (limiter.Context, error) { - key = fmt.Sprintf("%s:%s", store.Prefix, key) - now := time.Now() + buffer := bytebuffer.New() + defer buffer.Close() + buffer.Concat(store.Prefix, ":", key) - count, expiration := store.cache.Increment(key, 1, rate.Period) + count, expiration := store.cache.Increment(buffer.String(), 1, rate.Period) - lctx := common.GetContextFromState(now, rate, expiration, count) + lctx := common.GetContextFromState(time.Now(), rate, expiration, count) return lctx, nil } // Peek returns the limit for given identifier, without modification on current values. func (store *Store) Peek(ctx context.Context, key string, rate limiter.Rate) (limiter.Context, error) { - key = fmt.Sprintf("%s:%s", store.Prefix, key) - now := time.Now() + buffer := bytebuffer.New() + defer buffer.Close() + buffer.Concat(store.Prefix, ":", key) - count, expiration := store.cache.Get(key, rate.Period) + count, expiration := store.cache.Get(buffer.String(), rate.Period) - lctx := common.GetContextFromState(now, rate, expiration, count) + lctx := common.GetContextFromState(time.Now(), rate, expiration, count) return lctx, nil } // Reset returns the limit for given identifier. func (store *Store) Reset(ctx context.Context, key string, rate limiter.Rate) (limiter.Context, error) { - key = fmt.Sprintf("%s:%s", store.Prefix, key) - now := time.Now() + buffer := bytebuffer.New() + defer buffer.Close() + buffer.Concat(store.Prefix, ":", key) - count, expiration := store.cache.Reset(key, rate.Period) + count, expiration := store.cache.Reset(buffer.String(), rate.Period) - lctx := common.GetContextFromState(now, rate, expiration, count) + lctx := common.GetContextFromState(time.Now(), rate, expiration, count) return lctx, nil } diff --git a/drivers/store/memory/store_test.go b/drivers/store/memory/store_test.go index 2a5bf7e..88ad3dc 100644 --- a/drivers/store/memory/store_test.go +++ b/drivers/store/memory/store_test.go @@ -23,16 +23,16 @@ func TestMemoryStoreConcurrentAccess(t *testing.T) { })) } -func BenchmarkRedisStoreSequentialAccess(b *testing.B) { +func BenchmarkMemoryStoreSequentialAccess(b *testing.B) { tests.BenchmarkStoreSequentialAccess(b, memory.NewStoreWithOptions(limiter.StoreOptions{ Prefix: "limiter:memory:sequential-benchmark", - CleanUpInterval: 1 * time.Second, + CleanUpInterval: 1 * time.Hour, })) } -func BenchmarkRedisStoreConcurrentAccess(b *testing.B) { +func BenchmarkMemoryStoreConcurrentAccess(b *testing.B) { tests.BenchmarkStoreConcurrentAccess(b, memory.NewStoreWithOptions(limiter.StoreOptions{ Prefix: "limiter:memory:concurrent-benchmark", - CleanUpInterval: 1 * time.Second, + CleanUpInterval: 1 * time.Hour, })) } diff --git a/internal/bytebuffer/pool.go b/internal/bytebuffer/pool.go new file mode 100644 index 0000000..13f101b --- /dev/null +++ b/internal/bytebuffer/pool.go @@ -0,0 +1,58 @@ +package bytebuffer + +import ( + "sync" + "unsafe" +) + +// ByteBuffer is a wrapper around a slice to reduce memory allocation while handling blob of data. +type ByteBuffer struct { + blob []byte +} + +// NewByteBuffer creates a new ByteBuffer instance. +func New() *ByteBuffer { + entry := bufferPool.Get().(*ByteBuffer) + entry.blob = entry.blob[:0] + return entry +} + +// Bytes returns the content buffer. +func (buffer *ByteBuffer) Bytes() []byte { + return buffer.blob +} + +// String returns the content buffer. +func (buffer *ByteBuffer) String() string { + // Copied from strings.(*Builder).String + return *(*string)(unsafe.Pointer(&buffer.blob)) // nolint: gosec +} + +// Concat appends given arguments to blob content +func (buffer *ByteBuffer) Concat(args ...string) { + for i := range args { + buffer.blob = append(buffer.blob, args[i]...) + } +} + +// Close recycles underlying resources of encoder. +func (buffer *ByteBuffer) Close() { + // Proper usage of a sync.Pool requires each entry to have approximately + // the same memory cost. To obtain this property when the stored type + // contains a variably-sized buffer, we add a hard limit on the maximum buffer + // to place back in the pool. + // + // See https://golang.org/issue/23199 + if buffer != nil && cap(buffer.blob) < (1<<16) { + bufferPool.Put(buffer) + } +} + +// A byte buffer pool to reduce memory allocation pressure. +var bufferPool = &sync.Pool{ + New: func() interface{} { + return &ByteBuffer{ + blob: make([]byte, 0, 1024), + } + }, +} diff --git a/internal/fasttime/fasttime.go b/internal/fasttime/fasttime.go new file mode 100644 index 0000000..a1a74d0 --- /dev/null +++ b/internal/fasttime/fasttime.go @@ -0,0 +1,21 @@ +// +build !windows + +// Package fasttime gets wallclock time, but super fast. +package fasttime + +import ( + _ "unsafe" +) + +// Forked from https://github.com/sethvargo/go-limiter + +//go:noescape +//go:linkname walltime runtime.walltime +func walltime() (int64, int32) + +// Now returns a monotonic clock value. The actual value will differ across +// systems, but that's okay because we generally only care about the deltas. +func Now() uint64 { + x, y := walltime() + return uint64(x)*1e9 + uint64(y) +} diff --git a/internal/fasttime/fasttime_windows.go b/internal/fasttime/fasttime_windows.go new file mode 100644 index 0000000..447bf46 --- /dev/null +++ b/internal/fasttime/fasttime_windows.go @@ -0,0 +1,13 @@ +// +build windows + +package fasttime + +import "time" + +// Forked from https://github.com/sethvargo/go-limiter + +// Now returns a monotonic clock value. On Windows, no such clock exists, so we +// fallback to time.Now(). +func Now() uint64 { + return uint64(time.Now().UnixNano()) +} diff --git a/store.go b/store.go index c797680..3290c75 100644 --- a/store.go +++ b/store.go @@ -20,10 +20,13 @@ type StoreOptions struct { // Prefix is the prefix to use for the key. Prefix string - // MaxRetry is the maximum number of retry under race conditions. + // MaxRetry is the maximum number of retry under race conditions on redis store. // Deprecated: this option is no longer required since all operations are atomic now. MaxRetry int - // CleanUpInterval is the interval for cleanup. + // CleanUpInterval is the interval for cleanup (run garbage collection) on stale entries on memory store. + // Setting this to a low value will optimize memory consumption, but will likely + // reduce performance and increase lock contention. + // Setting this to a high value will maximum throughput, but will increase the memory footprint. CleanUpInterval time.Duration } From 547fc6d9c10bfceb9266b3fafd6520dc1d1c6370 Mon Sep 17 00:00:00 2001 From: Thomas LE ROUX Date: Tue, 17 Nov 2020 11:54:52 +0100 Subject: [PATCH 5/6] chore(store/memory): Decrease number of go routines for CircleCI --- drivers/store/memory/cache_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/drivers/store/memory/cache_test.go b/drivers/store/memory/cache_test.go index 78e5757..7ab05b0 100644 --- a/drivers/store/memory/cache_test.go +++ b/drivers/store/memory/cache_test.go @@ -38,7 +38,7 @@ func TestCacheIncrementSequential(t *testing.T) { func TestCacheIncrementConcurrent(t *testing.T) { is := require.New(t) - goroutines := 300 + goroutines := 200 ops := 500 expected := int64(0) From 11f537dc6eb3a97c6ad60f10b98ab66643683934 Mon Sep 17 00:00:00 2001 From: Thomas LE ROUX Date: Tue, 17 Nov 2020 12:02:33 +0100 Subject: [PATCH 6/6] chore: fix linter --- internal/bytebuffer/pool.go | 2 +- internal/fasttime/fasttime.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/bytebuffer/pool.go b/internal/bytebuffer/pool.go index 13f101b..5e761ad 100644 --- a/internal/bytebuffer/pool.go +++ b/internal/bytebuffer/pool.go @@ -10,7 +10,7 @@ type ByteBuffer struct { blob []byte } -// NewByteBuffer creates a new ByteBuffer instance. +// New creates a new ByteBuffer instance. func New() *ByteBuffer { entry := bufferPool.Get().(*ByteBuffer) entry.blob = entry.blob[:0] diff --git a/internal/fasttime/fasttime.go b/internal/fasttime/fasttime.go index a1a74d0..ac6d78b 100644 --- a/internal/fasttime/fasttime.go +++ b/internal/fasttime/fasttime.go @@ -4,7 +4,7 @@ package fasttime import ( - _ "unsafe" + _ "unsafe" // import unsafe because we use go:linkname directive. ) // Forked from https://github.com/sethvargo/go-limiter