diff --git a/client/go.mod b/client/go.mod index 948a5c22b145..54be0c967653 100644 --- a/client/go.mod +++ b/client/go.mod @@ -13,6 +13,7 @@ require ( github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.11.1 github.com/stretchr/testify v1.8.2 + go.uber.org/atomic v1.10.0 go.uber.org/goleak v1.1.11 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230711005742-c3f37128e5a4 @@ -31,7 +32,6 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.26.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect - go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.17.0 // indirect golang.org/x/sys v0.13.0 // indirect diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 16a2525cd0d2..36dd52059f85 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -88,6 +88,9 @@ type Config struct { // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` + + // EnableControllerTrace is to control whether resource control client enable trace. + EnableControllerTrace bool `toml:"enable-controller-trace" json:"enable-controller-trace,string"` } // DefaultConfig returns the default resource manager controller configuration. @@ -96,6 +99,7 @@ func DefaultConfig() *Config { DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), RequestUnit: DefaultRequestUnitConfig(), + EnableControllerTrace: false, } } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 56d9ef9bd1b5..13fbf2f2e7cd 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -32,6 +32,7 @@ import ( "github.com/prometheus/client_golang/prometheus" pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/errs" + atomicutil "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/exp/slices" ) @@ -54,6 +55,14 @@ const ( lowToken selectType = 1 ) +var enableControllerTrace = atomicutil.NewBool(false) + +func logControllerTrace(msg string, fields ...zap.Field) { + if enableControllerTrace.Load() { + log.Info(msg, fields...) + } +} + // ResourceGroupKVInterceptor is used as quota limit controller for resource group using kv store. type ResourceGroupKVInterceptor interface { // OnRequestWait is used to check whether resource group has enough tokens. It maybe needs to wait some time. @@ -369,6 +378,9 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } copyCfg := *c.ruConfig c.safeRuConfig.Store(©Cfg) + if enableControllerTrace.Load() != config.EnableControllerTrace { + enableControllerTrace.Store(config.EnableControllerTrace) + } log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } @@ -505,7 +517,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, c.responseDeadlineCh = c.run.responseDeadline.C } go func() { - log.Debug("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) + logControllerTrace("[resource group controller] send token bucket request", zap.Time("now", now), zap.Any("req", req.Requests), zap.String("source", source)) resp, err := c.provider.AcquireTokenBuckets(ctx, req) latency := time.Since(now) if err != nil { @@ -518,7 +530,7 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, } else { successfulTokenRequestDuration.Observe(latency.Seconds()) } - log.Debug("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) + logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) c.tokenResponseChan <- resp }() } @@ -603,10 +615,11 @@ type groupCostController struct { calculators []ResourceCalculator handleRespFunc func(*rmpb.TokenBucketResponse) - successfulRequestDuration prometheus.Observer - requestRetryCounter prometheus.Counter - failedRequestCounter prometheus.Counter - tokenRequestCounter prometheus.Counter + successfulRequestDuration prometheus.Observer + failedLimitReserveDuration prometheus.Observer + requestRetryCounter prometheus.Counter + failedRequestCounter prometheus.Counter + tokenRequestCounter prometheus.Counter mu struct { sync.Mutex @@ -696,14 +709,15 @@ func newGroupCostController( return nil, errs.ErrClientResourceGroupConfigUnavailable.FastGenByArgs("not supports the resource type") } gc := &groupCostController{ - meta: group, - name: group.Name, - mainCfg: mainCfg, - mode: group.GetMode(), - successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name), - failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name), - requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name), - tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name), + meta: group, + name: group.Name, + mainCfg: mainCfg, + mode: group.GetMode(), + successfulRequestDuration: successfulRequestDuration.WithLabelValues(group.Name), + failedLimitReserveDuration: failedLimitReserveDuration.WithLabelValues(group.Name), + failedRequestCounter: failedRequestCounter.WithLabelValues(group.Name), + requestRetryCounter: requestRetryCounter.WithLabelValues(group.Name), + tokenRequestCounter: resourceGroupTokenRequestCounter.WithLabelValues(group.Name), calculators: []ResourceCalculator{ newKVCalculator(mainCfg), newSQLCalculator(mainCfg), @@ -805,7 +819,7 @@ func (gc *groupCostController) updateRunState() { } *gc.run.consumption = *gc.mu.consumption gc.mu.Unlock() - log.Debug("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) + logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) gc.run.now = newTime } @@ -886,7 +900,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() { if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) + logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) } gc.burstable.Store(isBurstable) } @@ -900,7 +914,7 @@ func (gc *groupCostController) updateAvgRUPerSec() { if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) { continue } - log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) + logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) } gc.burstable.Store(isBurstable) } @@ -1204,6 +1218,8 @@ func (gc *groupCostController) onRequestWait( } if d, err = WaitReservations(ctx, now, res); err == nil { break retryLoop + } else if d.Seconds() > 0 { + gc.failedLimitReserveDuration.Observe(d.Seconds()) } case rmpb.GroupMode_RUMode: res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) @@ -1214,6 +1230,8 @@ func (gc *groupCostController) onRequestWait( } if d, err = WaitReservations(ctx, now, res); err == nil { break retryLoop + } else if d.Seconds() > 0 { + gc.failedLimitReserveDuration.Observe(d.Seconds()) } } gc.requestRetryCounter.Inc() diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index f89ab17514c3..b54835b8f3af 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -122,10 +122,11 @@ func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensN // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. // A Reservation may be canceled, which may enable the Limiter to permit additional events. type Reservation struct { - ok bool - lim *Limiter - tokens float64 - timeToAct time.Time + ok bool + lim *Limiter + tokens float64 + timeToAct time.Time + needWaitDurtion time.Duration // This is the Limit at reservation time, it can change later. limit Limit } @@ -301,7 +302,7 @@ func (lim *Limiter) Reconfigure(now time.Time, ) { lim.mu.Lock() defer lim.mu.Unlock() - log.Debug("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) + logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", args.NotifyThreshold), zap.Int64("old-burst", lim.burst)) if args.NewBurst < 0 { lim.last = now lim.tokens = args.NewTokens @@ -317,7 +318,7 @@ func (lim *Limiter) Reconfigure(now time.Time, opt(lim) } lim.maybeNotify() - log.Debug("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) + logControllerTrace("[resource group controller] after reconfigure", zap.Float64("tokens", lim.tokens), zap.Float64("rate", float64(lim.limit)), zap.Float64("notify-threshold", args.NotifyThreshold), zap.Int64("burst", lim.burst)) } // AvailableTokens decreases the amount of tokens currently available. @@ -358,9 +359,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur // Prepare reservation r := Reservation{ - ok: ok, - lim: lim, - limit: lim.limit, + ok: ok, + lim: lim, + limit: lim.limit, + needWaitDurtion: waitDuration, } if ok { r.tokens = n @@ -372,7 +374,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur lim.tokens = tokens lim.maybeNotify() } else { - log.Debug("[resource group controller]", zap.Float64("current-tokens", lim.tokens), zap.Float64("current-rate", float64(lim.limit)), zap.Float64("request-tokens", n), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) + logControllerTrace("[resource group controller]", zap.Duration("need-wait-duration", waitDuration), zap.Float64("current-tokens", lim.tokens), zap.Float64("current-rate", float64(lim.limit)), zap.Float64("request-tokens", n), zap.Int64("burst", lim.burst), zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) lim.last = last if lim.limit == 0 { lim.notify() @@ -452,7 +454,7 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() - return 0, errs.ErrClientResourceGroupThrottled + return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled } delay := res.DelayFrom(now) if delay > longestDelayDuration { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index b8b96ae13d62..d963f8305512 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -161,7 +161,7 @@ func TestCancel(t *testing.T) { checkTokens(re, lim1, t2, 7) checkTokens(re, lim2, t2, 2) d, err := WaitReservations(ctx, t2, []*Reservation{r1, r2}) - re.Equal(d, time.Duration(0)) + re.Equal(d, 4*time.Second) re.Error(err) checkTokens(re, lim1, t3, 13) checkTokens(re, lim2, t3, 3) diff --git a/client/resource_group/controller/metrics.go b/client/resource_group/controller/metrics.go index 68eb26d03121..47e285ad775b 100644 --- a/client/resource_group/controller/metrics.go +++ b/client/resource_group/controller/metrics.go @@ -42,6 +42,15 @@ var ( Help: "Bucketed histogram of wait duration of successful request.", }, []string{resourceGroupNameLabel}) + failedLimitReserveDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: requestSubsystem, + Name: "limit_reserve_time_failed", + Buckets: []float64{.005, .01, .05, .1, .5, 1, 5, 10, 20, 25, 30}, // 0.005 ~ 30 + Help: "Bucketed histogram of wait duration of failed request.", + }, []string{resourceGroupNameLabel}) + failedRequestCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index 10e916128424..3214cc5250f7 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -102,6 +102,9 @@ type ControllerConfig struct { // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` + + // EnableControllerTrace is to control whether resource control client enable trace. + EnableControllerTrace bool `toml:"enable-controller-trace" json:"enable-controller-trace,string"` } // Adjust adjusts the configuration and initializes it with the default value if necessary. diff --git a/tools/pd-ctl/pdctl/command/resource_manager_command.go b/tools/pd-ctl/pdctl/command/resource_manager_command.go new file mode 100644 index 000000000000..ace7d1b1c8c7 --- /dev/null +++ b/tools/pd-ctl/pdctl/command/resource_manager_command.go @@ -0,0 +1,112 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "strconv" + + "github.com/spf13/cobra" +) + +const ( + resourceManagerPrefix = "resource-manager/api/v1" + // flags + rmConfigController = "config/controller" +) + +// NewResourceManagerCommand return a resource manager subcommand of rootCmd +func NewResourceManagerCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "resource-manager [flags]", + Short: "resource-manager commands", + } + cmd.AddCommand(newConfigCommand()) + return cmd +} + +func newConfigCommand() *cobra.Command { + r := &cobra.Command{ + Use: "config", + Short: "config resource manager", + } + r.AddCommand(newConfigControllerCommand()) + return r +} + +func newConfigControllerCommand() *cobra.Command { + r := &cobra.Command{ + Use: "controller", + Short: "config controller", + } + r.AddCommand(newConfigControllerSetCommand()) + r.AddCommand(newConfigControllerShowCommand()) + return r +} + +func newConfigControllerSetCommand() *cobra.Command { + r := &cobra.Command{ + Use: "set ", + Short: "set controller config", + Run: func(cmd *cobra.Command, args []string) { + if len(args) != 2 { + cmd.Println(cmd.UsageString()) + return + } + + var val interface{} + val, err := strconv.ParseFloat(args[1], 64) + if err != nil { + val = args[1] + } + data := map[string]interface{}{args[0]: val} + jsonData, err := json.Marshal(data) + if err != nil { + cmd.Println(err) + return + } + resp, err := doRequest(cmd, fmt.Sprintf("%s/%s", resourceManagerPrefix, rmConfigController), http.MethodPost, http.Header{}, WithBody(bytes.NewBuffer(jsonData))) + if err != nil { + cmd.PrintErrln("Failed to set the config: ", err) + return + } + cmd.Println(resp) + }, + } + return r +} + +func newConfigControllerShowCommand() *cobra.Command { + r := &cobra.Command{ + Use: "show", + Short: "show controller config", + Run: func(cmd *cobra.Command, args []string) { + if len(args) != 0 { + cmd.Println(cmd.UsageString()) + return + } + resp, err := doRequest(cmd, fmt.Sprintf("%s/%s", resourceManagerPrefix, rmConfigController), http.MethodGet, http.Header{}) + if err != nil { + cmd.Println(err) + return + } + cmd.Println(resp) + }, + } + return r +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index 86494c046eb9..7a3c540b2660 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -67,6 +67,7 @@ func GetRootCmd() *cobra.Command { command.NewUnsafeCommand(), command.NewKeyspaceGroupCommand(), command.NewKeyspaceCommand(), + command.NewResourceManagerCommand(), ) rootCmd.Flags().ParseErrorsWhitelist.UnknownFlags = true