Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: hotspot rule add paramKey; #376

Merged
merged 5 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions core/hotspot/concurrency_stat_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@ func (s *ConcurrencyStatSlot) Order() uint32 {

func (c *ConcurrencyStatSlot) OnEntryPassed(ctx *base.EntryContext) {
res := ctx.Resource.Name()
args := ctx.Input.Args
tcs := getTrafficControllersFor(res)
for _, tc := range tcs {
if tc.BoundRule().MetricType != Concurrency {
continue
}
arg := matchArg(tc, args)
arg := tc.ExtractArgs(ctx)
if arg == nil {
continue
}
Expand All @@ -72,13 +71,12 @@ func (c *ConcurrencyStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError

func (c *ConcurrencyStatSlot) OnCompleted(ctx *base.EntryContext) {
res := ctx.Resource.Name()
args := ctx.Input.Args
tcs := getTrafficControllersFor(res)
for _, tc := range tcs {
if tc.BoundRule().MetricType != Concurrency {
continue
}
arg := matchArg(tc, args)
arg := tc.ExtractArgs(ctx)
if arg == nil {
continue
}
Expand Down
10 changes: 7 additions & 3 deletions core/hotspot/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ type Rule struct {
// if ParamIndex is great than or equals to zero, ParamIndex means the <ParamIndex>-th parameter
// if ParamIndex is the negative, ParamIndex means the reversed <ParamIndex>-th parameter
ParamIndex int `json:"paramIndex"`
// ParamKey is the key in EntryContext.Input.Attachments map.
// ParamKey can be used as a supplement to ParamIndex to facilitate rules to quickly obtain parameter from a large number of parameters
// ParamKey is mutually exclusive with ParamIndex, ParamKey has the higher priority than ParamIndex
ParamKey string `json:"paramKey"`
cafra marked this conversation as resolved.
Show resolved Hide resolved
// Threshold is the threshold to trigger rejection
Threshold int64 `json:"threshold"`
// MaxQueueingTimeMs only takes effect when ControlBehavior is Throttling and MetricType is QPS
Expand All @@ -100,8 +104,8 @@ func (r *Rule) String() string {
b, err := json.Marshal(r)
if err != nil {
// Return the fallback string
return fmt.Sprintf("{Id:%s, Resource:%s, MetricType:%+v, ControlBehavior:%+v, ParamIndex:%d, Threshold:%d, MaxQueueingTimeMs:%d, BurstCount:%d, DurationInSec:%d, ParamsMaxCapacity:%d, SpecificItems:%+v}",
r.ID, r.Resource, r.MetricType, r.ControlBehavior, r.ParamIndex, r.Threshold, r.MaxQueueingTimeMs, r.BurstCount, r.DurationInSec, r.ParamsMaxCapacity, r.SpecificItems)
return fmt.Sprintf("{Id:%s, Resource:%s, MetricType:%+v, ControlBehavior:%+v, ParamIndex:%d, ParamKey:%s, Threshold:%d, MaxQueueingTimeMs:%d, BurstCount:%d, DurationInSec:%d, ParamsMaxCapacity:%d, SpecificItems:%+v}",
r.ID, r.Resource, r.MetricType, r.ControlBehavior, r.ParamIndex, r.ParamKey, r.Threshold, r.MaxQueueingTimeMs, r.BurstCount, r.DurationInSec, r.ParamsMaxCapacity, r.SpecificItems)
}
return string(b)
}
Expand All @@ -116,7 +120,7 @@ func (r *Rule) IsStatReusable(newRule *Rule) bool {

// Equals checks whether current rule is consistent with the given rule.
func (r *Rule) Equals(newRule *Rule) bool {
baseCheck := r.Resource == newRule.Resource && r.MetricType == newRule.MetricType && r.ControlBehavior == newRule.ControlBehavior && r.ParamsMaxCapacity == newRule.ParamsMaxCapacity && r.ParamIndex == newRule.ParamIndex && r.Threshold == newRule.Threshold && r.DurationInSec == newRule.DurationInSec && reflect.DeepEqual(r.SpecificItems, newRule.SpecificItems)
baseCheck := r.Resource == newRule.Resource && r.MetricType == newRule.MetricType && r.ControlBehavior == newRule.ControlBehavior && r.ParamsMaxCapacity == newRule.ParamsMaxCapacity && r.ParamIndex == newRule.ParamIndex && r.ParamKey == newRule.ParamKey && r.Threshold == newRule.Threshold && r.DurationInSec == newRule.DurationInSec && reflect.DeepEqual(r.SpecificItems, newRule.SpecificItems)
if !baseCheck {
return false
}
Expand Down
6 changes: 3 additions & 3 deletions core/hotspot/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,12 @@ func IsValidRule(rule *Rule) error {
if rule.ControlBehavior < 0 {
return errors.New("invalid control strategy")
}
if rule.ParamIndex < 0 {
return errors.New("invalid param index")
}
if rule.MetricType == QPS && rule.DurationInSec <= 0 {
return errors.New("invalid duration")
}
if rule.ParamIndex > 0 && rule.ParamKey != "" {
louyuting marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("invalid param index and param key are mutually exclusive")
}
return checkControlBehaviorField(rule)
}

Expand Down
5 changes: 4 additions & 1 deletion core/hotspot/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ func Test_Rule_String(t *testing.T) {
MetricType: Concurrency,
ControlBehavior: Reject,
ParamIndex: 0,
ParamKey: "key",
Threshold: 110.0,
MaxQueueingTimeMs: 5,
BurstCount: 10,
DurationInSec: 1,
ParamsMaxCapacity: 10000,
SpecificItems: specific,
}
assert.True(t, fmt.Sprintf("%+v", []*Rule{r}) == "[{Id:abc, Resource:abc, MetricType:Concurrency, ControlBehavior:Reject, ParamIndex:0, Threshold:110, MaxQueueingTimeMs:5, BurstCount:10, DurationInSec:1, ParamsMaxCapacity:10000, SpecificItems:map[1123:3 sss:1]}]")
assert.True(t, fmt.Sprintf("%+v", []*Rule{r}) == "[{Id:abc, Resource:abc, MetricType:Concurrency, ControlBehavior:Reject, ParamIndex:0, ParamKey:key, Threshold:110, MaxQueueingTimeMs:5, BurstCount:10, DurationInSec:1, ParamsMaxCapacity:10000, SpecificItems:map[1123:3 sss:1]}]")
})
}

Expand All @@ -61,6 +62,7 @@ func Test_Rule_Equals(t *testing.T) {
MetricType: Concurrency,
ControlBehavior: Reject,
ParamIndex: 0,
ParamKey: "testKey",
Threshold: 110.0,
MaxQueueingTimeMs: 5,
BurstCount: 10,
Expand All @@ -78,6 +80,7 @@ func Test_Rule_Equals(t *testing.T) {
MetricType: Concurrency,
ControlBehavior: Reject,
ParamIndex: 0,
ParamKey: "testKey",
Threshold: 110.0,
MaxQueueingTimeMs: 5,
BurstCount: 10,
Expand Down
29 changes: 1 addition & 28 deletions core/hotspot/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package hotspot

import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
)

Expand All @@ -40,40 +39,14 @@ func (s *Slot) Order() uint32 {
return RuleCheckSlotOrder
}

// matchArg matches the arg from args based on TrafficShapingController
// return nil if match failed.
func matchArg(tc TrafficShapingController, args []interface{}) interface{} {
if tc == nil {
return nil
}
idx := tc.BoundParamIndex()
if idx < 0 {
idx = len(args) + idx
}
if idx < 0 {
if logging.DebugEnabled() {
logging.Debug("[Slot matchArg] The param index of hotspot traffic shaping controller is invalid", "args", args, "paramIndex", tc.BoundParamIndex())
}
return nil
}
if idx >= len(args) {
if logging.DebugEnabled() {
logging.Debug("[Slot matchArg] The argument in index doesn't exist", "args", args, "paramIndex", tc.BoundParamIndex())
}
return nil
}
return args[idx]
}

func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
res := ctx.Resource.Name()
args := ctx.Input.Args
batch := int64(ctx.Input.BatchCount)

result := ctx.RuleCheckResult
tcs := getTrafficControllersFor(res)
for _, tc := range tcs {
arg := matchArg(tc, args)
arg := tc.ExtractArgs(ctx)
if arg == nil {
continue
}
Expand Down
57 changes: 4 additions & 53 deletions core/hotspot/slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
package hotspot

import (
"reflect"
"testing"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

Expand Down Expand Up @@ -52,53 +48,8 @@ func (m *TrafficShapingControllerMock) Replace(r *Rule) {
return
}

func Test_matchArg(t *testing.T) {
t.Run("Test_matchArg", func(t *testing.T) {
args := make([]interface{}, 10)
args[0] = 1
args[1] = 2
args[2] = 3
args[3] = 4
args[4] = 5
args[5] = 6
args[6] = 7
args[7] = 8
args[8] = 9
args[9] = 10

tcMock := &TrafficShapingControllerMock{}
tcMock.On("BoundParamIndex").Return(0)
ret0 := matchArg(tcMock, args)
assert.True(t, reflect.DeepEqual(ret0, 1))

tcMock1 := &TrafficShapingControllerMock{}
tcMock1.On("BoundParamIndex").Return(5)
ret1 := matchArg(tcMock1, args)
assert.True(t, reflect.DeepEqual(ret1, 6))

tcMock2 := &TrafficShapingControllerMock{}
tcMock2.On("BoundParamIndex").Return(9)
ret2 := matchArg(tcMock2, args)
assert.True(t, reflect.DeepEqual(ret2, 10))

tcMock3 := &TrafficShapingControllerMock{}
tcMock3.On("BoundParamIndex").Return(-1)
ret3 := matchArg(tcMock3, args)
assert.True(t, reflect.DeepEqual(ret3, 10))

tcMock4 := &TrafficShapingControllerMock{}
tcMock4.On("BoundParamIndex").Return(-10)
ret4 := matchArg(tcMock4, args)
assert.True(t, reflect.DeepEqual(ret4, 1))

tcMock5 := &TrafficShapingControllerMock{}
tcMock5.On("BoundParamIndex").Return(10)
ret5 := matchArg(tcMock5, args)
assert.True(t, ret5 == nil)

tcMock6 := &TrafficShapingControllerMock{}
tcMock6.On("BoundParamIndex").Return(-11)
ret6 := matchArg(tcMock6, args)
assert.True(t, ret6 == nil)
})
func (m *TrafficShapingControllerMock) ExtractArgs(ctx *base.EntryContext) []interface{} {
_ = m.Called()
ret := []interface{}{ctx.Input.Args[m.BoundParamIndex()]}
return ret
}
70 changes: 70 additions & 0 deletions core/hotspot/traffic_shaping.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type TrafficShapingController interface {

BoundParamIndex() int

ExtractArgs(ctx *base.EntryContext) interface{}

BoundMetric() *ParamsMetric

BoundRule() *Rule
Expand All @@ -44,6 +46,7 @@ type baseTrafficShapingController struct {
res string
metricType MetricType
paramIndex int
paramKey string
threshold int64
specificItems map[interface{}]int64
durationInSec int64
Expand All @@ -60,6 +63,7 @@ func newBaseTrafficShapingControllerWithMetric(r *Rule, metric *ParamsMetric) *b
res: r.Resource,
metricType: r.MetricType,
paramIndex: r.ParamIndex,
paramKey: r.ParamKey,
threshold: r.Threshold,
specificItems: r.SpecificItems,
durationInSec: r.DurationInSec,
Expand Down Expand Up @@ -155,6 +159,72 @@ func (c *baseTrafficShapingController) BoundParamIndex() int {
return c.paramIndex
}

// ExtractArgs matches the arg from ctx based on TrafficShapingController
// return nil if match failed.
func (c *baseTrafficShapingController) ExtractArgs(ctx *base.EntryContext) (value interface{}) {
if c == nil {
return nil
}
value = c.extractAttachmentArgs(ctx)
if value != nil {
return
}
value = c.extractArgs(ctx)
if value != nil {
return
}
return
}
func (c *baseTrafficShapingController) extractArgs(ctx *base.EntryContext) interface{} {
args := ctx.Input.Args
idx := c.BoundParamIndex()
if idx < 0 {
idx = len(args) + idx
}
if idx < 0 {
if logging.DebugEnabled() {
logging.Debug("[extractArgs] The param index of hotspot traffic shaping controller is invalid",
"args", args, "paramIndex", c.BoundParamIndex())
}
return nil
}
if idx >= len(args) {
if logging.DebugEnabled() {
logging.Debug("[extractArgs] The argument in index doesn't exist",
"args", args, "paramIndex", c.BoundParamIndex())
}
return nil
}
return args[idx]
}
func (c *baseTrafficShapingController) extractAttachmentArgs(ctx *base.EntryContext) interface{} {
attachments := ctx.Input.Attachments

if attachments == nil {
if logging.DebugEnabled() {
logging.Debug("[paramKey] The attachments of ctx is nil",
"args", attachments, "paramKey", c.paramKey)
}
return nil
}
if c.paramKey == "" {
if logging.DebugEnabled() {
logging.Debug("[paramKey] The param key is nil",
"args", attachments, "paramKey", c.paramKey)
}
return nil
}
arg, ok := attachments[c.paramKey]
if !ok {
if logging.DebugEnabled() {
logging.Debug("[paramKey] extracted data does not exist",
"args", attachments, "paramKey", c.paramKey)
}
}

return arg
}

func (c *rejectTrafficShapingController) PerformChecking(arg interface{}, batchCount int64) *base.TokenResult {
metric := c.metric
if metric == nil {
Expand Down
53 changes: 53 additions & 0 deletions core/hotspot/traffic_shaping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package hotspot

import (
"reflect"
"sync/atomic"
"testing"
"time"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -321,3 +323,54 @@ func Test_newBaseTrafficShapingController(t *testing.T) {
assert.True(t, tc.metric.RuleTokenCounter.Len() == ParamsMaxCapacity)
})
}

func Test_baseTrafficShapingController_ExtractArgs(t *testing.T) {
t.Run("Test_baseTrafficShapingController_ExtractArgs", func(t *testing.T) {

c := &baseTrafficShapingController{}

args := make([]interface{}, 10)
attachments := make(map[interface{}]interface{})
ctx := base.NewEmptyEntryContext()
ctx.Input = &base.SentinelInput{
BatchCount: 1,
Flag: 0,
Args: args,
Attachments: attachments,
}
// no data
ret := c.ExtractArgs(ctx)
assert.Nil(t, ret)

// set data
args[0] = 1
args[1] = 2
value1 := "v1"
attachments["test1"] = value1

// set index or key
// exist
c.paramIndex = 0
c.paramKey = "test1"
ret = c.ExtractArgs(ctx)
assert.True(t, reflect.DeepEqual(ret, value1), ret)

// part exist 1
c.paramIndex = 10
c.paramKey = "test1"
ret = c.ExtractArgs(ctx)
assert.True(t, reflect.DeepEqual(ret, value1), ret)

// part exist 2
c.paramIndex = 1
c.paramKey = "test2"
ret = c.ExtractArgs(ctx)
assert.True(t, reflect.DeepEqual(ret, 2), ret)

// not exist
c.paramIndex = 3
c.paramKey = "test2"
ret = c.ExtractArgs(ctx)
assert.Nil(t, ret)
})
}
Loading