Skip to content

Commit

Permalink
controller: fix error retry and add more metrics (#8219) (#8233)
Browse files Browse the repository at this point in the history
close #8217

controller: fix error retry and add more metrics

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ShuNing <nolouch@gmail.com>
Co-authored-by: nolouch <nolouch@gmail.com>
  • Loading branch information
ti-chi-bot and nolouch authored May 31, 2024
1 parent 514d183 commit cff590e
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 30 deletions.
71 changes: 45 additions & 26 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
request := gc.collectRequestAndConsumption(typ)
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
gc.tokenRequestCounter.Inc()
gc.metrics.tokenRequestCounter.Inc()
}
return true
})
Expand Down Expand Up @@ -614,13 +614,9 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
// metrics
metrics *groupMetricsCollection
mu struct {
sync.Mutex
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
Expand Down Expand Up @@ -667,6 +663,30 @@ type groupCostController struct {
tombstone bool
}

type groupMetricsCollection struct {
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounterWithOthers prometheus.Counter
failedRequestCounterWithThrottled prometheus.Counter
tokenRequestCounter prometheus.Counter
}

func initMetrics(oldName, name string) *groupMetricsCollection {
const (
otherType = "others"
throttledType = "throttled"
)
return &groupMetricsCollection{
successfulRequestDuration: successfulRequestDuration.WithLabelValues(oldName, name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(oldName, name),
failedRequestCounterWithOthers: failedRequestCounter.WithLabelValues(oldName, name, otherType),
failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType),
requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
}
}

type tokenCounter struct {
getTokenBucketFunc func() *rmpb.TokenBucket

Expand Down Expand Up @@ -707,16 +727,13 @@ func newGroupCostController(
default:
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
ms := initMetrics(group.Name, group.Name)
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name, group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name, group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name, group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name, group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name, group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
metrics: ms,
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -771,7 +788,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RUMode:
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand All @@ -785,7 +802,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RawMode:
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand Down Expand Up @@ -1215,7 +1232,7 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
Expand All @@ -1225,18 +1242,20 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.requestRetryCounter.Inc()
gc.metrics.requestRetryCounter.Inc()
time.Sleep(retryInterval)
waitDuration += retryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
gc.mu.Lock()
sub(gc.mu.consumption, delta)
Expand All @@ -1246,7 +1265,7 @@ func (gc *groupCostController) onRequestWait(
})
return nil, nil, waitDuration, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

Expand Down
15 changes: 15 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
)

func createTestGroupCostController(re *require.Assertions) *groupCostController {
Expand Down Expand Up @@ -117,3 +118,17 @@ func TestRequestAndResponseConsumption(t *testing.T) {
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}

func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
}
// The group is throttled
_, _, _, _, err := gc.onRequestWait(context.TODO(), req)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}
27 changes: 26 additions & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -81,6 +82,15 @@ type Limiter struct {
isLowProcess bool
// remainingNotifyTimes is used to limit notify when the speed limit is already set.
remainingNotifyTimes int
name string

// metrics
metrics *limiterMetricsCollection
}

// limiterMetricsCollection is a collection of metrics for a limiter.
type limiterMetricsCollection struct {
lowTokenNotifyCounter prometheus.Counter
}

// Limit returns the maximum overall event rate.
Expand All @@ -106,15 +116,19 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify

// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
lim := &Limiter{
name: name,
limit: Limit(cfg.NewRate),
last: now,
tokens: cfg.NewTokens,
burst: cfg.NewBurst,
notifyThreshold: cfg.NotifyThreshold,
lowTokensNotifyChan: lowTokensNotifyChan,
}
lim.metrics = &limiterMetricsCollection{
lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name),
}
log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
return lim
}
Expand Down Expand Up @@ -224,6 +238,14 @@ func (lim *Limiter) SetupNotificationThreshold(now time.Time, threshold float64)
lim.notifyThreshold = threshold
}

// SetName sets the name of the limiter.
func (lim *Limiter) SetName(name string) *Limiter {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.name = name
return lim
}

// notify tries to send a non-blocking notification on notifyCh and disables
// further notifications (until the next Reconfigure or StartNotification).
func (lim *Limiter) notify() {
Expand All @@ -234,6 +256,9 @@ func (lim *Limiter) notify() {
lim.isLowProcess = true
select {
case lim.lowTokensNotifyChan <- struct{}{}:
if lim.metrics != nil {
lim.metrics.lowTokenNotifyCounter.Inc()
}
default:
}
}
Expand Down
18 changes: 15 additions & 3 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
// TODO: remove old label in 8.x
resourceGroupNameLabel = "name"
newResourceGroupNameLabel = "resource_group"

errType = "type"
)

var (
Expand All @@ -40,7 +42,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "success",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600}, // 0.0005 ~ 1h
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -49,7 +51,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "limit_reserve_time_failed",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600, 86400}, // 0.0005 ~ 24h
Help: "Bucketed histogram of wait duration of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -59,7 +61,7 @@ var (
Subsystem: requestSubsystem,
Name: "fail",
Help: "Counter of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType})

requestRetryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -73,6 +75,7 @@ var (
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 8s
Name: "duration",
Help: "Bucketed histogram of latency(s) of token request.",
}, []string{"type"})
Expand All @@ -84,6 +87,14 @@ var (
Name: "resource_group",
Help: "Counter of token request by every resource group.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

lowTokenRequestNotifyCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Name: "low_token_notified",
Help: "Counter of low token request.",
}, []string{newResourceGroupNameLabel})
)

var (
Expand All @@ -100,4 +111,5 @@ func init() {
prometheus.MustRegister(requestRetryCounter)
prometheus.MustRegister(tokenRequestDuration)
prometheus.MustRegister(resourceGroupTokenRequestCounter)
prometheus.MustRegister(lowTokenRequestNotifyCounter)
}

0 comments on commit cff590e

Please sign in to comment.