Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: fix error retry and add more metrics #8219

Merged
merged 4 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 45 additions & 26 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
request := gc.collectRequestAndConsumption(typ)
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
gc.tokenRequestCounter.Inc()
gc.metrics.tokenRequestCounter.Inc()
}
return true
})
Expand Down Expand Up @@ -632,13 +632,9 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
// metrics
metrics *groupMetricsCollection
mu struct {
sync.Mutex
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
Expand Down Expand Up @@ -685,6 +681,30 @@ type groupCostController struct {
tombstone bool
}

type groupMetricsCollection struct {
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounterWithOthers prometheus.Counter
failedRequestCounterWithThrottled prometheus.Counter
tokenRequestCounter prometheus.Counter
}

func initMetrics(oldName, name string) *groupMetricsCollection {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will we remove oldName?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe 8.5

const (
otherType = "others"
throttledType = "throttled"
)
return &groupMetricsCollection{
successfulRequestDuration: successfulRequestDuration.WithLabelValues(oldName, name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(oldName, name),
failedRequestCounterWithOthers: failedRequestCounter.WithLabelValues(oldName, name, otherType),
failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType),
requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
}
}

type tokenCounter struct {
getTokenBucketFunc func() *rmpb.TokenBucket

Expand Down Expand Up @@ -725,16 +745,13 @@ func newGroupCostController(
default:
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
ms := initMetrics(group.Name, group.Name)
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name, group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name, group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name, group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name, group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name, group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
metrics: ms,
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -789,7 +806,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RUMode:
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand All @@ -803,7 +820,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RawMode:
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(gc.name, now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand Down Expand Up @@ -1233,7 +1250,7 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
case rmpb.GroupMode_RUMode:
Expand All @@ -1243,18 +1260,20 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.requestRetryCounter.Inc()
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
gc.mu.Lock()
sub(gc.mu.consumption, delta)
Expand All @@ -1264,7 +1283,7 @@ func (gc *groupCostController) onRequestWait(
})
return nil, nil, waitDuration, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

Expand Down
15 changes: 15 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
)

func createTestGroupCostController(re *require.Assertions) *groupCostController {
Expand Down Expand Up @@ -117,3 +118,17 @@ func TestRequestAndResponseConsumption(t *testing.T) {
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}

func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
}
// The group is throttled
_, _, _, _, err := gc.onRequestWait(context.TODO(), req)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}
26 changes: 25 additions & 1 deletion client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -81,6 +82,15 @@ type Limiter struct {
isLowProcess bool
// remainingNotifyTimes is used to limit notify when the speed limit is already set.
remainingNotifyTimes int
name string

// metrics
metrics *limiterMetricsCollection
}

// limiterMetricsCollection is a collection of metrics for a limiter.
type limiterMetricsCollection struct {
lowTokenNotifyCounter prometheus.Counter
}

// Limit returns the maximum overall event rate.
Expand All @@ -106,7 +116,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify

// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter {
lim := &Limiter{
limit: Limit(cfg.NewRate),
nolouch marked this conversation as resolved.
Show resolved Hide resolved
last: now,
Expand All @@ -115,6 +125,9 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN
notifyThreshold: cfg.NotifyThreshold,
lowTokensNotifyChan: lowTokensNotifyChan,
}
lim.metrics = &limiterMetricsCollection{
lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name),
}
log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
return lim
}
Expand Down Expand Up @@ -224,6 +237,14 @@ func (lim *Limiter) SetupNotificationThreshold(threshold float64) {
lim.notifyThreshold = threshold
}

// SetName sets the name of the limiter.
func (lim *Limiter) SetName(name string) *Limiter {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.name = name
return lim
}

// notify tries to send a non-blocking notification on notifyCh and disables
// further notifications (until the next Reconfigure or StartNotification).
func (lim *Limiter) notify() {
Expand All @@ -234,6 +255,9 @@ func (lim *Limiter) notify() {
lim.isLowProcess = true
select {
case lim.lowTokensNotifyChan <- struct{}{}:
if lim.metrics != nil {
lim.metrics.lowTokenNotifyCounter.Inc()
}
default:
}
}
Expand Down
18 changes: 15 additions & 3 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
// TODO: remove old label in 8.x
resourceGroupNameLabel = "name"
newResourceGroupNameLabel = "resource_group"

errType = "type"
)

var (
Expand All @@ -40,7 +42,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "success",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600}, // 0.0005 ~ 1h
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -49,7 +51,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "limit_reserve_time_failed",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600, 86400}, // 0.0005 ~ 24h
Help: "Bucketed histogram of wait duration of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -59,7 +61,7 @@ var (
Subsystem: requestSubsystem,
Name: "fail",
Help: "Counter of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType})

requestRetryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -73,6 +75,7 @@ var (
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 8s
Name: "duration",
Help: "Bucketed histogram of latency(s) of token request.",
}, []string{"type"})
Expand All @@ -84,6 +87,14 @@ var (
Name: "resource_group",
Help: "Counter of token request by every resource group.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

lowTokenRequestNotifyCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Name: "low_token_notified",
Help: "Counter of low token request.",
}, []string{newResourceGroupNameLabel})
)

var (
Expand All @@ -100,4 +111,5 @@ func init() {
prometheus.MustRegister(requestRetryCounter)
prometheus.MustRegister(tokenRequestDuration)
prometheus.MustRegister(resourceGroupTokenRequestCounter)
prometheus.MustRegister(lowTokenRequestNotifyCounter)
}
Loading