Skip to content

Commit

Permalink
controller: return the consumption info on RU request/response (#6125)
Browse files Browse the repository at this point in the history
ref #5851, ref pingcap/tidb#41972

- Return the consumption info on RU request/response.
- Remove some unnecessary `goto`s.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Mar 13, 2023
1 parent 5bdfa71 commit 3e3ae55
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 115 deletions.
125 changes: 67 additions & 58 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) error
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, error)
// OnResponse is used to consume tokens after receiving response
OnResponse(ctx context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
}

// ResourceGroupProvider provides some api to interact with resource manager server。
Expand Down Expand Up @@ -402,24 +402,24 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (err error) {
) (*rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
return err
return nil, err
}
return gc.onRequestWait(ctx, info)
}

// OnResponse is used to consume tokens after receiving response
func (c *ResourceGroupsController) OnResponse(_ context.Context, resourceGroupName string, req RequestInfo, resp ResponseInfo) error {
if tmp, ok := c.groupsController.Load(resourceGroupName); ok {
gc := tmp.(*groupCostController)
gc.onResponse(req, resp)
} else {
func (c *ResourceGroupsController) OnResponse(
resourceGroupName string, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
tmp, ok := c.groupsController.Load(resourceGroupName)
if !ok {
log.Warn("[resource group controller] resource group name does not exist", zap.String("resourceGroupName", resourceGroupName))
return &rmpb.Consumption{}, nil
}

return nil
return tmp.(*groupCostController).onResponse(req, resp)
}

type groupCostController struct {
Expand Down Expand Up @@ -872,78 +872,77 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {

func (gc *groupCostController) onRequestWait(
ctx context.Context, info RequestInfo,
) (err error) {
) (*rmpb.Consumption, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
}
now := time.Now()
if gc.burstable.Load() {
goto ret
}
// retry
retryLoop:
for i := 0; i < maxRetry; i++ {
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v))
if !gc.burstable.Load() {
var err error
now := time.Now()
retryLoop:
for i := 0; i < maxRetry; i++ {
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v))
}
}
}
if err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v))
if err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, defaultMaxWaitDuration, now, v))
}
}
if err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
}
}
if err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
}
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return nil, err
}
time.Sleep(100 * time.Millisecond)
}
if err != nil {
return err
}
ret:
gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()
return nil
return delta, nil
}

func (gc *groupCostController) onResponse(req RequestInfo, resp ResponseInfo) {
func (gc *groupCostController) onResponse(
req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
if gc.burstable.Load() {
goto ret
}
switch gc.mode {
case rmpb.GroupMode_RawMode:
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
if !gc.burstable.Load() {
switch gc.mode {
case rmpb.GroupMode_RawMode:
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
}
}
}
case rmpb.GroupMode_RUMode:
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
case rmpb.GroupMode_RUMode:
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
counter.limiter.RemoveTokens(time.Now(), v)
}
}
}
}
ret:
gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()
return delta, nil
}

// CheckResourceGroupExist checks if groupsController map {rg.name -> resource group controller}
Expand All @@ -952,3 +951,13 @@ func (c *ResourceGroupsController) CheckResourceGroupExist(name string) bool {
_, ok := c.groupsController.Load(name)
return ok
}

// This is used for test only.
func (gc *groupCostController) getKVCalculator() *KVCalculator {
for _, calc := range gc.calculators {
if kvCalc, ok := calc.(*KVCalculator); ok {
return kvCalc
}
}
return nil
}
62 changes: 60 additions & 2 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
package controller

import (
"context"
"fmt"
"testing"
"time"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)

func TestGroupControlBurstable(t *testing.T) {
re := require.New(t)
func createTestGroupCostController(re *require.Assertions) *groupCostController {
group := &rmpb.ResourceGroup{
Name: "test",
Mode: rmpb.GroupMode_RUMode,
Expand All @@ -43,6 +44,12 @@ func TestGroupControlBurstable(t *testing.T) {
ch2 := make(chan *groupCostController)
gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2)
re.NoError(err)
return gc
}

func TestGroupControlBurstable(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
args := tokenBucketReconfigureArgs{
NewRate: 1000,
Expand All @@ -54,3 +61,54 @@ func TestGroupControlBurstable(t *testing.T) {
gc.updateAvgRequestResourcePerSec()
re.Equal(gc.burstable.Load(), true)
}

func TestRequestAndResponseConsumption(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
}{
// Write request
{
req: &TestRequestInfo{
isWrite: true,
writeBytes: 100,
},
resp: &TestResponseInfo{
readBytes: 100,
succeed: true,
},
},
// Read request
{
req: &TestRequestInfo{
isWrite: false,
writeBytes: 0,
},
resp: &TestResponseInfo{
readBytes: 100,
kvCPU: 100 * time.Millisecond,
succeed: true,
},
},
}
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, err := gc.onRequestWait(context.TODO(), testCase.req)
re.NoError(err, caseNum)
expectedConsumption := &rmpb.Consumption{}
if testCase.req.IsWrite() {
kvCalculator.calculateWriteCost(expectedConsumption, testCase.req)
re.Equal(expectedConsumption.WRU, consumption.WRU)
}
consumption, err = gc.onResponse(testCase.req, testCase.resp)
re.NoError(err, caseNum)
kvCalculator.calculateReadCost(expectedConsumption, testCase.resp)
kvCalculator.calculateCPUCost(expectedConsumption, testCase.resp)
re.Equal(expectedConsumption.RRU, consumption.RRU, caseNum)
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}
75 changes: 75 additions & 0 deletions client/resource_group/controller/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,g
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import "time"

// TestRequestInfo is used to test the request info interface.
type TestRequestInfo struct {
isWrite bool
writeBytes uint64
}

// NewTestRequestInfo creates a new TestRequestInfo.
func NewTestRequestInfo(isWrite bool, writeBytes uint64) *TestRequestInfo {
return &TestRequestInfo{
isWrite: isWrite,
writeBytes: writeBytes,
}
}

// IsWrite implements the RequestInfo interface.
func (tri *TestRequestInfo) IsWrite() bool {
return tri.isWrite
}

// WriteBytes implements the RequestInfo interface.
func (tri *TestRequestInfo) WriteBytes() uint64 {
return tri.writeBytes
}

// TestResponseInfo is used to test the response info interface.
type TestResponseInfo struct {
readBytes uint64
kvCPU time.Duration
succeed bool
}

func NewTestResponseInfo(readBytes uint64, kvCPU time.Duration, succeed bool) *TestResponseInfo {
return &TestResponseInfo{
readBytes: readBytes,
kvCPU: kvCPU,
succeed: succeed,
}
}

// ReadBytes implements the ResponseInfo interface.
func (tri *TestResponseInfo) ReadBytes() uint64 {
return tri.readBytes
}

// KVCPU implements the ResponseInfo interface.
func (tri *TestResponseInfo) KVCPU() time.Duration {
return tri.kvCPU
}

// Succeed implements the ResponseInfo interface.
func (tri *TestResponseInfo) Succeed() bool {
return tri.succeed
}
Loading

0 comments on commit 3e3ae55

Please sign in to comment.