From aa358bc887470ad5a8b6bb001175b9e34d2b0279 Mon Sep 17 00:00:00 2001 From: louyuting <1849491904@qq.com> Date: Sun, 24 May 2020 00:01:52 +0800 Subject: [PATCH] frequency parameters flow control implementation --- api/api.go | 7 + api/slot_chain.go | 3 + core/base/result.go | 1 + core/config/config.go | 2 +- .../cache/concurrent_cache.go | 32 ++ .../cache/concurrent_lru.go | 88 ++++++ .../cache/concurrent_lru_benchmark_test.go | 63 ++++ .../cache/concurrent_lru_test.go | 96 ++++++ core/freq_params_traffic/cache/lru.go | 208 ++++++++++++ .../concurrency_stat_slot.go | 55 ++++ core/freq_params_traffic/params_metric.go | 22 ++ core/freq_params_traffic/rule.go | 176 +++++++++++ core/freq_params_traffic/rule_manager.go | 289 +++++++++++++++++ core/freq_params_traffic/rule_manager_test.go | 297 ++++++++++++++++++ core/freq_params_traffic/rule_test.go | 137 ++++++++ core/freq_params_traffic/slot.go | 126 ++++++++ core/freq_params_traffic/slot_test.go | 109 +++++++ core/freq_params_traffic/traffic_shaping.go | 273 ++++++++++++++++ .../traffic_shaping_test.go | 266 ++++++++++++++++ core/stat/base/leap_array.go | 2 +- .../hot-pramas-sentinel.yml | 8 + .../hot_params_flow_example.go | 131 ++++++++ logging/logging.go | 2 +- logging/logging_test.go | 2 +- util/atomic.go | 17 + 25 files changed, 2408 insertions(+), 4 deletions(-) create mode 100644 core/freq_params_traffic/cache/concurrent_cache.go create mode 100644 core/freq_params_traffic/cache/concurrent_lru.go create mode 100644 core/freq_params_traffic/cache/concurrent_lru_benchmark_test.go create mode 100644 core/freq_params_traffic/cache/concurrent_lru_test.go create mode 100644 core/freq_params_traffic/cache/lru.go create mode 100644 core/freq_params_traffic/concurrency_stat_slot.go create mode 100644 core/freq_params_traffic/params_metric.go create mode 100644 core/freq_params_traffic/rule.go create mode 100644 core/freq_params_traffic/rule_manager.go create mode 100644 core/freq_params_traffic/rule_manager_test.go create mode 100644 core/freq_params_traffic/rule_test.go create mode 100644 core/freq_params_traffic/slot.go create mode 100644 core/freq_params_traffic/slot_test.go create mode 100644 core/freq_params_traffic/traffic_shaping.go create mode 100644 core/freq_params_traffic/traffic_shaping_test.go create mode 100644 example/freq_params_traffic/hot-pramas-sentinel.yml create mode 100644 example/freq_params_traffic/hot_params_flow_example.go diff --git a/api/api.go b/api/api.go index ecaade3ba..2968406ec 100644 --- a/api/api.go +++ b/api/api.go @@ -52,6 +52,13 @@ func WithArgs(args ...interface{}) EntryOption { } } +// WithSlotChain sets the slot chain. +func WithSlotChain(chain *base.SlotChain) EntryOption { + return func(opts *EntryOptions) { + opts.slotChain = chain + } +} + // WithAttachment set the resource entry with the given k-v pair func WithAttachment(key interface{}, value interface{}) EntryOption { return func(opts *EntryOptions) { diff --git a/api/slot_chain.go b/api/slot_chain.go index 4649af06e..66ab3a92f 100644 --- a/api/slot_chain.go +++ b/api/slot_chain.go @@ -4,6 +4,7 @@ import ( "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/alibaba/sentinel-golang/core/flow" + "github.com/alibaba/sentinel-golang/core/freq_params_traffic" "github.com/alibaba/sentinel-golang/core/log" "github.com/alibaba/sentinel-golang/core/stat" "github.com/alibaba/sentinel-golang/core/system" @@ -30,8 +31,10 @@ func BuildDefaultSlotChain() *base.SlotChain { sc.AddRuleCheckSlotLast(&system.SystemAdaptiveSlot{}) sc.AddRuleCheckSlotLast(&flow.FlowSlot{}) sc.AddRuleCheckSlotLast(&circuitbreaker.CircuitBreakerSlot{}) + sc.AddRuleCheckSlotLast(&freq_params_traffic.FreqPramsTrafficSlot{}) sc.AddStatSlotLast(&stat.StatisticSlot{}) sc.AddStatSlotLast(&log.LogSlot{}) sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{}) + sc.AddStatSlotLast(&freq_params_traffic.ConcurrencyStatSlot{}) return sc } diff --git a/core/base/result.go b/core/base/result.go index 85ad0e408..a4cea3d7b 100644 --- a/core/base/result.go +++ b/core/base/result.go @@ -11,6 +11,7 @@ const ( BlockTypeFlow BlockTypeCircuitBreaking BlockTypeSystemFlow + BlockTypeFreqParamsFlow ) func (t BlockType) String() string { diff --git a/core/config/config.go b/core/config/config.go index 168907ec2..2d9d6e1ae 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -134,7 +134,7 @@ func reconfigureRecordLogger(logBaseDir string, withPid bool) error { } // Note: not thread-safe! - logging.ResetDefaultLogger(log.New(logFile, "", log.LstdFlags), logging.DefaultNamespace) + logging.ResetDefaultLogger(log.New(logFile, "", log.LstdFlags|log.Lshortfile), logging.DefaultNamespace) fmt.Println("INFO: log base directory is: " + logDir) return nil diff --git a/core/freq_params_traffic/cache/concurrent_cache.go b/core/freq_params_traffic/cache/concurrent_cache.go new file mode 100644 index 000000000..4d124a63a --- /dev/null +++ b/core/freq_params_traffic/cache/concurrent_cache.go @@ -0,0 +1,32 @@ +package cache + +// ConcurrentCounterCache cache the hotspot parameter +type ConcurrentCounterCache interface { + // Add add a value to the cache, + // Updates the "recently used"-ness of the key. + Add(key interface{}, value *int64) + + // If the key is not existed in the cache, adds a value to the cache then return nil. And updates the "recently used"-ness of the key + // If the key is already existed in the cache, do nothing and return the prior value + AddIfAbsent(key interface{}, value *int64) (priorValue *int64) + + // Get returns key's value from the cache and updates the "recently used"-ness of the key. + Get(key interface{}) (value *int64, isFound bool) + + // Remove removes a key from the cache. + // Return true if the key was contained. + Remove(key interface{}) (isFound bool) + + // Contains checks if a key exists in cache + // Without updating the recent-ness. + Contains(key interface{}) (ok bool) + + // Keys returns a slice of the keys in the cache, from oldest to newest. + Keys() []interface{} + + // Len returns the number of items in the cache. + Len() int + + // Purge clears all cache entries. + Purge() +} diff --git a/core/freq_params_traffic/cache/concurrent_lru.go b/core/freq_params_traffic/cache/concurrent_lru.go new file mode 100644 index 000000000..e9239ec48 --- /dev/null +++ b/core/freq_params_traffic/cache/concurrent_lru.go @@ -0,0 +1,88 @@ +package cache + +import ( + "sync" +) + +// LruCacheMap use LRU strategy to cache the most frequently accessed hotspot parameter +type LruCacheMap struct { + // Not thread safe + lru *LRU + lock *sync.RWMutex +} + +func (c *LruCacheMap) Add(key interface{}, value *int64) { + c.lock.Lock() + defer c.lock.Unlock() + + c.lru.Add(key, value) + return +} + +func (c *LruCacheMap) AddIfAbsent(key interface{}, value *int64) (priorValue *int64) { + c.lock.Lock() + defer c.lock.Unlock() + val := c.lru.AddIfAbsent(key, value) + if val == nil { + return nil + } + priorValue = val.(*int64) + return +} + +func (c *LruCacheMap) Get(key interface{}) (value *int64, isFound bool) { + c.lock.Lock() + defer c.lock.Unlock() + + val, found := c.lru.Get(key) + if found { + return val.(*int64), true + } + return nil, false +} + +func (c *LruCacheMap) Remove(key interface{}) (isFound bool) { + c.lock.Lock() + defer c.lock.Unlock() + + return c.lru.Remove(key) +} + +func (c *LruCacheMap) Contains(key interface{}) (ok bool) { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.lru.Contains(key) +} + +func (c *LruCacheMap) Keys() []interface{} { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.lru.Keys() +} + +func (c *LruCacheMap) Len() int { + c.lock.RLock() + defer c.lock.RUnlock() + + return c.lru.Len() +} + +func (c *LruCacheMap) Purge() { + c.lock.Lock() + defer c.lock.Unlock() + + c.lru.Purge() +} + +func NewLRUCacheMap(size int) ConcurrentCounterCache { + lru, err := NewLRU(size, nil) + if err != nil { + return nil + } + return &LruCacheMap{ + lru: lru, + lock: new(sync.RWMutex), + } +} diff --git a/core/freq_params_traffic/cache/concurrent_lru_benchmark_test.go b/core/freq_params_traffic/cache/concurrent_lru_benchmark_test.go new file mode 100644 index 000000000..cde5060b4 --- /dev/null +++ b/core/freq_params_traffic/cache/concurrent_lru_benchmark_test.go @@ -0,0 +1,63 @@ +package cache + +import ( + "strconv" + "testing" +) + +const CacheSize = 50000 + +func Benchmark_LRU_AddIfAbsent(b *testing.B) { + c := NewLRUCacheMap(CacheSize) + for a := 1; a <= CacheSize; a++ { + val := new(int64) + *val = int64(a) + c.Add(strconv.Itoa(a), val) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := 1000; j <= 1001; j++ { + newVal := new(int64) + *newVal = int64(j) + prior := c.AddIfAbsent(strconv.Itoa(j), newVal) + if *prior != int64(j) { + b.Fatal("error!") + } + } + } +} + +func Benchmark_LRU_Add(b *testing.B) { + c := NewLRUCacheMap(CacheSize) + for a := 1; a <= CacheSize; a++ { + val := new(int64) + *val = int64(a) + c.Add(strconv.Itoa(a), val) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := CacheSize - 100; j <= CacheSize-99; j++ { + newVal := new(int64) + *newVal = int64(j) + c.Add(strconv.Itoa(j), newVal) + } + } +} + +func Benchmark_LRU_Get(b *testing.B) { + c := NewLRUCacheMap(CacheSize) + for a := 1; a <= CacheSize; a++ { + val := new(int64) + *val = int64(a) + c.Add(strconv.Itoa(a), val) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + for j := CacheSize - 100; j <= CacheSize-99; j++ { + val, found := c.Get(strconv.Itoa(j)) + if *val != int64(j) || !found { + b.Fatal("error") + } + } + } +} diff --git a/core/freq_params_traffic/cache/concurrent_lru_test.go b/core/freq_params_traffic/cache/concurrent_lru_test.go new file mode 100644 index 000000000..42d079a8d --- /dev/null +++ b/core/freq_params_traffic/cache/concurrent_lru_test.go @@ -0,0 +1,96 @@ +package cache + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_concurrentLruCounterCacheMap_Add_Get(t *testing.T) { + t.Run("Test_concurrentLruCounterCacheMap_Add_Get", func(t *testing.T) { + c := NewLRUCacheMap(100) + for i := 1; i <= 100; i++ { + val := int64(i) + c.Add(strconv.Itoa(i), &val) + } + assert.True(t, c.Len() == 100) + val, found := c.Get("1") + assert.True(t, found && *val == 1) + }) +} + +func Test_concurrentLruCounterCacheMap_AddIfAbsent(t *testing.T) { + t.Run("Test_concurrentLruCounterCacheMap_AddIfAbsent", func(t *testing.T) { + c := NewLRUCacheMap(100) + for i := 1; i <= 99; i++ { + val := int64(i) + c.Add(strconv.Itoa(i), &val) + } + v := int64(100) + prior := c.AddIfAbsent("100", &v) + assert.True(t, prior == nil) + v2 := int64(1000) + prior2 := c.AddIfAbsent("100", &v2) + assert.True(t, *prior2 == 100) + }) +} + +func Test_concurrentLruCounterCacheMap_Contains(t *testing.T) { + t.Run("Test_concurrentLruCounterCacheMap_Contains", func(t *testing.T) { + c := NewLRUCacheMap(100) + for i := 1; i <= 100; i++ { + val := int64(i) + c.Add(strconv.Itoa(i), &val) + } + assert.True(t, c.Contains("100") == true) + assert.True(t, c.Contains("1") == true) + assert.True(t, c.Contains("101") == false) + + val := int64(101) + c.Add(strconv.Itoa(int(val)), &val) + assert.True(t, c.Contains("1") == false) + }) +} + +func Test_concurrentLruCounterCacheMap_Keys(t *testing.T) { + t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) { + c := NewLRUCacheMap(100) + for i := 1; i <= 100; i++ { + val := int64(i) + c.Add(strconv.Itoa(i), &val) + } + assert.True(t, len(c.Keys()) == 100) + assert.True(t, c.Keys()[0] == "1") + assert.True(t, c.Keys()[99] == "100") + }) +} + +func Test_concurrentLruCounterCacheMap_Purge(t *testing.T) { + t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) { + c := NewLRUCacheMap(100) + for i := 1; i <= 100; i++ { + val := int64(i) + c.Add(strconv.Itoa(i), &val) + } + assert.True(t, c.Len() == 100) + c.Purge() + assert.True(t, c.Len() == 0) + }) +} + +func Test_concurrentLruCounterCacheMap_Remove(t *testing.T) { + t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) { + c := NewLRUCacheMap(100) + for i := 1; i <= 100; i++ { + val := int64(i) + c.Add(strconv.Itoa(i), &val) + } + assert.True(t, c.Len() == 100) + + c.Remove("100") + assert.True(t, c.Len() == 99) + val, existed := c.Get("100") + assert.True(t, existed == false && val == nil) + }) +} diff --git a/core/freq_params_traffic/cache/lru.go b/core/freq_params_traffic/cache/lru.go new file mode 100644 index 000000000..95112ae48 --- /dev/null +++ b/core/freq_params_traffic/cache/lru.go @@ -0,0 +1,208 @@ +package cache + +import ( + "container/list" + + "github.com/alibaba/sentinel-golang/logging" + "github.com/pkg/errors" +) + +// EvictCallback is used to get a callback when a cache entry is evicted +type EvictCallback func(key interface{}, value interface{}) + +// LRU implements a non-thread safe fixed size LRU cache +type LRU struct { + size int + evictList *list.List + items map[interface{}]*list.Element + onEvict EvictCallback +} + +// entry is used to hold a value in the evictList +type entry struct { + key interface{} + value interface{} +} + +// NewLRU constructs an LRU of the given size +func NewLRU(size int, onEvict EvictCallback) (*LRU, error) { + if size <= 0 { + return nil, errors.New("Must provide a positive size") + } + c := &LRU{ + size: size, + evictList: list.New(), + items: make(map[interface{}]*list.Element), + onEvict: onEvict, + } + return c, nil +} + +// Purge is used to completely clear the cache. +func (c *LRU) Purge() { + for k, v := range c.items { + if c.onEvict != nil { + c.onEvict(k, v.Value.(*entry).value) + } + delete(c.items, k) + } + c.evictList.Init() +} + +func (c *LRU) Add(key, value interface{}) { + // Check for existing item + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + ent.Value.(*entry).value = value + return + } + + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + + evict := c.evictList.Len() > c.size + // Verify size not exceeded + if evict { + c.removeOldest() + } + return +} + +// AddIfAbsent adds item only if key is not existed. +func (c *LRU) AddIfAbsent(key interface{}, value interface{}) (priorValue interface{}) { + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + if ent.Value == nil { + return nil + } + return ent.Value.(*entry).value + } + // Add new item + ent := &entry{key, value} + entry := c.evictList.PushFront(ent) + c.items[key] = entry + + evict := c.evictList.Len() > c.size + // Verify size not exceeded + if evict { + c.removeOldest() + } + return nil +} + +// Get looks up a key's value from the cache. +func (c *LRU) Get(key interface{}) (value interface{}, isFound bool) { + if ent, ok := c.items[key]; ok { + c.evictList.MoveToFront(ent) + if ent.Value.(*entry) == nil { + return nil, false + } + return ent.Value.(*entry).value, true + } + return +} + +// Contains checks if a key is in the cache, without updating the recent-ness +// or deleting it for being stale. +func (c *LRU) Contains(key interface{}) (ok bool) { + _, ok = c.items[key] + return ok +} + +// Peek returns the key value (or undefined if not found) without updating +// the "recently used"-ness of the key. +func (c *LRU) Peek(key interface{}) (value interface{}, isFound bool) { + var ent *list.Element + if ent, isFound = c.items[key]; isFound { + return ent.Value.(*entry).value, true + } + return nil, isFound +} + +// Remove removes the provided key from the cache, returning if the +// key was contained. +func (c *LRU) Remove(key interface{}) (isFound bool) { + if ent, ok := c.items[key]; ok { + c.removeElement(ent) + return true + } + return false +} + +// RemoveOldest removes the oldest item from the cache. +func (c *LRU) RemoveOldest() (key interface{}, value interface{}, ok bool) { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + kv := ent.Value.(*entry) + return kv.key, kv.value, true + } + return nil, nil, false +} + +// GetOldest returns the oldest entry +func (c *LRU) GetOldest() (key interface{}, value interface{}, ok bool) { + ent := c.evictList.Back() + if ent != nil { + kv := ent.Value.(*entry) + return kv.key, kv.value, true + } + return nil, nil, false +} + +// Keys returns a slice of the keys in the cache, from oldest to newest. +func (c *LRU) Keys() []interface{} { + keys := make([]interface{}, len(c.items)) + i := 0 + for ent := c.evictList.Back(); ent != nil; ent = ent.Prev() { + keys[i] = ent.Value.(*entry).key + i++ + } + return keys +} + +// Len returns the number of items in the cache. +func (c *LRU) Len() int { + return c.evictList.Len() +} + +// Resize changes the cache size. +func (c *LRU) Resize(size int) (evicted int) { + diff := c.Len() - size + if diff < 0 { + diff = 0 + } + for i := 0; i < diff; i++ { + c.removeOldest() + } + c.size = size + return diff +} + +// removeOldest removes the oldest item from the cache. +func (c *LRU) removeOldest() { + ent := c.evictList.Back() + if ent != nil { + c.removeElement(ent) + } +} + +// removeElement is used to remove a given list element from the cache +func (c *LRU) removeElement(e *list.Element) { + c.evictList.Remove(e) + if e.Value == nil { + logging.GetDefaultLogger().Errorf("The Value of evictList's Element is nil.") + return + } + kv, ok := e.Value.(*entry) + if !ok { + logging.GetDefaultLogger().Errorf("Fail to assert the Value of evictList's Element as *entry.") + return + } + delete(c.items, kv.key) + if c.onEvict != nil { + c.onEvict(kv.key, kv.value) + } +} diff --git a/core/freq_params_traffic/concurrency_stat_slot.go b/core/freq_params_traffic/concurrency_stat_slot.go new file mode 100644 index 000000000..92ed2c95f --- /dev/null +++ b/core/freq_params_traffic/concurrency_stat_slot.go @@ -0,0 +1,55 @@ +package freq_params_traffic + +import ( + "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/util" +) + +// ConcurrencyStatSlot is to record the Concurrency statistic for all arguments +type ConcurrencyStatSlot struct { +} + +func (c *ConcurrencyStatSlot) OnEntryPassed(ctx *base.EntryContext) { + res := ctx.Resource.Name() + args := ctx.Input.Args + tcs := getTrafficControllersFor(res) + for _, tc := range tcs { + arg := matchArg(tc, args) + if arg == nil { + continue + } + metric := tc.BoundMetric() + concurrencyPtr, existed := metric.ConcurrencyCounter.Get(arg) + if !existed || concurrencyPtr == nil { + logger.Debugf("Parameter %+v does not exist in ConcurrencyCounter.", arg) + continue + } + util.IncrementAndGetInt64(concurrencyPtr) + } +} + +func (c *ConcurrencyStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError *base.BlockError) { + // Do nothing +} + +func (c *ConcurrencyStatSlot) OnCompleted(ctx *base.EntryContext) { + if ctx.IsBlocked() { + return + } + res := ctx.Resource.Name() + args := ctx.Input.Args + tcs := getTrafficControllersFor(res) + for _, tc := range tcs { + arg := matchArg(tc, args) + if arg == nil { + continue + } + metric := tc.BoundMetric() + concurrencyPtr, existed := metric.ConcurrencyCounter.Get(arg) + if !existed || concurrencyPtr == nil { + logger.Debugf("Parameter: %+v does not exist in ConcurrencyCounter.", arg) + continue + } + util.DecrementAndGetInt64(concurrencyPtr) + } +} diff --git a/core/freq_params_traffic/params_metric.go b/core/freq_params_traffic/params_metric.go new file mode 100644 index 000000000..cb64927e0 --- /dev/null +++ b/core/freq_params_traffic/params_metric.go @@ -0,0 +1,22 @@ +package freq_params_traffic + +import "github.com/alibaba/sentinel-golang/core/freq_params_traffic/cache" + +const ( + ConcurrencyMaxCount = 4000 + ParamsCapacityBase = 4000 + ParamsMaxCapacity = 20000 +) + +// ParamsMetric cache the frequent(hot spot) parameters for each value. +// ParamsMetric is used for pair . +type ParamsMetric struct { + // cache's key is the hot value + // cache's value is the counter + // RuleTimeCounter record the last add token time + RuleTimeCounter cache.ConcurrentCounterCache + // RuleTokenCounter record the number of token + RuleTokenCounter cache.ConcurrentCounterCache + // ConcurrencyCounter record the number of goroutine + ConcurrencyCounter cache.ConcurrentCounterCache +} diff --git a/core/freq_params_traffic/rule.go b/core/freq_params_traffic/rule.go new file mode 100644 index 000000000..469d4cddf --- /dev/null +++ b/core/freq_params_traffic/rule.go @@ -0,0 +1,176 @@ +package freq_params_traffic + +import ( + "fmt" + "reflect" + "strconv" +) + +// ControlBehavior indicates the traffic shaping behaviour. +type ControlBehavior int8 + +const ( + Reject ControlBehavior = iota + Throttling +) + +func (t ControlBehavior) String() string { + switch t { + case Reject: + return "Reject" + case Throttling: + return "Throttling" + default: + return strconv.Itoa(int(t)) + } +} + +// MetricType represents the target metric type. +type MetricType int8 + +const ( + // Concurrency represents concurrency count. + Concurrency MetricType = iota + // QPS represents request count per second. + QPS +) + +func (t MetricType) String() string { + switch t { + case Concurrency: + return "Concurrency" + case QPS: + return "QPS" + default: + return "Undefined" + } +} + +// ParamKind represents the Param kind. +type ParamKind int + +const ( + kindInt ParamKind = iota + kindString + KindBool + KindFloat64 + KindSum +) + +func (t ParamKind) String() string { + switch t { + case kindInt: + return "kindInt" + case kindString: + return "kindString" + case KindBool: + return "KindBool" + case KindFloat64: + return "KindFloat64" + default: + return "Undefined" + } +} + +// SpecificValue indicates the specific param, contain the supported param kind and concrete value. +type SpecificValue struct { + ValKind ParamKind + ValStr string +} + +func (s *SpecificValue) String() string { + return fmt.Sprintf("SpecificValue:[ValKind: %+v, ValStr: %s]", s.ValKind, s.ValStr) +} + +// Rule represents the frequency parameter flow control rule +type Rule struct { + // Id is the unique id + Id string + // Resource is the resource name + Resource string + MetricType MetricType + Behavior ControlBehavior + // ParamIndex is the index in context arguments slice. + ParamIndex int + Threshold float64 + // MaxQueueingTimeMs is the max queueing time in Throttling ControlBehavior + MaxQueueingTimeMs int64 + BurstCount int64 + DurationInSec int64 + SpecificItems map[SpecificValue]int64 +} + +func (r *Rule) String() string { + return fmt.Sprintf("{Id:%s, Resource:%s, MetricType:%+v, Behavior:%+v, ParamIndex:%d, Threshold:%f, MaxQueueingTimeMs:%d, BurstCount:%d, DurationInSec:%d, SpecificItems:%+v},", + r.Id, r.Resource, r.MetricType, r.Behavior, r.ParamIndex, r.Threshold, r.MaxQueueingTimeMs, r.BurstCount, r.DurationInSec, r.SpecificItems) +} +func (r *Rule) ResourceName() string { + return r.Resource +} + +// IsStatReusable checks whether current rule is "statistically" equal to the given rule. +func (r *Rule) IsStatReusable(newRule *Rule) bool { + return r.Resource == newRule.Resource && r.Behavior == newRule.Behavior && r.DurationInSec == newRule.DurationInSec +} + +// IsEqualsTo checks whether current rule is consistent with the given rule. +func (r *Rule) Equals(newRule *Rule) bool { + baseCheck := r.Resource == newRule.Resource && r.MetricType == newRule.MetricType && r.Behavior == newRule.Behavior && r.ParamIndex == newRule.ParamIndex && r.Threshold == newRule.Threshold && r.DurationInSec == newRule.DurationInSec && reflect.DeepEqual(r.SpecificItems, newRule.SpecificItems) + if !baseCheck { + return false + } + if r.Behavior == Reject { + return r.BurstCount == newRule.BurstCount + } else if r.Behavior == Throttling { + return r.MaxQueueingTimeMs == newRule.MaxQueueingTimeMs + } else { + return false + } +} + +// parseSpecificItems parses the SpecificValue as real value. +func parseSpecificItems(source map[SpecificValue]int64) map[interface{}]int64 { + ret := make(map[interface{}]int64) + if len(source) == 0 { + return ret + } + for k, v := range source { + switch k.ValKind { + case kindInt: + realVal, err := strconv.Atoi(k.ValStr) + if err != nil { + logger.Errorf("Fail to parse value for int specific item. paramKind: %+v, value: %s, err: %+v", k.ValKind, k.ValStr, err) + continue + } + ret[realVal] = v + + case kindString: + ret[k.ValStr] = v + + case KindBool: + realVal, err := strconv.ParseBool(k.ValStr) + if err != nil { + logger.Errorf("Fail to parse value for int specific item. value: %s, err: %+v", k.ValStr, err) + continue + } + ret[realVal] = v + + case KindFloat64: + realVal, err := strconv.ParseFloat(k.ValStr, 64) + if err != nil { + logger.Errorf("Fail to parse value for int specific item. value: %s, err: %+v", k.ValStr, err) + continue + } + realVal, err = strconv.ParseFloat(fmt.Sprintf("%.5f", realVal), 64) + if err != nil { + logger.Errorf("Fail to parse value for int specific item. value: %s, err: %+v", k.ValStr, err) + continue + } + ret[realVal] = v + + default: + logger.Errorf("Unsupported kind(%d) for specific item.", k.ValKind) + } + } + return ret +} diff --git a/core/freq_params_traffic/rule_manager.go b/core/freq_params_traffic/rule_manager.go new file mode 100644 index 000000000..fcd7ba22d --- /dev/null +++ b/core/freq_params_traffic/rule_manager.go @@ -0,0 +1,289 @@ +package freq_params_traffic + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/alibaba/sentinel-golang/logging" + "github.com/pkg/errors" +) + +// TrafficControllerGenFunc represents the TrafficShapingController generator function of a specific control behavior. +type TrafficControllerGenFunc func(r *Rule, reuseMetric *ParamsMetric) TrafficShapingController + +// trafficControllerMap represents the map storage for TrafficShapingController. +type trafficControllerMap map[string][]TrafficShapingController + +var ( + logger = logging.GetDefaultLogger() + + tcGenFuncMap = make(map[ControlBehavior]TrafficControllerGenFunc) + tcMap = make(trafficControllerMap) + tcMux = new(sync.RWMutex) +) + +func init() { + // Initialize the traffic shaping controller generator map for existing control behaviors. + tcGenFuncMap[Reject] = func(r *Rule, reuseMetric *ParamsMetric) TrafficShapingController { + var baseTc *baseTrafficShapingController + if reuseMetric != nil { + // new BaseTrafficShapingController with reuse statistic metric + baseTc = newBaseTrafficShapingControllerWithMetric(r, reuseMetric) + } else { + baseTc = newBaseTrafficShapingController(r) + } + return &rejectTrafficShapingController{ + baseTrafficShapingController: *baseTc, + burstCount: r.BurstCount, + } + } + + tcGenFuncMap[Throttling] = func(r *Rule, reuseMetric *ParamsMetric) TrafficShapingController { + var baseTc *baseTrafficShapingController + if reuseMetric != nil { + baseTc = newBaseTrafficShapingControllerWithMetric(r, reuseMetric) + } else { + baseTc = newBaseTrafficShapingController(r) + } + return &throttlingTrafficShapingController{ + baseTrafficShapingController: *baseTc, + maxQueueingTimeMs: r.MaxQueueingTimeMs, + } + } +} + +func getTrafficControllersFor(res string) []TrafficShapingController { + tcMux.RLock() + defer tcMux.RUnlock() + + return tcMap[res] +} + +// LoadRules replaces old rules with the given frequency parameters flow control rules. +// return value: +// +// bool: was designed to indicate whether the internal map has been changed +// error: was designed to indicate whether occurs the error. +func LoadRules(rules []*Rule) (bool, error) { + err := onRuleUpdate(rules) + return true, err +} + +// GetRules return the whole of rules +func GetRules() []*Rule { + tcMux.RLock() + defer tcMux.RUnlock() + + return rulesFrom(tcMap) +} + +// ClearRules clears all rules in frequency parameters flow control components +func ClearRules() error { + _, err := LoadRules(nil) + return err +} + +func onRuleUpdate(rules []*Rule) (err error) { + defer func() { + if r := recover(); r != nil { + var ok bool + err, ok = r.(error) + if !ok { + err = fmt.Errorf("%+v", r) + } + } + }() + + tcMux.Lock() + defer tcMux.Unlock() + + m := buildTcMap(rules) + tcMap = m + + logRuleUpdate(m) + return nil +} + +func logRuleUpdate(m trafficControllerMap) { + s, err := json.Marshal(m) + if err != nil { + logger.Info("Frequency parameters flow control rules loaded") + } else { + logger.Infof("Frequency parameters flow control rules loaded: %s", s) + } +} + +func rulesFrom(m trafficControllerMap) []*Rule { + rules := make([]*Rule, 0) + if len(m) == 0 { + return rules + } + for _, rs := range m { + if len(rs) == 0 { + continue + } + for _, r := range rs { + if r != nil && r.BoundRule() != nil { + rules = append(rules, r.BoundRule()) + } + } + } + return rules +} + +// buildTcMap be called on the condition that the mutex is locked +func buildTcMap(rules []*Rule) trafficControllerMap { + m := make(trafficControllerMap) + if len(rules) == 0 { + return m + } + + for _, r := range rules { + if err := IsValidRule(r); err != nil { + logger.Warnf("Ignoring invalid frequency params Rule: %+v, reason: %s", r, err.Error()) + continue + } + + res := r.Resource + oldResTcs := tcMap[res] + + // the index of statistic reusable rule in old traffic shaping controller slice + reuseStatIdx := -1 + // the index of equivalent rule in old traffic shaping controller slice + equalIdx := -1 + for idx, oldTc := range oldResTcs { + oldRule := oldTc.BoundRule() + if oldRule.Equals(r) { + // break if there is equivalent rule + equalIdx = idx + break + } + // find the index of first StatReusable rule + if !oldRule.IsStatReusable(r) { + continue + } + if reuseStatIdx >= 0 { + // had find reuse rule. + continue + } + reuseStatIdx = idx + } + + // there is equivalent rule in old traffic shaping controller slice + if equalIdx >= 0 { + equalOldTC := oldResTcs[equalIdx] + tcsOfRes, exists := m[res] + if !exists { + tcsOfRes = make([]TrafficShapingController, 0, 1) + m[r.Resource] = append(tcsOfRes, equalOldTC) + } else { + m[r.Resource] = append(tcsOfRes, equalOldTC) + } + // remove old tc from old resTcs + oldResTcs = append(oldResTcs[:equalIdx], oldResTcs[equalIdx+1:]...) + tcMap[res] = oldResTcs + continue + } + + // generate new traffic shaping controller + generator, supported := tcGenFuncMap[r.Behavior] + if !supported { + logger.Warnf("Ignoring the frequency params Rule due to unsupported control strategy: %+v", r) + continue + } + var tc TrafficShapingController + if reuseStatIdx >= 0 { + // generate new traffic shaping controller with reusable statistic metric. + tc = generator(r, oldResTcs[reuseStatIdx].BoundMetric()) + } else { + tc = generator(r, nil) + } + if tc == nil { + logger.Debugf("Ignoring the frequency params Rule due to bad generated traffic controller: %+v", r) + continue + } + + // remove the reused traffic shaping controller old res tcs + if reuseStatIdx >= 0 { + tcMap[res] = append(oldResTcs[:reuseStatIdx], oldResTcs[reuseStatIdx+1:]...) + } + tcsOfRes, exists := m[r.Resource] + if !exists { + tcsOfRes = make([]TrafficShapingController, 0, 1) + m[r.Resource] = append(tcsOfRes, tc) + } else { + m[r.Resource] = append(tcsOfRes, tc) + } + } + return m +} + +func IsValidRule(rule *Rule) error { + if rule == nil { + return errors.New("nil freq params Rule") + } + if len(rule.Resource) == 0 { + return errors.New("empty resource name") + } + if rule.Threshold < 0 { + return errors.New("negative threshold") + } + if rule.MetricType < 0 { + return errors.New("invalid metric type") + } + if rule.Behavior < 0 { + return errors.New("invalid control strategy") + } + if rule.ParamIndex < 0 { + return errors.New("invalid param index") + } + if rule.DurationInSec < 0 { + return errors.New("invalid duration") + } + return checkControlBehaviorField(rule) +} + +func checkControlBehaviorField(rule *Rule) error { + switch rule.Behavior { + case Reject: + if rule.BurstCount < 0 { + return errors.New("invalid BurstCount") + } + return nil + case Throttling: + if rule.MaxQueueingTimeMs < 0 { + return errors.New("invalid MaxQueueingTimeMs") + } + return nil + default: + } + return nil +} + +// SetTrafficShapingGenerator sets the traffic controller generator for the given control behavior. +// Note that modifying the generator of default control behaviors is not allowed. +func SetTrafficShapingGenerator(cb ControlBehavior, generator TrafficControllerGenFunc) error { + if generator == nil { + return errors.New("nil generator") + } + if cb >= Reject && cb <= Throttling { + return errors.New("not allowed to replace the generator for default control behaviors") + } + tcMux.Lock() + defer tcMux.Unlock() + + tcGenFuncMap[cb] = generator + return nil +} + +func RemoveTrafficShapingGenerator(cb ControlBehavior) error { + if cb >= Reject && cb <= Throttling { + return errors.New("not allowed to replace the generator for default control behaviors") + } + tcMux.Lock() + defer tcMux.Unlock() + + delete(tcGenFuncMap, cb) + return nil +} diff --git a/core/freq_params_traffic/rule_manager_test.go b/core/freq_params_traffic/rule_manager_test.go new file mode 100644 index 000000000..28f8221cf --- /dev/null +++ b/core/freq_params_traffic/rule_manager_test.go @@ -0,0 +1,297 @@ +package freq_params_traffic + +import ( + "fmt" + "math" + "testing" + + "github.com/alibaba/sentinel-golang/core/freq_params_traffic/cache" + "github.com/stretchr/testify/assert" +) + +func Test_tcGenFuncMap(t *testing.T) { + t.Run("Test_tcGenFuncMap_withoutMetric", func(t *testing.T) { + m := make(map[SpecificValue]int64) + m[SpecificValue{ + ValKind: kindInt, + ValStr: "100", + }] = 100 + + parsedM := make(map[interface{}]int64) + parsedM[100] = 100 + r1 := &Rule{ + Id: "abc", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 110, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + generator, supported := tcGenFuncMap[r1.Behavior] + assert.True(t, supported && generator != nil) + tc := generator(r1, nil) + assert.True(t, tc.BoundMetric() != nil && tc.BoundRule() == r1 && tc.BoundParamIndex() == 0) + rejectTC := tc.(*rejectTrafficShapingController) + assert.True(t, rejectTC != nil) + assert.True(t, rejectTC.res == r1.Resource && rejectTC.metricType == r1.MetricType && rejectTC.paramIndex == r1.ParamIndex && rejectTC.burstCount == r1.BurstCount) + assert.True(t, rejectTC.threshold == r1.Threshold && rejectTC.durationInSec == r1.DurationInSec) + }) + + t.Run("Test_tcGenFuncMap_withMetric", func(t *testing.T) { + m := make(map[SpecificValue]int64) + m[SpecificValue{ + ValKind: kindInt, + ValStr: "100", + }] = 100 + + parsedM := make(map[interface{}]int64) + parsedM[100] = 100 + r1 := &Rule{ + Id: "abc", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 110, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + generator, supported := tcGenFuncMap[r1.Behavior] + assert.True(t, supported && generator != nil) + + size := int(math.Min(float64(ParamsMaxCapacity), float64(ParamsCapacityBase*r1.DurationInSec))) + if size <= 0 { + size = ParamsMaxCapacity + } + metric := &ParamsMetric{ + RuleTimeCounter: cache.NewLRUCacheMap(size), + RuleTokenCounter: cache.NewLRUCacheMap(size), + ConcurrencyCounter: cache.NewLRUCacheMap(ConcurrencyMaxCount), + } + + tc := generator(r1, metric) + assert.True(t, tc.BoundMetric() != nil && tc.BoundRule() == r1 && tc.BoundParamIndex() == 0) + rejectTC := tc.(*rejectTrafficShapingController) + assert.True(t, rejectTC != nil) + assert.True(t, rejectTC.metric == metric) + assert.True(t, rejectTC.res == r1.Resource && rejectTC.metricType == r1.MetricType && rejectTC.paramIndex == r1.ParamIndex && rejectTC.burstCount == r1.BurstCount) + assert.True(t, rejectTC.threshold == r1.Threshold && rejectTC.durationInSec == r1.DurationInSec) + + }) +} + +func Test_IsValidRule(t *testing.T) { + t.Run("Test_IsValidRule", func(t *testing.T) { + m := make(map[SpecificValue]int64) + m[SpecificValue{ + ValKind: kindInt, + ValStr: "100", + }] = 100 + + parsedM := make(map[interface{}]int64) + parsedM[100] = 100 + r1 := &Rule{ + Id: "abc", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 110, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + assert.True(t, IsValidRule(r1) == nil) + }) + + t.Run("Test_InValidRule", func(t *testing.T) { + m := make(map[SpecificValue]int64) + m[SpecificValue{ + ValKind: kindInt, + ValStr: "100", + }] = 100 + + parsedM := make(map[interface{}]int64) + parsedM[100] = 100 + r1 := &Rule{ + Id: "", + Resource: "", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 110, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + assert.True(t, IsValidRule(r1) != nil) + }) +} + +func Test_buildTcMap(t *testing.T) { + m := make(map[SpecificValue]int64) + m[SpecificValue{ + ValKind: kindString, + ValStr: "sss", + }] = 1 + m[SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.123", + }] = 3 + r1 := &Rule{ + Id: "1", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 100, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + + m2 := make(map[SpecificValue]int64) + m2[SpecificValue{ + ValKind: kindString, + ValStr: "sss", + }] = 1 + m2[SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.123", + }] = 3 + r2 := &Rule{ + Id: "2", + Resource: "abc", + MetricType: QPS, + Behavior: Throttling, + ParamIndex: 1, + Threshold: 100, + MaxQueueingTimeMs: 20, + BurstCount: 0, + DurationInSec: 1, + SpecificItems: m2, + } + + m3 := make(map[SpecificValue]int64) + m3[SpecificValue{ + ValKind: kindString, + ValStr: "sss", + }] = 1 + m3[SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.123", + }] = 3 + r3 := &Rule{ + Id: "3", + Resource: "abc", + MetricType: Concurrency, + Behavior: Throttling, + ParamIndex: 2, + Threshold: 100, + MaxQueueingTimeMs: 20, + BurstCount: 0, + DurationInSec: 1, + SpecificItems: m3, + } + + r4 := &Rule{ + Id: "4", + Resource: "abc", + MetricType: Concurrency, + Behavior: Throttling, + ParamIndex: 2, + Threshold: 100, + MaxQueueingTimeMs: 20, + BurstCount: 0, + DurationInSec: 2, + SpecificItems: m3, + } + + updated, err := LoadRules([]*Rule{r1, r2, r3, r4}) + if !updated || err != nil { + t.Errorf("Fail to prepare data, err: %+v", err) + } + assert.True(t, len(tcMap["abc"]) == 4) + + r21 := &Rule{ + Id: "21", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 100, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + r22 := &Rule{ + Id: "22", + Resource: "abc", + MetricType: QPS, + Behavior: Throttling, + ParamIndex: 1, + Threshold: 101, + MaxQueueingTimeMs: 20, + BurstCount: 0, + DurationInSec: 1, + SpecificItems: m2, + } + r23 := &Rule{ + Id: "23", + Resource: "abc", + MetricType: Concurrency, + Behavior: Throttling, + ParamIndex: 2, + Threshold: 100, + MaxQueueingTimeMs: 20, + BurstCount: 0, + DurationInSec: 12, + SpecificItems: m3, + } + + oldTc1Ptr := tcMap["abc"][0] + oldTc2Ptr := tcMap["abc"][1] + oldTc3Ptr := tcMap["abc"][2] + oldTc4Ptr := tcMap["abc"][3] + oldTc1PtrAddr := fmt.Sprintf("%p", oldTc1Ptr) + oldTc2PtrAddr := fmt.Sprintf("%p", oldTc2Ptr) + oldTc3PtrAddr := fmt.Sprintf("%p", oldTc3Ptr) + oldTc4PtrAddr := fmt.Sprintf("%p", oldTc4Ptr) + fmt.Println(oldTc1PtrAddr) + fmt.Println(oldTc2PtrAddr) + fmt.Println(oldTc3PtrAddr) + fmt.Println(oldTc4PtrAddr) + oldTc2MetricPtrAddr := fmt.Sprintf("%p", tcMap["abc"][1].BoundMetric()) + fmt.Println("oldTc2MetricPtr:", oldTc2MetricPtrAddr) + + newTcMap := buildTcMap([]*Rule{r21, r22, r23}) + assert.True(t, len(newTcMap) == 1) + abcTcs := newTcMap["abc"] + assert.True(t, len(abcTcs) == 3) + newTc1Ptr := abcTcs[0] + newTc2Ptr := abcTcs[1] + newTc3Ptr := abcTcs[2] + newTc1PtrAddr := fmt.Sprintf("%p", newTc1Ptr) + newTc2PtrAddr := fmt.Sprintf("%p", newTc2Ptr) + newTc3PtrAddr := fmt.Sprintf("%p", newTc3Ptr) + fmt.Println(newTc1PtrAddr) + fmt.Println(newTc2PtrAddr) + fmt.Println(newTc3PtrAddr) + newTc2MetricPtrAddr := fmt.Sprintf("%p", newTc2Ptr.BoundMetric()) + fmt.Println("newTc2MetricPtrAddr:", newTc2MetricPtrAddr) + assert.True(t, newTc1PtrAddr == oldTc1PtrAddr && newTc2MetricPtrAddr == oldTc2MetricPtrAddr) + assert.True(t, abcTcs[0].BoundRule() == r1 && abcTcs[0] == oldTc1Ptr) + assert.True(t, abcTcs[1].BoundMetric() == oldTc2Ptr.BoundMetric()) + + tcMap = make(trafficControllerMap) +} diff --git a/core/freq_params_traffic/rule_test.go b/core/freq_params_traffic/rule_test.go new file mode 100644 index 000000000..15c8b693c --- /dev/null +++ b/core/freq_params_traffic/rule_test.go @@ -0,0 +1,137 @@ +package freq_params_traffic + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_parseSpecificItems(t *testing.T) { + t.Run("Test_parseSpecificItems", func(t *testing.T) { + source := make(map[SpecificValue]int64) + s1 := SpecificValue{ + ValKind: kindInt, + ValStr: "10010", + } + s2 := SpecificValue{ + ValKind: kindInt, + ValStr: "10010aaa", + } + s3 := SpecificValue{ + ValKind: kindString, + ValStr: "test-string", + } + s4 := SpecificValue{ + ValKind: KindBool, + ValStr: "true", + } + s5 := SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.234", + } + s6 := SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.2345678", + } + source[s1] = 100 + source[s2] = 100 + source[s3] = 100 + source[s4] = 100 + source[s5] = 100 + source[s6] = 100 + + got := parseSpecificItems(source) + assert.True(t, len(got) == 5) + assert.True(t, got[10010] == 100) + assert.True(t, got[true] == 100) + assert.True(t, got[1.234] == 100) + assert.True(t, got[1.23400] == 100) + assert.True(t, got["test-string"] == 100) + assert.True(t, got[1.23457] == 100) + }) +} + +func TestMetricType_String(t *testing.T) { + t.Run("TestMetricType_String", func(t *testing.T) { + assert.True(t, fmt.Sprintf("%+v", Concurrency) == "Concurrency") + + }) +} + +func Test_Rule_String(t *testing.T) { + t.Run("Test_Rule_String_Normal", func(t *testing.T) { + m := make(map[SpecificValue]int64) + m[SpecificValue{ + ValKind: kindString, + ValStr: "sss", + }] = 1 + m[SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.123", + }] = 3 + r := &Rule{ + Id: "abc", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 110, + MaxQueueingTimeMs: 5, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + fmt.Println(fmt.Sprintf("%+v", []*Rule{r})) + assert.True(t, fmt.Sprintf("%+v", []*Rule{r}) == "[{Id:abc, Resource:abc, MetricType:Concurrency, Behavior:Reject, ParamIndex:0, Threshold:110.000000, MaxQueueingTimeMs:5, BurstCount:10, DurationInSec:1, SpecificItems:map[{ValKind:kindString ValStr:sss}:1 {ValKind:KindFloat64 ValStr:1.123}:3]},]") + }) +} + +func Test_Rule_Equals(t *testing.T) { + t.Run("Test_Rule_Equals", func(t *testing.T) { + m := make(map[SpecificValue]int64) + m[SpecificValue{ + ValKind: kindString, + ValStr: "sss", + }] = 1 + m[SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.123", + }] = 3 + r1 := &Rule{ + Id: "abc", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 110, + MaxQueueingTimeMs: 5, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m, + } + + m2 := make(map[SpecificValue]int64) + m2[SpecificValue{ + ValKind: kindString, + ValStr: "sss", + }] = 1 + m2[SpecificValue{ + ValKind: KindFloat64, + ValStr: "1.123", + }] = 3 + r2 := &Rule{ + Id: "abc", + Resource: "abc", + MetricType: Concurrency, + Behavior: Reject, + ParamIndex: 0, + Threshold: 110, + MaxQueueingTimeMs: 5, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: m2, + } + assert.True(t, r1.Equals(r2)) + }) +} diff --git a/core/freq_params_traffic/slot.go b/core/freq_params_traffic/slot.go new file mode 100644 index 000000000..dfa4cb9f7 --- /dev/null +++ b/core/freq_params_traffic/slot.go @@ -0,0 +1,126 @@ +package freq_params_traffic + +import ( + "fmt" + "strconv" + "time" + + "github.com/alibaba/sentinel-golang/core/base" +) + +type FreqPramsTrafficSlot struct { +} + +// matchArg matches the arg from args based on TrafficShapingController +// return nil if match failed. +func matchArg(tc TrafficShapingController, args []interface{}) interface{} { + if tc == nil { + return nil + } + idx := tc.BoundParamIndex() + if idx < 0 { + idx = len(args) + idx + } + if idx < 0 { + logger.Debugf("The param index in tc(%+v) is invalid for args(%+v)", tc, args) + return nil + } + if idx >= len(args) { + logger.Debugf("The argument doesn't exist for index(%d) of tc(%+v), args: %+v", idx, tc, args) + return nil + } + arg := args[idx] + if arg == nil { + return nil + } + switch arg.(type) { + case bool: + case float32: + n32 := arg.(float32) + n64, err := strconv.ParseFloat(fmt.Sprintf("%.5f", n32), 64) + if err != nil { + return nil + } + arg = n64 + case float64: + n64 := arg.(float64) + n64, err := strconv.ParseFloat(fmt.Sprintf("%.5f", n64), 64) + if err != nil { + return nil + } + arg = n64 + case int: + arg = arg.(int) + case int8: + n := arg.(int8) + arg = int(n) + case int16: + n := arg.(int16) + arg = int(n) + case int32: + n := arg.(int32) + arg = int(n) + case int64: + n := arg.(int64) + arg = int(n) + case uint: + n := arg.(uint) + arg = int(n) + case uint8: + n := arg.(uint8) + arg = int(n) + case uint16: + n := arg.(uint16) + arg = int(n) + case uint32: + n := arg.(uint32) + arg = int(n) + case uint64: + n := arg.(uint64) + arg = int(n) + case string: + default: + // unsupported param kind, direct return pass + return nil + } + return arg +} + +func (s *FreqPramsTrafficSlot) Check(ctx *base.EntryContext) *base.TokenResult { + res := ctx.Resource.Name() + args := ctx.Input.Args + acquire := int64(ctx.Input.AcquireCount) + + result := ctx.RuleCheckResult + tcs := getTrafficControllersFor(res) + if len(tcs) == 0 { + return result + } + + for _, tc := range tcs { + arg := matchArg(tc, args) + if arg == nil { + continue + } + r := canPassCheck(tc, arg, acquire) + if r.Status() == base.ResultStatusBlocked { + return r + } + if r.Status() == base.ResultStatusShouldWait { + if waitMs := r.WaitMs(); waitMs > 0 { + // Handle waiting action. + time.Sleep(time.Duration(waitMs) * time.Millisecond) + } + continue + } + } + return result +} + +func canPassCheck(tc TrafficShapingController, arg interface{}, acquire int64) *base.TokenResult { + return canPassLocalCheck(tc, arg, acquire) +} + +func canPassLocalCheck(tc TrafficShapingController, arg interface{}, acquire int64) *base.TokenResult { + return tc.PerformChecking(arg, acquire) +} diff --git a/core/freq_params_traffic/slot_test.go b/core/freq_params_traffic/slot_test.go new file mode 100644 index 000000000..9e1681ef1 --- /dev/null +++ b/core/freq_params_traffic/slot_test.go @@ -0,0 +1,109 @@ +package freq_params_traffic + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/alibaba/sentinel-golang/core/base" + + "github.com/stretchr/testify/mock" +) + +type TrafficShapingControllerMock struct { + mock.Mock +} + +func (m *TrafficShapingControllerMock) PerformChecking(arg interface{}, acquireCount int64) *base.TokenResult { + retArgs := m.Called(arg, acquireCount) + return retArgs.Get(0).(*base.TokenResult) +} + +func (m *TrafficShapingControllerMock) BoundParamIndex() int { + retArgs := m.Called() + return retArgs.Int(0) +} + +func (m *TrafficShapingControllerMock) BoundMetric() *ParamsMetric { + retArgs := m.Called() + return retArgs.Get(0).(*ParamsMetric) +} + +func (m *TrafficShapingControllerMock) BoundRule() *Rule { + retArgs := m.Called() + return retArgs.Get(0).(*Rule) +} + +func (m *TrafficShapingControllerMock) Replace(r *Rule) { + _ = m.Called(r) + return +} + +func Test_matchArg(t *testing.T) { + t.Run("Test_matchArg", func(t *testing.T) { + + args := make([]interface{}, 10) + args[0] = true + args[1] = false + args[2] = float32(1.2345678) + args[3] = float64(1.23) + args[4] = uint8(66) + args[5] = int32(88) + args[6] = int(6688) + args[7] = uint64(668866) + args[8] = "ximu" + args[9] = int64(-100) + + tcMock := &TrafficShapingControllerMock{} + tcMock.On("BoundParamIndex").Return(0) + ret0 := matchArg(tcMock, args) + assert.True(t, reflect.TypeOf(ret0).Kind() == reflect.Bool && ret0 == true) + + tcMock1 := &TrafficShapingControllerMock{} + tcMock1.On("BoundParamIndex").Return(1) + ret1 := matchArg(tcMock1, args) + assert.True(t, reflect.TypeOf(ret1).Kind() == reflect.Bool && ret1 == false) + + tcMock2 := &TrafficShapingControllerMock{} + tcMock2.On("BoundParamIndex").Return(2) + ret2 := matchArg(tcMock2, args) + assert.True(t, reflect.TypeOf(ret2).Kind() == reflect.Float64 && ret2 == 1.23457) + + tcMock3 := &TrafficShapingControllerMock{} + tcMock3.On("BoundParamIndex").Return(3) + ret3 := matchArg(tcMock3, args) + assert.True(t, reflect.TypeOf(ret3).Kind() == reflect.Float64 && ret3 == 1.23000) + + tcMock4 := &TrafficShapingControllerMock{} + tcMock4.On("BoundParamIndex").Return(4) + ret4 := matchArg(tcMock4, args) + assert.True(t, reflect.TypeOf(ret4).Kind() == reflect.Int && ret4 == 66) + + tcMock5 := &TrafficShapingControllerMock{} + tcMock5.On("BoundParamIndex").Return(5) + ret5 := matchArg(tcMock5, args) + assert.True(t, reflect.TypeOf(ret5).Kind() == reflect.Int && ret5 == 88) + + tcMock6 := &TrafficShapingControllerMock{} + tcMock6.On("BoundParamIndex").Return(6) + ret6 := matchArg(tcMock6, args) + assert.True(t, reflect.TypeOf(ret6).Kind() == reflect.Int && ret6 == 6688) + + tcMock7 := &TrafficShapingControllerMock{} + tcMock7.On("BoundParamIndex").Return(7) + ret7 := matchArg(tcMock7, args) + assert.True(t, reflect.TypeOf(ret7).Kind() == reflect.Int && ret7 == 668866) + + tcMock8 := &TrafficShapingControllerMock{} + tcMock8.On("BoundParamIndex").Return(8) + ret8 := matchArg(tcMock8, args) + assert.True(t, reflect.TypeOf(ret8).Kind() == reflect.String && ret8 == "ximu") + + tcMock9 := &TrafficShapingControllerMock{} + tcMock9.On("BoundParamIndex").Return(9) + ret9 := matchArg(tcMock9, args) + assert.True(t, reflect.TypeOf(ret9).Kind() == reflect.Int && ret9 == -100) + + }) +} diff --git a/core/freq_params_traffic/traffic_shaping.go b/core/freq_params_traffic/traffic_shaping.go new file mode 100644 index 000000000..73eab18b5 --- /dev/null +++ b/core/freq_params_traffic/traffic_shaping.go @@ -0,0 +1,273 @@ +package freq_params_traffic + +import ( + "fmt" + "math" + "runtime" + "sync/atomic" + + "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/freq_params_traffic/cache" + "github.com/alibaba/sentinel-golang/util" +) + +type TrafficShapingController interface { + PerformChecking(arg interface{}, acquireCount int64) *base.TokenResult + + BoundParamIndex() int + + BoundMetric() *ParamsMetric + + BoundRule() *Rule +} + +type baseTrafficShapingController struct { + r *Rule + + res string + metricType MetricType + paramIndex int + threshold float64 + specificItems map[interface{}]int64 + durationInSec int64 + + metric *ParamsMetric +} + +func newBaseTrafficShapingControllerWithMetric(r *Rule, metric *ParamsMetric) *baseTrafficShapingController { + size := int(math.Min(float64(ParamsMaxCapacity), float64(ParamsCapacityBase*r.DurationInSec))) + if size <= 0 { + logger.Warnf("The size of cache is not more than 0, ParamsMaxCapacity: %d, ParamsCapacityBase: %d", ParamsMaxCapacity, ParamsCapacityBase) + size = ParamsMaxCapacity + } + specificItems := parseSpecificItems(r.SpecificItems) + return &baseTrafficShapingController{ + r: r, + res: r.Resource, + metricType: r.MetricType, + paramIndex: r.ParamIndex, + threshold: r.Threshold, + specificItems: specificItems, + durationInSec: r.DurationInSec, + metric: metric, + } +} + +func newBaseTrafficShapingController(r *Rule) *baseTrafficShapingController { + size := int(math.Min(float64(ParamsMaxCapacity), float64(ParamsCapacityBase*r.DurationInSec))) + if size <= 0 { + logger.Warnf("The size of cache is not more than 0, ParamsMaxCapacity: %d, ParamsCapacityBase: %d", ParamsMaxCapacity, ParamsCapacityBase) + size = ParamsMaxCapacity + } + specificItems := parseSpecificItems(r.SpecificItems) + return &baseTrafficShapingController{ + r: r, + res: r.Resource, + metricType: r.MetricType, + paramIndex: r.ParamIndex, + threshold: r.Threshold, + specificItems: specificItems, + durationInSec: r.DurationInSec, + metric: &ParamsMetric{ + RuleTimeCounter: cache.NewLRUCacheMap(size), + RuleTokenCounter: cache.NewLRUCacheMap(size), + ConcurrencyCounter: cache.NewLRUCacheMap(ConcurrencyMaxCount), + }, + } +} + +func (c *baseTrafficShapingController) BoundMetric() *ParamsMetric { + return c.metric +} + +func (c *baseTrafficShapingController) performCheckingForConcurrencyMetric(arg interface{}) *base.TokenResult { + specificItem := c.specificItems + initConcurrency := new(int64) + *initConcurrency = 0 + concurrencyPtr := c.metric.ConcurrencyCounter.AddIfAbsent(arg, initConcurrency) + if concurrencyPtr == nil { + // First to access this arg + return base.NewTokenResultPass() + } + concurrency := atomic.LoadInt64(concurrencyPtr) + concurrency++ + if specificConcurrency, existed := specificItem[arg]; existed { + if concurrency <= specificConcurrency { + return base.NewTokenResultPass() + } + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, fmt.Sprintf("Frequency params traffic shaping controller, current concurrency: %d, specific concurrency: %d", concurrency, specificConcurrency)) + } + threshold := int64(c.threshold) + if concurrency <= threshold { + return base.NewTokenResultPass() + } + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, fmt.Sprintf("Frequency params traffic shaping controller, current concurrency: %d, threshold: %d", concurrency, threshold)) +} + +// rejectTrafficShapingController use Reject strategy +type rejectTrafficShapingController struct { + baseTrafficShapingController + burstCount int64 +} + +// rejectTrafficShapingController use Throttling strategy +type throttlingTrafficShapingController struct { + baseTrafficShapingController + maxQueueingTimeMs int64 +} + +func (c *baseTrafficShapingController) BoundRule() *Rule { + return c.r +} + +func (c *baseTrafficShapingController) BoundParamIndex() int { + return c.paramIndex +} + +func (c *rejectTrafficShapingController) PerformChecking(arg interface{}, acquireCount int64) *base.TokenResult { + metric := c.metric + if metric == nil { + return base.NewTokenResultPass() + } + + if c.metricType == Concurrency { + return c.performCheckingForConcurrencyMetric(arg) + } else if c.metricType > QPS { + return base.NewTokenResultPass() + } + + timeCounter := metric.RuleTimeCounter + tokenCounter := metric.RuleTokenCounter + if timeCounter == nil || tokenCounter == nil { + return base.NewTokenResultPass() + } + + // calculate available token + tokenCount := int64(c.threshold) + val, existed := c.specificItems[arg] + if existed { + tokenCount = val + } + if tokenCount <= 0 { + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, "rejectTrafficShapingController, the setting tokenCount is <= 0") + } + maxCount := tokenCount + c.burstCount + if acquireCount > maxCount { + // return blocked because the acquired number is more than max count of rejectTrafficShapingController + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, fmt.Sprintf("rejectTrafficShapingController, the acquired number(%d) is more than max count(%d) of rejectTrafficShapingController", acquireCount, maxCount)) + } + + for { + currentTimeInMs := int64(util.CurrentTimeMillis()) + lastAddTokenTimePtr := timeCounter.AddIfAbsent(arg, ¤tTimeInMs) + if lastAddTokenTimePtr == nil { + // First to fill token, and consume token immediately + leftCount := maxCount - acquireCount + tokenCounter.AddIfAbsent(arg, &leftCount) + return base.NewTokenResultPass() + } + + // Calculate the time duration since last token was added. + passTime := currentTimeInMs - atomic.LoadInt64(lastAddTokenTimePtr) + if passTime > c.durationInSec*1000 { + // Refill the tokens because statistic window has passed. + leftCount := maxCount - acquireCount + oldQpsPtr := tokenCounter.AddIfAbsent(arg, &leftCount) + if oldQpsPtr == nil { + // Might not be accurate here. + atomic.StoreInt64(lastAddTokenTimePtr, currentTimeInMs) + return base.NewTokenResultPass() + } else { + // refill token + restQps := atomic.LoadInt64(oldQpsPtr) + toAddTokenNum := passTime * tokenCount / (c.durationInSec * 1000) + newQps := int64(0) + if toAddTokenNum+restQps > maxCount { + newQps = maxCount - acquireCount + } else { + newQps = toAddTokenNum + restQps - acquireCount + } + if newQps < 0 { + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, fmt.Sprintf("rejectTrafficShapingController, the new QPS after subbing acquire(%d) is less than 0.", acquireCount)) + } + if atomic.CompareAndSwapInt64(oldQpsPtr, restQps, newQps) { + atomic.StoreInt64(lastAddTokenTimePtr, currentTimeInMs) + return base.NewTokenResultPass() + } + runtime.Gosched() + } + } else { + //check whether the rest of token is enough to acquire + oldQpsPtr, found := tokenCounter.Get(arg) + if found { + oldRestToken := atomic.LoadInt64(oldQpsPtr) + if oldRestToken-acquireCount >= 0 { + //update + if atomic.CompareAndSwapInt64(oldQpsPtr, oldRestToken, oldRestToken-acquireCount) { + return base.NewTokenResultPass() + } + } else { + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, fmt.Sprintf("rejectTrafficShapingController, the rest token is not enough, oldRestToken: %d, acquire: %d", oldRestToken, acquireCount)) + } + } + runtime.Gosched() + } + } +} + +func (c *throttlingTrafficShapingController) PerformChecking(arg interface{}, acquireCount int64) *base.TokenResult { + metric := c.metric + if metric == nil { + return base.NewTokenResultPass() + } + + if c.metricType == Concurrency { + return c.performCheckingForConcurrencyMetric(arg) + } else if c.metricType > QPS { + return base.NewTokenResultPass() + } + + timeCounter := metric.RuleTimeCounter + tokenCounter := metric.RuleTokenCounter + if timeCounter == nil || tokenCounter == nil { + return base.NewTokenResultPass() + } + + // calculate available token + tokenCount := int64(c.threshold) + val, existed := c.specificItems[arg] + if existed { + tokenCount = val + } + if tokenCount <= 0 { + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, "throttlingTrafficShapingController, the setting tokenCount is <= 0") + } + intervalCostTime := int64(math.Round(float64(acquireCount * c.durationInSec * 1000 / tokenCount))) + for { + currentTimeInMs := int64(util.CurrentTimeMillis()) + lastPassTimePtr := timeCounter.AddIfAbsent(arg, ¤tTimeInMs) + if lastPassTimePtr == nil { + // first access arg + return base.NewTokenResultPass() + } + // load the last pass time + lastPassTime := atomic.LoadInt64(lastPassTimePtr) + // calculate the expected pass time + expectedTime := lastPassTime + intervalCostTime + + if expectedTime <= currentTimeInMs || expectedTime-currentTimeInMs < c.maxQueueingTimeMs { + if atomic.CompareAndSwapInt64(lastPassTimePtr, lastPassTime, currentTimeInMs) { + awaitTime := expectedTime - currentTimeInMs + if awaitTime > 0 { + atomic.StoreInt64(lastPassTimePtr, expectedTime) + return base.NewTokenResultShouldWait(uint64(awaitTime)) + } + return base.NewTokenResultPass() + } else { + runtime.Gosched() + } + } else { + return base.NewTokenResultBlocked(base.BlockTypeFreqParamsFlow, fmt.Sprintf("throttlingTrafficShapingController, current time(%d) is not reaching to expected time(%d)", currentTimeInMs, expectedTime)) + } + } +} diff --git a/core/freq_params_traffic/traffic_shaping_test.go b/core/freq_params_traffic/traffic_shaping_test.go new file mode 100644 index 000000000..8166e7ca5 --- /dev/null +++ b/core/freq_params_traffic/traffic_shaping_test.go @@ -0,0 +1,266 @@ +package freq_params_traffic + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/alibaba/sentinel-golang/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +type counterCacheMock struct { + mock.Mock +} + +func (c *counterCacheMock) Add(key interface{}, value *int64) { + c.Called(key, value) + return +} + +func (c *counterCacheMock) AddIfAbsent(key interface{}, value *int64) (priorValue *int64) { + arg := c.Called(key, value) + ret := arg.Get(0) + if ret == nil { + return nil + } + return ret.(*int64) +} + +func (c *counterCacheMock) Get(key interface{}) (value *int64, isFound bool) { + arg := c.Called(key) + val := arg.Get(0) + if val == nil { + return nil, arg.Bool(1) + } + return val.(*int64), arg.Bool(1) +} + +func (c *counterCacheMock) Remove(key interface{}) (isFound bool) { + arg := c.Called(key) + return arg.Bool(0) +} + +func (c *counterCacheMock) Contains(key interface{}) (ok bool) { + arg := c.Called(key) + return arg.Bool(0) +} + +func (c *counterCacheMock) Keys() []interface{} { + arg := c.Called() + return arg.Get(0).([]interface{}) +} + +func (c *counterCacheMock) Len() int { + arg := c.Called() + return arg.Int(0) +} + +func (c *counterCacheMock) Purge() { + _ = c.Called() + return +} + +func Test_baseTrafficShapingController_performCheckingForConcurrencyMetric(t *testing.T) { + t.Run("Test_baseTrafficShapingController_performCheckingForConcurrencyMetric", func(t *testing.T) { + goCounter := &counterCacheMock{} + c := &baseTrafficShapingController{ + r: nil, + res: "res_a", + metricType: Concurrency, + paramIndex: 0, + threshold: 100, + specificItems: make(map[interface{}]int64), + durationInSec: 1, + metric: &ParamsMetric{ + RuleTimeCounter: nil, + RuleTokenCounter: nil, + ConcurrencyCounter: goCounter, + }, + } + initConcurrency := new(int64) + *initConcurrency = 50 + + goCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Return(initConcurrency) + result := c.performCheckingForConcurrencyMetric(666688) + assert.True(t, result.IsPass()) + + *initConcurrency = 101 + result = c.performCheckingForConcurrencyMetric(666688) + assert.True(t, result.IsBlocked()) + + c.specificItems[666688] = 20 + result = c.performCheckingForConcurrencyMetric(666688) + assert.True(t, result.IsBlocked()) + }) +} + +func Test_defaultTrafficShapingController_performChecking(t *testing.T) { + t.Run("Test_defaultTrafficShapingController_performChecking_TimeCounter_Nil", func(t *testing.T) { + timeCounter := &counterCacheMock{} + tokenCounter := &counterCacheMock{} + goCounter := &counterCacheMock{} + c := &rejectTrafficShapingController{ + baseTrafficShapingController: baseTrafficShapingController{ + r: nil, + res: "res_a", + metricType: QPS, + paramIndex: 0, + threshold: 100, + specificItems: make(map[interface{}]int64), + durationInSec: 1, + metric: &ParamsMetric{ + RuleTimeCounter: timeCounter, + RuleTokenCounter: tokenCounter, + ConcurrencyCounter: goCounter, + }, + }, + burstCount: 10, + } + arg := 010110 + result := c.PerformChecking(arg, 130) + assert.True(t, result.IsBlocked()) + + lastAddTokenTime := new(int64) + *lastAddTokenTime = 1578416556900 + timeCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Times(1).Return(nil) + tokenCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Times(1).Return(nil) + result = c.PerformChecking(arg, 20) + assert.True(t, result.IsPass()) + }) + + t.Run("Test_defaultTrafficShapingController_performChecking_Sub_Token", func(t *testing.T) { + timeCounter := &counterCacheMock{} + tokenCounter := &counterCacheMock{} + c := &rejectTrafficShapingController{ + baseTrafficShapingController: baseTrafficShapingController{ + r: nil, + res: "res_a", + metricType: QPS, + paramIndex: 0, + threshold: 100, + specificItems: make(map[interface{}]int64), + durationInSec: 10, + metric: &ParamsMetric{ + RuleTimeCounter: timeCounter, + RuleTokenCounter: tokenCounter, + ConcurrencyCounter: nil, + }, + }, + burstCount: 10, + } + arg := 010110 + lastAddTokenTime := new(int64) + currentTimeInMs := int64(util.CurrentTimeMillis()) + *lastAddTokenTime = currentTimeInMs - 1000 + timeCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Times(1).Return(lastAddTokenTime) + oldQps := new(int64) + *oldQps = 50 + tokenCounter.On("Get", mock.Anything).Return(oldQps, true).Times(1) + result := c.PerformChecking(arg, 20) + assert.True(t, result.IsPass()) + assert.True(t, atomic.LoadInt64(oldQps) == 30) + }) + + t.Run("Test_defaultTrafficShapingController_performChecking_First_Fill_Token", func(t *testing.T) { + timeCounter := &counterCacheMock{} + tokenCounter := &counterCacheMock{} + c := &rejectTrafficShapingController{ + baseTrafficShapingController: baseTrafficShapingController{ + r: nil, + res: "res_a", + metricType: QPS, + paramIndex: 0, + threshold: 100, + specificItems: make(map[interface{}]int64), + durationInSec: 1, + metric: &ParamsMetric{ + RuleTimeCounter: timeCounter, + RuleTokenCounter: tokenCounter, + ConcurrencyCounter: nil, + }, + }, + burstCount: 10, + } + arg := 010110 + lastAddTokenTime := new(int64) + currentTimeInMs := int64(util.CurrentTimeMillis()) + *lastAddTokenTime = currentTimeInMs - 1001 + timeCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Return(lastAddTokenTime).Times(1) + + tokenCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Return(nil).Times(1) + time.Sleep(time.Duration(10) * time.Millisecond) + result := c.PerformChecking(arg, 20) + assert.True(t, result.IsPass()) + assert.True(t, *lastAddTokenTime > currentTimeInMs) + }) + + t.Run("Test_defaultTrafficShapingController_performChecking_Refill_Token", func(t *testing.T) { + timeCounter := &counterCacheMock{} + tokenCounter := &counterCacheMock{} + c := &rejectTrafficShapingController{ + baseTrafficShapingController: baseTrafficShapingController{ + r: nil, + res: "res_a", + metricType: QPS, + paramIndex: 0, + threshold: 100, + specificItems: make(map[interface{}]int64), + durationInSec: 1, + metric: &ParamsMetric{ + RuleTimeCounter: timeCounter, + RuleTokenCounter: tokenCounter, + ConcurrencyCounter: nil, + }, + }, + burstCount: 10, + } + arg := 010110 + lastAddTokenTime := new(int64) + currentTimeInMs := int64(util.CurrentTimeMillis()) + *lastAddTokenTime = currentTimeInMs - 1001 + timeCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Return(lastAddTokenTime).Times(1) + + oldQps := new(int64) + *oldQps = 50 + tokenCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Return(oldQps).Times(1) + time.Sleep(time.Duration(10) * time.Millisecond) + result := c.PerformChecking(arg, 20) + assert.True(t, result.IsPass()) + assert.True(t, atomic.LoadInt64(lastAddTokenTime) > currentTimeInMs) + assert.True(t, atomic.LoadInt64(oldQps) > 30) + }) +} + +func Test_throttlingTrafficShapingController_performChecking(t *testing.T) { + t.Run("Test_throttlingTrafficShapingController_performChecking", func(t *testing.T) { + timeCounter := &counterCacheMock{} + tokenCounter := &counterCacheMock{} + c := &throttlingTrafficShapingController{ + baseTrafficShapingController: baseTrafficShapingController{ + r: nil, + res: "res_a", + metricType: QPS, + paramIndex: 0, + threshold: 100, + specificItems: make(map[interface{}]int64), + durationInSec: 1, + metric: &ParamsMetric{ + RuleTimeCounter: timeCounter, + RuleTokenCounter: tokenCounter, + ConcurrencyCounter: nil, + }, + }, + maxQueueingTimeMs: 10, + } + + arg := 010110 + lastAddTokenTime := new(int64) + currentTimeInMs := int64(util.CurrentTimeMillis()) + *lastAddTokenTime = currentTimeInMs - 201 + timeCounter.On("AddIfAbsent", mock.Anything, mock.Anything).Return(lastAddTokenTime).Times(1) + result := c.PerformChecking(arg, 20) + assert.True(t, result.IsPass()) + }) +} diff --git a/core/stat/base/leap_array.go b/core/stat/base/leap_array.go index 4b2c3a19b..066596ba3 100644 --- a/core/stat/base/leap_array.go +++ b/core/stat/base/leap_array.go @@ -168,7 +168,7 @@ func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*Bucke } else { runtime.Gosched() } - } else if bucketStart < old.BucketStart { + } else if bucketStart < atomic.LoadUint64(&old.BucketStart) { // TODO: reserve for some special case (e.g. when occupying "future" buckets). return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart)) } diff --git a/example/freq_params_traffic/hot-pramas-sentinel.yml b/example/freq_params_traffic/hot-pramas-sentinel.yml new file mode 100644 index 000000000..689e6a0d4 --- /dev/null +++ b/example/freq_params_traffic/hot-pramas-sentinel.yml @@ -0,0 +1,8 @@ +version: "v1" +sentinel: + app: + name: sentinel-go-demo + log: + dir: ./example/ + metric: + maxFileCount: 7 \ No newline at end of file diff --git a/example/freq_params_traffic/hot_params_flow_example.go b/example/freq_params_traffic/hot_params_flow_example.go new file mode 100644 index 000000000..7a995968a --- /dev/null +++ b/example/freq_params_traffic/hot_params_flow_example.go @@ -0,0 +1,131 @@ +package main + +import ( + "fmt" + "log" + "math/rand" + "time" + + sentinel "github.com/alibaba/sentinel-golang/api" + "github.com/alibaba/sentinel-golang/core/base" + "github.com/alibaba/sentinel-golang/core/flow" + "github.com/alibaba/sentinel-golang/core/freq_params_traffic" + "github.com/alibaba/sentinel-golang/core/stat" + "github.com/alibaba/sentinel-golang/core/system" + "github.com/alibaba/sentinel-golang/util" + "github.com/google/uuid" +) + +func main() { + var Resource = "test" + + // We should initialize Sentinel first. + err := sentinel.Init("./hot-pramas-sentinel.yml") + if err != nil { + log.Fatalf("Unexpected error: %+v", err) + } + + _, err = freq_params_traffic.LoadRules([]*freq_params_traffic.Rule{ + { + Id: "a1", + Resource: Resource, + MetricType: freq_params_traffic.Concurrency, + Behavior: freq_params_traffic.Reject, + ParamIndex: 0, + Threshold: 100, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: make(map[freq_params_traffic.SpecificValue]int64), + }, + { + Id: "a2", + Resource: Resource, + MetricType: freq_params_traffic.Concurrency, + Behavior: freq_params_traffic.Reject, + ParamIndex: 1, + Threshold: 100, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: make(map[freq_params_traffic.SpecificValue]int64), + }, + { + Id: "a3", + Resource: Resource, + MetricType: freq_params_traffic.Concurrency, + Behavior: freq_params_traffic.Reject, + ParamIndex: 2, + Threshold: 100, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: make(map[freq_params_traffic.SpecificValue]int64), + }, + { + Id: "a4", + Resource: Resource, + MetricType: freq_params_traffic.Concurrency, + Behavior: freq_params_traffic.Reject, + ParamIndex: 3, + Threshold: 100, + MaxQueueingTimeMs: 0, + BurstCount: 10, + DurationInSec: 1, + SpecificItems: make(map[freq_params_traffic.SpecificValue]int64), + }, + }) + if err != nil { + log.Fatalf("Unexpected error: %+v", err) + return + } + + sc := base.NewSlotChain() + sc.AddStatPrepareSlotLast(&stat.StatNodePrepareSlot{}) + sc.AddRuleCheckSlotLast(&system.SystemAdaptiveSlot{}) + sc.AddRuleCheckSlotLast(&flow.FlowSlot{}) + sc.AddRuleCheckSlotLast(&freq_params_traffic.FreqPramsTrafficSlot{}) + sc.AddStatSlotLast(&stat.StatisticSlot{}) + sc.AddStatSlotLast(&freq_params_traffic.ConcurrencyStatSlot{}) + + for i := 0; i < 100; i++ { + go func() { + for { + e, b := sentinel.Entry(Resource, sentinel.WithTrafficType(base.Inbound), sentinel.WithSlotChain(sc), sentinel.WithArgs(true, rand.Int()%3000, uuid.New().String(), uuid.New().String())) + if b != nil { + // Blocked. We could get the block reason from the BlockError. + time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) + fmt.Println(util.CurrentTimeMillis(), " blocked") + } else if e == nil && b == nil { + fmt.Println("e is ni") + } else { + // Passed, wrap the logic here. + fmt.Println(util.CurrentTimeMillis(), " passed") + time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) + // Be sure the entry is exited finally. + e.Exit() + } + + } + }() + } + + for { + e, b := sentinel.Entry(Resource, sentinel.WithTrafficType(base.Inbound), sentinel.WithSlotChain(sc), sentinel.WithArgs(true, rand.Int()%3000, uuid.New().String(), uuid.New().String())) + if b != nil { + // Blocked. We could get the block reason from the BlockError. + time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) + fmt.Println(util.CurrentTimeMillis(), " blocked") + } else if e == nil && b == nil { + fmt.Println("e is ni") + } else { + // Passed, wrap the logic here. + fmt.Println(util.CurrentTimeMillis(), " passed") + time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) + + // Be sure the entry is exited finally. + e.Exit() + } + + } +} diff --git a/logging/logging.go b/logging/logging.go index ee3ee8c02..66f4ae774 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -54,7 +54,7 @@ func ResetDefaultLogger(log *log.Logger, namespace string) { func NewConsoleLogger(namespace string) *SentinelLogger { return &SentinelLogger{ - log: log.New(os.Stdout, "", log.LstdFlags), + log: log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile), namespace: namespace, } } diff --git a/logging/logging_test.go b/logging/logging_test.go index ae3929cd7..88d7b49a0 100644 --- a/logging/logging_test.go +++ b/logging/logging_test.go @@ -16,7 +16,7 @@ func TestNewSimpleFileLogger(t *testing.T) { if !strings.HasSuffix(tmpDir, string(os.PathSeparator)) { tmpDir = tmpDir + string(os.PathSeparator) } - logger, err := NewSimpleFileLogger(tmpDir+fileName, "test-log", log.LstdFlags) + logger, err := NewSimpleFileLogger(tmpDir+fileName, "test-log", log.LstdFlags|log.LstdFlags) assert.NoError(t, err) logger.Debug("debug info test.") diff --git a/util/atomic.go b/util/atomic.go index ce3c1850a..62eab07a8 100644 --- a/util/atomic.go +++ b/util/atomic.go @@ -3,6 +3,9 @@ package util import "sync/atomic" func IncrementAndGetInt64(v *int64) int64 { + if v == nil { + panic("Nil reference in util.IncrementAndGetInt64") + } old := int64(0) for { old = atomic.LoadInt64(v) @@ -13,6 +16,20 @@ func IncrementAndGetInt64(v *int64) int64 { return old + 1 } +func DecrementAndGetInt64(v *int64) int64 { + if v == nil { + panic("Nil reference in util.DecrementAndGetInt64") + } + old := int64(0) + for { + old = atomic.LoadInt64(v) + if atomic.CompareAndSwapInt64(v, old, old-1) { + break + } + } + return old - 1 +} + type AtomicBool struct { // default 0, means false flag int32