Skip to content

Commit

Permalink
also return total wait duration in OnRequestWait
Browse files Browse the repository at this point in the history
Signed-off-by: glorv <glorvs@163.com>
  • Loading branch information
glorv committed Nov 30, 2023
1 parent 3191594 commit b19ebbf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 24 deletions.
15 changes: 9 additions & 6 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const (
// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, uint32, error)
OnRequestWait(ctx context.Context, resourceGroupName string, info RequestInfo) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error)
// OnResponse is used to consume tokens after receiving response.
OnResponse(resourceGroupName string, req RequestInfo, resp ResponseInfo) (*rmpb.Consumption, error)
// IsBackgroundRequest If the resource group has background jobs, we should not record consumption and wait for it.
Expand Down Expand Up @@ -526,10 +526,10 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
func (c *ResourceGroupsController) OnRequestWait(
ctx context.Context, resourceGroupName string, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) {
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
return nil, nil, 0, err
return nil, nil, time.Duration(0), 0, err

Check warning on line 532 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L532

Added line #L532 was not covered by tests
}
return gc.onRequestWait(ctx, info)
}
Expand Down Expand Up @@ -1176,7 +1176,7 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 {

func (gc *groupCostController) onRequestWait(
ctx context.Context, info RequestInfo,
) (*rmpb.Consumption, *rmpb.Consumption, uint32, error) {
) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.BeforeKVRequest(delta, info)
Expand All @@ -1185,6 +1185,7 @@ func (gc *groupCostController) onRequestWait(
gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()
var waitDuration time.Duration

if !gc.burstable.Load() {
var err error
Expand Down Expand Up @@ -1217,6 +1218,7 @@ func (gc *groupCostController) onRequestWait(
}
gc.requestRetryCounter.Inc()
time.Sleep(retryInterval)
waitDuration += retryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
Expand All @@ -1226,9 +1228,10 @@ func (gc *groupCostController) onRequestWait(
failpoint.Inject("triggerUpdate", func() {
gc.lowRUNotifyChan <- struct{}{}
})
return nil, nil, 0, err
return nil, nil, waitDuration, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

gc.mu.Lock()
Expand All @@ -1245,7 +1248,7 @@ func (gc *groupCostController) onRequestWait(
*gc.mu.storeCounter[info.StoreID()] = *gc.mu.globalCounter
gc.mu.Unlock()

return delta, penalty, gc.meta.Priority, nil
return delta, penalty, waitDuration, gc.meta.Priority, nil
}

func (gc *groupCostController) onResponse(
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestRequestAndResponseConsumption(t *testing.T) {
kvCalculator := gc.getKVCalculator()
for idx, testCase := range testCases {
caseNum := fmt.Sprintf("case %d", idx)
consumption, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req)
consumption, _, _, priority, err := gc.onRequestWait(context.TODO(), testCase.req)
re.NoError(err, caseNum)
re.Equal(priority, gc.meta.Priority)
expectedConsumption := &rmpb.Consumption{}
Expand Down
34 changes: 17 additions & 17 deletions tests/integrations/mcs/resourcemanager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,9 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
_, _, _, _, err := controller.OnRequestWait(suite.ctx, cas.resourceGroupName, rreq)
re.NoError(err)
_, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
_, _, _, _, err = controller.OnRequestWait(suite.ctx, cas.resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(cas.resourceGroupName, rreq, rres)
Expand All @@ -459,7 +459,7 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupController() {
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate", "return(true)"))
tcs := tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 900000000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq)
_, _, _, _, err = controller.OnRequestWait(suite.ctx, rg.Name, wreq)
re.Error(err)
time.Sleep(time.Millisecond * 200)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerUpdate"))
Expand Down Expand Up @@ -514,9 +514,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
wreq := tcs.makeWriteRequest()
rres := tcs.makeReadResponse()
wres := tcs.makeWriteResponse()
_, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
controller.OnResponse(resourceGroupName, rreq, rres)
controller.OnResponse(resourceGroupName, wreq, wres)
Expand Down Expand Up @@ -553,9 +553,9 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
rres := cas.tcs[i].makeReadResponse()
wres := cas.tcs[i].makeWriteResponse()
startTime := time.Now()
_, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
_, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName, rreq)
re.NoError(err)
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
_, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName, wreq)
re.NoError(err)
sum += time.Since(startTime)
controller.OnResponse(resourceGroupName, rreq, rres)
Expand All @@ -573,22 +573,22 @@ func (suite *resourceManagerClientTestSuite) TestSwitchBurst() {
resourceGroupName2 := suite.initGroups[2].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 100000, times: 1, waitDuration: 0}
wreq := tcs.makeWriteRequest()
_, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
_, _, _, _, err := controller.OnRequestWait(suite.ctx, resourceGroupName2, wreq)
re.NoError(err)

re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/acceleratedSpeedTrend", "return(true)"))
resourceGroupName3 := suite.initGroups[3].Name
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 1000, times: 1, waitDuration: 0}
wreq = tcs.makeWriteRequest()
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
re.NoError(err)
time.Sleep(110 * time.Millisecond)
tcs = tokenConsumptionPerSecond{rruTokensAtATime: 1, wruTokensAtATime: 10, times: 1010, waitDuration: 0}
duration := time.Duration(0)
for i := 0; i < tcs.times; i++ {
wreq = tcs.makeWriteRequest()
startTime := time.Now()
_, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
_, _, _, _, err = controller.OnRequestWait(suite.ctx, resourceGroupName3, wreq)
duration += time.Since(startTime)
re.NoError(err)
}
Expand Down Expand Up @@ -637,7 +637,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// init
req := controller.NewTestRequestInfo(false, 0, 2 /* store2 */)
resp := controller.NewTestResponseInfo(0, time.Duration(30), true)
_, penalty, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, _, _, err := c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
Expand All @@ -646,7 +646,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {

req = controller.NewTestRequestInfo(true, 60, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
Expand All @@ -656,7 +656,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// failed request, shouldn't be counted in penalty
req = controller.NewTestRequestInfo(true, 20, 1 /* store1 */)
resp = controller.NewTestResponseInfo(0, time.Duration(0), false)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
_, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
re.Equal(penalty.TotalCpuTimeMs, 0.0)
Expand All @@ -666,7 +666,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// from same store, should be zero
req1 := controller.NewTestRequestInfo(false, 0, 1 /* store1 */)
resp1 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
_, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req1)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req1, resp1)
Expand All @@ -675,7 +675,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// from different store, should be non-zero
req2 := controller.NewTestRequestInfo(true, 50, 2 /* store2 */)
resp2 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
_, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req2)
re.NoError(err)
re.Equal(penalty.WriteBytes, 60.0)
re.InEpsilon(penalty.TotalCpuTimeMs, 10.0/1000.0/1000.0, 1e-6)
Expand All @@ -685,7 +685,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
// from new store, should be zero
req3 := controller.NewTestRequestInfo(true, 0, 3 /* store3 */)
resp3 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
_, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req3)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req3, resp3)
Expand All @@ -695,7 +695,7 @@ func (suite *resourceManagerClientTestSuite) TestResourcePenalty() {
resourceGroupName = groupNames[1]
req4 := controller.NewTestRequestInfo(true, 50, 1 /* store2 */)
resp4 := controller.NewTestResponseInfo(0, time.Duration(10), true)
_, penalty, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
_, penalty, _, _, err = c.OnRequestWait(suite.ctx, resourceGroupName, req4)
re.NoError(err)
re.Equal(penalty.WriteBytes, 0.0)
_, err = c.OnResponse(resourceGroupName, req4, resp4)
Expand Down

0 comments on commit b19ebbf

Please sign in to comment.