From ce2d087394dc74dbe45e6c9f9bb263ae6f105a26 Mon Sep 17 00:00:00 2001 From: RW Date: Wed, 19 Oct 2022 16:22:42 +0200 Subject: [PATCH] Improve memory storage (#2162) * improve memory storage code and performance * improve memory storage code and performance * improve memory storage code and performance * improve memory storage code and performance * improve memory storage code and performance * improve memory storage code and performance --- internal/memory/memory.go | 49 ++++---- internal/memory/memory_test.go | 2 +- internal/storage/memory/config.go | 33 ++++++ internal/storage/memory/memory.go | 42 +++++-- internal/storage/memory/memory_test.go | 153 +++++++++++++++++++++++++ middleware/limiter/limiter_fixed.go | 12 +- middleware/limiter/limiter_sliding.go | 11 +- utils/time.go | 32 ++++++ utils/time_test.go | 45 ++++++++ 9 files changed, 326 insertions(+), 53 deletions(-) create mode 100644 internal/storage/memory/config.go create mode 100644 internal/storage/memory/memory_test.go create mode 100644 utils/time.go create mode 100644 utils/time_test.go diff --git a/internal/memory/memory.go b/internal/memory/memory.go index dd8111a361..0c37aa9c2e 100644 --- a/internal/memory/memory.go +++ b/internal/memory/memory.go @@ -1,15 +1,18 @@ +// Package memory Is a slight copy of the memory storage, but far from the storage interface it can not only work with bytes +// but directly store any kind of data without having to encode it each time, which gives a huge speed advantage package memory import ( "sync" "sync/atomic" "time" + + "github.com/gofiber/fiber/v2/utils" ) type Storage struct { sync.RWMutex data map[string]item // data - ts uint32 // timestamp } type item struct { @@ -21,10 +24,9 @@ type item struct { func New() *Storage { store := &Storage{ data: make(map[string]item), - ts: uint32(time.Now().Unix()), } + utils.StartTimeStampUpdater() go store.gc(1 * time.Second) - go store.updater(1 * time.Second) return store } @@ -33,7 +35,7 @@ func (s *Storage) Get(key string) interface{} { s.RLock() v, ok := s.data[key] s.RUnlock() - if !ok || v.e != 0 && v.e <= atomic.LoadUint32(&s.ts) { + if !ok || v.e != 0 && v.e <= atomic.LoadUint32(&utils.Timestamp) { return nil } return v.v @@ -43,7 +45,7 @@ func (s *Storage) Get(key string) interface{} { func (s *Storage) Set(key string, val interface{}, ttl time.Duration) { var exp uint32 if ttl > 0 { - exp = uint32(ttl.Seconds()) + atomic.LoadUint32(&s.ts) + exp = uint32(ttl.Seconds()) + atomic.LoadUint32(&utils.Timestamp) } s.Lock() s.data[key] = item{exp, val} @@ -64,28 +66,27 @@ func (s *Storage) Reset() { s.Unlock() } -func (s *Storage) updater(sleep time.Duration) { - for { - time.Sleep(sleep) - atomic.StoreUint32(&s.ts, uint32(time.Now().Unix())) - } -} func (s *Storage) gc(sleep time.Duration) { - expired := []string{} + ticker := time.NewTicker(sleep) + defer ticker.Stop() + var expired []string + for { - time.Sleep(sleep) - expired = expired[:0] - s.RLock() - for key, v := range s.data { - if v.e != 0 && v.e <= atomic.LoadUint32(&s.ts) { - expired = append(expired, key) + select { + case <-ticker.C: + expired = expired[:0] + s.RLock() + for key, v := range s.data { + if v.e != 0 && v.e <= atomic.LoadUint32(&utils.Timestamp) { + expired = append(expired, key) + } } + s.RUnlock() + s.Lock() + for i := range expired { + delete(s.data, expired[i]) + } + s.Unlock() } - s.RUnlock() - s.Lock() - for i := range expired { - delete(s.data, expired[i]) - } - s.Unlock() } } diff --git a/internal/memory/memory_test.go b/internal/memory/memory_test.go index a592bb7cba..12bcf3884c 100644 --- a/internal/memory/memory_test.go +++ b/internal/memory/memory_test.go @@ -58,7 +58,7 @@ func Benchmark_Memory(b *testing.B) { for i := 0; i < keyLength; i++ { keys[i] = utils.UUID() } - value := []string{"some", "random", "value"} + value := []byte("joe") ttl := 2 * time.Second b.Run("fiber_memory", func(b *testing.B) { diff --git a/internal/storage/memory/config.go b/internal/storage/memory/config.go new file mode 100644 index 0000000000..07d13edb5b --- /dev/null +++ b/internal/storage/memory/config.go @@ -0,0 +1,33 @@ +package memory + +import "time" + +// Config defines the config for storage. +type Config struct { + // Time before deleting expired keys + // + // Default is 10 * time.Second + GCInterval time.Duration +} + +// ConfigDefault is the default config +var ConfigDefault = Config{ + GCInterval: 10 * time.Second, +} + +// configDefault is a helper function to set default values +func configDefault(config ...Config) Config { + // Return default config if nothing provided + if len(config) < 1 { + return ConfigDefault + } + + // Override default config + cfg := config[0] + + // Set default values + if int(cfg.GCInterval.Seconds()) <= 0 { + cfg.GCInterval = ConfigDefault.GCInterval + } + return cfg +} diff --git a/internal/storage/memory/memory.go b/internal/storage/memory/memory.go index 048fb3a325..ff43c30533 100644 --- a/internal/storage/memory/memory.go +++ b/internal/storage/memory/memory.go @@ -1,8 +1,13 @@ +// Package memory Is a copy of the storage memory from the external storage packet as a purpose to test the behavior +// in the unittests when using a storages from these packets package memory import ( "sync" + "sync/atomic" "time" + + "github.com/gofiber/fiber/v2/utils" ) // Storage interface that is implemented by storage providers @@ -14,21 +19,25 @@ type Storage struct { } type entry struct { + data []byte // max value is 4294967295 -> Sun Feb 07 2106 06:28:15 GMT+0000 expiry uint32 - data []byte } // New creates a new memory storage -func New() *Storage { +func New(config ...Config) *Storage { + // Set default config + cfg := configDefault(config...) + // Create storage store := &Storage{ db: make(map[string]entry), - gcInterval: 10 * time.Second, + gcInterval: cfg.GCInterval, done: make(chan struct{}), } // Start garbage collector + utils.StartTimeStampUpdater() go store.gc() return store @@ -42,7 +51,7 @@ func (s *Storage) Get(key string) ([]byte, error) { s.mux.RLock() v, ok := s.db[key] s.mux.RUnlock() - if !ok || v.expiry != 0 && v.expiry <= uint32(time.Now().Unix()) { + if !ok || v.expiry != 0 && v.expiry <= atomic.LoadUint32(&utils.Timestamp) { return nil, nil } @@ -58,11 +67,11 @@ func (s *Storage) Set(key string, val []byte, exp time.Duration) error { var expire uint32 if exp != 0 { - expire = uint32(time.Now().Add(exp).Unix()) + expire = uint32(exp.Seconds()) + atomic.LoadUint32(&utils.Timestamp) } s.mux.Lock() - s.db[key] = entry{expire, val} + s.db[key] = entry{val, expire} s.mux.Unlock() return nil } @@ -96,20 +105,31 @@ func (s *Storage) Close() error { func (s *Storage) gc() { ticker := time.NewTicker(s.gcInterval) defer ticker.Stop() + var expired []string for { select { case <-s.done: return - case t := <-ticker.C: - now := uint32(t.Unix()) - s.mux.Lock() + case <-ticker.C: + expired = expired[:0] + s.mux.RLock() for id, v := range s.db { - if v.expiry != 0 && v.expiry < now { - delete(s.db, id) + if v.expiry != 0 && v.expiry < atomic.LoadUint32(&utils.Timestamp) { + expired = append(expired, id) } } + s.mux.RUnlock() + s.mux.Lock() + for i := range expired { + delete(s.db, expired[i]) + } s.mux.Unlock() } } } + +// Return database client +func (s *Storage) Conn() map[string]entry { + return s.db +} diff --git a/internal/storage/memory/memory_test.go b/internal/storage/memory/memory_test.go new file mode 100644 index 0000000000..1fc527f2cd --- /dev/null +++ b/internal/storage/memory/memory_test.go @@ -0,0 +1,153 @@ +package memory + +import ( + "testing" + "time" + + "github.com/gofiber/fiber/v2/utils" +) + +var testStore = New() + +func Test_Storage_Memory_Set(t *testing.T) { + var ( + key = "john" + val = []byte("doe") + ) + + err := testStore.Set(key, val, 0) + utils.AssertEqual(t, nil, err) +} + +func Test_Storage_Memory_Set_Override(t *testing.T) { + var ( + key = "john" + val = []byte("doe") + ) + + err := testStore.Set(key, val, 0) + utils.AssertEqual(t, nil, err) + + err = testStore.Set(key, val, 0) + utils.AssertEqual(t, nil, err) +} + +func Test_Storage_Memory_Get(t *testing.T) { + var ( + key = "john" + val = []byte("doe") + ) + + err := testStore.Set(key, val, 0) + utils.AssertEqual(t, nil, err) + + result, err := testStore.Get(key) + utils.AssertEqual(t, nil, err) + utils.AssertEqual(t, val, result) +} + +func Test_Storage_Memory_Set_Expiration(t *testing.T) { + var ( + key = "john" + val = []byte("doe") + exp = 1 * time.Second + ) + + err := testStore.Set(key, val, exp) + utils.AssertEqual(t, nil, err) + + time.Sleep(1100 * time.Millisecond) +} + +func Test_Storage_Memory_Get_Expired(t *testing.T) { + var ( + key = "john" + ) + + result, err := testStore.Get(key) + utils.AssertEqual(t, nil, err) + utils.AssertEqual(t, true, len(result) == 0) +} + +func Test_Storage_Memory_Get_NotExist(t *testing.T) { + + result, err := testStore.Get("notexist") + utils.AssertEqual(t, nil, err) + utils.AssertEqual(t, true, len(result) == 0) +} + +func Test_Storage_Memory_Delete(t *testing.T) { + var ( + key = "john" + val = []byte("doe") + ) + + err := testStore.Set(key, val, 0) + utils.AssertEqual(t, nil, err) + + err = testStore.Delete(key) + utils.AssertEqual(t, nil, err) + + result, err := testStore.Get(key) + utils.AssertEqual(t, nil, err) + utils.AssertEqual(t, true, len(result) == 0) +} + +func Test_Storage_Memory_Reset(t *testing.T) { + var ( + val = []byte("doe") + ) + + err := testStore.Set("john1", val, 0) + utils.AssertEqual(t, nil, err) + + err = testStore.Set("john2", val, 0) + utils.AssertEqual(t, nil, err) + + err = testStore.Reset() + utils.AssertEqual(t, nil, err) + + result, err := testStore.Get("john1") + utils.AssertEqual(t, nil, err) + utils.AssertEqual(t, true, len(result) == 0) + + result, err = testStore.Get("john2") + utils.AssertEqual(t, nil, err) + utils.AssertEqual(t, true, len(result) == 0) +} + +func Test_Storage_Memory_Close(t *testing.T) { + utils.AssertEqual(t, nil, testStore.Close()) +} + +func Test_Storage_Memory_Conn(t *testing.T) { + utils.AssertEqual(t, true, testStore.Conn() != nil) +} + +// go test -v -run=^$ -bench=Benchmark_Storage_Memory -benchmem -count=4 +func Benchmark_Storage_Memory(b *testing.B) { + keyLength := 1000 + keys := make([]string, keyLength) + for i := 0; i < keyLength; i++ { + keys[i] = utils.UUID() + } + value := []byte("joe") + + ttl := 2 * time.Second + b.Run("fiber_memory", func(b *testing.B) { + d := New() + b.ReportAllocs() + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, key := range keys { + d.Set(key, value, ttl) + } + for _, key := range keys { + _, _ = d.Get(key) + } + for _, key := range keys { + d.Delete(key) + } + } + }) +} diff --git a/middleware/limiter/limiter_fixed.go b/middleware/limiter/limiter_fixed.go index ed1f23d523..b6b6d35939 100644 --- a/middleware/limiter/limiter_fixed.go +++ b/middleware/limiter/limiter_fixed.go @@ -4,9 +4,9 @@ import ( "strconv" "sync" "sync/atomic" - "time" "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/utils" ) type FixedWindow struct{} @@ -17,7 +17,6 @@ func (FixedWindow) New(cfg Config) fiber.Handler { // Limiter variables mux = &sync.RWMutex{} max = strconv.Itoa(cfg.Max) - timestamp = uint64(time.Now().Unix()) expiration = uint64(cfg.Expiration.Seconds()) ) @@ -25,12 +24,7 @@ func (FixedWindow) New(cfg Config) fiber.Handler { manager := newManager(cfg.Storage) // Update timestamp every second - go func() { - for { - atomic.StoreUint64(×tamp, uint64(time.Now().Unix())) - time.Sleep(1 * time.Second) - } - }() + utils.StartTimeStampUpdater() // Return new handler return func(c *fiber.Ctx) error { @@ -49,7 +43,7 @@ func (FixedWindow) New(cfg Config) fiber.Handler { e := manager.get(key) // Get timestamp - ts := atomic.LoadUint64(×tamp) + ts := uint64(atomic.LoadUint32(&utils.Timestamp)) // Set expiration if entry does not exist if e.exp == 0 { diff --git a/middleware/limiter/limiter_sliding.go b/middleware/limiter/limiter_sliding.go index 9369998d29..7f49863d7a 100644 --- a/middleware/limiter/limiter_sliding.go +++ b/middleware/limiter/limiter_sliding.go @@ -7,6 +7,7 @@ import ( "time" "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/utils" ) type SlidingWindow struct{} @@ -17,7 +18,6 @@ func (SlidingWindow) New(cfg Config) fiber.Handler { // Limiter variables mux = &sync.RWMutex{} max = strconv.Itoa(cfg.Max) - timestamp = uint64(time.Now().Unix()) expiration = uint64(cfg.Expiration.Seconds()) ) @@ -25,12 +25,7 @@ func (SlidingWindow) New(cfg Config) fiber.Handler { manager := newManager(cfg.Storage) // Update timestamp every second - go func() { - for { - atomic.StoreUint64(×tamp, uint64(time.Now().Unix())) - time.Sleep(1 * time.Second) - } - }() + utils.StartTimeStampUpdater() // Return new handler return func(c *fiber.Ctx) error { @@ -49,7 +44,7 @@ func (SlidingWindow) New(cfg Config) fiber.Handler { e := manager.get(key) // Get timestamp - ts := atomic.LoadUint64(×tamp) + ts := uint64(atomic.LoadUint32(&utils.Timestamp)) // Set expiration if entry does not exist if e.exp == 0 { diff --git a/utils/time.go b/utils/time.go new file mode 100644 index 0000000000..8ea13c2262 --- /dev/null +++ b/utils/time.go @@ -0,0 +1,32 @@ +package utils + +import ( + "sync" + "sync/atomic" + "time" +) + +var ( + timestampTimer sync.Once + // Timestamp please start the timer function before you use this value + // please load the value with atomic `atomic.LoadUint32(&utils.Timestamp)` + Timestamp uint32 +) + +// StartTimeStampUpdater starts a concurrent function which stores the timestamp to an atomic value per second, +// which is much better for performance than determining it at runtime each time +func StartTimeStampUpdater() { + timestampTimer.Do(func() { + // set initial value + atomic.StoreUint32(&Timestamp, uint32(time.Now().Unix())) + go func(sleep time.Duration) { + ticker := time.NewTicker(sleep) + defer ticker.Stop() + + for t := range ticker.C { + // update timestamp + atomic.StoreUint32(&Timestamp, uint32(t.Unix())) + } + }(1 * time.Second) // duration + }) +} diff --git a/utils/time_test.go b/utils/time_test.go new file mode 100644 index 0000000000..75c13b1dd8 --- /dev/null +++ b/utils/time_test.go @@ -0,0 +1,45 @@ +package utils + +import ( + "sync/atomic" + "testing" + "time" +) + +func checkTimeStamp(t testing.TB, expectedCurrent, actualCurrent uint32) { + // test with some buffer in front and back of the expectedCurrent time -> because of the timing on the work machine + AssertEqual(t, true, actualCurrent >= expectedCurrent-1 || actualCurrent <= expectedCurrent+1) +} + +func Test_TimeStampUpdater(t *testing.T) { + t.Parallel() + + StartTimeStampUpdater() + + now := uint32(time.Now().Unix()) + checkTimeStamp(t, now, atomic.LoadUint32(&Timestamp)) + // one second later + time.Sleep(1 * time.Second) + checkTimeStamp(t, now+1, atomic.LoadUint32(&Timestamp)) + // two seconds later + time.Sleep(1 * time.Second) + checkTimeStamp(t, now+2, atomic.LoadUint32(&Timestamp)) +} + +func Benchmark_CalculateTimestamp(b *testing.B) { + StartTimeStampUpdater() + + var res uint32 + b.Run("fiber", func(b *testing.B) { + for n := 0; n < b.N; n++ { + res = atomic.LoadUint32(&Timestamp) + } + checkTimeStamp(b, uint32(time.Now().Unix()), res) + }) + b.Run("default", func(b *testing.B) { + for n := 0; n < b.N; n++ { + res = uint32(time.Now().Unix()) + } + checkTimeStamp(b, uint32(time.Now().Unix()), res) + }) +}