Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split isolation pkg to implement concurrency traffic control #247

Merged
merged 2 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/hotspot"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/core/log"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/core/system"
Expand All @@ -30,6 +31,7 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})
sc.AddRuleCheckSlotLast(&system.AdaptiveSlot{})
sc.AddRuleCheckSlotLast(&flow.Slot{})
sc.AddRuleCheckSlotLast(&isolation.Slot{})
sc.AddRuleCheckSlotLast(&circuitbreaker.Slot{})
sc.AddRuleCheckSlotLast(&hotspot.Slot{})
sc.AddStatSlotLast(&stat.Slot{})
Expand Down
3 changes: 3 additions & 0 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type BlockType uint8
const (
BlockTypeUnknown BlockType = iota
BlockTypeFlow
BlockTypeIsolation
BlockTypeCircuitBreaking
BlockTypeSystemFlow
BlockTypeHotSpotParamFlow
Expand All @@ -20,6 +21,8 @@ func (t BlockType) String() string {
return "Unknown"
case BlockTypeFlow:
return "FlowControl"
case BlockTypeIsolation:
return "BlockTypeIsolation"
case BlockTypeCircuitBreaking:
return "CircuitBreaking"
case BlockTypeSystemFlow:
Expand Down
3 changes: 3 additions & 0 deletions core/isolation/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package isolation implements the concurrency traffic control.

package isolation
45 changes: 45 additions & 0 deletions core/isolation/rule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package isolation

import (
"encoding/json"
"fmt"
)

// MetricType represents the target metric type.
type MetricType int32

const (
// Concurrency represents concurrency count.
Concurrency MetricType = iota
)

func (s MetricType) String() string {
switch s {
case Concurrency:
return "Concurrency"
default:
return "Undefined"
}
}

// Rule describes the concurrency num control, that is similar to semaphore
type Rule struct {
// ID represents the unique ID of the rule (optional).
ID string `json:"id,omitempty"`
Resource string `json:"resource"`
MetricType MetricType `json:"metricType"`
Threshold uint32 `json:"threshold"`
}

func (r *Rule) String() string {
b, err := json.Marshal(r)
if err != nil {
// Return the fallback string
return fmt.Sprintf("{Id=%s, Resource=%s, MetricType=%s, Threshold=%d}", r.ID, r.Resource, r.MetricType.String(), r.Threshold)
}
return string(b)
}

func (r *Rule) ResourceName() string {
return r.Resource
}
147 changes: 147 additions & 0 deletions core/isolation/rule_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package isolation

import (
"encoding/json"
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

var (
ruleMap = make(map[string][]*Rule)
rwMux = &sync.RWMutex{}
)

// LoadRules loads the given isolation rules to the rule manager, while all previous rules will be replaced.
func LoadRules(rules []*Rule) (updated bool, err error) {
updated = true
err = nil
louyuting marked this conversation as resolved.
Show resolved Hide resolved

m := make(map[string][]*Rule)
for _, r := range rules {
if e := IsValid(r); e != nil {
logging.Error(e, "invalid isolation rule.", "rule", r)
continue
}
resRules, ok := m[r.Resource]
if !ok {
resRules = make([]*Rule, 0, 1)
}
m[r.Resource] = append(resRules, r)
}

start := util.CurrentTimeNano()
rwMux.Lock()
defer func() {
rwMux.Unlock()
logging.Debug("time statistic(ns) for updating isolation rule", "timeCost", util.CurrentTimeNano()-start)
logRuleUpdate(m)
}()
ruleMap = m
return
}

// ClearRules clears all the rules in isolation module.
func ClearRules() error {
_, err := LoadRules(nil)
return err
}

// GetRules returns all the rules based on copy.
// It doesn't take effect for isolation module if user changes the rule.
func GetRules() []Rule {
louyuting marked this conversation as resolved.
Show resolved Hide resolved
rules := getRules()
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}

// GetRulesOfResource returns specific resource's rules based on copy.
// It doesn't take effect for isolation module if user changes the rule.
func GetRulesOfResource(res string) []Rule {
rules := getRulesOfResource(res)
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}

// getRules returns all the rules。Any changes of rules take effect for isolation module
// getRules is an internal interface.
func getRules() []*Rule {
rwMux.RLock()
defer rwMux.RUnlock()

return rulesFrom(ruleMap)
}

// getRulesOfResource returns specific resource's rules。Any changes of rules take effect for isolation module
// getRulesOfResource is an internal interface.
func getRulesOfResource(res string) []*Rule {
rwMux.RLock()
defer rwMux.RUnlock()

resRules, exist := ruleMap[res]
if !exist {
return nil
}
ret := make([]*Rule, 0, len(resRules))
louyuting marked this conversation as resolved.
Show resolved Hide resolved
for _, r := range resRules {
ret = append(ret, r)
}
return ret
}

func rulesFrom(m map[string][]*Rule) []*Rule {
rules := make([]*Rule, 0)
if len(m) == 0 {
return rules
}
for _, rs := range m {
for _, r := range rs {
if r != nil {
rules = append(rules, r)
}
}
}
return rules
}

func logRuleUpdate(m map[string][]*Rule) {
bs, err := json.Marshal(rulesFrom(m))
if err != nil {
if len(m) == 0 {
logging.Info("[IsolationRuleManager] Isolation rules were cleared")
} else {
logging.Info("[IsolationRuleManager] Isolation rules were loaded")
}
} else {
if len(m) == 0 {
logging.Info("[IsolationRuleManager] Isolation rules were cleared")
} else {
logging.Info("[IsolationRuleManager] Isolation rules were loaded", "rules", string(bs))
}
}
}

// IsValidRule checks whether the given Rule is valid.
func IsValid(r *Rule) error {
if r == nil {
return errors.New("nil isolation rule")
}
if len(r.Resource) == 0 {
return errors.New("empty resource of isolation rule")
}
if r.MetricType != Concurrency {
return errors.Errorf("unsupported metric type: %d", r.MetricType)
}
if r.Threshold == 0 {
return errors.New("zero threshold")
}
return nil
}
39 changes: 39 additions & 0 deletions core/isolation/rule_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package isolation

import (
"testing"

"github.com/alibaba/sentinel-golang/logging"
"github.com/stretchr/testify/assert"
)

func TestLoadRules(t *testing.T) {
t.Run("TestLoadRules_1", func(t *testing.T) {
logging.SetGlobalLoggerLevel(logging.DebugLevel)
r1 := &Rule{
Resource: "abc1",
MetricType: Concurrency,
Threshold: 100,
}
r2 := &Rule{
Resource: "abc2",
MetricType: Concurrency,
Threshold: 200,
}
r3 := &Rule{
Resource: "abc3",
MetricType: MetricType(1),
Threshold: 200,
}
_, err := LoadRules([]*Rule{r1, r2, r3})
assert.True(t, err == nil)
assert.True(t, len(ruleMap) == 2)
assert.True(t, len(ruleMap["abc1"]) == 1)
assert.True(t, ruleMap["abc1"][0] == r1)
assert.True(t, len(ruleMap["abc2"]) == 1)
assert.True(t, ruleMap["abc2"][0] == r2)

err = ClearRules()
assert.True(t, err == nil)
})
}
47 changes: 47 additions & 0 deletions core/isolation/slot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package isolation

import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)

type Slot struct {
}

func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
resource := ctx.Resource.Name()
result := ctx.RuleCheckResult
if len(resource) == 0 {
return result
}
if passed, rule, snapshot := checkPass(ctx); !passed {
if result == nil {
result = base.NewTokenResultBlockedWithCause(base.BlockTypeIsolation, "", rule, snapshot)
} else {
result.ResetToBlockedWithCause(base.BlockTypeIsolation, "", rule, snapshot)
}
}
return result
}

func checkPass(ctx *base.EntryContext) (bool, *Rule, uint32) {
statNode := ctx.StatNode
acquireCount := ctx.Input.AcquireCount
curCount := uint32(0)
for _, rule := range getRulesOfResource(ctx.Resource.Name()) {
threshold := rule.Threshold
if rule.MetricType == Concurrency {
if cur := statNode.CurrentGoroutineNum(); cur >= 0 {
curCount = uint32(cur)
} else {
curCount = 0
logging.Error(errors.New("negative concurrency"), "", "rule", rule)
}
if curCount+acquireCount > threshold {
return false, rule, curCount
}
}
}
return true, nil, curCount
louyuting marked this conversation as resolved.
Show resolved Hide resolved
}
54 changes: 54 additions & 0 deletions example/isolation/concurrency_limitation_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"math/rand"
"os"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/logging"
)

func main() {
cfg := config.NewDefaultConfig()
cfg.Sentinel.Log.Logger = logging.NewConsoleLogger()
cfg.Sentinel.Log.Metric.FlushIntervalSec = 0
cfg.Sentinel.Stat.System.CollectIntervalMs = 0
err := sentinel.InitWithConfig(cfg)
if err != nil {
logging.Error(err, "fail")
os.Exit(1)
}
logging.SetGlobalLoggerLevel(logging.DebugLevel)
ch := make(chan struct{})

r1 := &isolation.Rule{
Resource: "abc",
MetricType: isolation.Concurrency,
Threshold: 12,
}
_, err = isolation.LoadRules([]*isolation.Rule{r1})
if err != nil {
logging.Error(err, "fail")
os.Exit(1)
}

for i := 0; i < 15; i++ {
go func() {
for {
e, b := sentinel.Entry("abc", sentinel.WithAcquireCount(1))
if b != nil {
logging.Info("blocked", "reason", b.BlockType().String(), "rule", b.TriggeredRule(), "snapshot", b.TriggeredValue())
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
logging.Info("passed")
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
e.Exit()
}
}
}()
}
<-ch
}