Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split isolation pkg to implement concurrency traffic control #247

Merged
merged 2 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/slot_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/alibaba/sentinel-golang/core/circuitbreaker"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/core/hotspot"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/core/log"
"github.com/alibaba/sentinel-golang/core/stat"
"github.com/alibaba/sentinel-golang/core/system"
Expand All @@ -30,6 +31,7 @@ func BuildDefaultSlotChain() *base.SlotChain {
sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})
sc.AddRuleCheckSlotLast(&system.AdaptiveSlot{})
sc.AddRuleCheckSlotLast(&flow.Slot{})
sc.AddRuleCheckSlotLast(&isolation.Slot{})
sc.AddRuleCheckSlotLast(&circuitbreaker.Slot{})
sc.AddRuleCheckSlotLast(&hotspot.Slot{})
sc.AddStatSlotLast(&stat.Slot{})
Expand Down
3 changes: 3 additions & 0 deletions core/base/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type BlockType uint8
const (
BlockTypeUnknown BlockType = iota
BlockTypeFlow
BlockTypeIsolation
BlockTypeCircuitBreaking
BlockTypeSystemFlow
BlockTypeHotSpotParamFlow
Expand All @@ -20,6 +21,8 @@ func (t BlockType) String() string {
return "Unknown"
case BlockTypeFlow:
return "FlowControl"
case BlockTypeIsolation:
return "BlockTypeIsolation"
case BlockTypeCircuitBreaking:
return "CircuitBreaking"
case BlockTypeSystemFlow:
Expand Down
45 changes: 45 additions & 0 deletions core/isolation/rule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package isolation

import (
"encoding/json"
"fmt"
)

// MetricType represents the target metric type.
type MetricType int32

const (
// Concurrency represents concurrency count.
Concurrency MetricType = iota
)

func (s MetricType) String() string {
switch s {
case Concurrency:
return "Concurrency"
default:
return "Undefined"
}
}

// Rule describes the concurrency num control, that is similar to semaphore
type Rule struct {
// ID represents the unique ID of the rule (optional).
ID string `json:"id,omitempty"`
Resource string `json:"resource"`
MetricType MetricType `json:"metricType"`
Threshold uint32 `json:"threshold"`
}

func (r *Rule) String() string {
b, err := json.Marshal(r)
if err != nil {
// Return the fallback string
return fmt.Sprintf("{Id=%s, Resource=%s, MetricType=%s, Threshold=%d}", r.ID, r.Resource, r.MetricType.String(), r.Threshold)
}
return string(b)
}

func (r *Rule) ResourceName() string {
return r.Resource
}
136 changes: 136 additions & 0 deletions core/isolation/rule_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package isolation

import (
"encoding/json"
"sync"

"github.com/alibaba/sentinel-golang/logging"
"github.com/alibaba/sentinel-golang/util"
"github.com/pkg/errors"
)

var (
ruleMap = make(map[string][]*Rule)
rwMux = &sync.RWMutex{}
)

func LoadRules(rules []*Rule) (updated bool, err error) {
updated = true
err = nil
louyuting marked this conversation as resolved.
Show resolved Hide resolved

m := make(map[string][]*Rule)
for _, r := range rules {
if e := IsValid(r); e != nil {
logging.Error(e, "invalid isolation rule.", "rule", r)
continue
}
resRules, ok := m[r.Resource]
if !ok {
resRules = make([]*Rule, 0, 1)
}
m[r.Resource] = append(resRules, r)
}

start := util.CurrentTimeNano()
rwMux.Lock()
defer func() {
rwMux.Unlock()
logging.Debug("time statistic(ns) for updating isolation rule", "timeCost", util.CurrentTimeNano()-start)
logRuleUpdate(m)
}()
ruleMap = m
return
}

func ClearRules() error {
_, err := LoadRules(nil)
return err
}

func GetRules() []Rule {
louyuting marked this conversation as resolved.
Show resolved Hide resolved
rules := getRules()
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}

func GetRulesOfResource(res string) []Rule {
rules := getRulesOfResource(res)
ret := make([]Rule, 0, len(rules))
for _, rule := range rules {
ret = append(ret, *rule)
}
return ret
}

func getRules() []*Rule {
rwMux.RLock()
defer rwMux.RUnlock()

return rulesFrom(ruleMap)
}

func getRulesOfResource(res string) []*Rule {
rwMux.RLock()
defer rwMux.RUnlock()

resRules, exist := ruleMap[res]
if !exist {
return nil
}
ret := make([]*Rule, 0, len(resRules))
louyuting marked this conversation as resolved.
Show resolved Hide resolved
for _, r := range resRules {
ret = append(ret, r)
}
return ret
}

func rulesFrom(m map[string][]*Rule) []*Rule {
rules := make([]*Rule, 0)
if len(m) == 0 {
return rules
}
for _, rs := range m {
for _, r := range rs {
if r != nil {
rules = append(rules, r)
}
}
}
return rules
}

func logRuleUpdate(m map[string][]*Rule) {
bs, err := json.Marshal(rulesFrom(m))
if err != nil {
if len(m) == 0 {
logging.Info("[IsolationRuleManager] Isolation rules were cleared")
} else {
logging.Info("[IsolationRuleManager] Isolation rules were loaded")
}
} else {
if len(m) == 0 {
logging.Info("[IsolationRuleManager] Isolation rules were cleared")
} else {
logging.Info("[IsolationRuleManager] Isolation rules were loaded", "rules", string(bs))
}
}
}

func IsValid(r *Rule) error {
if r == nil {
return errors.New("nil isolation rule")
}
if len(r.Resource) == 0 {
return errors.New("empty resource of isolation rule")
}
if r.MetricType != Concurrency {
return errors.Errorf("unsupported metric type: %d", r.MetricType)
}
if r.Threshold == 0 {
return errors.New("zero threshold")
}
return nil
}
39 changes: 39 additions & 0 deletions core/isolation/rule_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package isolation

import (
"testing"

"github.com/alibaba/sentinel-golang/logging"
"github.com/stretchr/testify/assert"
)

func TestLoadRules(t *testing.T) {
t.Run("TestLoadRules_1", func(t *testing.T) {
logging.SetGlobalLoggerLevel(logging.DebugLevel)
r1 := &Rule{
Resource: "abc1",
MetricType: Concurrency,
Threshold: 100,
}
r2 := &Rule{
Resource: "abc2",
MetricType: Concurrency,
Threshold: 200,
}
r3 := &Rule{
Resource: "abc3",
MetricType: MetricType(1),
Threshold: 200,
}
_, err := LoadRules([]*Rule{r1, r2, r3})
assert.True(t, err == nil)
assert.True(t, len(ruleMap) == 2)
assert.True(t, len(ruleMap["abc1"]) == 1)
assert.True(t, ruleMap["abc1"][0] == r1)
assert.True(t, len(ruleMap["abc2"]) == 1)
assert.True(t, ruleMap["abc2"][0] == r2)

err = ClearRules()
assert.True(t, err == nil)
})
}
47 changes: 47 additions & 0 deletions core/isolation/slot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package isolation

import (
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/logging"
"github.com/pkg/errors"
)

type Slot struct {
}

func (s *Slot) Check(ctx *base.EntryContext) *base.TokenResult {
resource := ctx.Resource.Name()
result := ctx.RuleCheckResult
if len(resource) == 0 {
return result
}
if passed, rule, snapshot := checkPass(ctx); !passed {
if result == nil {
result = base.NewTokenResultBlockedWithCause(base.BlockTypeIsolation, "", rule, snapshot)
} else {
result.ResetToBlockedWithCause(base.BlockTypeIsolation, "", rule, snapshot)
}
}
return result
}

func checkPass(ctx *base.EntryContext) (bool, *Rule, uint32) {
statNode := ctx.StatNode
acquireCount := ctx.Input.AcquireCount
curCount := uint32(0)
for _, rule := range getRulesOfResource(ctx.Resource.Name()) {
threshold := rule.Threshold
if rule.MetricType == Concurrency {
if cur := statNode.CurrentGoroutineNum(); cur >= 0 {
curCount = uint32(cur)
} else {
curCount = 0
logging.Error(errors.New("negative concurrency"), "", "rule", rule)
}
if curCount+acquireCount > threshold {
return false, rule, curCount
}
}
}
return true, nil, curCount
louyuting marked this conversation as resolved.
Show resolved Hide resolved
}
54 changes: 54 additions & 0 deletions example/isolation/concurrency_limitation_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package main

import (
"math/rand"
"os"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/core/isolation"
"github.com/alibaba/sentinel-golang/logging"
)

func main() {
cfg := config.NewDefaultConfig()
cfg.Sentinel.Log.Logger = logging.NewConsoleLogger()
cfg.Sentinel.Log.Metric.FlushIntervalSec = 0
cfg.Sentinel.Stat.System.CollectIntervalMs = 0
err := sentinel.InitWithConfig(cfg)
if err != nil {
logging.Error(err, "fail")
os.Exit(1)
}
logging.SetGlobalLoggerLevel(logging.DebugLevel)
ch := make(chan struct{})

r1 := &isolation.Rule{
Resource: "abc",
MetricType: isolation.Concurrency,
Threshold: 12,
}
_, err = isolation.LoadRules([]*isolation.Rule{r1})
if err != nil {
logging.Error(err, "fail")
os.Exit(1)
}

for i := 0; i < 15; i++ {
go func() {
for {
e, b := sentinel.Entry("abc", sentinel.WithAcquireCount(1))
if b != nil {
logging.Info("blocked", "reason", b.BlockType().String(), "rule", b.TriggeredRule(), "snapshot", b.TriggeredValue())
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
} else {
logging.Info("passed")
time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond)
e.Exit()
}
}
}()
}
<-ch
}