Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

frequency parameters traffic control #119

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ func WithArgs(args ...interface{}) EntryOption {
}
}

// WithSlotChain sets the slot chain.
func WithSlotChain(chain *base.SlotChain) EntryOption {
return func(opts *EntryOptions) {
opts.slotChain = chain
}
}

// WithAttachment set the resource entry with the given k-v pair
func WithAttachment(key interface{}, value interface{}) EntryOption {
return func(opts *EntryOptions) {
Expand Down
3 changes: 3 additions & 0 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/freq_params_traffic"
"github.com/alibaba/sentinel-golang/core/log"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/core/system"
Expand All @@ -30,8 +31,10 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddRuleCheckSlotLast(&system.SystemAdaptiveSlot{})
sc.AddRuleCheckSlotLast(&flow.FlowSlot{})
sc.AddRuleCheckSlotLast(&circuitbreaker.CircuitBreakerSlot{})
sc.AddRuleCheckSlotLast(&freq_params_traffic.FreqPramsTrafficSlot{})
sc.AddStatSlotLast(&stat.StatisticSlot{})
sc.AddStatSlotLast(&log.LogSlot{})
sc.AddStatSlotLast(&circuitbreaker.MetricStatSlot{})
sc.AddStatSlotLast(&freq_params_traffic.ConcurrencyStatSlot{})
return sc
}
1 change: 1 addition & 0 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
BlockTypeFlow
BlockTypeCircuitBreaking
BlockTypeSystemFlow
BlockTypeFreqParamsFlow
)

func (t BlockType) String() string {
Expand Down
2 changes: 1 addition & 1 deletion core/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func reconfigureRecordLogger(logBaseDir string, withPid bool) error {
}

// Note: not thread-safe!
logging.ResetDefaultLogger(log.New(logFile, "", log.LstdFlags), logging.DefaultNamespace)
logging.ResetDefaultLogger(log.New(logFile, "", log.LstdFlags|log.Lshortfile), logging.DefaultNamespace)
fmt.Println("INFO: log base directory is: " + logDir)

return nil
Expand Down
32 changes: 32 additions & 0 deletions core/freq_params_traffic/cache/concurrent_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package cache

// ConcurrentCounterCache cache the hotspot parameter
type ConcurrentCounterCache interface {
// Add add a value to the cache,
// Updates the "recently used"-ness of the key.
Add(key interface{}, value *int64)

// If the key is not existed in the cache, adds a value to the cache then return nil. And updates the "recently used"-ness of the key
// If the key is already existed in the cache, do nothing and return the prior value
AddIfAbsent(key interface{}, value *int64) (priorValue *int64)

// Get returns key's value from the cache and updates the "recently used"-ness of the key.
Get(key interface{}) (value *int64, isFound bool)

// Remove removes a key from the cache.
// Return true if the key was contained.
Remove(key interface{}) (isFound bool)

// Contains checks if a key exists in cache
// Without updating the recent-ness.
Contains(key interface{}) (ok bool)

// Keys returns a slice of the keys in the cache, from oldest to newest.
Keys() []interface{}

// Len returns the number of items in the cache.
Len() int

// Purge clears all cache entries.
Purge()
}
88 changes: 88 additions & 0 deletions core/freq_params_traffic/cache/concurrent_lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cache

import (
"sync"
)

// LruCacheMap use LRU strategy to cache the most frequently accessed hotspot parameter
type LruCacheMap struct {
// Not thread safe
lru *LRU
lock *sync.RWMutex
}

func (c *LruCacheMap) Add(key interface{}, value *int64) {
c.lock.Lock()
defer c.lock.Unlock()

c.lru.Add(key, value)
return
}

func (c *LruCacheMap) AddIfAbsent(key interface{}, value *int64) (priorValue *int64) {
c.lock.Lock()
defer c.lock.Unlock()
val := c.lru.AddIfAbsent(key, value)
if val == nil {
return nil
}
priorValue = val.(*int64)
return
}

func (c *LruCacheMap) Get(key interface{}) (value *int64, isFound bool) {
c.lock.Lock()
defer c.lock.Unlock()

val, found := c.lru.Get(key)
if found {
return val.(*int64), true
}
return nil, false
}

func (c *LruCacheMap) Remove(key interface{}) (isFound bool) {
c.lock.Lock()
defer c.lock.Unlock()

return c.lru.Remove(key)
}

func (c *LruCacheMap) Contains(key interface{}) (ok bool) {
c.lock.RLock()
defer c.lock.RUnlock()

return c.lru.Contains(key)
}

func (c *LruCacheMap) Keys() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()

return c.lru.Keys()
}

func (c *LruCacheMap) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()

return c.lru.Len()
}

func (c *LruCacheMap) Purge() {
c.lock.Lock()
defer c.lock.Unlock()

c.lru.Purge()
}

func NewLRUCacheMap(size int) ConcurrentCounterCache {
lru, err := NewLRU(size, nil)
if err != nil {
return nil
}
return &LruCacheMap{
lru: lru,
lock: new(sync.RWMutex),
}
}
63 changes: 63 additions & 0 deletions core/freq_params_traffic/cache/concurrent_lru_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package cache

import (
"strconv"
"testing"
)

const CacheSize = 50000

func Benchmark_LRU_AddIfAbsent(b *testing.B) {
c := NewLRUCacheMap(CacheSize)
for a := 1; a <= CacheSize; a++ {
val := new(int64)
*val = int64(a)
c.Add(strconv.Itoa(a), val)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 1000; j <= 1001; j++ {
newVal := new(int64)
*newVal = int64(j)
prior := c.AddIfAbsent(strconv.Itoa(j), newVal)
if *prior != int64(j) {
b.Fatal("error!")
}
}
}
}

func Benchmark_LRU_Add(b *testing.B) {
c := NewLRUCacheMap(CacheSize)
for a := 1; a <= CacheSize; a++ {
val := new(int64)
*val = int64(a)
c.Add(strconv.Itoa(a), val)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := CacheSize - 100; j <= CacheSize-99; j++ {
newVal := new(int64)
*newVal = int64(j)
c.Add(strconv.Itoa(j), newVal)
}
}
}

func Benchmark_LRU_Get(b *testing.B) {
c := NewLRUCacheMap(CacheSize)
for a := 1; a <= CacheSize; a++ {
val := new(int64)
*val = int64(a)
c.Add(strconv.Itoa(a), val)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := CacheSize - 100; j <= CacheSize-99; j++ {
val, found := c.Get(strconv.Itoa(j))
if *val != int64(j) || !found {
b.Fatal("error")
}
}
}
}
96 changes: 96 additions & 0 deletions core/freq_params_traffic/cache/concurrent_lru_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package cache

import (
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func Test_concurrentLruCounterCacheMap_Add_Get(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add_Get", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Len() == 100)
val, found := c.Get("1")
assert.True(t, found && *val == 1)
})
}

func Test_concurrentLruCounterCacheMap_AddIfAbsent(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_AddIfAbsent", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 99; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
v := int64(100)
prior := c.AddIfAbsent("100", &v)
assert.True(t, prior == nil)
v2 := int64(1000)
prior2 := c.AddIfAbsent("100", &v2)
assert.True(t, *prior2 == 100)
})
}

func Test_concurrentLruCounterCacheMap_Contains(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Contains", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Contains("100") == true)
assert.True(t, c.Contains("1") == true)
assert.True(t, c.Contains("101") == false)

val := int64(101)
c.Add(strconv.Itoa(int(val)), &val)
assert.True(t, c.Contains("1") == false)
})
}

func Test_concurrentLruCounterCacheMap_Keys(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, len(c.Keys()) == 100)
assert.True(t, c.Keys()[0] == "1")
assert.True(t, c.Keys()[99] == "100")
})
}

func Test_concurrentLruCounterCacheMap_Purge(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Len() == 100)
c.Purge()
assert.True(t, c.Len() == 0)
})
}

func Test_concurrentLruCounterCacheMap_Remove(t *testing.T) {
t.Run("Test_concurrentLruCounterCacheMap_Add", func(t *testing.T) {
c := NewLRUCacheMap(100)
for i := 1; i <= 100; i++ {
val := int64(i)
c.Add(strconv.Itoa(i), &val)
}
assert.True(t, c.Len() == 100)

c.Remove("100")
assert.True(t, c.Len() == 99)
val, existed := c.Get("100")
assert.True(t, existed == false && val == nil)
})
}
Loading