diff --git a/pkg/mcs/resourcemanager/server/token_buckets.go b/pkg/mcs/resourcemanager/server/token_buckets.go index 88e1be02942..1de00350bca 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets.go +++ b/pkg/mcs/resourcemanager/server/token_buckets.go @@ -292,7 +292,7 @@ func (gtb *GroupTokenBucket) init(now time.Time, clientID uint64) { if gtb.Settings.FillRate == 0 { gtb.Settings.FillRate = defaultRefillRate } - if gtb.Tokens < defaultInitialTokens { + if gtb.Tokens < defaultInitialTokens && gtb.Settings.BurstLimit > 0 { gtb.Tokens = defaultInitialTokens } // init slot @@ -311,21 +311,23 @@ func (gtb *GroupTokenBucket) updateTokens(now time.Time, burstLimit int64, clien var elapseTokens float64 if !gtb.Initialized { gtb.init(now, clientUniqueID) - } else if delta := now.Sub(*gtb.LastUpdate); delta > 0 { - elapseTokens = float64(gtb.Settings.GetFillRate())*delta.Seconds() + gtb.lastBurstTokens - gtb.lastBurstTokens = 0 - gtb.Tokens += elapseTokens - gtb.LastUpdate = &now + } else if burst := float64(burstLimit); burst > 0 { + if delta := now.Sub(*gtb.LastUpdate); delta > 0 { + elapseTokens = float64(gtb.Settings.GetFillRate())*delta.Seconds() + gtb.lastBurstTokens + gtb.lastBurstTokens = 0 + gtb.Tokens += elapseTokens + } + if gtb.Tokens > burst { + elapseTokens -= gtb.Tokens - burst + gtb.Tokens = burst + } } + gtb.LastUpdate = &now // Reloan when setting changed if gtb.settingChanged && gtb.Tokens <= 0 { elapseTokens = 0 gtb.resetLoan() } - if burst := float64(burstLimit); burst > 0 && gtb.Tokens > burst { - elapseTokens -= gtb.Tokens - burst - gtb.Tokens = burst - } // Balance each slots. gtb.balanceSlotTokens(clientUniqueID, gtb.Settings, consumptionToken, elapseTokens) } diff --git a/pkg/mcs/resourcemanager/server/token_buckets_test.go b/pkg/mcs/resourcemanager/server/token_buckets_test.go index a7d3b9e3bad..1e14d9eca6a 100644 --- a/pkg/mcs/resourcemanager/server/token_buckets_test.go +++ b/pkg/mcs/resourcemanager/server/token_buckets_test.go @@ -48,11 +48,40 @@ func TestGroupTokenBucketUpdateAndPatch(t *testing.T) { }, } tb.patch(tbSetting) - + time.Sleep(10 * time.Millisecond) time2 := time.Now() tb.request(time2, 0, 0, clientUniqueID) re.LessOrEqual(math.Abs(100000-tb.Tokens), time2.Sub(time1).Seconds()*float64(tbSetting.Settings.FillRate)+1e7) re.Equal(tbSetting.Settings.FillRate, tb.Settings.FillRate) + + tbSetting = &rmpb.TokenBucket{ + Tokens: 0, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 2000, + BurstLimit: -1, + }, + } + tb = NewGroupTokenBucket(tbSetting) + tb.request(time2, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens), 1e-7) + time3 := time.Now() + tb.request(time3, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens), 1e-7) + + tbSetting = &rmpb.TokenBucket{ + Tokens: 200000, + Settings: &rmpb.TokenLimitSettings{ + FillRate: 2000, + BurstLimit: -1, + }, + } + tb = NewGroupTokenBucket(tbSetting) + tb.request(time3, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens-200000), 1e-7) + time.Sleep(10 * time.Millisecond) + time4 := time.Now() + tb.request(time4, 0, 0, clientUniqueID) + re.LessOrEqual(math.Abs(tbSetting.Tokens-200000), 1e-7) } func TestGroupTokenBucketRequest(t *testing.T) {