diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index ba34a60ab2e..83e43e43f53 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -41,9 +41,9 @@ const ( // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. - OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) error + OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, error) // OnResponse is used to consume tokens after receiving response - OnResponse(ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error + OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error) } // ResourceGroupProvider provides some api to interact with resource manager server。 @@ -402,24 +402,24 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs wait some time. func (c *ResourceGroupsController) OnRequestWait( ctx context.Context, resourceGroupName string, info RequestInfo, -) (err error) { +) (*rmpb.Consumption, error) { gc, err := c.tryGetResourceGroup(ctx, resourceGroupName) if err != nil { - return err + return nil, err } return gc.onRequestWait(ctx, info) } // OnResponse is used to consume tokens after receiving response -func (c *ResourceGroupsController) OnResponse(_ context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error { - if tmp, ok := c.groupsController.Load(resourceGroupName); ok { - gc := tmp.(*groupCostController) - gc.onResponse(req, resp) - } else { +func (c *ResourceGroupsController) OnResponse( + resourceGroupName string, req RequestInfo, resp ResponseInfo, +) (*rmpb.Consumption, error) { + tmp, ok := c.groupsController.Load(resourceGroupName) + if !ok { log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName)) + return &rmpb.Consumption{}, nil } - - return nil + return tmp.(*groupCostController).onResponse(req, resp) } type groupCostController struct { @@ -872,78 +872,77 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { func (gc *groupCostController) onRequestWait( ctx context.Context, info RequestInfo, -) (err error) { +) (*rmpb.Consumption, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.BeforeKVRequest(delta, info) } - now := time.Now() - if gc.burstable.Load() { - goto ret - } - // retry -retryLoop: - for i := 0; i < maxRetry; i++ { - switch gc.mode { - case rmpb.GroupMode_RawMode: - res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) - for typ, counter := range gc.run.resourceTokens { - if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v)) + if !gc.burstable.Load() { + var err error + now := time.Now() + retryLoop: + for i := 0; i < maxRetry; i++ { + switch gc.mode { + case rmpb.GroupMode_RawMode: + res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) + for typ, counter := range gc.run.resourceTokens { + if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { + res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v)) + } } - } - if err = WaitReservations(ctx, now, res); err == nil { - break retryLoop - } - case rmpb.GroupMode_RUMode: - res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) - for typ, counter := range gc.run.requestUnitTokens { - if v := getRUValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v)) + if err = WaitReservations(ctx, now, res); err == nil { + break retryLoop + } + case rmpb.GroupMode_RUMode: + res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) + for typ, counter := range gc.run.requestUnitTokens { + if v := getRUValueFromConsumption(delta, typ); v > 0 { + res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v)) + } + } + if err = WaitReservations(ctx, now, res); err == nil { + break retryLoop } } - if err = WaitReservations(ctx, now, res); err == nil { - break retryLoop - } + time.Sleep(100 * time.Millisecond) + } + if err != nil { + return nil, err } - time.Sleep(100 * time.Millisecond) - } - if err != nil { - return err } -ret: gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() - return nil + return delta, nil } -func (gc *groupCostController) onResponse(req RequestInfo, resp ResponseInfo) { +func (gc *groupCostController) onResponse( + req RequestInfo, resp ResponseInfo, +) (*rmpb.Consumption, error) { delta := &rmpb.Consumption{} for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } - if gc.burstable.Load() { - goto ret - } - switch gc.mode { - case rmpb.GroupMode_RawMode: - for typ, counter := range gc.run.resourceTokens { - if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - counter.limiter.RemoveTokens(time.Now(), v) + if !gc.burstable.Load() { + switch gc.mode { + case rmpb.GroupMode_RawMode: + for typ, counter := range gc.run.resourceTokens { + if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { + counter.limiter.RemoveTokens(time.Now(), v) + } } - } - case rmpb.GroupMode_RUMode: - for typ, counter := range gc.run.requestUnitTokens { - if v := getRUValueFromConsumption(delta, typ); v > 0 { - counter.limiter.RemoveTokens(time.Now(), v) + case rmpb.GroupMode_RUMode: + for typ, counter := range gc.run.requestUnitTokens { + if v := getRUValueFromConsumption(delta, typ); v > 0 { + counter.limiter.RemoveTokens(time.Now(), v) + } } } } -ret: gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() + return delta, nil } // CheckResourceGroupExist checks if groupsController map {rg.name -> resource group controller} @@ -952,3 +951,13 @@ func (c *ResourceGroupsController) CheckResourceGroupExist(name string) bool { _, ok := c.groupsController.Load(name) return ok } + +// This is used for test only. +func (gc *groupCostController) getKVCalculator() *KVCalculator { + for _, calc := range gc.calculators { + if kvCalc, ok := calc.(*KVCalculator); ok { + return kvCalc + } + } + return nil +} diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 9424aec1f11..5bde4bca2c1 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -19,6 +19,8 @@ package controller import ( + "context" + "fmt" "testing" "time" @@ -26,8 +28,7 @@ import ( "github.com/stretchr/testify/require" ) -func TestGroupControlBurstable(t *testing.T) { - re := require.New(t) +func createTestGroupCostController(re *require.Assertions) *groupCostController { group := &rmpb.ResourceGroup{ Name: "test", Mode: rmpb.GroupMode_RUMode, @@ -43,6 +44,12 @@ func TestGroupControlBurstable(t *testing.T) { ch2 := make(chan *groupCostController) gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2) re.NoError(err) + return gc +} + +func TestGroupControlBurstable(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) gc.initRunState() args := tokenBucketReconfigureArgs{ NewRate: 1000, @@ -54,3 +61,54 @@ func TestGroupControlBurstable(t *testing.T) { gc.updateAvgRequestResourcePerSec() re.Equal(gc.burstable.Load(), true) } + +func TestRequestAndResponseConsumption(t *testing.T) { + re := require.New(t) + gc := createTestGroupCostController(re) + gc.initRunState() + testCases := []struct { + req *TestRequestInfo + resp *TestResponseInfo + }{ + // Write request + { + req: &TestRequestInfo{ + isWrite: true, + writeBytes: 100, + }, + resp: &TestResponseInfo{ + readBytes: 100, + succeed: true, + }, + }, + // Read request + { + req: &TestRequestInfo{ + isWrite: false, + writeBytes: 0, + }, + resp: &TestResponseInfo{ + readBytes: 100, + kvCPU: 100 * time.Millisecond, + succeed: true, + }, + }, + } + kvCalculator := gc.getKVCalculator() + for idx, testCase := range testCases { + caseNum := fmt.Sprintf("case %d", idx) + consumption, err := gc.onRequestWait(context.TODO(), testCase.req) + re.NoError(err, caseNum) + expectedConsumption := &rmpb.Consumption{} + if testCase.req.IsWrite() { + kvCalculator.calculateWriteCost(expectedConsumption, testCase.req) + re.Equal(expectedConsumption.WRU, consumption.WRU) + } + consumption, err = gc.onResponse(testCase.req, testCase.resp) + re.NoError(err, caseNum) + kvCalculator.calculateReadCost(expectedConsumption, testCase.resp) + kvCalculator.calculateCPUCost(expectedConsumption, testCase.resp) + re.Equal(expectedConsumption.RRU, consumption.RRU, caseNum) + re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum) + } +} diff --git a/client/resource_group/controller/testutil.go b/client/resource_group/controller/testutil.go new file mode 100644 index 00000000000..8b510621a52 --- /dev/null +++ b/client/resource_group/controller/testutil.go @@ -0,0 +1,75 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import "time" + +// TestRequestInfo is used to test the request info interface. +type TestRequestInfo struct { + isWrite bool + writeBytes uint64 +} + +// NewTestRequestInfo creates a new TestRequestInfo. +func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo { + return &TestRequestInfo{ + isWrite: isWrite, + writeBytes: writeBytes, + } +} + +// IsWrite implements the RequestInfo interface. +func (tri *TestRequestInfo) IsWrite() bool { + return tri.isWrite +} + +// WriteBytes implements the RequestInfo interface. +func (tri *TestRequestInfo) WriteBytes() uint64 { + return tri.writeBytes +} + +// TestResponseInfo is used to test the response info interface. +type TestResponseInfo struct { + readBytes uint64 + kvCPU time.Duration + succeed bool +} + +func NewTestResponseInfo(readBytes uint64, kvCPU time.Duration, succeed bool) *TestResponseInfo { + return &TestResponseInfo{ + readBytes: readBytes, + kvCPU: kvCPU, + succeed: succeed, + } +} + +// ReadBytes implements the ResponseInfo interface. +func (tri *TestResponseInfo) ReadBytes() uint64 { + return tri.readBytes +} + +// KVCPU implements the ResponseInfo interface. +func (tri *TestResponseInfo) KVCPU() time.Duration { + return tri.kvCPU +} + +// Succeed implements the ResponseInfo interface. +func (tri *TestResponseInfo) Succeed() bool { + return tri.succeed +} diff --git a/tests/mcs/resource_manager/resource_manager_test.go b/tests/mcs/resource_manager/resource_manager_test.go index e10002eefc7..8bd35617a0d 100644 --- a/tests/mcs/resource_manager/resource_manager_test.go +++ b/tests/mcs/resource_manager/resource_manager_test.go @@ -216,37 +216,6 @@ func (suite *resourceManagerClientTestSuite) TestWatchResourceGroup() { const buffDuration = time.Millisecond * 300 -type testRequestInfo struct { - isWrite bool - writeBytes uint64 -} - -func (ti *testRequestInfo) IsWrite() bool { - return ti.isWrite -} - -func (ti *testRequestInfo) WriteBytes() uint64 { - return ti.writeBytes -} - -type testResponseInfo struct { - cpu time.Duration - readBytes uint64 - succeed bool -} - -func (tri *testResponseInfo) ReadBytes() uint64 { - return tri.readBytes -} - -func (tri *testResponseInfo) KVCPU() time.Duration { - return tri.cpu -} - -func (tri *testResponseInfo) Succeed() bool { - return tri.succeed -} - type tokenConsumptionPerSecond struct { rruTokensAtATime float64 wruTokensAtATime float64 @@ -254,33 +223,28 @@ type tokenConsumptionPerSecond struct { waitDuration time.Duration } -func (t tokenConsumptionPerSecond) makeReadRequest() *testRequestInfo { - return &testRequestInfo{ - isWrite: false, - writeBytes: 0, - } +func (t tokenConsumptionPerSecond) makeReadRequest() *controller.TestRequestInfo { + return controller.NewTestRequestInfo(false, 0) } -func (t tokenConsumptionPerSecond) makeWriteRequest() *testRequestInfo { - return &testRequestInfo{ - isWrite: true, - writeBytes: uint64(t.wruTokensAtATime - 1), - } +func (t tokenConsumptionPerSecond) makeWriteRequest() *controller.TestRequestInfo { + return controller.NewTestRequestInfo(true, uint64(t.wruTokensAtATime-1)) } -func (t tokenConsumptionPerSecond) makeReadResponse() *testResponseInfo { - return &testResponseInfo{ - readBytes: uint64((t.rruTokensAtATime - 1) / 2), - cpu: time.Duration(t.rruTokensAtATime/2) * time.Millisecond, - } +func (t tokenConsumptionPerSecond) makeReadResponse() *controller.TestResponseInfo { + return controller.NewTestResponseInfo( + uint64((t.rruTokensAtATime-1)/2), + time.Duration(t.rruTokensAtATime/2)*time.Millisecond, + false, + ) } -func (t tokenConsumptionPerSecond) makeWriteResponse() *testResponseInfo { - return &testResponseInfo{ - readBytes: 0, - cpu: time.Duration(0), - succeed: true, - } +func (t tokenConsumptionPerSecond) makeWriteResponse() *controller.TestResponseInfo { + return controller.NewTestResponseInfo( + 0, + time.Duration(0), + true, + ) } func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { @@ -345,8 +309,8 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() { controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq) controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq) sum += time.Since(startTime) - controller.OnResponse(suite.ctx, cas.resourceGroupName, rreq, rres) - controller.OnResponse(suite.ctx, cas.resourceGroupName, wreq, wres) + controller.OnResponse(cas.resourceGroupName, rreq, rres) + controller.OnResponse(cas.resourceGroupName, wreq, wres) time.Sleep(1000 * time.Microsecond) } re.LessOrEqual(sum, buffDuration+cas.tcs[i].waitDuration) @@ -767,7 +731,7 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { rres := testConfig.tcs.makeReadResponse() for j := 0; j < testConfig.times; j++ { controller.OnRequestWait(suite.ctx, suite.initGroups[0].Name, rreq) - controller.OnResponse(suite.ctx, suite.initGroups[0].Name, rreq, rres) + controller.OnResponse(suite.initGroups[0].Name, rreq, rres) time.Sleep(100 * time.Microsecond) } time.Sleep(1 * time.Second)