Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: replace MaxToken by BurstLimit #5931

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pkg/mcs/resource_manager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ResourceGroup struct {
Mode rmpb.GroupMode `json:"mode"`
// RU settings
RUSettings *RequestUnitSettings `json:"r_u_settings,omitempty"`
// Native resource settings
// raw resource settings
RawResourceSettings *RawResourceSettings `json:"raw_resource_settings,omitempty"`
}

Expand Down Expand Up @@ -90,7 +90,7 @@ func (rg *ResourceGroup) CheckAndInit() error {
rg.RUSettings = &RequestUnitSettings{}
}
if rg.RawResourceSettings != nil {
return errors.New("invalid resource group settings, RU mode should not set resource settings")
return errors.New("invalid resource group settings, RU mode should not set raw resource settings")
}
}
if rg.Mode == rmpb.GroupMode_RawMode {
Expand Down Expand Up @@ -167,7 +167,7 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
func (rg *ResourceGroup) RequestRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings == nil {
if rg.RUSettings == nil || rg.RUSettings.RU.Settings == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check whether rg.RUSettings.RU is nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rg.RUSettings.RU is not a pointer

return nil
}
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs)
Expand All @@ -184,7 +184,7 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
Name: rg.Name,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: rg.RUSettings.RU.TokenBucket,
RU: rg.RUSettings.RU.GetTokenBucket(),
},
}
return group
Expand All @@ -193,9 +193,9 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
Name: rg.Name,
Mode: rmpb.GroupMode_RawMode,
RawResourceSettings: &rmpb.GroupRawResourceSettings{
Cpu: rg.RawResourceSettings.CPU.TokenBucket,
IoRead: rg.RawResourceSettings.IOReadBandwidth.TokenBucket,
IoWrite: rg.RawResourceSettings.IOWriteBandwidth.TokenBucket,
Cpu: rg.RawResourceSettings.CPU.GetTokenBucket(),
IoRead: rg.RawResourceSettings.IOReadBandwidth.GetTokenBucket(),
IoWrite: rg.RawResourceSettings.IOWriteBandwidth.GetTokenBucket(),
},
}
return group
Expand Down
14 changes: 7 additions & 7 deletions pkg/mcs/resource_manager/server/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ func TestPatchResourceGroup(t *testing.T) {
expectJSONString string
}{
{`{"name":"test", "mode":1, "r_u_settings": {"r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"ru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"ru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
`{"name":"test","mode":1,"r_u_settings":{"ru":{"settings":{"fill_rate":200000},"state":{"initialized":false}}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_u":{"settings":{"fill_rate": 200000, "burst_limit": -1}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"ru":{"settings":{"fill_rate":200000,"burst_limit":-1},"state":{"initialized":false}}}}`},
}

for _, ca := range testCaseRU {
Expand All @@ -43,11 +43,11 @@ func TestPatchResourceGroup(t *testing.T) {
expectJSONString string
}{
{`{"name":"test", "mode":2, "raw_resource_settings": {"cpu":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"initialized":false}}}`},
{`{"name":"test", "mode":2, "raw_resource_settings": {"io_read":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_write_bandwidth":{"initialized":false}}}`},
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"settings":{"fill_rate":200000},"state":{"initialized":false}},"io_read_bandwidth":{"state":{"initialized":false}},"io_write_bandwidth":{"state":{"initialized":false}}}}`},
{`{"name":"test", "mode":2, "raw_resource_settings": {"io_read":{"settings":{"fill_rate": 200000,"burst_limit":1000000}}}}`,
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"state":{"initialized":false}},"io_read_bandwidth":{"settings":{"fill_rate":200000,"burst_limit":1000000},"state":{"initialized":false}},"io_write_bandwidth":{"state":{"initialized":false}}}}`},
{`{"name":"test", "mode":2, "raw_resource_settings": {"io_write":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
`{"name":"test","mode":2,"raw_resource_settings":{"cpu":{"state":{"initialized":false}},"io_read_bandwidth":{"state":{"initialized":false}},"io_write_bandwidth":{"settings":{"fill_rate":200000},"state":{"initialized":false}}}}`},
}

for _, ca := range testCaseResource {
Expand Down
73 changes: 47 additions & 26 deletions pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
const (
defaultRefillRate = 10000
defaultInitialTokens = 10 * 10000
defaultMaxTokens = 1e7
)

const (
Expand All @@ -34,41 +33,60 @@ const (
)

// GroupTokenBucket is a token bucket for a resource group.
// TODO: statistics consumption @JmPotato
// Now we don't save consumption in `GroupTokenBucket`, only statistics it in prometheus.
type GroupTokenBucket struct {
*rmpb.TokenBucket `json:"token_bucket,omitempty"`
// Settings is the setting of TokenBucket.
// BurstLimit is used as below:
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within a unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within a unlimited capacity).
// - If b > 0, that means the limiter is limited capacity. (current not used).
// MaxTokens limits the number of tokens that can be accumulated
MaxTokens float64 `json:"max_tokens,omitempty"`
Settings *rmpb.TokenLimitSettings `json:"settings,omitempty"`
GroupTokenBucketState `json:"state,omitempty"`
}

Consumption *rmpb.Consumption `json:"consumption,omitempty"`
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
// GroupTokenBucketState is the running state of TokenBucket.
type GroupTokenBucketState struct {
Tokens float64 `json:"tokens,omitempty"`
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
}

// NewGroupTokenBucket returns a new GroupTokenBucket
func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) GroupTokenBucket {
if tokenBucket == nil || tokenBucket.Settings == nil {
return GroupTokenBucket{}
}
return GroupTokenBucket{
TokenBucket: tokenBucket,
MaxTokens: defaultMaxTokens,
Settings: tokenBucket.Settings,
GroupTokenBucketState: GroupTokenBucketState{
Tokens: tokenBucket.Tokens,
},
}
}

// GetTokenBucket returns the grpc protoc struct of GroupTokenBucket.
func (t *GroupTokenBucket) GetTokenBucket() *rmpb.TokenBucket {
if t.Settings == nil {
return nil
}
return &rmpb.TokenBucket{
Settings: t.Settings,
Tokens: t.Tokens,
}
}

// patch patches the token bucket settings.
func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) {
if settings == nil {
func (t *GroupTokenBucket) patch(tb *rmpb.TokenBucket) {
if tb == nil {
return
}
tb := proto.Clone(t.TokenBucket).(*rmpb.TokenBucket)
if settings.GetSettings() != nil {
if tb == nil {
tb = &rmpb.TokenBucket{}
}
tb.Settings = settings.GetSettings()
if setting := proto.Clone(tb.GetSettings()).(*rmpb.TokenLimitSettings); setting != nil {
t.Settings = setting
}

// the settings in token is delta of the last update and now.
tb.Tokens += settings.GetTokens()
t.TokenBucket = tb
t.Tokens += tb.GetTokens()
}

// init initializes the group token bucket.
Expand All @@ -79,10 +97,6 @@ func (t *GroupTokenBucket) init(now time.Time) {
if t.Tokens < defaultInitialTokens {
t.Tokens = defaultInitialTokens
}
// TODO: If we support init or modify MaxTokens in the future, we can move following code.
if t.Tokens > t.MaxTokens {
t.MaxTokens = t.Tokens
}
t.LastUpdate = &now
t.Initialized = true
}
Expand All @@ -97,13 +111,20 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
t.Tokens += float64(t.Settings.FillRate) * delta.Seconds()
t.LastUpdate = &now
}
if t.Tokens > t.MaxTokens {
t.Tokens = t.MaxTokens
}
if t.Settings.BurstLimit != 0 {
if burst := float64(t.Settings.BurstLimit); t.Tokens > burst {
t.Tokens = burst
}
}

var res rmpb.TokenBucket
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: t.GetSettings().GetBurstLimit()}
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: t.Settings.GetBurstLimit()}
// If BurstLimit is -1, just return.
if res.Settings.BurstLimit < 0 {
res.Tokens = neededTokens
return &res, 0
}
// FillRate is used for the token server unavailable in abnormal situation.
if neededTokens <= 0 {
return &res, 0
Expand Down
8 changes: 4 additions & 4 deletions tests/mcs/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
modifySettings func(*rmpb.ResourceGroup)
}{
{"test1", rmpb.GroupMode_RUMode, true, true,
`{"name":"test1","mode":1,"r_u_settings":{"ru":{"token_bucket":{"settings":{"fill_rate":10000}},"initialized":false}}}`,
`{"name":"test1","mode":1,"r_u_settings":{"ru":{"settings":{"fill_rate":10000},"state":{"initialized":false}}}}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand All @@ -449,7 +449,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
},

{"test2", rmpb.GroupMode_RUMode, true, true,
`{"name":"test2","mode":1,"r_u_settings":{"ru":{"token_bucket":{"settings":{"fill_rate":20000}},"initialized":false}}}`,
`{"name":"test2","mode":1,"r_u_settings":{"ru":{"settings":{"fill_rate":20000},"state":{"initialized":false}}}}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand All @@ -461,7 +461,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
},
},
{"test2", rmpb.GroupMode_RUMode, false, true,
`{"name":"test2","mode":1,"r_u_settings":{"ru":{"token_bucket":{"settings":{"fill_rate":30000}},"initialized":false}}}`,
`{"name":"test2","mode":1,"r_u_settings":{"ru":{"settings":{"fill_rate":30000},"state":{"initialized":false}}}}`,
func(gs *rmpb.ResourceGroup) {
gs.RUSettings = &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Expand All @@ -485,7 +485,7 @@ func (suite *resourceManagerClientTestSuite) TestBasicResourceGroupCURD() {
},
},
{"test3", rmpb.GroupMode_RawMode, false, true,
`{"name":"test3","mode":2,"raw_resource_settings":{"cpu":{"token_bucket":{"settings":{"fill_rate":1000000}},"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"initialized":false}}}`,
`{"name":"test3","mode":2,"raw_resource_settings":{"cpu":{"settings":{"fill_rate":1000000},"state":{"initialized":false}},"io_read_bandwidth":{"state":{"initialized":false}},"io_write_bandwidth":{"state":{"initialized":false}}}}`,
func(gs *rmpb.ResourceGroup) {
gs.RawResourceSettings = &rmpb.GroupRawResourceSettings{
Cpu: &rmpb.TokenBucket{
Expand Down