Skip to content

Commit

Permalink
controller: fix error retry and add more metrics
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed May 27, 2024
1 parent 4cd42b3 commit 4fe26f4
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 29 deletions.
71 changes: 45 additions & 26 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex
request := gc.collectRequestAndConsumption(typ)
if request != nil {
c.run.currentRequests = append(c.run.currentRequests, request)
gc.tokenRequestCounter.Inc()
gc.metrics.tokenRequestCounter.Inc()
}
return true
})
Expand Down Expand Up @@ -632,13 +632,9 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
// metrics
metrics *groupMetricsCollection
mu struct {
sync.Mutex
consumption *rmpb.Consumption
storeCounter map[uint64]*rmpb.Consumption
Expand Down Expand Up @@ -685,6 +681,30 @@ type groupCostController struct {
tombstone bool
}

type groupMetricsCollection struct {
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounterWithOthers prometheus.Counter
failedRequestCounterWithThrottled prometheus.Counter
tokenRequestCounter prometheus.Counter
}

func initMetrics(oldName, name string) *groupMetricsCollection {
const (
otherType = "others"
throttledType = "throttled"
)
return &groupMetricsCollection{
successfulRequestDuration: successfulRequestDuration.WithLabelValues(oldName, name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(oldName, name),
failedRequestCounterWithOthers: failedRequestCounter.WithLabelValues(oldName, name, otherType),
failedRequestCounterWithThrottled: failedRequestCounter.WithLabelValues(oldName, name, throttledType),
requestRetryCounter: requestRetryCounter.WithLabelValues(oldName, name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(oldName, name),
}
}

type tokenCounter struct {
getTokenBucketFunc func() *rmpb.TokenBucket

Expand Down Expand Up @@ -725,16 +745,13 @@ func newGroupCostController(
default:
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
ms := initMetrics(group.Name, group.Name)
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name, group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name, group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name, group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name, group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name, group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
metrics: ms,
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -789,7 +806,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RUMode:
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(now, cfgFunc(getRUTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan).SetName(gc.name).SetupMetrics()
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand All @@ -803,7 +820,7 @@ func (gc *groupCostController) initRunState() {
case rmpb.GroupMode_RawMode:
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceLimitTypeList {
limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan)
limiter := NewLimiterWithCfg(now, cfgFunc(getRawResourceTokenBucketSetting(gc.meta, typ)), gc.lowRUNotifyChan).SetName(gc.name).SetupMetrics()

Check warning on line 823 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L823

Added line #L823 was not covered by tests
counter := &tokenCounter{
limiter: limiter,
avgRUPerSec: 0,
Expand Down Expand Up @@ -1233,7 +1250,7 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {

Check warning on line 1253 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L1253

Added line #L1253 was not covered by tests
break retryLoop
}
case rmpb.GroupMode_RUMode:
Expand All @@ -1243,18 +1260,20 @@ func (gc *groupCostController) onRequestWait(
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) {
break retryLoop
}
}
gc.requestRetryCounter.Inc()
gc.metrics.requestRetryCounter.Inc()
time.Sleep(gc.mainCfg.WaitRetryInterval)
waitDuration += gc.mainCfg.WaitRetryInterval
}
if err != nil {
gc.failedRequestCounter.Inc()
if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()

Check warning on line 1276 in client/resource_group/controller/controller.go

View check run for this annotation

Codecov / codecov/patch

client/resource_group/controller/controller.go#L1276

Added line #L1276 was not covered by tests
}
gc.mu.Lock()
sub(gc.mu.consumption, delta)
Expand All @@ -1264,7 +1283,7 @@ func (gc *groupCostController) onRequestWait(
})
return nil, nil, waitDuration, 0, err
}
gc.successfulRequestDuration.Observe(d.Seconds())
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

Expand Down
15 changes: 15 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/errs"
)

func createTestGroupCostController(re *require.Assertions) *groupCostController {
Expand Down Expand Up @@ -117,3 +118,17 @@ func TestRequestAndResponseConsumption(t *testing.T) {
re.Equal(expectedConsumption.TotalCpuTimeMs, consumption.TotalCpuTimeMs, caseNum)
}
}

func TestResourceGroupThrottledError(t *testing.T) {
re := require.New(t)
gc := createTestGroupCostController(re)
gc.initRunState()
req := &TestRequestInfo{
isWrite: true,
writeBytes: 10000000,
}
// The group is throttled
_, _, _, _, err := gc.onRequestWait(context.TODO(), req)
re.Error(err)
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
}
30 changes: 30 additions & 0 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/errs"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -81,6 +82,15 @@ type Limiter struct {
isLowProcess bool
// remainingNotifyTimes is used to limit notify when the speed limit is already set.
remainingNotifyTimes int
name string

// metrics
metrics *limiterMetricsCollection
}

// limiterMetricsCollection is a collection of metrics for a limiter.
type limiterMetricsCollection struct {
lowTokenNotifyCounter prometheus.Counter
}

// Limit returns the maximum overall event rate.
Expand Down Expand Up @@ -224,6 +234,23 @@ func (lim *Limiter) SetupNotificationThreshold(threshold float64) {
lim.notifyThreshold = threshold
}

// SetName sets the name of the limiter.
func (lim *Limiter) SetName(name string) *Limiter {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.name = name
return lim
}

func (lim *Limiter) SetupMetrics() *Limiter {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.metrics = &limiterMetricsCollection{
lowTokenNotifyCounter: lowTokenRequestNotifyCounter.WithLabelValues(lim.name),
}
return lim
}

// notify tries to send a non-blocking notification on notifyCh and disables
// further notifications (until the next Reconfigure or StartNotification).
func (lim *Limiter) notify() {
Expand All @@ -234,6 +261,9 @@ func (lim *Limiter) notify() {
lim.isLowProcess = true
select {
case lim.lowTokensNotifyChan <- struct{}{}:
if lim.metrics != nil {
lim.metrics.lowTokenNotifyCounter.Inc()
}
default:
}
}
Expand Down
17 changes: 14 additions & 3 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
// TODO: remove old label in 8.x
resourceGroupNameLabel = "name"
newResourceGroupNameLabel = "resource_group"

errType = "type"
)

var (
Expand All @@ -40,7 +42,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "success",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600}, // 0.0005 ~ 1h
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -49,7 +51,7 @@ var (
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "limit_reserve_time_failed",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Buckets: []float64{0.0005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30, 60, 600, 1800, 3600, 86400}, // 0.0005 ~ 24h
Help: "Bucketed histogram of wait duration of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

Expand All @@ -59,7 +61,7 @@ var (
Subsystem: requestSubsystem,
Name: "fail",
Help: "Counter of failed request.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel, errType})

requestRetryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -73,6 +75,7 @@ var (
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Buckets: prometheus.ExponentialBuckets(0.001, 2, 13), // 1ms ~ 8s
Name: "duration",
Help: "Bucketed histogram of latency(s) of token request.",
}, []string{"type"})
Expand All @@ -84,6 +87,14 @@ var (
Name: "resource_group",
Help: "Counter of token request by every resource group.",
}, []string{resourceGroupNameLabel, newResourceGroupNameLabel})

lowTokenRequestNotifyCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: tokenRequestSubsystem,
Name: "low_token_notified",
Help: "Counter of low token request.",
}, []string{newResourceGroupNameLabel})
)

var (
Expand Down

0 comments on commit 4fe26f4

Please sign in to comment.