Skip to content

Commit

Permalink
resource-manager: improve trace logs, ctl and metrics
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Dec 7, 2023
1 parent ad232d1 commit b477971
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 30 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4
Expand All @@ -31,7 +32,6 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config struct {
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`

// EnableControllerTrace is to control whether resource control client enable trace.
EnableControllerTrace bool `toml:"enable-controller-trace" json:"enable-controller-trace,string"`
}

// DefaultConfig returns the default resource manager controller configuration.
Expand All @@ -96,6 +99,7 @@ func DefaultConfig() *Config {
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
EnableControllerTrace: false,
}
}

Expand Down
52 changes: 35 additions & 17 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
Expand All @@ -54,6 +55,14 @@ const (
lowToken selectType = 1
)

var enableControllerTrace = atomicutil.NewBool(false)

func logControllerTrace(msg string, fields ...zap.Field) {
if enableControllerTrace.Load() {
log.Info(msg, fields...)
}
}

// ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store.
type ResourceGroupKVInterceptor interface {
// OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time.
Expand Down Expand Up @@ -369,6 +378,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
copyCfg := *c.ruConfig
c.safeRuConfig.Store(&copyCfg)
if enableControllerTrace.Load() != config.EnableControllerTrace {
enableControllerTrace.Store(config.EnableControllerTrace)
}
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
}

Expand Down Expand Up @@ -505,7 +517,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
c.responseDeadlineCh = c.run.responseDeadline.C
}
go func() {
log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
logControllerTrace("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source))
resp, err := c.provider.AcquireTokenBuckets(ctx, req)
latency := time.Since(now)
if err != nil {
Expand All @@ -518,7 +530,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context,
} else {
successfulTokenRequestDuration.Observe(latency.Seconds())
}
log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency))
c.tokenResponseChan <- resp
}()
}
Expand Down Expand Up @@ -603,10 +615,11 @@ type groupCostController struct {
calculators []ResourceCalculator
handleRespFunc func(*rmpb.TokenBucketResponse)

successfulRequestDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter
successfulRequestDuration prometheus.Observer
failedLimitReserveDuration prometheus.Observer
requestRetryCounter prometheus.Counter
failedRequestCounter prometheus.Counter
tokenRequestCounter prometheus.Counter

mu struct {
sync.Mutex
Expand Down Expand Up @@ -696,14 +709,15 @@ func newGroupCostController(
return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type")
}
gc := &groupCostController{
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name),
meta: group,
name: group.Name,
mainCfg: mainCfg,
mode: group.GetMode(),
successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name),
failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name),
failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name),
requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name),
tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name),
calculators: []ResourceCalculator{
newKVCalculator(mainCfg),
newSQLCalculator(mainCfg),
Expand Down Expand Up @@ -805,7 +819,7 @@ func (gc *groupCostController) updateRunState() {
}
*gc.run.consumption = *gc.mu.consumption
gc.mu.Unlock()
log.Debug("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption))
gc.run.now = newTime
}

Expand Down Expand Up @@ -886,7 +900,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() {
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}
Expand All @@ -900,7 +914,7 @@ func (gc *groupCostController) updateAvgRUPerSec() {
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}
Expand Down Expand Up @@ -1204,6 +1218,8 @@ func (gc *groupCostController) onRequestWait(
}
if d, err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
} else if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
}
case rmpb.GroupMode_RUMode:
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
Expand All @@ -1214,6 +1230,8 @@ func (gc *groupCostController) onRequestWait(
}
if d, err = WaitReservations(ctx, now, res); err == nil {
break retryLoop
} else if d.Seconds() > 0 {
gc.failedLimitReserveDuration.Observe(d.Seconds())
}
}
gc.requestRetryCounter.Inc()
Expand Down
24 changes: 13 additions & 11 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,11 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
ok bool
lim *Limiter
tokens float64
timeToAct time.Time
needWaitDurtion time.Duration
// This is the Limit at reservation time, it can change later.
limit Limit
}
Expand Down Expand Up @@ -301,7 +302,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
) {
lim.mu.Lock()
defer lim.mu.Unlock()
log.Debug("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst))
if args.NewBurst < 0 {
lim.last = now
lim.tokens = args.NewTokens
Expand All @@ -317,7 +318,7 @@ func (lim *Limiter) Reconfigure(now time.Time,
opt(lim)
}
lim.maybeNotify()
log.Debug("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst))
}

// AvailableTokens decreases the amount of tokens currently available.
Expand Down Expand Up @@ -358,9 +359,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur

// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
ok: ok,
lim: lim,
limit: lim.limit,
needWaitDurtion: waitDuration,
}
if ok {
r.tokens = n
Expand All @@ -372,7 +374,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
lim.tokens = tokens
lim.maybeNotify()
} else {
log.Debug("[resource group controller]", zap.Float64("current-tokens", lim.tokens), zap.Float64("current-rate", float64(lim.limit)), zap.Float64("request-tokens", n), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
logControllerTrace("[resource group controller]", zap.Duration("need-wait-duration", waitDuration), zap.Float64("current-tokens", lim.tokens), zap.Float64("current-rate", float64(lim.limit)), zap.Float64("request-tokens", n), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes))
lim.last = last
if lim.limit == 0 {
lim.notify()
Expand Down Expand Up @@ -452,7 +454,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv
for _, res := range reservations {
if !res.ok {
cancel()
return 0, errs.ErrClientResourceGroupThrottled
return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled
}
delay := res.DelayFrom(now)
if delay > longestDelayDuration {
Expand Down
2 changes: 1 addition & 1 deletion client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func TestCancel(t *testing.T) {
checkTokens(re, lim1, t2, 7)
checkTokens(re, lim2, t2, 2)
d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2})
re.Equal(d, time.Duration(0))
re.Equal(d, 4*time.Second)
re.Error(err)
checkTokens(re, lim1, t3, 13)
checkTokens(re, lim2, t3, 3)
Expand Down
9 changes: 9 additions & 0 deletions client/resource_group/controller/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ var (
Help: "Bucketed histogram of wait duration of successful request.",
}, []string{resourceGroupNameLabel})

failedLimitReserveDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: requestSubsystem,
Name: "limit_reserve_time_failed",
Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30
Help: "Bucketed histogram of wait duration of failed request.",
}, []string{resourceGroupNameLabel})

failedRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/resourcemanager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type ControllerConfig struct {
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`

// EnableControllerTrace is to control whether resource control client enable trace.
EnableControllerTrace bool `toml:"enable-controller-trace" json:"enable-controller-trace,string"`
}

// Adjust adjusts the configuration and initializes it with the default value if necessary.
Expand Down
112 changes: 112 additions & 0 deletions tools/pd-ctl/pdctl/command/resource_manager_command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package command

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"strconv"

"github.com/spf13/cobra"
)

const (
resourceManagerPrefix = "resource-manager/api/v1"
// flags
rmConfigController = "config/controller"
)

// NewResourceManagerCommand return a resource manager subcommand of rootCmd
func NewResourceManagerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "resource-manager <command> [flags]",
Short: "resource-manager commands",
}
cmd.AddCommand(newConfigCommand())
return cmd
}

func newConfigCommand() *cobra.Command {
r := &cobra.Command{
Use: "config",
Short: "config resource manager",
}
r.AddCommand(newConfigControllerCommand())
return r
}

func newConfigControllerCommand() *cobra.Command {
r := &cobra.Command{
Use: "controller",
Short: "config controller",
}
r.AddCommand(newConfigControllerSetCommand())
r.AddCommand(newConfigControllerShowCommand())
return r
}

func newConfigControllerSetCommand() *cobra.Command {
r := &cobra.Command{
Use: "set <key> <value>",
Short: "set controller config",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 2 {
cmd.Println(cmd.UsageString())
return
}

var val interface{}
val, err := strconv.ParseFloat(args[1], 64)
if err != nil {
val = args[1]
}
data := map[string]interface{}{args[0]: val}
jsonData, err := json.Marshal(data)
if err != nil {
cmd.Println(err)
return
}
resp, err := doRequest(cmd, fmt.Sprintf("%s/%s", resourceManagerPrefix, rmConfigController), http.MethodPost, http.Header{}, WithBody(bytes.NewBuffer(jsonData)))
if err != nil {
cmd.PrintErrln("Failed to set the config: ", err)
return
}
cmd.Println(resp)
},
}
return r
}

func newConfigControllerShowCommand() *cobra.Command {
r := &cobra.Command{
Use: "show",
Short: "show controller config",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 0 {
cmd.Println(cmd.UsageString())
return
}
resp, err := doRequest(cmd, fmt.Sprintf("%s/%s", resourceManagerPrefix, rmConfigController), http.MethodGet, http.Header{})
if err != nil {
cmd.Println(err)
return
}
cmd.Println(resp)
},
}
return r
}
1 change: 1 addition & 0 deletions tools/pd-ctl/pdctl/ctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func GetRootCmd() *cobra.Command {
command.NewUnsafeCommand(),
command.NewKeyspaceGroupCommand(),
command.NewKeyspaceCommand(),
command.NewResourceManagerCommand(),
)

rootCmd.Flags().ParseErrorsWhitelist.UnknownFlags = true
Expand Down

0 comments on commit b477971

Please sign in to comment.