Skip to content

Commit

Permalink
resource_manager/client: introduce RU and Request metrics (#6170)
Browse files Browse the repository at this point in the history
close #6136

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>
  • Loading branch information
CabinfeverB and ti-chi-bot committed Mar 22, 2023
1 parent f062cfa commit 0eb5850
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 16 deletions.
40 changes: 33 additions & 7 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/failpoint"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
Expand Down Expand Up @@ -192,6 +193,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
for {
select {
case <-c.loopCtx.Done():
resourceGroupStatusGauge.Reset()
return
case <-c.responseDeadlineCh:
c.run.inDegradedMode = true
Expand Down Expand Up @@ -257,7 +259,8 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
return gc, nil
}
// Initialize the resource group controller.
gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan)
gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.tokenBucketUpdateChan,
successfulRequestDuration.WithLabelValues(group.Name), failedRequestCounter.WithLabelValues(group.Name), resourceGroupTokenRequestCounter.WithLabelValues(group.Name))
if err != nil {
return nil, err
}
Expand All @@ -266,6 +269,7 @@ func (c *ResourceGroupsController) tryGetResourceGroup(ctx context.Context, name
// Check again to prevent initializing the same resource group concurrently.
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc)
if !loaded {
resourceGroupStatusGauge.WithLabelValues(name).Set(1)
log.Info("[resource group controller] create resource group cost controller", zap.String("name", group.GetName()))
}
return tmp.(*groupCostController), nil
Expand All @@ -284,6 +288,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err
resourceGroupName := key.(string)
if _, ok := latestGroups[resourceGroupName]; !ok {
c.groupsController.Delete(key)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName)
return true
}

Expand All @@ -295,6 +300,7 @@ func (c *ResourceGroupsController) cleanUpResourceGroup(ctx context.Context) err
if equalRU(latestConsumption, *gc.run.consumption) {
if gc.tombstone {
c.groupsController.Delete(resourceGroupName)
resourceGroupStatusGauge.DeleteLabelValues(resourceGroupName)
return true
}
gc.tombstone = true
Expand Down Expand Up @@ -361,6 +367,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
}
gc.tokenRequestCounter.Inc()
return true
})
if len(c.run.currentRequests) > 0 {
Expand All @@ -381,14 +388,18 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
go func() {
log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
resp, err := c.provider.AcquireTokenBuckets(ctx, req)
latency := time.Since(now)
if err != nil {
// Don't log any errors caused by the stopper canceling the context.
if !errors.ErrorEqual(err, context.Canceled) {
log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err)
}
resp = nil
failedTokenRequestDuration.Observe(latency.Seconds())
} else {
successfulTokenRequestDuration.Observe(latency.Seconds())
}
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", time.Since(now)))
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
c.tokenResponseChan <- resp
}()
}
Expand All @@ -399,6 +410,7 @@ func (c *ResourceGroupsController) OnRequestWait(
) (*rmpb.Consumption, error) {
gc, err := c.tryGetResourceGroup(ctx, resourceGroupName)
if err != nil {
failedRequestCounter.WithLabelValues(resourceGroupName).Inc()
return nil, err
}
return gc.onRequestWait(ctx, info)
Expand All @@ -424,6 +436,10 @@ type groupCostController struct {

handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
sync.Mutex
consumption *rmpb.Consumption
Expand Down Expand Up @@ -498,6 +514,8 @@ func newGroupCostController(
mainCfg *Config,
lowRUNotifyChan chan struct{},
tokenBucketUpdateChan chan *groupCostController,
successfulRequestDuration prometheus.Observer,
failedRequestCounter, tokenRequestCounter prometheus.Counter,
) (*groupCostController, error) {
switch group.Mode {
case rmpb.GroupMode_RUMode:
Expand All @@ -509,8 +527,11 @@ func newGroupCostController(
}

gc := &groupCostController{
ResourceGroup: group,
mainCfg: mainCfg,
ResourceGroup: group,
mainCfg: mainCfg,
successfulRequestDuration: successfulRequestDuration,
failedRequestCounter: failedRequestCounter,
tokenRequestCounter: tokenRequestCounter,
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -955,8 +976,10 @@ func (gc *groupCostController) onRequestWait(
if !gc.burstable.Load() {
var err error
now := time.Now()
var i int
var d time.Duration
retryLoop:
for i := 0; i < maxRetry; i++ {
for i = 0; i < maxRetry; i++ {
switch gc.mode {
case rmpb.GroupMode_RawMode:
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
Expand All @@ -965,7 +988,7 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
}
}
if err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
}
case rmpb.GroupMode_RUMode:
Expand All @@ -975,14 +998,17 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
}
}
if err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
}
}
time.Sleep(100 * time.Millisecond)
}
if err != nil {
gc.failedRequestCounter.Inc()
return nil, err
} else {
gc.successfulRequestDuration.Observe(d.Seconds())
}
}
gc.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController
}
ch1 := make(chan struct{})
ch2 := make(chan *groupCostController)
gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2)
gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2, successfulRequestDuration.WithLabelValues(group.Name), failedRequestCounter.WithLabelValues(group.Name), resourceGroupTokenRequestCounter.WithLabelValues(group.Name))
re.NoError(err)
return gc
}
Expand Down
12 changes: 6 additions & 6 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,9 @@ func (limit Limit) tokensFromDuration(d time.Duration) float64 {

// WaitReservations is used to process a series of reservations
// so that all limiter tokens are returned if one reservation fails
func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) error {
func WaitReservations(ctx context.Context, now time.Time, reservations []*Reservation) (time.Duration, error) {
if len(reservations) == 0 {
return nil
return 0, nil
}
cancel := func() {
for _, res := range reservations {
Expand All @@ -422,27 +422,27 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return errs.ErrClientResourceGroupThrottled
return 0, errs.ErrClientResourceGroupThrottled
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
longestDelayDuration = delay
}
}
if longestDelayDuration <= 0 {
return nil
return 0, nil
}
t := time.NewTimer(longestDelayDuration)
defer t.Stop()

select {
case <-t.C:
// We can proceed.
return nil
return longestDelayDuration, nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
cancel()
return ctx.Err()
return 0, ctx.Err()
}
}
5 changes: 3 additions & 2 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func TestCancel(t *testing.T) {
r2 := lim2.Reserve(ctx1, InfDuration, t1, 5)
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(d, time.Duration(0))
re.Error(err)
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
Expand All @@ -166,7 +167,7 @@ func TestCancel(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
err := WaitReservations(ctx2, t3, []*Reservation{r1, r2})
_, err := WaitReservations(ctx2, t3, []*Reservation{r1, r2})
re.Error(err)
wg.Done()
}()
Expand Down
82 changes: 82 additions & 0 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import "github.com/prometheus/client_golang/prometheus"

const (
namespace = "resource_manager_client"
requestSubsystem = "request"
tokenRequestSubsystem = "token_request"

resourceGroupNameLabel = "name"
)

var (
resourceGroupStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: "resource_group",
Name: "status",
Help: "Status of the resource group.",
}, []string{resourceGroupNameLabel})

successfulRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "success",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8), // 0.001 ~ 40.96
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel})

failedRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "fail",
Help: "Counter of failed request.",
}, []string{resourceGroupNameLabel})

tokenRequestDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Name: "duration",
Help: "Bucketed histogram of latency(s) of token request.",
}, []string{"type"})

resourceGroupTokenRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Name: "resource_group",
Help: "Counter of token request by every resource group.",
}, []string{resourceGroupNameLabel})
)

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
failedTokenRequestDuration = tokenRequestDuration.WithLabelValues("fail")
successfulTokenRequestDuration = tokenRequestDuration.WithLabelValues("success")
)

func init() {
prometheus.MustRegister(resourceGroupStatusGauge)
prometheus.MustRegister(successfulRequestDuration)
prometheus.MustRegister(failedRequestCounter)
prometheus.MustRegister(tokenRequestDuration)
prometheus.MustRegister(resourceGroupTokenRequestCounter)
}

0 comments on commit 0eb5850

Please sign in to comment.