From ca83b1dc15983b3907445a5f2d50ac351aa48414 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 29 Dec 2022 20:12:54 +0800 Subject: [PATCH 01/10] impl token request Signed-off-by: Cabinfever_B --- .../resource_manager/server/grpc_service.go | 51 ++++++++++- .../resource_manager/server/token_bukets.go | 85 +++++++++++++++++-- pkg/mcs/resource_manager/server/types.go | 61 +++++++++---- 3 files changed, 174 insertions(+), 23 deletions(-) diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index a0f9c1c5c38..51a9ff405db 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -16,12 +16,16 @@ package server import ( "context" + "io" "net/http" + "time" "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/mcs/registry" "github.com/tikv/pd/server" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -125,5 +129,50 @@ func (s *Service) ModifyResourceGroup(ctx context.Context, req *rmpb.PutResource // AcquireTokenBuckets implements ResourceManagerServer.AcquireTokenBuckets. func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBucketsServer) error { - return errors.New("Not implemented") + for { + select { + case <-s.ctx.Done(): + return errors.New("server closed") + default: + } + request, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + targetPeriodMs := request.GetTargetRequestPeriodMs() + resps := &rmpb.TokenBucketsResponse{} + for _, req := range request.Requests { + rg := s.manager.GetResourceGroup(req.ResourceGroupName) + if rg == nil { + return errors.New("resource group not found") + } + now := time.Now() + resp := &rmpb.TokenBucketResponse{ + ResourceGroupName: rg.Name, + } + switch rg.Mode { + case rmpb.GroupMode_RUMode: + for _, re := range req.GetRuItems().GetRequestRU() { + switch re.Type { + case rmpb.RequestUnitType_RRU: + rg.UpdateRRU(now) + tokens := rg.RequestRRU(float64(re.Value), targetPeriodMs) + resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) + case rmpb.RequestUnitType_WRU: + rg.UpdateWRU(now) + tokens := rg.RequestWRU(float64(re.Value), targetPeriodMs) + resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) + } + } + case rmpb.GroupMode_NativeMode: + return errors.New("not supports the resource type") + } + log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName)) + resps.Responses = append(resps.Responses, resp) + } + stream.Send(resps) + } } diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 00d4a3fe04f..d8b85c52dce 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -25,12 +25,30 @@ const defaultRefillRate = 10000 const defaultInitialTokens = 10 * 10000 +const defaultMaxTokens = 1e7 + +const defaultLoanMaxPeriod = 10 * time.Second + +var loanReserveRatio float64 = 0.05 + // GroupTokenBucket is a token bucket for a resource group. +// TODO: statistics Consumption type GroupTokenBucket struct { *rmpb.TokenBucket `json:"token_bucket,omitempty"` Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` LastUpdate *time.Time `json:"last_update,omitempty"` Initialized bool `json:"initialized"` + LoanExpireTime *time.Time `json:"loan_time,omitempty"` + LoanMaxPeriod time.Duration `json:"loan_max_perio,omitempty"` + MaxTokens float64 `json:"max_tokens,omitempty"` +} + +func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) GroupTokenBucket { + return GroupTokenBucket{ + TokenBucket: tokenBucket, + MaxTokens: defaultMaxTokens, + LoanMaxPeriod: defaultLoanMaxPeriod, + } } // patch patches the token bucket settings. @@ -51,8 +69,8 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) { t.TokenBucket = tb } -// Update updates the token bucket. -func (t *GroupTokenBucket) Update(now time.Time) { +// update updates the token bucket. +func (t *GroupTokenBucket) update(now time.Time) { if !t.Initialized { t.Settings.Fillrate = defaultRefillRate t.Tokens = defaultInitialTokens @@ -66,12 +84,65 @@ func (t *GroupTokenBucket) Update(now time.Time) { t.Tokens += float64(t.Settings.Fillrate) * delta.Seconds() t.LastUpdate = &now } + if t.Tokens >= 0 { + t.LoanExpireTime = nil + } + if t.Tokens > defaultMaxTokens { + t.Tokens = defaultMaxTokens + } + } -// Request requests tokens from the token bucket. -func (t *GroupTokenBucket) Request( +// request requests tokens from the token bucket. +func (t *GroupTokenBucket) request( neededTokens float64, targetPeriodMs uint64, -) *rmpb.TokenBucket { - // TODO: Implement the token bucket algorithm. - return nil +) (*rmpb.TokenBucket, int64) { + var res rmpb.TokenBucket + res.Settings = &rmpb.TokenLimitSettings{} + // TODO: consider the shares for dispatch the fill rate + res.Settings.Fillrate = 0 + + if neededTokens <= 0 { + return &res, 0 + } + + if t.Tokens >= neededTokens { + t.Tokens -= neededTokens + // granted the total request tokens + res.Tokens = neededTokens + return &res, 0 + } + + var grantedTokens float64 + if t.Tokens > 0 { + grantedTokens = t.Tokens + t.Tokens = 0 + neededTokens -= grantedTokens + } + + now := time.Now() + var periodFilled float64 + var trickleTime int64 = int64(targetPeriodMs) + if t.LoanExpireTime != nil && t.LoanExpireTime.After(now) { + duration := t.LoanExpireTime.Sub(now) + periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * duration.Seconds() + trickleTime = duration.Milliseconds() + } else { + now.Add(t.LoanMaxPeriod) + t.LoanExpireTime = &now + periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * t.LoanMaxPeriod.Seconds() + } + periodFilled += t.Tokens + if periodFilled <= float64(t.Settings.Fillrate)*loanReserveRatio { + periodFilled = float64(t.Settings.Fillrate) * loanReserveRatio + } + if periodFilled >= neededTokens { + grantedTokens += neededTokens + t.Tokens -= neededTokens + } else { + grantedTokens += periodFilled + t.Tokens -= periodFilled + } + res.Tokens = grantedTokens + return &res, trickleTime } diff --git a/pkg/mcs/resource_manager/server/types.go b/pkg/mcs/resource_manager/server/types.go index 72686ad4c94..2f6968fe196 100644 --- a/pkg/mcs/resource_manager/server/types.go +++ b/pkg/mcs/resource_manager/server/types.go @@ -18,6 +18,7 @@ package server import ( "encoding/json" "sync" + "time" "github.com/pingcap/errors" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" @@ -146,27 +147,17 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { case rmpb.GroupMode_RUMode: if settings := group.GetSettings().GetRUSettings(); settings != nil { ruSettings = &RequestUnitSettings{ - RRU: GroupTokenBucket{ - TokenBucket: settings.GetRRU(), - }, - WRU: GroupTokenBucket{ - TokenBucket: settings.GetWRU(), - }, + RRU: NewGroupTokenBucket(settings.GetRRU()), + WRU: NewGroupTokenBucket(settings.GetWRU()), } rg.RUSettings = ruSettings } case rmpb.GroupMode_NativeMode: if settings := group.GetSettings().GetResourceSettings(); settings != nil { resourceSettings = &NativeResourceSettings{ - CPU: GroupTokenBucket{ - TokenBucket: settings.GetCpu(), - }, - IOReadBandwidth: GroupTokenBucket{ - TokenBucket: settings.GetIoRead(), - }, - IOWriteBandwidth: GroupTokenBucket{ - TokenBucket: settings.GetIoWrite(), - }, + CPU: NewGroupTokenBucket(settings.GetCpu()), + IOReadBandwidth: NewGroupTokenBucket(settings.GetIoRead()), + IOWriteBandwidth: NewGroupTokenBucket(settings.GetIoWrite()), } rg.ResourceSettings = resourceSettings } @@ -174,6 +165,46 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { return rg } +// UpdateRRU updates the RRU of the resource group. +func (rg *ResourceGroup) UpdateRRU(now time.Time) { + rg.Lock() + defer rg.Unlock() + if rg.RUSettings != nil { + rg.RUSettings.RRU.update(now) + } +} + +// UpdateWRU updates the WRU of the resource group. +func (rg *ResourceGroup) UpdateWRU(now time.Time) { + rg.Lock() + defer rg.Unlock() + if rg.RUSettings != nil { + rg.RUSettings.WRU.update(now) + } +} + +// RequestRRU requests the RRU of the resource group. +func (rg *ResourceGroup) RequestRRU(neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { + rg.Lock() + defer rg.Unlock() + if rg.RUSettings == nil { + return nil + } + tb, trickleTimeMs := rg.RUSettings.RRU.request(neededTokens, targetPeriodMs) + return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_RRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} +} + +// RequestWRU requests the WRU of the resource group. +func (rg *ResourceGroup) RequestWRU(neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { + rg.Lock() + defer rg.Unlock() + if rg.RUSettings == nil { + return nil + } + tb, trickleTimeMs := rg.RUSettings.WRU.request(neededTokens, targetPeriodMs) + return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_WRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} +} + // IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup. func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup { rg.RLock() From 321c2ad17abdce0fff9698bb8deeb67bfc6043c8 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 3 Jan 2023 17:29:35 +0800 Subject: [PATCH 02/10] impl token request and fix update bug Signed-off-by: Cabinfever_B --- .../resource_manager/server/grpc_service.go | 2 +- .../resource_manager/server/token_bukets.go | 20 ++++++++----------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index 51a9ff405db..9b5d4e395d3 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -159,7 +159,7 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu switch re.Type { case rmpb.RequestUnitType_RRU: rg.UpdateRRU(now) - tokens := rg.RequestRRU(float64(re.Value), targetPeriodMs) + tokens := rg.RequestRRU(re.Value, targetPeriodMs) resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) case rmpb.RequestUnitType_WRU: rg.UpdateWRU(now) diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index d8b85c52dce..1cf3a84781a 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -21,8 +21,6 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" ) -const defaultRefillRate = 10000 - const defaultInitialTokens = 10 * 10000 const defaultMaxTokens = 1e7 @@ -72,8 +70,9 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) { // update updates the token bucket. func (t *GroupTokenBucket) update(now time.Time) { if !t.Initialized { - t.Settings.Fillrate = defaultRefillRate - t.Tokens = defaultInitialTokens + if t.Tokens < defaultInitialTokens { + t.Tokens = defaultInitialTokens + } t.LastUpdate = &now t.Initialized = true return @@ -90,7 +89,6 @@ func (t *GroupTokenBucket) update(now time.Time) { if t.Tokens > defaultMaxTokens { t.Tokens = defaultMaxTokens } - } // request requests tokens from the token bucket. @@ -101,7 +99,6 @@ func (t *GroupTokenBucket) request( res.Settings = &rmpb.TokenLimitSettings{} // TODO: consider the shares for dispatch the fill rate res.Settings.Fillrate = 0 - if neededTokens <= 0 { return &res, 0 } @@ -120,16 +117,15 @@ func (t *GroupTokenBucket) request( neededTokens -= grantedTokens } - now := time.Now() var periodFilled float64 - var trickleTime int64 = int64(targetPeriodMs) - if t.LoanExpireTime != nil && t.LoanExpireTime.After(now) { - duration := t.LoanExpireTime.Sub(now) + var trickleTime = int64(targetPeriodMs) + if t.LoanExpireTime != nil && t.LoanExpireTime.After(*t.LastUpdate) { + duration := t.LoanExpireTime.Sub(*t.LastUpdate) periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * duration.Seconds() trickleTime = duration.Milliseconds() } else { - now.Add(t.LoanMaxPeriod) - t.LoanExpireTime = &now + et := t.LastUpdate.Add(t.LoanMaxPeriod) + t.LoanExpireTime = &et periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * t.LoanMaxPeriod.Seconds() } periodFilled += t.Tokens From 0fc2ca5ae7fc41841c3b63036b9d0a60b61b1c6c Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 3 Jan 2023 17:30:22 +0800 Subject: [PATCH 03/10] add test Signed-off-by: Cabinfever_B impl token request and fix update bug Signed-off-by: Cabinfever_B --- .../resource_manager/server/grpc_service.go | 2 +- .../server/token_buckets_test.go | 85 +++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) create mode 100644 pkg/mcs/resource_manager/server/token_buckets_test.go diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index 9b5d4e395d3..4ad156ecb40 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -163,7 +163,7 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) case rmpb.RequestUnitType_WRU: rg.UpdateWRU(now) - tokens := rg.RequestWRU(float64(re.Value), targetPeriodMs) + tokens := rg.RequestWRU(re.Value, targetPeriodMs) resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) } } diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go new file mode 100644 index 00000000000..d4cb97561a0 --- /dev/null +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -0,0 +1,85 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "math" + "testing" + "time" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/require" +) + +func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { + re := require.New(t) + tbSetting := &rmpb.TokenBucket{ + Tokens: 200000, + Settings: &rmpb.TokenLimitSettings{ + Fillrate: 2000, + BurstLimit: 20000000, + }, + } + + tb := NewGroupTokenBucket(tbSetting) + time1 := time.Now() + tb.update(time1) + re.LessOrEqual(math.Abs(tbSetting.Tokens-tb.Tokens), 1e-7) + re.Equal(tbSetting.Settings.Fillrate, tb.Settings.Fillrate) + + tbSetting = &rmpb.TokenBucket{ + Tokens: -100000, + Settings: &rmpb.TokenLimitSettings{ + Fillrate: 1000, + BurstLimit: 10000000, + }, + } + tb.patch(tbSetting) + + time2 := time.Now() + tb.update(time2) + re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.Fillrate)+1e7) + re.Equal(tbSetting.Settings.Fillrate, tb.Settings.Fillrate) +} + +func TestGroupTokenBucketRequest(t *testing.T) { + re := require.New(t) + tbSetting := &rmpb.TokenBucket{ + Tokens: 200000, + Settings: &rmpb.TokenLimitSettings{ + Fillrate: 2000, + BurstLimit: 20000000, + }, + } + + gtb := NewGroupTokenBucket(tbSetting) + time1 := time.Now() + gtb.update(time1) + tb, trickle := gtb.request(100000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7) + re.Equal(trickle, int64(0)) + tb, trickle = gtb.request(101000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + re.Equal(*gtb.LoanExpireTime, time1.Add(gtb.LoanMaxPeriod)) + time2 := time.Now() + gtb.update(time2) + tb, trickle = gtb.request(100000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-19000*(1-loanReserveRatio)), time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*(1-loanReserveRatio)*float64(tb.Settings.Fillrate)+1e7) + re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) + tb, trickle = gtb.request(2000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(tb.Tokens, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*loanReserveRatio*float64(tb.Settings.Fillrate)+1e7) + re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) +} From ef257250c688ab28303aa8b5b85c4dc1850826ff Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 5 Jan 2023 16:48:51 +0800 Subject: [PATCH 04/10] address comment Signed-off-by: Cabinfever_B --- go.mod | 2 +- go.sum | 2 + .../resource_manager/server/grpc_service.go | 8 ++- pkg/mcs/resource_manager/server/manager.go | 2 +- .../server/token_buckets_test.go | 16 ++--- .../resource_manager/server/token_bukets.go | 60 ++++++++++++------- pkg/mcs/resource_manager/server/types.go | 42 ++++++------- pkg/mcs/resource_manager/server/types_test.go | 39 ++++++------ 8 files changed, 94 insertions(+), 77 deletions(-) diff --git a/go.mod b/go.mod index 23cf2ecbf39..c03b04729ce 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26 + github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d github.com/pingcap/tidb-dashboard v0.0.0-20221201151320-ea3ee6971f2e diff --git a/go.sum b/go.sum index 19c5daac87f..cbc37e05fc2 100644 --- a/go.sum +++ b/go.sum @@ -367,6 +367,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5 github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26 h1:Tw4afZ2Tyr8iT8Oln6/szMjh5IDs+GtlnLsDo/Y2HEE= github.com/pingcap/kvproto v0.0.0-20221221093947-0a9b14f1fc26/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1 h1:jw4NjEiCleRJPPpHM7K6l8OKzOjnZAj62eKteCAY6ro= +github.com/pingcap/kvproto v0.0.0-20230105060948-64890fa4f6c1/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index 4ad156ecb40..0de75b0ab0a 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -147,7 +147,8 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu for _, req := range request.Requests { rg := s.manager.GetResourceGroup(req.ResourceGroupName) if rg == nil { - return errors.New("resource group not found") + log.Warn("resource group not found", zap.String("resource-group", req.ResourceGroupName)) + continue } now := time.Now() resp := &rmpb.TokenBucketResponse{ @@ -167,8 +168,9 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) } } - case rmpb.GroupMode_NativeMode: - return errors.New("not supports the resource type") + case rmpb.GroupMode_RawMode: + log.Warn("not supports the resource type", zap.String("resource-group", req.ResourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)])) + continue } log.Debug("finish token request from", zap.String("resource group", req.ResourceGroupName)) resps.Responses = append(resps.Responses, resp) diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index 5faad4f5f07..ef6fc958649 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -87,7 +87,7 @@ func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error { return errors.New("not exists the group") } newGroup := curGroup.Copy() - err := newGroup.PatchSettings(group.GetSettings()) + err := newGroup.PatchSettings(group) if err != nil { return err } diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go index d4cb97561a0..47e7166d7c7 100644 --- a/pkg/mcs/resource_manager/server/token_buckets_test.go +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -28,7 +28,7 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { tbSetting := &rmpb.TokenBucket{ Tokens: 200000, Settings: &rmpb.TokenLimitSettings{ - Fillrate: 2000, + FillRate: 2000, BurstLimit: 20000000, }, } @@ -37,12 +37,12 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { time1 := time.Now() tb.update(time1) re.LessOrEqual(math.Abs(tbSetting.Tokens-tb.Tokens), 1e-7) - re.Equal(tbSetting.Settings.Fillrate, tb.Settings.Fillrate) + re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) tbSetting = &rmpb.TokenBucket{ Tokens: -100000, Settings: &rmpb.TokenLimitSettings{ - Fillrate: 1000, + FillRate: 1000, BurstLimit: 10000000, }, } @@ -50,8 +50,8 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { time2 := time.Now() tb.update(time2) - re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.Fillrate)+1e7) - re.Equal(tbSetting.Settings.Fillrate, tb.Settings.Fillrate) + re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.FillRate)+1e7) + re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) } func TestGroupTokenBucketRequest(t *testing.T) { @@ -59,7 +59,7 @@ func TestGroupTokenBucketRequest(t *testing.T) { tbSetting := &rmpb.TokenBucket{ Tokens: 200000, Settings: &rmpb.TokenLimitSettings{ - Fillrate: 2000, + FillRate: 2000, BurstLimit: 20000000, }, } @@ -77,9 +77,9 @@ func TestGroupTokenBucketRequest(t *testing.T) { time2 := time.Now() gtb.update(time2) tb, trickle = gtb.request(100000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-19000*(1-loanReserveRatio)), time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*(1-loanReserveRatio)*float64(tb.Settings.Fillrate)+1e7) + re.LessOrEqual(math.Abs(tb.Tokens-19000*(1-loanReserveRatio)), time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*(1-loanReserveRatio)*float64(tb.Settings.FillRate)+1e7) re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) tb, trickle = gtb.request(2000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(tb.Tokens, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*loanReserveRatio*float64(tb.Settings.Fillrate)+1e7) + re.LessOrEqual(tb.Tokens, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*loanReserveRatio*float64(tb.Settings.FillRate)+1e7) re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) } diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 1cf3a84781a..caba0633d5c 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -21,6 +21,8 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" ) +const defaultRefillRate = 10000 + const defaultInitialTokens = 10 * 10000 const defaultMaxTokens = 1e7 @@ -33,14 +35,18 @@ var loanReserveRatio float64 = 0.05 // TODO: statistics Consumption type GroupTokenBucket struct { *rmpb.TokenBucket `json:"token_bucket,omitempty"` - Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` - LastUpdate *time.Time `json:"last_update,omitempty"` - Initialized bool `json:"initialized"` - LoanExpireTime *time.Time `json:"loan_time,omitempty"` - LoanMaxPeriod time.Duration `json:"loan_max_perio,omitempty"` - MaxTokens float64 `json:"max_tokens,omitempty"` + // LoanMaxPeriod represents the maximum loan period, which together with the fill rate determines the loan amount + LoanMaxPeriod time.Duration `json:"loan_max_perio,omitempty"` + // MaxTokens limits the number of tokens that can be accumulated + MaxTokens float64 `json:"max_tokens,omitempty"` + + Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` + LastUpdate *time.Time `json:"last_update,omitempty"` + Initialized bool `json:"initialized"` + LoanExpireTime *time.Time `json:"loan_time,omitempty"` } +// NewGroupTokenBucket returns a new GroupTokenBucket func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) GroupTokenBucket { return GroupTokenBucket{ TokenBucket: tokenBucket, @@ -70,6 +76,9 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) { // update updates the token bucket. func (t *GroupTokenBucket) update(now time.Time) { if !t.Initialized { + if t.Settings.FillRate == 0 { + t.Settings.FillRate = defaultRefillRate + } if t.Tokens < defaultInitialTokens { t.Tokens = defaultInitialTokens } @@ -80,14 +89,14 @@ func (t *GroupTokenBucket) update(now time.Time) { delta := now.Sub(*t.LastUpdate) if delta > 0 { - t.Tokens += float64(t.Settings.Fillrate) * delta.Seconds() + t.Tokens += float64(t.Settings.FillRate) * delta.Seconds() t.LastUpdate = &now } if t.Tokens >= 0 { t.LoanExpireTime = nil } - if t.Tokens > defaultMaxTokens { - t.Tokens = defaultMaxTokens + if t.Tokens > t.MaxTokens { + t.Tokens = t.MaxTokens } } @@ -97,12 +106,13 @@ func (t *GroupTokenBucket) request( ) (*rmpb.TokenBucket, int64) { var res rmpb.TokenBucket res.Settings = &rmpb.TokenLimitSettings{} - // TODO: consider the shares for dispatch the fill rate - res.Settings.Fillrate = 0 + // FillRate is used for the token server unavailable in abnormal situation. + res.Settings.FillRate = 0 if neededTokens <= 0 { return &res, 0 } + // If the current tokens can directly meet the requirement, returns the need token if t.Tokens >= neededTokens { t.Tokens -= neededTokens // granted the total request tokens @@ -110,6 +120,7 @@ func (t *GroupTokenBucket) request( return &res, 0 } + // Firstly allocate the remaining tokens var grantedTokens float64 if t.Tokens > 0 { grantedTokens = t.Tokens @@ -117,20 +128,27 @@ func (t *GroupTokenBucket) request( neededTokens -= grantedTokens } + // Consider using a loan to get tokens var periodFilled float64 - var trickleTime = int64(targetPeriodMs) - if t.LoanExpireTime != nil && t.LoanExpireTime.After(*t.LastUpdate) { - duration := t.LoanExpireTime.Sub(*t.LastUpdate) - periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * duration.Seconds() - trickleTime = duration.Milliseconds() - } else { + var trickleTime = time.Duration(targetPeriodMs) * time.Millisecond + // If the loan has been used, and within the expiration date of the loan, + // We calculate `periodFilled` which is the number of tokens that can be allocated + // according to fillrate, remaining loan time and retention ratio + if t.LoanExpireTime != nil { + if t.LoanExpireTime.After(*t.LastUpdate) { + duration := t.LoanExpireTime.Sub(*t.LastUpdate) + periodFilled = float64(t.Settings.FillRate) * (1 - loanReserveRatio) * duration.Seconds() + trickleTime = duration + } + } else { // Apply for a loan et := t.LastUpdate.Add(t.LoanMaxPeriod) t.LoanExpireTime = &et - periodFilled = float64(t.Settings.Fillrate) * (1 - loanReserveRatio) * t.LoanMaxPeriod.Seconds() + periodFilled = float64(t.Settings.FillRate) * (1 - loanReserveRatio) * t.LoanMaxPeriod.Seconds() } + // have to deduct what resource group already owe periodFilled += t.Tokens - if periodFilled <= float64(t.Settings.Fillrate)*loanReserveRatio { - periodFilled = float64(t.Settings.Fillrate) * loanReserveRatio + if periodFilled <= float64(t.Settings.FillRate)*loanReserveRatio*trickleTime.Seconds() { + periodFilled = float64(t.Settings.FillRate) * loanReserveRatio * trickleTime.Seconds() } if periodFilled >= neededTokens { grantedTokens += neededTokens @@ -140,5 +158,5 @@ func (t *GroupTokenBucket) request( t.Tokens -= periodFilled } res.Tokens = grantedTokens - return &res, trickleTime + return &res, trickleTime.Milliseconds() } diff --git a/pkg/mcs/resource_manager/server/types.go b/pkg/mcs/resource_manager/server/types.go index 2f6968fe196..515b555bff5 100644 --- a/pkg/mcs/resource_manager/server/types.go +++ b/pkg/mcs/resource_manager/server/types.go @@ -82,7 +82,7 @@ func (rg *ResourceGroup) CheckAndInit() error { if len(rg.Name) == 0 || len(rg.Name) > 32 { return errors.New("invalid resource group name, the length should be in [1,32]") } - if rg.Mode != rmpb.GroupMode_RUMode && rg.Mode != rmpb.GroupMode_NativeMode { + if rg.Mode != rmpb.GroupMode_RUMode && rg.Mode != rmpb.GroupMode_RawMode { return errors.New("invalid resource group mode") } if rg.Mode == rmpb.GroupMode_RUMode { @@ -93,7 +93,7 @@ func (rg *ResourceGroup) CheckAndInit() error { return errors.New("invalid resource group settings, RU mode should not set resource settings") } } - if rg.Mode == rmpb.GroupMode_NativeMode { + if rg.Mode == rmpb.GroupMode_RawMode { if rg.ResourceSettings == nil { rg.ResourceSettings = &NativeResourceSettings{} } @@ -107,7 +107,7 @@ func (rg *ResourceGroup) CheckAndInit() error { // PatchSettings patches the resource group settings. // Only used to patch the resource group when updating. // Note: the tokens is the delta value to patch. -func (rg *ResourceGroup) PatchSettings(groupSettings *rmpb.GroupSettings) error { +func (rg *ResourceGroup) PatchSettings(groupSettings *rmpb.ResourceGroup) error { rg.Lock() defer rg.Unlock() if groupSettings.GetMode() != rg.Mode { @@ -120,7 +120,7 @@ func (rg *ResourceGroup) PatchSettings(groupSettings *rmpb.GroupSettings) error } rg.RUSettings.RRU.patch(groupSettings.GetRUSettings().GetRRU()) rg.RUSettings.WRU.patch(groupSettings.GetRUSettings().GetWRU()) - case rmpb.GroupMode_NativeMode: + case rmpb.GroupMode_RawMode: if groupSettings.GetResourceSettings() == nil { return errors.New("invalid resource group settings, native mode should set resource settings") } @@ -141,19 +141,19 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { rg := &ResourceGroup{ Name: group.Name, - Mode: group.Settings.Mode, + Mode: group.Mode, } - switch group.GetSettings().GetMode() { + switch group.GetMode() { case rmpb.GroupMode_RUMode: - if settings := group.GetSettings().GetRUSettings(); settings != nil { + if settings := group.GetRUSettings(); settings != nil { ruSettings = &RequestUnitSettings{ RRU: NewGroupTokenBucket(settings.GetRRU()), WRU: NewGroupTokenBucket(settings.GetWRU()), } rg.RUSettings = ruSettings } - case rmpb.GroupMode_NativeMode: - if settings := group.GetSettings().GetResourceSettings(); settings != nil { + case rmpb.GroupMode_RawMode: + if settings := group.GetResourceSettings(); settings != nil { resourceSettings = &NativeResourceSettings{ CPU: NewGroupTokenBucket(settings.GetCpu()), IOReadBandwidth: NewGroupTokenBucket(settings.GetIoRead()), @@ -213,25 +213,21 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup { case rmpb.GroupMode_RUMode: // RU mode group := &rmpb.ResourceGroup{ Name: rg.Name, - Settings: &rmpb.GroupSettings{ - Mode: rmpb.GroupMode_RUMode, - RUSettings: &rmpb.GroupRequestUnitSettings{ - RRU: rg.RUSettings.RRU.TokenBucket, - WRU: rg.RUSettings.WRU.TokenBucket, - }, + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RRU: rg.RUSettings.RRU.TokenBucket, + WRU: rg.RUSettings.WRU.TokenBucket, }, } return group - case rmpb.GroupMode_NativeMode: // Native mode + case rmpb.GroupMode_RawMode: // Raw mode group := &rmpb.ResourceGroup{ Name: rg.Name, - Settings: &rmpb.GroupSettings{ - Mode: rmpb.GroupMode_NativeMode, - ResourceSettings: &rmpb.GroupResourceSettings{ - Cpu: rg.ResourceSettings.CPU.TokenBucket, - IoRead: rg.ResourceSettings.IOReadBandwidth.TokenBucket, - IoWrite: rg.ResourceSettings.IOWriteBandwidth.TokenBucket, - }, + Mode: rmpb.GroupMode_RawMode, + ResourceSettings: &rmpb.GroupResourceSettings{ + Cpu: rg.ResourceSettings.CPU.TokenBucket, + IoRead: rg.ResourceSettings.IOReadBandwidth.TokenBucket, + IoWrite: rg.ResourceSettings.IOWriteBandwidth.TokenBucket, }, } return group diff --git a/pkg/mcs/resource_manager/server/types_test.go b/pkg/mcs/resource_manager/server/types_test.go index 1e22559f591..ed7750217a2 100644 --- a/pkg/mcs/resource_manager/server/types_test.go +++ b/pkg/mcs/resource_manager/server/types_test.go @@ -17,21 +17,21 @@ func TestPatchResourceGroup(t *testing.T) { patchJSONString string expectJSONString string }{ - {`{"mode":0, "r_u_settings": {"r_r_u":{"settings":{"fillrate": 200000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false},"wru":{"initialized":false}}}`}, - {`{"mode":0, "r_u_settings": {"w_r_u":{"settings":{"fillrate": 200000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false}}}`}, - {`{"mode":0, "r_u_settings": {"w_r_u":{"settings":{"fillrate": 200000, "burst": 100000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false}}}`}, - {`{"mode":0, "r_u_settings": {"r_r_u":{"settings":{"fillrate": 200000, "burst": 100000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false},"wru":{"initialized":false}}}`}, - {`{"mode":0, "r_u_settings": {"r_r_u":{"settings":{"fillrate": 200000, "burst": 100000}}, "w_r_u":{"settings":{"fillrate": 200000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false},"wru":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false}}}`}, + {`{"name":"test", "mode":0, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000}}}}`, + `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`}, + {`{"name":"test", "mode":0, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000}}}}`, + `{"name":"test","mode":0,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, + {`{"name":"test", "mode":0, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`, + `{"name":"test","mode":0,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, + {`{"name":"test", "mode":0, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`, + `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`}, + {`{"name":"test", "mode":0, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}, "w_r_u":{"settings":{"fill_rate": 200000}}}}`, + `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, } for _, ca := range testCaseRU { rg := rg1.Copy() - patch := &rmpb.GroupSettings{} + patch := &rmpb.ResourceGroup{} err := json.Unmarshal([]byte(ca.patchJSONString), patch) re.NoError(err) err = rg.PatchSettings(patch) @@ -41,24 +41,23 @@ func TestPatchResourceGroup(t *testing.T) { re.Equal(ca.expectJSONString, string(res)) } - rg2 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_NativeMode} + rg2 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RawMode} err = rg2.CheckAndInit() re.NoError(err) testCaseResource := []struct { patchJSONString string expectJSONString string }{ - {`{"mode":1, "resource_settings": {"cpu":{"settings":{"fillrate": 200000}}}}`, - `{"name":"test","mode":1,"resource_settings":{"cpu":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"initialized":false}}}`}, - {`{"mode":1, "resource_settings": {"io_read":{"settings":{"fillrate": 200000}}}}`, - `{"name":"test","mode":1,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false},"io_write_bandwidth":{"initialized":false}}}`}, - {`{"mode":1, "resource_settings": {"io_write":{"settings":{"fillrate": 200000}}}}`, - `{"name":"test","mode":1,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"token_bucket":{"settings":{"fillrate":200000}},"initialized":false}}}`}, + {`{"name":"test", "mode":1, "resource_settings": {"cpu":{"settings":{"fill_rate": 200000}}}}`, + `{"name":"test","mode":1,"resource_settings":{"cpu":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"initialized":false}}}`}, + {`{"name":"test", "mode":1, "resource_settings": {"io_read":{"settings":{"fill_rate": 200000}}}}`, + `{"name":"test","mode":1,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_write_bandwidth":{"initialized":false}}}`}, + {`{"name":"test", "mode":1, "resource_settings": {"io_write":{"settings":{"fill_rate": 200000}}}}`, + `{"name":"test","mode":1,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, } - for _, ca := range testCaseResource { rg := rg2.Copy() - patch := &rmpb.GroupSettings{} + patch := &rmpb.ResourceGroup{} err := json.Unmarshal([]byte(ca.patchJSONString), patch) re.NoError(err) err = rg.PatchSettings(patch) From 6a13caba65fb089f4d41b6038fdfa8020de52013 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Thu, 5 Jan 2023 18:48:55 +0800 Subject: [PATCH 05/10] address comment Signed-off-by: Cabinfever_B --- .../server/token_buckets_test.go | 21 ++++--- .../resource_manager/server/token_bukets.go | 60 +++++++------------ 2 files changed, 34 insertions(+), 47 deletions(-) diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go index 47e7166d7c7..f897c589d7c 100644 --- a/pkg/mcs/resource_manager/server/token_buckets_test.go +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -73,13 +73,18 @@ func TestGroupTokenBucketRequest(t *testing.T) { tb, trickle = gtb.request(101000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) - re.Equal(*gtb.LoanExpireTime, time1.Add(gtb.LoanMaxPeriod)) - time2 := time.Now() + tb, trickle = gtb.request(17000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-17000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(4000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-4000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(19000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-18000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + time2 := time1.Add(10 * time.Second) gtb.update(time2) - tb, trickle = gtb.request(100000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-19000*(1-loanReserveRatio)), time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*(1-loanReserveRatio)*float64(tb.Settings.FillRate)+1e7) - re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) - tb, trickle = gtb.request(2000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(tb.Tokens, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Seconds()*loanReserveRatio*float64(tb.Settings.FillRate)+1e7) - re.Equal(trickle, time1.Add(gtb.LoanMaxPeriod).Sub(time2).Milliseconds()) + tb, trickle = gtb.request(20000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-20000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) } diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index caba0633d5c..bcf4356e83c 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -15,6 +15,8 @@ package server import ( + "fmt" + "math" "time" "github.com/gogo/protobuf/proto" @@ -27,31 +29,25 @@ const defaultInitialTokens = 10 * 10000 const defaultMaxTokens = 1e7 -const defaultLoanMaxPeriod = 10 * time.Second - -var loanReserveRatio float64 = 0.05 +var reserveRatio float64 = 0.05 // GroupTokenBucket is a token bucket for a resource group. // TODO: statistics Consumption type GroupTokenBucket struct { *rmpb.TokenBucket `json:"token_bucket,omitempty"` - // LoanMaxPeriod represents the maximum loan period, which together with the fill rate determines the loan amount - LoanMaxPeriod time.Duration `json:"loan_max_perio,omitempty"` // MaxTokens limits the number of tokens that can be accumulated MaxTokens float64 `json:"max_tokens,omitempty"` - Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` - LastUpdate *time.Time `json:"last_update,omitempty"` - Initialized bool `json:"initialized"` - LoanExpireTime *time.Time `json:"loan_time,omitempty"` + Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` + LastUpdate *time.Time `json:"last_update,omitempty"` + Initialized bool `json:"initialized"` } // NewGroupTokenBucket returns a new GroupTokenBucket func NewGroupTokenBucket(tokenBucket *rmpb.TokenBucket) GroupTokenBucket { return GroupTokenBucket{ - TokenBucket: tokenBucket, - MaxTokens: defaultMaxTokens, - LoanMaxPeriod: defaultLoanMaxPeriod, + TokenBucket: tokenBucket, + MaxTokens: defaultMaxTokens, } } @@ -92,9 +88,6 @@ func (t *GroupTokenBucket) update(now time.Time) { t.Tokens += float64(t.Settings.FillRate) * delta.Seconds() t.LastUpdate = &now } - if t.Tokens >= 0 { - t.LoanExpireTime = nil - } if t.Tokens > t.MaxTokens { t.Tokens = t.MaxTokens } @@ -124,39 +117,28 @@ func (t *GroupTokenBucket) request( var grantedTokens float64 if t.Tokens > 0 { grantedTokens = t.Tokens - t.Tokens = 0 neededTokens -= grantedTokens } - // Consider using a loan to get tokens - var periodFilled float64 var trickleTime = time.Duration(targetPeriodMs) * time.Millisecond - // If the loan has been used, and within the expiration date of the loan, - // We calculate `periodFilled` which is the number of tokens that can be allocated - // according to fillrate, remaining loan time and retention ratio - if t.LoanExpireTime != nil { - if t.LoanExpireTime.After(*t.LastUpdate) { - duration := t.LoanExpireTime.Sub(*t.LastUpdate) - periodFilled = float64(t.Settings.FillRate) * (1 - loanReserveRatio) * duration.Seconds() - trickleTime = duration + availableRate := float64(t.Settings.FillRate) + if debt := -t.Tokens; debt > 0 { + fmt.Println(debt) + debt -= float64(t.Settings.FillRate) * trickleTime.Seconds() + if debt > 0 { + debtRate := debt / float64(targetPeriodMs/1000) + availableRate -= debtRate + availableRate = math.Max(availableRate, reserveRatio*float64(t.Settings.FillRate)) } - } else { // Apply for a loan - et := t.LastUpdate.Add(t.LoanMaxPeriod) - t.LoanExpireTime = &et - periodFilled = float64(t.Settings.FillRate) * (1 - loanReserveRatio) * t.LoanMaxPeriod.Seconds() - } - // have to deduct what resource group already owe - periodFilled += t.Tokens - if periodFilled <= float64(t.Settings.FillRate)*loanReserveRatio*trickleTime.Seconds() { - periodFilled = float64(t.Settings.FillRate) * loanReserveRatio * trickleTime.Seconds() } - if periodFilled >= neededTokens { + + consumptionDuration := time.Duration(float64(time.Second) * (neededTokens / availableRate)) + if consumptionDuration <= trickleTime { grantedTokens += neededTokens - t.Tokens -= neededTokens } else { - grantedTokens += periodFilled - t.Tokens -= periodFilled + grantedTokens += availableRate * trickleTime.Seconds() } + t.Tokens -= grantedTokens res.Tokens = grantedTokens return &res, trickleTime.Milliseconds() } From 887d22157d29a5e39fe6684659877a003a77b174 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Fri, 6 Jan 2023 17:19:50 +0800 Subject: [PATCH 06/10] merge master Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/server/types_test.go | 69 ------------------- 1 file changed, 69 deletions(-) delete mode 100644 pkg/mcs/resource_manager/server/types_test.go diff --git a/pkg/mcs/resource_manager/server/types_test.go b/pkg/mcs/resource_manager/server/types_test.go deleted file mode 100644 index ed7750217a2..00000000000 --- a/pkg/mcs/resource_manager/server/types_test.go +++ /dev/null @@ -1,69 +0,0 @@ -package server - -import ( - "encoding/json" - "testing" - - rmpb "github.com/pingcap/kvproto/pkg/resource_manager" - "github.com/stretchr/testify/require" -) - -func TestPatchResourceGroup(t *testing.T) { - re := require.New(t) - rg1 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RUMode} - err := rg1.CheckAndInit() - re.NoError(err) - testCaseRU := []struct { - patchJSONString string - expectJSONString string - }{ - {`{"name":"test", "mode":0, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`}, - {`{"name":"test", "mode":0, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, - {`{"name":"test", "mode":0, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, - {`{"name":"test", "mode":0, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`}, - {`{"name":"test", "mode":0, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}, "w_r_u":{"settings":{"fill_rate": 200000}}}}`, - `{"name":"test","mode":0,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, - } - - for _, ca := range testCaseRU { - rg := rg1.Copy() - patch := &rmpb.ResourceGroup{} - err := json.Unmarshal([]byte(ca.patchJSONString), patch) - re.NoError(err) - err = rg.PatchSettings(patch) - re.NoError(err) - res, err := json.Marshal(rg) - re.NoError(err) - re.Equal(ca.expectJSONString, string(res)) - } - - rg2 := &ResourceGroup{Name: "test", Mode: rmpb.GroupMode_RawMode} - err = rg2.CheckAndInit() - re.NoError(err) - testCaseResource := []struct { - patchJSONString string - expectJSONString string - }{ - {`{"name":"test", "mode":1, "resource_settings": {"cpu":{"settings":{"fill_rate": 200000}}}}`, - `{"name":"test","mode":1,"resource_settings":{"cpu":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"initialized":false}}}`}, - {`{"name":"test", "mode":1, "resource_settings": {"io_read":{"settings":{"fill_rate": 200000}}}}`, - `{"name":"test","mode":1,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"io_write_bandwidth":{"initialized":false}}}`}, - {`{"name":"test", "mode":1, "resource_settings": {"io_write":{"settings":{"fill_rate": 200000}}}}`, - `{"name":"test","mode":1,"resource_settings":{"cpu":{"initialized":false},"io_read_bandwidth":{"initialized":false},"io_write_bandwidth":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`}, - } - for _, ca := range testCaseResource { - rg := rg2.Copy() - patch := &rmpb.ResourceGroup{} - err := json.Unmarshal([]byte(ca.patchJSONString), patch) - re.NoError(err) - err = rg.PatchSettings(patch) - re.NoError(err) - res, err := json.Marshal(rg) - re.NoError(err) - re.Equal(ca.expectJSONString, string(res)) - } -} From d5aa646b6a886fbca213c85ae27964798db8c581 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Mon, 9 Jan 2023 15:50:45 +0800 Subject: [PATCH 07/10] address comment Signed-off-by: Cabinfever_B --- .../resource_manager/server/token_bukets.go | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index bcf4356e83c..280859d412d 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -15,7 +15,6 @@ package server import ( - "fmt" "math" "time" @@ -25,9 +24,10 @@ import ( const defaultRefillRate = 10000 -const defaultInitialTokens = 10 * 10000 - -const defaultMaxTokens = 1e7 +const ( + defaultInitialTokens = 10 * 10000 + defaultMaxTokens = 1e7 +) var reserveRatio float64 = 0.05 @@ -78,6 +78,10 @@ func (t *GroupTokenBucket) update(now time.Time) { if t.Tokens < defaultInitialTokens { t.Tokens = defaultInitialTokens } + // TODO: If we support init or modify MaxTokens in the future, we can move following code. + if t.Tokens > t.MaxTokens { + t.MaxTokens = t.Tokens + } t.LastUpdate = &now t.Initialized = true return @@ -100,7 +104,6 @@ func (t *GroupTokenBucket) request( var res rmpb.TokenBucket res.Settings = &rmpb.TokenLimitSettings{} // FillRate is used for the token server unavailable in abnormal situation. - res.Settings.FillRate = 0 if neededTokens <= 0 { return &res, 0 } @@ -122,8 +125,20 @@ func (t *GroupTokenBucket) request( var trickleTime = time.Duration(targetPeriodMs) * time.Millisecond availableRate := float64(t.Settings.FillRate) + // When there are debt, the allotment will match the fill rate. + // We will have a threshold, beyond which the token allocation will be a minimum. + // the current threshold is fill rate * target period * 2. + // | + // fill rate |· · · · · · · · · + // | · + // | · + // | · + // | · + // reserve rate | · · · · + // | + // rate 0 ----------------------------------------------- + // debt period token 2*period token if debt := -t.Tokens; debt > 0 { - fmt.Println(debt) debt -= float64(t.Settings.FillRate) * trickleTime.Seconds() if debt > 0 { debtRate := debt / float64(targetPeriodMs/1000) From 5fb09c9cefcdbe6175ecac0f4fe63420a6b3f385 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 10 Jan 2023 02:42:10 +0800 Subject: [PATCH 08/10] address comment Signed-off-by: Cabinfever_B --- .../resource_manager/server/grpc_service.go | 6 +- .../resource_manager/server/resource_group.go | 26 +--- .../server/token_buckets_test.go | 18 ++- .../resource_manager/server/token_bukets.go | 135 ++++++++++-------- 4 files changed, 93 insertions(+), 92 deletions(-) diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index 0de75b0ab0a..69c0de2ef2c 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -159,12 +159,10 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu for _, re := range req.GetRuItems().GetRequestRU() { switch re.Type { case rmpb.RequestUnitType_RRU: - rg.UpdateRRU(now) - tokens := rg.RequestRRU(re.Value, targetPeriodMs) + tokens := rg.RequestRRU(now, re.Value, targetPeriodMs) resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) case rmpb.RequestUnitType_WRU: - rg.UpdateWRU(now) - tokens := rg.RequestWRU(re.Value, targetPeriodMs) + tokens := rg.RequestWRU(now, re.Value, targetPeriodMs) resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) } } diff --git a/pkg/mcs/resource_manager/server/resource_group.go b/pkg/mcs/resource_manager/server/resource_group.go index 86d1e83fc54..4a711f3596a 100644 --- a/pkg/mcs/resource_manager/server/resource_group.go +++ b/pkg/mcs/resource_manager/server/resource_group.go @@ -172,43 +172,25 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup { return rg } -// UpdateRRU updates the RRU of the resource group. -func (rg *ResourceGroup) UpdateRRU(now time.Time) { - rg.Lock() - defer rg.Unlock() - if rg.RUSettings != nil { - rg.RUSettings.RRU.update(now) - } -} - -// UpdateWRU updates the WRU of the resource group. -func (rg *ResourceGroup) UpdateWRU(now time.Time) { - rg.Lock() - defer rg.Unlock() - if rg.RUSettings != nil { - rg.RUSettings.WRU.update(now) - } -} - // RequestRRU requests the RRU of the resource group. -func (rg *ResourceGroup) RequestRRU(neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { +func (rg *ResourceGroup) RequestRRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { rg.Lock() defer rg.Unlock() if rg.RUSettings == nil { return nil } - tb, trickleTimeMs := rg.RUSettings.RRU.request(neededTokens, targetPeriodMs) + tb, trickleTimeMs := rg.RUSettings.RRU.request(now, neededTokens, targetPeriodMs) return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_RRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} } // RequestWRU requests the WRU of the resource group. -func (rg *ResourceGroup) RequestWRU(neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { +func (rg *ResourceGroup) RequestWRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket { rg.Lock() defer rg.Unlock() if rg.RUSettings == nil { return nil } - tb, trickleTimeMs := rg.RUSettings.WRU.request(neededTokens, targetPeriodMs) + tb, trickleTimeMs := rg.RUSettings.WRU.request(now, neededTokens, targetPeriodMs) return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_WRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs} } diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go index f897c589d7c..3ca21c0cdc4 100644 --- a/pkg/mcs/resource_manager/server/token_buckets_test.go +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -35,7 +35,7 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { tb := NewGroupTokenBucket(tbSetting) time1 := time.Now() - tb.update(time1) + tb.request(time1, 0, 0) re.LessOrEqual(math.Abs(tbSetting.Tokens-tb.Tokens), 1e-7) re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) @@ -49,7 +49,7 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { tb.patch(tbSetting) time2 := time.Now() - tb.update(time2) + tb.request(time2, 0, 0) re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.FillRate)+1e7) re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) } @@ -66,25 +66,23 @@ func TestGroupTokenBucketRequest(t *testing.T) { gtb := NewGroupTokenBucket(tbSetting) time1 := time.Now() - gtb.update(time1) - tb, trickle := gtb.request(100000, uint64(time.Second)*10/uint64(time.Millisecond)) + tb, trickle := gtb.request(time1, 100000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7) re.Equal(trickle, int64(0)) - tb, trickle = gtb.request(101000, uint64(time.Second)*10/uint64(time.Millisecond)) + tb, trickle = gtb.request(time1, 101000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) - tb, trickle = gtb.request(17000, uint64(time.Second)*10/uint64(time.Millisecond)) + tb, trickle = gtb.request(time1, 17000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-17000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) - tb, trickle = gtb.request(4000, uint64(time.Second)*10/uint64(time.Millisecond)) + tb, trickle = gtb.request(time1, 4000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-4000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) - tb, trickle = gtb.request(19000, uint64(time.Second)*10/uint64(time.Millisecond)) + tb, trickle = gtb.request(time1, 19000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-18000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) time2 := time1.Add(10 * time.Second) - gtb.update(time2) - tb, trickle = gtb.request(20000, uint64(time.Second)*10/uint64(time.Millisecond)) + tb, trickle = gtb.request(time2, 20000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-20000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) } diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 280859d412d..21830e58c46 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -15,7 +15,6 @@ package server import ( - "math" "time" "github.com/gogo/protobuf/proto" @@ -69,38 +68,37 @@ func (t *GroupTokenBucket) patch(settings *rmpb.TokenBucket) { t.TokenBucket = tb } -// update updates the token bucket. -func (t *GroupTokenBucket) update(now time.Time) { - if !t.Initialized { - if t.Settings.FillRate == 0 { - t.Settings.FillRate = defaultRefillRate - } - if t.Tokens < defaultInitialTokens { - t.Tokens = defaultInitialTokens - } - // TODO: If we support init or modify MaxTokens in the future, we can move following code. - if t.Tokens > t.MaxTokens { - t.MaxTokens = t.Tokens - } - t.LastUpdate = &now - t.Initialized = true - return +// init initializes the group token bucket. +func (t *GroupTokenBucket) init(now time.Time) { + if t.Settings.FillRate == 0 { + t.Settings.FillRate = defaultRefillRate } - - delta := now.Sub(*t.LastUpdate) - if delta > 0 { - t.Tokens += float64(t.Settings.FillRate) * delta.Seconds() - t.LastUpdate = &now + if t.Tokens < defaultInitialTokens { + t.Tokens = defaultInitialTokens } + // TODO: If we support init or modify MaxTokens in the future, we can move following code. if t.Tokens > t.MaxTokens { - t.Tokens = t.MaxTokens + t.MaxTokens = t.Tokens } + t.LastUpdate = &now + t.Initialized = true } -// request requests tokens from the token bucket. -func (t *GroupTokenBucket) request( - neededTokens float64, targetPeriodMs uint64, -) (*rmpb.TokenBucket, int64) { +// request requests tokens from the group token bucket. +func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPeriodMs uint64) (*rmpb.TokenBucket, int64) { + if !t.Initialized { + t.init(now) + } else { + delta := now.Sub(*t.LastUpdate) + if delta > 0 { + t.Tokens += float64(t.Settings.FillRate) * delta.Seconds() + t.LastUpdate = &now + } + if t.Tokens > t.MaxTokens { + t.Tokens = t.MaxTokens + } + } + var res rmpb.TokenBucket res.Settings = &rmpb.TokenLimitSettings{} // FillRate is used for the token server unavailable in abnormal situation. @@ -121,39 +119,64 @@ func (t *GroupTokenBucket) request( if t.Tokens > 0 { grantedTokens = t.Tokens neededTokens -= grantedTokens + t.Tokens = 0 } - var trickleTime = time.Duration(targetPeriodMs) * time.Millisecond - availableRate := float64(t.Settings.FillRate) - // When there are debt, the allotment will match the fill rate. - // We will have a threshold, beyond which the token allocation will be a minimum. - // the current threshold is fill rate * target period * 2. - // | - // fill rate |· · · · · · · · · - // | · - // | · - // | · - // | · - // reserve rate | · · · · - // | - // rate 0 ----------------------------------------------- - // debt period token 2*period token - if debt := -t.Tokens; debt > 0 { - debt -= float64(t.Settings.FillRate) * trickleTime.Seconds() - if debt > 0 { - debtRate := debt / float64(targetPeriodMs/1000) - availableRate -= debtRate - availableRate = math.Max(availableRate, reserveRatio*float64(t.Settings.FillRate)) + var targetPeriodTime = time.Duration(targetPeriodMs) * time.Millisecond + var trickleTime = 0. + + // When there are loan, the allotment will match the fill rate. + // We will have k threshold, beyond which the token allocation will be a minimum. + // The threshold unit is `fill rate * target period`. + // | + // k*fill_rate |* * * * * * * + // | * + // *** | * + // | * + // | * + // fill_rate | * + // reserve_rate | * + // | + // grant_rate 0 ------------------------------------------------------------------------------------ + // loan *** k*period_token (k+k-1)*period_token *** (k+k+1...+1)*period_token + k := 3 + p := make([]float64, k) + p[0] = float64(k) * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() + for i := 1; i < k; i++ { + p[i] = float64(k-i)*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() + p[i-1] + } + for i := 0; i < k && neededTokens > 0 && trickleTime < targetPeriodTime.Seconds(); i++ { + loan := -t.Tokens + if loan > p[i] { + continue + } + roundReserveTokens := p[i] - loan + fillRate := float64(k-i) * float64(t.Settings.FillRate) + if roundReserveTokens > neededTokens { + t.Tokens -= neededTokens + grantedTokens += neededTokens + neededTokens = 0 + } else { + roundReserveTime := roundReserveTokens / fillRate + if roundReserveTime+trickleTime >= targetPeriodTime.Seconds() { + roundTokens := (targetPeriodTime.Seconds() - trickleTime) * fillRate + neededTokens -= roundTokens + t.Tokens -= roundTokens + grantedTokens += roundTokens + trickleTime = targetPeriodTime.Seconds() + } else { + grantedTokens += roundReserveTokens + neededTokens -= roundReserveTokens + t.Tokens -= roundReserveTokens + trickleTime += roundReserveTime + } } } - - consumptionDuration := time.Duration(float64(time.Second) * (neededTokens / availableRate)) - if consumptionDuration <= trickleTime { - grantedTokens += neededTokens - } else { - grantedTokens += availableRate * trickleTime.Seconds() + if grantedTokens < reserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() { + res.Tokens -= reserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() - grantedTokens + grantedTokens = reserveRatio * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() } - t.Tokens -= grantedTokens + res.Tokens = grantedTokens - return &res, trickleTime.Milliseconds() + return &res, targetPeriodTime.Milliseconds() } From fc04c447f66d241d35a7a739bb546332ebafe2a4 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 10 Jan 2023 14:17:22 +0800 Subject: [PATCH 09/10] address comment Signed-off-by: Cabinfever_B --- .../server/token_buckets_test.go | 18 +++++++----- .../resource_manager/server/token_bukets.go | 28 +++++++++---------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/pkg/mcs/resource_manager/server/token_buckets_test.go b/pkg/mcs/resource_manager/server/token_buckets_test.go index 3ca21c0cdc4..a7ecbe81d77 100644 --- a/pkg/mcs/resource_manager/server/token_buckets_test.go +++ b/pkg/mcs/resource_manager/server/token_buckets_test.go @@ -69,20 +69,24 @@ func TestGroupTokenBucketRequest(t *testing.T) { tb, trickle := gtb.request(time1, 100000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-100000), 1e-7) re.Equal(trickle, int64(0)) + // need to lend token tb, trickle = gtb.request(time1, 101000, uint64(time.Second)*10/uint64(time.Millisecond)) re.LessOrEqual(math.Abs(tb.Tokens-101000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) - tb, trickle = gtb.request(time1, 17000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-17000), 1e-7) + tb, trickle = gtb.request(time1, 35000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-35000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) - tb, trickle = gtb.request(time1, 4000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-4000), 1e-7) + tb, trickle = gtb.request(time1, 60000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-22000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) - tb, trickle = gtb.request(time1, 19000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-18000), 1e-7) + tb, trickle = gtb.request(time1, 3000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-2000), 1e-7) + re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) + tb, trickle = gtb.request(time1, 3000, uint64(time.Second)*10/uint64(time.Millisecond)) + re.LessOrEqual(math.Abs(tb.Tokens-1000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) time2 := time1.Add(10 * time.Second) tb, trickle = gtb.request(time2, 20000, uint64(time.Second)*10/uint64(time.Millisecond)) - re.LessOrEqual(math.Abs(tb.Tokens-20000), 1e-7) + re.LessOrEqual(math.Abs(tb.Tokens-19000), 1e-7) re.Equal(trickle, int64(time.Second)*10/int64(time.Millisecond)) } diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 21830e58c46..0cfd3253d13 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -21,14 +21,16 @@ import ( rmpb "github.com/pingcap/kvproto/pkg/resource_manager" ) -const defaultRefillRate = 10000 - const ( + defaultRefillRate = 10000 defaultInitialTokens = 10 * 10000 defaultMaxTokens = 1e7 ) -var reserveRatio float64 = 0.05 +const ( + defaultReserveRatio = 0.05 + defaultLoanCoefficient = 2 +) // GroupTokenBucket is a token bucket for a resource group. // TODO: statistics Consumption @@ -139,19 +141,18 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe // | // grant_rate 0 ------------------------------------------------------------------------------------ // loan *** k*period_token (k+k-1)*period_token *** (k+k+1...+1)*period_token - k := 3 - p := make([]float64, k) - p[0] = float64(k) * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() - for i := 1; i < k; i++ { - p[i] = float64(k-i)*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() + p[i-1] + p := make([]float64, defaultLoanCoefficient) + p[0] = float64(defaultLoanCoefficient) * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() + for i := 1; i < defaultLoanCoefficient; i++ { + p[i] = float64(defaultLoanCoefficient-i)*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() + p[i-1] } - for i := 0; i < k && neededTokens > 0 && trickleTime < targetPeriodTime.Seconds(); i++ { + for i := 0; i < defaultLoanCoefficient && neededTokens > 0 && trickleTime < targetPeriodTime.Seconds(); i++ { loan := -t.Tokens if loan > p[i] { continue } roundReserveTokens := p[i] - loan - fillRate := float64(k-i) * float64(t.Settings.FillRate) + fillRate := float64(defaultLoanCoefficient-i) * float64(t.Settings.FillRate) if roundReserveTokens > neededTokens { t.Tokens -= neededTokens grantedTokens += neededTokens @@ -172,11 +173,10 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe } } } - if grantedTokens < reserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() { - res.Tokens -= reserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() - grantedTokens - grantedTokens = reserveRatio * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() + if grantedTokens < defaultReserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() { + t.Tokens -= defaultReserveRatio*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() - grantedTokens + grantedTokens = defaultReserveRatio * float64(t.Settings.FillRate) * targetPeriodTime.Seconds() } - res.Tokens = grantedTokens return &res, targetPeriodTime.Milliseconds() } From 3896b95fdc99ef7c024312d3e692237dbd75b639 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 10 Jan 2023 18:06:11 +0800 Subject: [PATCH 10/10] address comment Signed-off-by: Cabinfever_B --- pkg/mcs/resource_manager/server/grpc_service.go | 8 ++++---- pkg/mcs/resource_manager/server/token_bukets.go | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pkg/mcs/resource_manager/server/grpc_service.go b/pkg/mcs/resource_manager/server/grpc_service.go index 69c0de2ef2c..a2c827de334 100644 --- a/pkg/mcs/resource_manager/server/grpc_service.go +++ b/pkg/mcs/resource_manager/server/grpc_service.go @@ -156,15 +156,15 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu } switch rg.Mode { case rmpb.GroupMode_RUMode: + var tokens *rmpb.GrantedRUTokenBucket for _, re := range req.GetRuItems().GetRequestRU() { switch re.Type { case rmpb.RequestUnitType_RRU: - tokens := rg.RequestRRU(now, re.Value, targetPeriodMs) - resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) + tokens = rg.RequestRRU(now, re.Value, targetPeriodMs) case rmpb.RequestUnitType_WRU: - tokens := rg.RequestWRU(now, re.Value, targetPeriodMs) - resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) + tokens = rg.RequestWRU(now, re.Value, targetPeriodMs) } + resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens) } case rmpb.GroupMode_RawMode: log.Warn("not supports the resource type", zap.String("resource-group", req.ResourceGroupName), zap.String("mode", rmpb.GroupMode_name[int32(rmpb.GroupMode_RawMode)])) diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 0cfd3253d13..4f6d294a82e 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -33,15 +33,15 @@ const ( ) // GroupTokenBucket is a token bucket for a resource group. -// TODO: statistics Consumption +// TODO: statistics consumption @JmPotato type GroupTokenBucket struct { *rmpb.TokenBucket `json:"token_bucket,omitempty"` // MaxTokens limits the number of tokens that can be accumulated MaxTokens float64 `json:"max_tokens,omitempty"` - Consumption *rmpb.TokenBucketsRequest `json:"consumption,omitempty"` - LastUpdate *time.Time `json:"last_update,omitempty"` - Initialized bool `json:"initialized"` + Consumption *rmpb.Consumption `json:"consumption,omitempty"` + LastUpdate *time.Time `json:"last_update,omitempty"` + Initialized bool `json:"initialized"` } // NewGroupTokenBucket returns a new GroupTokenBucket