Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[177] Add "warm-up" control behavior support #190

Merged
merged 13 commits into from
Aug 21, 2020
1 change: 1 addition & 0 deletions core/base/stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (

type ReadStat interface {
GetQPS(event MetricEvent) float64
GetPreviousQPS(event MetricEvent) float64
GetSum(event MetricEvent) int64

MinRT() float64
Expand Down
5 changes: 5 additions & 0 deletions core/base/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ func (m *StatNodeMock) GetQPS(event MetricEvent) float64 {
return float64(args.Int(0))
}

func (m *StatNodeMock) GetPreviousQPS(event MetricEvent) float64 {
args := m.Called(event)
return float64(args.Int(0))
binbin0325 marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *StatNodeMock) GetMaxAvg(event MetricEvent) float64 {
args := m.Called(event)
return float64(args.Int(0))
Expand Down
1 change: 1 addition & 0 deletions core/config/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ const (
DefaultMetricLogSingleFileMaxSize uint64 = 1024 * 1024 * 50
DefaultMetricLogMaxFileAmount uint32 = 8
DefaultSystemStatCollectIntervalMs uint32 = 1000
DefaultWarmUpColdFactor uint32 = 3
)
5 changes: 3 additions & 2 deletions core/flow/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ type FlowRule struct {
WarmUpPeriodSec uint32
MaxQueueingTimeMs uint32
// ClusterMode indicates whether the rule is for cluster flow control or local.
ClusterMode bool
ClusterConfig ClusterRuleConfig
ClusterMode bool
ClusterConfig ClusterRuleConfig
WarmUpColdFactor uint32
}

func (f *FlowRule) String() string {
Expand Down
3 changes: 3 additions & 0 deletions core/flow/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func init() {
tcGenFuncMap[Throttling] = func(rule *FlowRule) *TrafficShapingController {
return NewTrafficShapingController(NewDefaultTrafficShapingCalculator(rule.Count), NewThrottlingChecker(rule.MaxQueueingTimeMs), rule)
}
tcGenFuncMap[WarmUp] = func(rule *FlowRule) *TrafficShapingController {
return NewTrafficShapingController(NewWarmUpTrafficShapingCalculator(rule.WarmUpPeriodSec, rule.WarmUpColdFactor, rule.Count), NewDefaultTrafficShapingChecker(rule), rule)
}
}

func logRuleUpdate(m TrafficControllerMap) {
Expand Down
101 changes: 101 additions & 0 deletions core/flow/warm_up.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package flow
binbin0325 marked this conversation as resolved.
Show resolved Hide resolved

import (
"math"
"sync/atomic"

"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/config"
"github.com/alibaba/sentinel-golang/util"
)

type WarmUpTrafficShapingCalculator struct {
threshold float64
warmUpPeriodInSec uint32
coldFactor uint32
warningToken uint64
maxToken uint64
slope float64
storedTokens *uint64
lastFilledTime *uint64
}

func NewWarmUpTrafficShapingCalculator(warmUpPeriodInSec, warmUpColdFactor uint32, threshold float64) *WarmUpTrafficShapingCalculator {
if warmUpColdFactor <= 1 {
warmUpColdFactor = config.DefaultWarmUpColdFactor
}

warningToken := uint64((float64(warmUpPeriodInSec) * threshold) / float64(warmUpColdFactor-1))

maxToken := warningToken + uint64(2*float64(warmUpPeriodInSec)*threshold/float64(1.0+warmUpColdFactor))

slope := float64(warmUpColdFactor-1.0) / threshold / float64(maxToken-warningToken)

warmUpTrafficShapingCalculator := &WarmUpTrafficShapingCalculator{
warmUpPeriodInSec: warmUpPeriodInSec,
coldFactor: warmUpColdFactor,
warningToken: warningToken,
maxToken: maxToken,
slope: slope,
threshold: threshold,
storedTokens: new(uint64),
lastFilledTime: new(uint64),
}

return warmUpTrafficShapingCalculator
}

func (d *WarmUpTrafficShapingCalculator) CalculateAllowedTokens(node base.StatNode, acquireCount uint32, flag int32) float64 {
previousQps := node.GetPreviousQPS(base.MetricEventPass)
d.syncToken(previousQps)

restToken := atomic.LoadUint64(d.storedTokens)
if restToken >= d.warningToken {
aboveToken := restToken - d.warningToken
warningQps := math.Nextafter(1.0/(float64(aboveToken)*d.slope+1.0/d.threshold), math.MaxFloat64)
return warningQps
} else {
return d.threshold
}
}

func (d *WarmUpTrafficShapingCalculator) syncToken(passQps float64) {
currentTime := util.CurrentTimeMillis()
currentTime = currentTime - currentTime%1000

oldLastFillTime := atomic.LoadUint64(d.lastFilledTime)
if currentTime <= oldLastFillTime {
return
}

oldValue := atomic.LoadUint64(d.storedTokens)
newValue := d.coolDownTokens(currentTime, passQps)

if atomic.CompareAndSwapUint64(d.storedTokens, oldValue, newValue) {
if currentValue := atomic.AddUint64(d.storedTokens, uint64(0-passQps)); currentValue < 0 {
atomic.StoreUint64(d.storedTokens, 0)
}
atomic.StoreUint64(d.lastFilledTime, currentTime)
}
}

func (d *WarmUpTrafficShapingCalculator) coolDownTokens(currentTime uint64, passQps float64) uint64 {
oldValue := atomic.LoadUint64(d.storedTokens)
newValue := oldValue

// Prerequisites for adding a token:
// When token consumption is much lower than the warning line
if oldValue < d.warningToken {
newValue = uint64(float64(oldValue) + (float64(currentTime)-float64(atomic.LoadUint64(d.lastFilledTime)))*d.threshold/1000)
} else if oldValue > d.warningToken {
if passQps < float64(uint32(d.threshold)/d.coldFactor) {
newValue = uint64(float64(oldValue) + float64(currentTime-atomic.LoadUint64(d.lastFilledTime))*d.threshold/1000)
}
}

if newValue <= d.maxToken {
return newValue
} else {
return d.maxToken
}
}
4 changes: 4 additions & 0 deletions core/stat/base/sliding_window_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ func (m *SlidingWindowMetric) GetQPS(event base.MetricEvent) float64 {
return m.getQPSWithTime(util.CurrentTimeMillis(), event)
}

func (m *SlidingWindowMetric) GetPreviousQPS(event base.MetricEvent) float64 {
return m.getQPSWithTime(util.CurrentTimeMillis()-uint64(m.bucketLengthInMs), event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial design of the method is for getting the metric of last second (i.e. elapsed, "stable" second). Here we got the latest 1 sec (with current bucket, i.e. unstable bucket). We may discuss it later.

}

func (m *SlidingWindowMetric) getQPSWithTime(now uint64, event base.MetricEvent) float64 {
return float64(m.getSumWithTime(now, event)) / m.getIntervalInSecond()
}
Expand Down
4 changes: 4 additions & 0 deletions core/stat/base_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ func (n *BaseStatNode) GetQPS(event base.MetricEvent) float64 {
return n.metric.GetQPS(event)
}

func (n *BaseStatNode) GetPreviousQPS(event base.MetricEvent) float64 {
return n.metric.GetPreviousQPS(event)
}

func (n *BaseStatNode) GetSum(event base.MetricEvent) int64 {
return n.metric.GetSum(event)
}
Expand Down
94 changes: 94 additions & 0 deletions example/warm_up/qps_warm_up_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package main

import (
"fmt"
"log"
"math/rand"
"sync/atomic"
"time"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/flow"
"github.com/alibaba/sentinel-golang/util"
)

type Counter struct {
pass *int64
block *int64
total *int64
}

var routineCount = 30

func main() {
counter := Counter{pass: new(int64), block: new(int64), total: new(int64)}
// We should initialize Sentinel first.
err := sentinel.InitDefault()
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
}

_, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "some-test",
MetricType: flow.QPS,
Count: 100,
ControlBehavior: flow.WarmUp,
WarmUpPeriodSec: 10,
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
go timerTask(&counter)
ch := make(chan struct{})
//warmUp task
for i := 0; i < 3; i++ {
go Task(&counter)
}
time.Sleep(3 * time.Second)
//sentinel task
for i := 0; i < routineCount; i++ {
go Task(&counter)
}
<-ch
}

func Task(counter *Counter) {
for {
atomic.AddInt64(counter.total, 1)
e, b := sentinel.Entry("some-test", sentinel.WithTrafficType(base.Inbound))
if b != nil {
atomic.AddInt64(counter.block, 1)
} else {
// Be sure the entry is exited finally.
e.Exit()
atomic.AddInt64(counter.pass, 1)
}
time.Sleep(time.Duration(rand.Uint64()%50) * time.Millisecond)
}
}

func timerTask(counter *Counter) {
fmt.Println("begin to statistic!!!")
var (
oldTotal, oldPass, oldBlock int64
)
for {
time.Sleep(1 * time.Second)
globalTotal := atomic.LoadInt64(counter.total)
oneSecondTotal := globalTotal - oldTotal
oldTotal = globalTotal

globalPass := atomic.LoadInt64(counter.pass)
oneSecondPass := globalPass - oldPass
oldPass = globalPass

globalBlock := atomic.LoadInt64(counter.block)
oneSecondBlock := globalBlock - oldBlock
oldBlock = globalBlock
fmt.Println(util.CurrentTimeMillis()/1000, "total:", oneSecondTotal, " pass:", oneSecondPass, " block:", oneSecondBlock)
}
}
36 changes: 36 additions & 0 deletions example/warm_up/qps_warm_up_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package main

import (
"log"
"testing"

"github.com/alibaba/sentinel-golang/core/flow"

sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
)

func Benchmark_qps(b *testing.B) {
binbin0325 marked this conversation as resolved.
Show resolved Hide resolved
// We should initialize Sentinel first.
err := sentinel.InitDefault()
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
}

_, err = flow.LoadRules([]*flow.FlowRule{
{
Resource: "some-test",
MetricType: flow.QPS,
Count: 100,
ControlBehavior: flow.WarmUp,
WarmUpPeriodSec: 10,
},
})
if err != nil {
log.Fatalf("Unexpected error: %+v", err)
return
}
for i := 0; i < b.N; i++ {
sentinel.Entry("some-test", sentinel.WithTrafficType(base.Inbound))
binbin0325 marked this conversation as resolved.
Show resolved Hide resolved
}
}
7 changes: 7 additions & 0 deletions example/warm_up/sentinel.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: "v1"
sentinel:
app:
name: sentinel-go-demo
log:
metric:
maxFileCount: 7