Skip to content

Commit

Permalink
frequency parameters flow control implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
louyuting committed May 23, 2020
1 parent 46b892a commit aa358bc
Show file tree
Hide file tree
Showing 25 changed files with 2,408 additions and 4 deletions.
7 changes: 7 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func WithArgs(args ...interface{}) EntryOption {
}
}

// WithSlotChain sets the slot chain.
func WithSlotChain(chain *base.SlotChain) EntryOption {
return func(opts *EntryOptions) {
opts.slotChain = chain
}
}

// WithAttachment set the resource entry with the given k-v pair
func WithAttachment(key interface{}, value interface{}) EntryOption {
return func(opts *EntryOptions) {
Expand Down
3 changes: 3 additions & 0 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/freq_params_traffic"
"github.com/alibaba/sentinel-golang/core/log"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/core/system"
Expand All @@ -30,8 +31,10 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddRuleCheckSlotLast(&system.SystemAdaptiveSlot{})
sc.AddRuleCheckSlotLast(&flow.FlowSlot{})
sc.AddRuleCheckSlotLast(&circuitbreaker.CircuitBreakerSlot{})
sc.AddRuleCheckSlotLast(&freq_params_traffic.FreqPramsTrafficSlot{})
sc.AddStatSlotLast(&stat.StatisticSlot{})
sc.AddStatSlotLast(&log.LogSlot{})
sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{})
sc.AddStatSlotLast(&freq_params_traffic.ConcurrencyStatSlot{})
return sc
}
1 change: 1 addition & 0 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
BlockTypeFlow
BlockTypeCircuitBreaking
BlockTypeSystemFlow
BlockTypeFreqParamsFlow
)

func (t BlockType) String() string {
Expand Down
2 changes: 1 addition & 1 deletion core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func reconfigureRecordLogger(logBaseDir string, withPid bool) error {
}

// Note: not thread-safe!
logging.ResetDefaultLogger(log.New(logFile, "", log.LstdFlags), logging.DefaultNamespace)
logging.ResetDefaultLogger(log.New(logFile, "", log.LstdFlags|log.Lshortfile), logging.DefaultNamespace)
fmt.Println("INFO: log base directory is: " + logDir)

return nil
Expand Down
32 changes: 32 additions & 0 deletions core/freq_params_traffic/cache/concurrent_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package cache

// ConcurrentCounterCache cache the hotspot parameter
type ConcurrentCounterCache interface {
// Add add a value to the cache,
// Updates the "recently used"-ness of the key.
Add(key interface{}, value *int64)

// If the key is not existed in the cache, adds a value to the cache then return nil. And updates the "recently used"-ness of the key
// If the key is already existed in the cache, do nothing and return the prior value
AddIfAbsent(key interface{}, value *int64) (priorValue *int64)

// Get returns key's value from the cache and updates the "recently used"-ness of the key.
Get(key interface{}) (value *int64, isFound bool)

// Remove removes a key from the cache.
// Return true if the key was contained.
Remove(key interface{}) (isFound bool)

// Contains checks if a key exists in cache
// Without updating the recent-ness.
Contains(key interface{}) (ok bool)

// Keys returns a slice of the keys in the cache, from oldest to newest.
Keys() []interface{}

// Len returns the number of items in the cache.
Len() int

// Purge clears all cache entries.
Purge()
}
88 changes: 88 additions & 0 deletions core/freq_params_traffic/cache/concurrent_lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cache

import (
"sync"
)

// LruCacheMap use LRU strategy to cache the most frequently accessed hotspot parameter
type LruCacheMap struct {
// Not thread safe
lru *LRU
lock *sync.RWMutex
}

func (c *LruCacheMap) Add(key interface{}, value *int64) {
c.lock.Lock()
defer c.lock.Unlock()

c.lru.Add(key, value)
return
}

func (c *LruCacheMap) AddIfAbsent(key interface{}, value *int64) (priorValue *int64) {
c.lock.Lock()
defer c.lock.Unlock()
val := c.lru.AddIfAbsent(key, value)
if val == nil {
return nil
}
priorValue = val.(*int64)
return
}

func (c *LruCacheMap) Get(key interface{}) (value *int64, isFound bool) {
c.lock.Lock()
defer c.lock.Unlock()

val, found := c.lru.Get(key)
if found {
return val.(*int64), true
}
return nil, false
}

func (c *LruCacheMap) Remove(key interface{}) (isFound bool) {
c.lock.Lock()
defer c.lock.Unlock()

return c.lru.Remove(key)
}

func (c *LruCacheMap) Contains(key interface{}) (ok bool) {
c.lock.RLock()
defer c.lock.RUnlock()

return c.lru.Contains(key)
}

func (c *LruCacheMap) Keys() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()

return c.lru.Keys()
}

func (c *LruCacheMap) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()

return c.lru.Len()
}

func (c *LruCacheMap) Purge() {
c.lock.Lock()
defer c.lock.Unlock()

c.lru.Purge()
}

func NewLRUCacheMap(size int) ConcurrentCounterCache {
lru, err := NewLRU(size, nil)
if err != nil {
return nil
}
return &LruCacheMap{
lru: lru,
lock: new(sync.RWMutex),
}
}
63 changes: 63 additions & 0 deletions core/freq_params_traffic/cache/concurrent_lru_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package cache

import (
"strconv"
"testing"
)

const CacheSize = 50000

func Benchmark_LRU_AddIfAbsent(b *testing.B) {
c := NewLRUCacheMap(CacheSize)
for a := 1; a <= CacheSize; a++ {
val := new(int64)
*val = int64(a)
c.Add(strconv.Itoa(a), val)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 1000; j <= 1001; j++ {
newVal := new(int64)
*newVal = int64(j)
prior := c.AddIfAbsent(strconv.Itoa(j), newVal)
if *prior != int64(j) {
b.Fatal("error!")
}
}
}
}

func Benchmark_LRU_Add(b *testing.B) {
c := NewLRUCacheMap(CacheSize)
for a := 1; a <= CacheSize; a++ {
val := new(int64)
*val = int64(a)
c.Add(strconv.Itoa(a), val)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := CacheSize - 100; j <= CacheSize-99; j++ {
newVal := new(int64)
*newVal = int64(j)
c.Add(strconv.Itoa(j), newVal)
}
}
}

func Benchmark_LRU_Get(b *testing.B) {
c := NewLRUCacheMap(CacheSize)
for a := 1; a <= CacheSize; a++ {
val := new(int64)
*val = int64(a)
c.Add(strconv.Itoa(a), val)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := CacheSize - 100; j <= CacheSize-99; j++ {
val, found := c.Get(strconv.Itoa(j))
if *val != int64(j) || !found {
b.Fatal("error")
}
}
}
}
96 changes: 96 additions & 0 deletions core/freq_params_traffic/cache/concurrent_lru_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package cache

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_concurrentLruCounterCacheMap_Add_Get(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add_Get", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Len() == 100)
val, found := c.Get("1")
assert.True(t, found && *val == 1)
})
}

func Test_concurrentLruCounterCacheMap_AddIfAbsent(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_AddIfAbsent", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 99; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
v := int64(100)
prior := c.AddIfAbsent("100", &v)
assert.True(t, prior == nil)
v2 := int64(1000)
prior2 := c.AddIfAbsent("100", &v2)
assert.True(t, *prior2 == 100)
})
}

func Test_concurrentLruCounterCacheMap_Contains(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Contains", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Contains("100") == true)
assert.True(t, c.Contains("1") == true)
assert.True(t, c.Contains("101") == false)

val := int64(101)
c.Add(strconv.Itoa(int(val)), &val)
assert.True(t, c.Contains("1") == false)
})
}

func Test_concurrentLruCounterCacheMap_Keys(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, len(c.Keys()) == 100)
assert.True(t, c.Keys()[0] == "1")
assert.True(t, c.Keys()[99] == "100")
})
}

func Test_concurrentLruCounterCacheMap_Purge(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Len() == 100)
c.Purge()
assert.True(t, c.Len() == 0)
})
}

func Test_concurrentLruCounterCacheMap_Remove(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Len() == 100)

c.Remove("100")
assert.True(t, c.Len() == 99)
val, existed := c.Get("100")
assert.True(t, existed == false && val == nil)
})
}
Loading

0 comments on commit aa358bc

Please sign in to comment.