Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_manager: unify RRU and WRU into RU for token limit #5888

Merged
merged 9 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ require (
go.uber.org/zap v1.20.0
google.golang.org/grpc v1.51.0
)

replace github.com/pingcap/kvproto => github.com/CabinfeverB/kvproto v0.0.0-20230130100540-c8385fbaf594

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update with new kvproto master

4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/CabinfeverB/kvproto v0.0.0-20230130100540-c8385fbaf594 h1:ptejjtrEOMQPaZf9syyMo+almWZCUagXxy1rQJnpbic=
github.com/CabinfeverB/kvproto v0.0.0-20230130100540-c8385fbaf594/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down Expand Up @@ -561,8 +563,6 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934 h1:LB+BrfyO5fsz5pwN3V4HvTrpZTAmsjB4VkCEBLbjYUw=
github.com/pingcap/kvproto v0.0.0-20230119031034-25f1909b7934/go.mod h1:+on3Lfk/fb1lXkud3XvskJumhSIEEgN2TTbMObUlrxE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,4 @@ replace google.golang.org/grpc v1.51.0 => google.golang.org/grpc v1.26.0
// kvproto at the same time. You can run `go mod tidy` to make it replaced with go-mod style specification.
// After the PR to kvproto is merged, remember to comment this out and run `go mod tidy`.
// replace github.com/pingcap/kvproto => github.com/$YourPrivateRepo $YourPrivateBranch
replace github.com/pingcap/kvproto => github.com/CabinfeverB/kvproto v0.0.0-20230130100540-c8385fbaf594
887 changes: 877 additions & 10 deletions go.sum

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions pkg/mcs/resource_manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (gc *groupCostController) initRunState() {
switch gc.mode {
case rmpb.GroupMode_RUMode:
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitList {
for typ := range requestUnitLimitTypeList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
Expand All @@ -444,7 +444,7 @@ func (gc *groupCostController) initRunState() {
}
case rmpb.GroupMode_RawMode:
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceList {
for typ := range requestResourceLimitTypeList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
Expand Down Expand Up @@ -563,13 +563,13 @@ func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool
func (gc *groupCostController) shouldReportConsumption() bool {
switch gc.Mode {
case rmpb.GroupMode_RUMode:
for typ := range requestUnitList {
for typ := range requestUnitLimitTypeList {
if getRUValueFromConsumption(gc.run.consumption, typ)-getRUValueFromConsumption(gc.run.lastRequestConsumption, typ) >= consumptionsReportingThreshold {
return true
}
}
case rmpb.GroupMode_RawMode:
for typ := range requestResourceList {
for typ := range requestResourceLimitTypeList {
if getRawResourceValueFromConsumption(gc.run.consumption, typ)-getRawResourceValueFromConsumption(gc.run.lastRequestConsumption, typ) >= consumptionsReportingThreshold {
return true
}
Expand Down Expand Up @@ -675,7 +675,7 @@ func (gc *groupCostController) collectRequestAndConsumption(low bool) *rmpb.Toke
selected := !low
switch gc.mode {
case rmpb.GroupMode_RawMode:
requests := make([]*rmpb.RawResourceItem, 0, len(requestResourceList))
requests := make([]*rmpb.RawResourceItem, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if low && counter.limiter.IsLowTokens() {
selected = true
Expand All @@ -692,7 +692,7 @@ func (gc *groupCostController) collectRequestAndConsumption(low bool) *rmpb.Toke
},
}
case rmpb.GroupMode_RUMode:
requests := make([]*rmpb.RequestUnitItem, 0, len(requestUnitList))
requests := make([]*rmpb.RequestUnitItem, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if low && counter.limiter.IsLowTokens() {
selected = true
Expand Down Expand Up @@ -747,7 +747,7 @@ retryLoop:
for i := 0; i < maxRetry; i++ {
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceList))
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v))
Expand All @@ -757,7 +757,7 @@ retryLoop:
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitList))
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v))
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestGroupControlBurstable(t *testing.T) {
Name: "test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RRU: &rmpb.TokenBucket{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 1000,
},
Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/resource_manager/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import (
)

var (
requestUnitList map[rmpb.RequestUnitType]struct{} = map[rmpb.RequestUnitType]struct{}{
rmpb.RequestUnitType_RRU: {},
rmpb.RequestUnitType_WRU: {},
requestUnitLimitTypeList map[rmpb.RequestUnitType]struct{} = map[rmpb.RequestUnitType]struct{}{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this map now? If not we can remove for range where it is called.

Copy link
Member

@JmPotato JmPotato Feb 1, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the proto keeps RequestUnitType (though it only has 1 type), I think it's reasonable to keep a list here also.

rmpb.RequestUnitType_RU: {},
}
requestResourceList map[rmpb.RawResourceType]struct{} = map[rmpb.RawResourceType]struct{}{
requestResourceLimitTypeList map[rmpb.RawResourceType]struct{} = map[rmpb.RawResourceType]struct{}{
rmpb.RawResourceType_IOReadFlow: {},
rmpb.RawResourceType_IOWriteFlow: {},
rmpb.RawResourceType_CPU: {},
Expand Down
18 changes: 14 additions & 4 deletions pkg/mcs/resource_manager/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,33 @@ func (kc *KVCalculator) BeforeKVRequest(consumption *rmpb.Consumption, req Reque
// Write bytes are knowable in advance, so we can calculate the WRU cost here.
writeBytes := float64(req.WriteBytes())
consumption.WriteBytes += writeBytes
consumption.WRU += float64(kc.WriteBaseCost) + float64(kc.WriteBytesCost)*writeBytes
wru := float64(kc.WriteBaseCost) + float64(kc.WriteBytesCost)*writeBytes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why create a new variable?

consumption.WRU += wru
consumption.RU += wru
} else {
consumption.KvReadRpcCount += 1
// Read bytes could not be known before the request is executed,
// so we only add the base cost here.
consumption.RRU += float64(kc.ReadBaseCost)
consumption.RU += float64(kc.ReadBaseCost)
}
}

// AfterKVRequest ...
func (kc *KVCalculator) AfterKVRequest(consumption *rmpb.Consumption, req RequestInfo, res ResponseInfo) {
rru := 0.
// For now, we can only collect the KV CPU cost for a read request.
if !req.IsWrite() {
kvCPUMs := float64(res.KVCPUMs())
consumption.TotalCpuTimeMs += kvCPUMs
consumption.RRU += float64(kc.ReadCPUMsCost) * kvCPUMs
rru += float64(kc.ReadCPUMsCost) * kvCPUMs
}
// A write request may also read data, which should be counted into the RRU cost.
readBytes := float64(res.ReadBytes())
consumption.ReadBytes += readBytes
consumption.RRU += float64(kc.ReadBytesCost) * readBytes
rru += float64(kc.ReadBytesCost) * readBytes
consumption.RRU += rru
consumption.RU += rru
}

// SQLCalculator is used to calculate the SQL-side consumption.
Expand Down Expand Up @@ -124,8 +130,10 @@ func (dsc *SQLCalculator) AfterKVRequest(consumption *rmpb.Consumption, req Requ
func getRUValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RequestUnitType) float64 {
switch typ {
case 0:
return custom.RRU
return custom.RU
case 1:
return custom.RRU
case 2:
return custom.WRU
}
return 0
Expand All @@ -144,6 +152,7 @@ func getRawResourceValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RawRe
}

func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.RU += custom2.RU
custom1.RRU += custom2.RRU
custom1.WRU += custom2.WRU
custom1.ReadBytes += custom2.ReadBytes
Expand All @@ -155,6 +164,7 @@ func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
}

func sub(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.RU -= custom2.RU
custom1.RRU -= custom2.RRU
custom1.WRU -= custom2.WRU
custom1.ReadBytes -= custom2.ReadBytes
Expand Down
7 changes: 2 additions & 5 deletions pkg/mcs/resource_manager/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,8 @@ func (s *Service) AcquireTokenBuckets(stream rmpb.ResourceManager_AcquireTokenBu
case rmpb.GroupMode_RUMode:
var tokens *rmpb.GrantedRUTokenBucket
for _, re := range req.GetRuItems().GetRequestRU() {
switch re.Type {
case rmpb.RequestUnitType_RRU:
tokens = rg.RequestRRU(now, re.Value, targetPeriodMs)
case rmpb.RequestUnitType_WRU:
tokens = rg.RequestWRU(now, re.Value, targetPeriodMs)
if re.Type == rmpb.RequestUnitType_RU {
tokens = rg.RequestRU(now, re.Value, targetPeriodMs)
}
resp.GrantedRUTokens = append(resp.GrantedRUTokens, tokens)
}
Expand Down
31 changes: 8 additions & 23 deletions pkg/mcs/resource_manager/server/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ type ResourceGroup struct {

// RequestUnitSettings is the definition of the RU settings.
type RequestUnitSettings struct {
RRU GroupTokenBucket `json:"rru,omitempty"`
WRU GroupTokenBucket `json:"wru,omitempty"`
RU GroupTokenBucket `json:"ru,omitempty"`
}

// RawResourceSettings is the definition of the native resource settings.
Expand Down Expand Up @@ -119,8 +118,7 @@ func (rg *ResourceGroup) PatchSettings(metaGroup *rmpb.ResourceGroup) error {
if metaGroup.GetRUSettings() == nil {
return errors.New("invalid resource group settings, RU mode should set RU settings")
}
rg.RUSettings.RRU.patch(metaGroup.GetRUSettings().GetRRU())
rg.RUSettings.WRU.patch(metaGroup.GetRUSettings().GetWRU())
rg.RUSettings.RU.patch(metaGroup.GetRUSettings().GetRU())
case rmpb.GroupMode_RawMode:
if metaGroup.GetRawResourceSettings() == nil {
return errors.New("invalid resource group settings, raw mode should set resource settings")
Expand Down Expand Up @@ -148,8 +146,7 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
case rmpb.GroupMode_RUMode:
if settings := group.GetRUSettings(); settings != nil {
ruSettings = &RequestUnitSettings{
RRU: NewGroupTokenBucket(settings.GetRRU()),
WRU: NewGroupTokenBucket(settings.GetWRU()),
RU: NewGroupTokenBucket(settings.GetRU()),
}
rg.RUSettings = ruSettings
}
Expand All @@ -166,26 +163,15 @@ func FromProtoResourceGroup(group *rmpb.ResourceGroup) *ResourceGroup {
return rg
}

// RequestRRU requests the RRU of the resource group.
func (rg *ResourceGroup) RequestRRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
// RequestRU requests the RU of the resource group.
func (rg *ResourceGroup) RequestRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.RRU.request(now, neededTokens, targetPeriodMs)
return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_RRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

// RequestWRU requests the WRU of the resource group.
func (rg *ResourceGroup) RequestWRU(now time.Time, neededTokens float64, targetPeriodMs uint64) *rmpb.GrantedRUTokenBucket {
rg.Lock()
defer rg.Unlock()
if rg.RUSettings == nil {
return nil
}
tb, trickleTimeMs := rg.RUSettings.WRU.request(now, neededTokens, targetPeriodMs)
return &rmpb.GrantedRUTokenBucket{Type: rmpb.RequestUnitType_WRU, GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
tb, trickleTimeMs := rg.RUSettings.RU.request(now, neededTokens, targetPeriodMs)
return &rmpb.GrantedRUTokenBucket{GrantedTokens: tb, TrickleTimeMs: trickleTimeMs}
}

// IntoProtoResourceGroup converts a ResourceGroup to a rmpb.ResourceGroup.
Expand All @@ -198,8 +184,7 @@ func (rg *ResourceGroup) IntoProtoResourceGroup() *rmpb.ResourceGroup {
Name: rg.Name,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RRU: rg.RUSettings.RRU.TokenBucket,
WRU: rg.RUSettings.WRU.TokenBucket,
RU: rg.RUSettings.RU.TokenBucket,
},
}
return group
Expand Down
14 changes: 4 additions & 10 deletions pkg/mcs/resource_manager/server/resource_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,10 @@ func TestPatchResourceGroup(t *testing.T) {
patchJSONString string
expectJSONString string
}{
{`{"name":"test", "mode":1, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"w_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}, "w_r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"rru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false},"wru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_u":{"settings":{"fill_rate": 200000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"ru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
{`{"name":"test", "mode":1, "r_u_settings": {"r_u":{"settings":{"fill_rate": 200000, "burst": 100000}}}}`,
`{"name":"test","mode":1,"r_u_settings":{"ru":{"token_bucket":{"settings":{"fill_rate":200000}},"initialized":false}}}`},
}

for _, ca := range testCaseRU {
Expand Down
2 changes: 2 additions & 0 deletions tests/msc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -162,5 +162,7 @@ replace (
github.com/tikv/pd/client => ../../client
)

replace github.com/pingcap/kvproto => github.com/CabinfeverB/kvproto v0.0.0-20230130100540-c8385fbaf594

// reset grpc and protobuf deps in order to import client and server at the same time
replace google.golang.org/grpc v1.51.0 => google.golang.org/grpc v1.26.0
Loading