diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index ffc360c385c..96c783455bb 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -52,6 +52,12 @@ const ( defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. defaultMaxWaitDuration = 30 * time.Second + // defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + defaultLTBTokenRPCMaxDelay = 1 * time.Second + // defaultWaitRetryTimes is the times to retry when waiting for the token. + defaultWaitRetryTimes = 20 + // defaultWaitRetryInterval is the interval to retry when waiting for the token. + defaultWaitRetryInterval = 50 * time.Millisecond ) const ( @@ -73,18 +79,36 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. - defaultDegradedModeWaitDuration = 0 + defaultDegradedModeWaitDuration = time.Duration(0) defaultAvgBatchProportion = 0.7 ) -// Config is the configuration of the resource manager controller which includes some option for client needed. -type Config struct { +// TokenRPCParams is the parameters for local bucket RPC. +type TokenRPCParams struct { + // WaitRetryInterval is the interval to retry when waiting for the token. + WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"` + + // WaitRetryTimes is the times to retry when waiting for the token. + WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"` +} + +// LocalBucketConfig is the configuration for local bucket. not export to server side. +type LocalBucketConfig struct { + TokenRPCParams `toml:"token-rpc-params" json:"token-rpc-params"` +} + +// BaseConfig is the configuration of the resource manager controller which includes some option for client needed. +// TODO: unified the configuration for client and server, server side in pkg/mcs/resourcemanger/config.go. +type BaseConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` // LTBMaxWaitDuration is the max wait time duration for local token bucket. LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + LTBTokenRPCMaxDelay Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -93,13 +117,43 @@ type Config struct { EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"` } +// Config is the configuration of the resource manager controller. +type Config struct { + BaseConfig + LocalBucketConfig +} + +// Adjust adjusts the configuration. +func (c *Config) Adjust() { + // valid the configuration, TODO: separately add the valid function. + if c.BaseConfig.LTBMaxWaitDuration.Duration == 0 { + c.BaseConfig.LTBMaxWaitDuration = NewDuration(defaultMaxWaitDuration) + } + if c.LocalBucketConfig.WaitRetryInterval.Duration == 0 { + c.LocalBucketConfig.WaitRetryInterval = NewDuration(defaultWaitRetryInterval) + } + // adjust the client settings. calculate the retry times. + if int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration) != int(c.LocalBucketConfig.WaitRetryInterval.Duration)*c.LocalBucketConfig.WaitRetryTimes { + c.LocalBucketConfig.WaitRetryTimes = int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration / c.LocalBucketConfig.WaitRetryInterval.Duration) + } +} + // DefaultConfig returns the default resource manager controller configuration. func DefaultConfig() *Config { return &Config{ - DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), - LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), - RequestUnit: DefaultRequestUnitConfig(), - EnableControllerTraceLog: false, + BaseConfig: BaseConfig{ + DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), + RequestUnit: DefaultRequestUnitConfig(), + EnableControllerTraceLog: false, + LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), + LTBTokenRPCMaxDelay: NewDuration(defaultLTBTokenRPCMaxDelay), + }, + LocalBucketConfig: LocalBucketConfig{ + TokenRPCParams: TokenRPCParams{ + WaitRetryInterval: NewDuration(defaultWaitRetryInterval), + WaitRetryTimes: defaultWaitRetryTimes, + }, + }, } } @@ -155,6 +209,8 @@ type RUConfig struct { // some config for client LTBMaxWaitDuration time.Duration + WaitRetryInterval time.Duration + WaitRetryTimes int DegradedModeWaitDuration time.Duration } @@ -176,6 +232,8 @@ func GenerateRUConfig(config *Config) *RUConfig { WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration, + WaitRetryInterval: config.WaitRetryInterval.Duration, + WaitRetryTimes: config.WaitRetryTimes, DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration, } } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index f86b0fa3c79..a95296cb34f 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -38,14 +38,12 @@ import ( ) const ( - controllerConfigPath = "resource_group/controller" - maxRetry = 10 - retryInterval = 50 * time.Millisecond - maxNotificationChanLen = 200 - needTokensAmplification = 1.1 - trickleReserveDuration = 1250 * time.Millisecond - - watchRetryInterval = 30 * time.Second + controllerConfigPath = "resource_group/controller" + maxNotificationChanLen = 200 + needTokensAmplification = 1.1 + trickleReserveDuration = 1250 * time.Millisecond + slowNotifyFilterDuration = 10 * time.Millisecond + watchRetryInterval = 30 * time.Second ) type selectType int @@ -104,6 +102,20 @@ func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { } } +// WithWaitRetryInterval is the option to set the retry interval when waiting for the token. +func WithWaitRetryInterval(d time.Duration) ResourceControlCreateOption { + return func(controller *ResourceGroupsController) { + controller.ruConfig.WaitRetryInterval = d + } +} + +// WithWaitRetryTimes is the option to set the times to retry when waiting for the token. +func WithWaitRetryTimes(times int) ResourceControlCreateOption { + return func(controller *ResourceGroupsController) { + controller.ruConfig.WaitRetryTimes = times + } +} + var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil) // ResourceGroupsController implements ResourceGroupKVInterceptor. @@ -119,7 +131,7 @@ type ResourceGroupsController struct { calculators []ResourceCalculator // When a signal is received, it means the number of available token is low. - lowTokenNotifyChan chan struct{} + lowTokenNotifyChan chan notifyMsg // When a token bucket response received from server, it will be sent to the channel. tokenResponseChan chan []*rmpb.TokenBucketResponse // When the token bucket of a resource group is updated, it will be sent to the channel. @@ -161,7 +173,7 @@ func NewResourceGroupController( clientUniqueID: clientUniqueID, provider: provider, ruConfig: ruConfig, - lowTokenNotifyChan: make(chan struct{}, 1), + lowTokenNotifyChan: make(chan notifyMsg, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), opts: opts, @@ -172,6 +184,7 @@ func NewResourceGroupController( log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} controller.safeRuConfig.Store(controller.ruConfig) + enableControllerTraceLog.Store(config.EnableControllerTraceLog) return controller, nil } @@ -180,12 +193,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con if err != nil { return nil, err } + config := DefaultConfig() + defer config.Adjust() kvs := resp.GetKvs() if len(kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") - return DefaultConfig(), nil + return config, nil } - config := &Config{} err = json.Unmarshal(kvs[0].GetValue(), config) if err != nil { return nil, err @@ -267,7 +281,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */, notifyMsg{}) } case <-watchRetryTimer.C: if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { @@ -288,7 +302,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { watchRetryTimer.Reset(watchRetryInterval) } } - case <-emergencyTokenAcquisitionTicker.C: c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) /* channels */ @@ -305,11 +318,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil - case <-c.lowTokenNotifyChan: + case notifyMsg := <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) if len(c.run.currentRequests) == 0 { - c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */) + c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg) } if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) @@ -366,10 +379,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } for _, item := range resp { cfgRevision = item.Kv.ModRevision - config := &Config{} + config := DefaultConfig() if err := json.Unmarshal(item.Kv.Value, config); err != nil { continue } + config.Adjust() c.ruConfig = GenerateRUConfig(config) // Stay compatible with serverless @@ -383,7 +397,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { } log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) } - case gc := <-c.tokenBucketUpdateChan: now := gc.run.now go gc.handleTokenBucketUpdateEvent(c.loopCtx, now) @@ -489,7 +502,7 @@ func (c *ResourceGroupsController) handleTokenBucketResponse(resp []*rmpb.TokenB } } -func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType) { +func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Context, source string, typ selectType, notifyMsg notifyMsg) { c.run.currentRequests = make([]*rmpb.TokenBucketRequest, 0) c.groupsController.Range(func(name, value any) bool { gc := value.(*groupCostController) @@ -501,11 +514,11 @@ func (c *ResourceGroupsController) collectTokenBucketRequests(ctx context.Contex return true }) if len(c.run.currentRequests) > 0 { - c.sendTokenBucketRequests(ctx, c.run.currentRequests, source) + c.sendTokenBucketRequests(ctx, c.run.currentRequests, source, notifyMsg) } } -func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string) { +func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, requests []*rmpb.TokenBucketRequest, source string, notifyMsg notifyMsg) { now := time.Now() req := &rmpb.TokenBucketsRequest{ Requests: requests, @@ -523,13 +536,16 @@ func (c *ResourceGroupsController) sendTokenBucketRequests(ctx context.Context, if err != nil { // Don't log any errors caused by the stopper canceling the context. if !errors.ErrorEqual(err, context.Canceled) { - log.L().Sugar().Infof("[resource group controller] token bucket rpc error: %v", err) + log.Error("[resource group controller] token bucket rpc error", zap.Error(err)) } resp = nil failedTokenRequestDuration.Observe(latency.Seconds()) } else { successfulTokenRequestDuration.Observe(latency.Seconds()) } + if !notifyMsg.startTime.IsZero() && time.Since(notifyMsg.startTime) > slowNotifyFilterDuration { + log.Warn("[resource group controller] slow token bucket request", zap.String("source", source), zap.Duration("cost", time.Since(notifyMsg.startTime))) + } logControllerTrace("[resource group controller] token bucket response", zap.Time("now", time.Now()), zap.Any("resp", resp), zap.String("source", source), zap.Duration("latency", latency)) c.tokenResponseChan <- resp }() @@ -625,7 +641,7 @@ type groupCostController struct { // fast path to make once token limit with un-limit burst. burstable *atomic.Bool - lowRUNotifyChan chan<- struct{} + lowRUNotifyChan chan<- notifyMsg tokenBucketUpdateChan chan<- *groupCostController // run contains the state that is updated by the main loop. @@ -715,7 +731,7 @@ type tokenCounter struct { func newGroupCostController( group *rmpb.ResourceGroup, mainCfg *RUConfig, - lowRUNotifyChan chan struct{}, + lowRUNotifyChan chan notifyMsg, tokenBucketUpdateChan chan *groupCostController, ) (*groupCostController, error) { switch group.Mode { @@ -834,7 +850,7 @@ func (gc *groupCostController) updateRunState() { } *gc.run.consumption = *gc.mu.consumption gc.mu.Unlock() - logControllerTrace("[resource group controller] update run state", zap.Any("request-unit-consumption", gc.run.consumption)) + logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption)) gc.run.now = newTime } @@ -1034,7 +1050,7 @@ func (gc *groupCostController) applyBasicConfigForRUTokenCounters() { cfg.NewRate = 99999999 }) counter.limiter.Reconfigure(gc.run.now, cfg, resetLowProcess()) - log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("resource-group", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) + log.Info("[resource group controller] resource token bucket enter degraded mode", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)])) } } @@ -1088,6 +1104,9 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket timerDuration = (trickleDuration + trickleReserveDuration) / 2 } counter.notify.mu.Lock() + if counter.notify.setupNotificationTimer != nil { + counter.notify.setupNotificationTimer.Stop() + } counter.notify.setupNotificationTimer = time.NewTimer(timerDuration) counter.notify.setupNotificationCh = counter.notify.setupNotificationTimer.C counter.notify.setupNotificationThreshold = 1 @@ -1222,7 +1241,7 @@ func (gc *groupCostController) onRequestWait( var i int var d time.Duration retryLoop: - for i = 0; i < maxRetry; i++ { + for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ { switch gc.mode { case rmpb.GroupMode_RawMode: res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) @@ -1246,8 +1265,8 @@ func (gc *groupCostController) onRequestWait( } } gc.metrics.requestRetryCounter.Inc() - time.Sleep(retryInterval) - waitDuration += retryInterval + time.Sleep(gc.mainCfg.WaitRetryInterval) + waitDuration += gc.mainCfg.WaitRetryInterval } if err != nil { if errs.ErrClientResourceGroupThrottled.Equal(err) { @@ -1260,7 +1279,7 @@ func (gc *groupCostController) onRequestWait( sub(gc.mu.consumption, delta) gc.mu.Unlock() failpoint.Inject("triggerUpdate", func() { - gc.lowRUNotifyChan <- struct{}{} + gc.lowRUNotifyChan <- notifyMsg{} }) return nil, nil, waitDuration, 0, err } diff --git a/client/resource_group/controller/controller_test.go b/client/resource_group/controller/controller_test.go index 9111dda852b..117a0bc4ece 100644 --- a/client/resource_group/controller/controller_test.go +++ b/client/resource_group/controller/controller_test.go @@ -45,7 +45,7 @@ func createTestGroupCostController(re *require.Assertions) *groupCostController JobTypes: []string{"lightning", "br"}, }, } - ch1 := make(chan struct{}) + ch1 := make(chan notifyMsg) ch2 := make(chan *groupCostController) gc, err := newGroupCostController(group, DefaultRUConfig(), ch1, ch2) re.NoError(err) diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index b2e2a03de70..0c164d4fc02 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -75,7 +75,7 @@ type Limiter struct { // last is the last time the limiter's tokens field was updated last time.Time notifyThreshold float64 - lowTokensNotifyChan chan<- struct{} + lowTokensNotifyChan chan<- notifyMsg // To prevent too many chan sent, the notifyThreshold is set to 0 after notify. // So the notifyThreshold cannot show whether the limiter is in the low token state, // isLowProcess is used to check it. @@ -88,6 +88,11 @@ type Limiter struct { metrics *limiterMetricsCollection } +// notifyMsg is a message to notify the low token state. +type notifyMsg struct { + startTime time.Time +} + // limiterMetricsCollection is a collection of metrics for a limiter. type limiterMetricsCollection struct { lowTokenNotifyCounter prometheus.Counter @@ -102,7 +107,7 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ limit: r, last: now, @@ -116,7 +121,7 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify // NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- struct{}) *Limiter { +func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan<- notifyMsg) *Limiter { lim := &Limiter{ name: name, limit: Limit(cfg.NewRate), @@ -136,13 +141,14 @@ func NewLimiterWithCfg(name string, now time.Time, cfg tokenBucketReconfigureArg // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. // A Reservation may be canceled, which may enable the Limiter to permit additional events. type Reservation struct { - ok bool - lim *Limiter - tokens float64 - timeToAct time.Time - needWaitDurtion time.Duration + ok bool + lim *Limiter + tokens float64 + timeToAct time.Time + needWaitDuration time.Duration // This is the Limit at reservation time, it can change later. limit Limit + err error } // OK returns whether the limiter can provide the requested number of tokens @@ -217,7 +223,8 @@ func (lim *Limiter) Reserve(ctx context.Context, waitDuration time.Duration, now select { case <-ctx.Done(): return &Reservation{ - ok: false, + ok: false, + err: ctx.Err(), } default: } @@ -254,7 +261,7 @@ func (lim *Limiter) notify() { lim.notifyThreshold = 0 lim.isLowProcess = true select { - case lim.lowTokensNotifyChan <- struct{}{}: + case lim.lowTokensNotifyChan <- notifyMsg{startTime: time.Now()}: if lim.metrics != nil { lim.metrics.lowTokenNotifyCounter.Inc() } @@ -386,10 +393,10 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur // Prepare reservation r := Reservation{ - ok: ok, - lim: lim, - limit: lim.limit, - needWaitDurtion: waitDuration, + ok: ok, + lim: lim, + limit: lim.limit, + needWaitDuration: waitDuration, } if ok { r.tokens = n @@ -412,7 +419,8 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur zap.Float64("notify-threshold", lim.notifyThreshold), zap.Bool("is-low-process", lim.isLowProcess), zap.Int64("burst", lim.burst), - zap.Int("remaining-notify-times", lim.remainingNotifyTimes)) + zap.Int("remaining-notify-times", lim.remainingNotifyTimes), + zap.String("name", lim.name)) } lim.last = last if lim.limit == 0 { @@ -493,7 +501,10 @@ func WaitReservations(ctx context.Context, now time.Time, reservations []*Reserv for _, res := range reservations { if !res.ok { cancel() - return res.needWaitDurtion, errs.ErrClientResourceGroupThrottled + if res.err != nil { + return res.needWaitDuration, res.err + } + return res.needWaitDuration, errs.ErrClientResourceGroupThrottled } delay := res.DelayFrom(now) if delay > longestDelayDuration { diff --git a/client/resource_group/controller/limiter_test.go b/client/resource_group/controller/limiter_test.go index d963f830551..155bb3c5a4e 100644 --- a/client/resource_group/controller/limiter_test.go +++ b/client/resource_group/controller/limiter_test.go @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo } func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 3, t1, true}) runReserveMax(t, lim, request{t0, 3, t4, true}) @@ -103,7 +103,7 @@ func TestSimpleReserve(t *testing.T) { func TestReconfig(t *testing.T) { re := require.New(t) - lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan notifyMsg, 1)) runReserveMax(t, lim, request{t0, 4, t2, true}) args := tokenBucketReconfigureArgs{ @@ -126,7 +126,7 @@ func TestReconfig(t *testing.T) { } func TestNotify(t *testing.T) { - nc := make(chan struct{}, 1) + nc := make(chan notifyMsg, 1) lim := NewLimiter(t0, 1, 0, 0, nc) args := tokenBucketReconfigureArgs{ @@ -147,7 +147,7 @@ func TestCancel(t *testing.T) { ctx := context.Background() ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) - nc := make(chan struct{}, 1) + nc := make(chan notifyMsg, 1) lim1 := NewLimiter(t0, 1, 0, 10, nc) lim2 := NewLimiter(t0, 1, 0, 0, nc) @@ -185,3 +185,16 @@ func TestCancel(t *testing.T) { checkTokens(re, lim1, t5, 15) checkTokens(re, lim2, t5, 5) } + +func TestCancelErrorOfReservation(t *testing.T) { + re := require.New(t) + nc := make(chan notifyMsg, 1) + lim := NewLimiter(t0, 10, 0, 10, nc) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + r := lim.Reserve(ctx, InfDuration, t0, 5) + d, err := WaitReservations(context.Background(), t0, []*Reservation{r}) + re.Equal(0*time.Second, d) + re.Error(err) + re.Contains(err.Error(), "context canceled") +} diff --git a/pkg/mcs/resourcemanager/server/config.go b/pkg/mcs/resourcemanager/server/config.go index d280799f2c5..5d203f59748 100644 --- a/pkg/mcs/resourcemanager/server/config.go +++ b/pkg/mcs/resourcemanager/server/config.go @@ -59,6 +59,8 @@ const ( defaultDegradedModeWaitDuration = time.Second * 0 // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. defaultMaxWaitDuration = 30 * time.Second + // defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + defaultLTBTokenRPCMaxDelay = 1 * time.Second ) // Config is the configuration for the resource manager. @@ -99,6 +101,9 @@ type ControllerConfig struct { // LTBMaxWaitDuration is the max wait time duration for local token bucket. LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC. + LTBTokenRPCMaxDelay typeutil.Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -112,10 +117,16 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { if rmc == nil { return } - rmc.RequestUnit.Adjust() - - configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) - configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) + rmc.RequestUnit.Adjust(meta.Child("request-unit")) + if !meta.IsDefined("degraded-mode-wait-duration") { + configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) + } + if !meta.IsDefined("ltb-max-wait-duration") { + configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) + } + if !meta.IsDefined("ltb-token-rpc-max-delay") { + configutil.AdjustDuration(&rmc.LTBTokenRPCMaxDelay, defaultLTBTokenRPCMaxDelay) + } failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) @@ -145,7 +156,7 @@ type RequestUnitConfig struct { } // Adjust adjusts the configuration and initializes it with the default value if necessary. -func (ruc *RequestUnitConfig) Adjust() { +func (ruc *RequestUnitConfig) Adjust(_ *configutil.ConfigMetaData) { if ruc == nil { return } @@ -202,11 +213,11 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error { configutil.AdjustCommandLineString(flagSet, &c.ListenAddr, "listen-addr") configutil.AdjustCommandLineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr") - return c.Adjust(meta, false) + return c.Adjust(meta) } // Adjust is used to adjust the resource manager configurations. -func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error { +func (c *Config) Adjust(meta *toml.MetaData) error { configMetaData := configutil.NewConfigMetadata(meta) if err := configMetaData.CheckUndecoded(); err != nil { c.WarningMsgs = append(c.WarningMsgs, err.Error()) diff --git a/pkg/mcs/resourcemanager/server/config_test.go b/pkg/mcs/resourcemanager/server/config_test.go index dd8dd2d2814..ae9dfc2cad3 100644 --- a/pkg/mcs/resourcemanager/server/config_test.go +++ b/pkg/mcs/resourcemanager/server/config_test.go @@ -28,6 +28,7 @@ func TestControllerConfig(t *testing.T) { cfgData := ` [controller] ltb-max-wait-duration = "60s" +ltb-token-rpc-max-delay = "500ms" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -39,11 +40,12 @@ read-cpu-ms-cost = 5.0 cfg := NewConfig() meta, err := toml.Decode(cfgData, &cfg) re.NoError(err) - err = cfg.Adjust(&meta, false) + err = cfg.Adjust(&meta) re.NoError(err) - re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) - re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) + re.Equal(2*time.Second, cfg.Controller.DegradedModeWaitDuration.Duration) + re.Equal(60*time.Second, cfg.Controller.LTBMaxWaitDuration.Duration) + re.Equal(500*time.Millisecond, cfg.Controller.LTBTokenRPCMaxDelay.Duration) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/tests/integrations/mcs/resourcemanager/resource_manager_test.go b/tests/integrations/mcs/resourcemanager/resource_manager_test.go index 41b637aba51..8ee41df1453 100644 --- a/tests/integrations/mcs/resourcemanager/resource_manager_test.go +++ b/tests/integrations/mcs/resourcemanager/resource_manager_test.go @@ -33,6 +33,7 @@ import ( "github.com/tikv/pd/client/resource_group/controller" "github.com/tikv/pd/pkg/mcs/resourcemanager/server" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/tests" "go.uber.org/goleak" @@ -1362,16 +1363,24 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh configURL := "/resource-manager/api/v1/config/controller" waitDuration := 10 * time.Second + tokenRPCMaxDelay := 2 * time.Second readBaseCost := 1.5 defaultCfg := controller.DefaultConfig() - // failpoint enableDegradedMode will setup and set it be 1s. - defaultCfg.DegradedModeWaitDuration.Duration = time.Second + expectCfg := server.ControllerConfig{ + // failpoint enableDegradedMode will setup and set it be 1s. + DegradedModeWaitDuration: typeutil.NewDuration(time.Second), + LTBMaxWaitDuration: typeutil.Duration(defaultCfg.LTBMaxWaitDuration), + LTBTokenRPCMaxDelay: typeutil.Duration(defaultCfg.LTBTokenRPCMaxDelay), + RequestUnit: server.RequestUnitConfig(defaultCfg.RequestUnit), + EnableControllerTraceLog: defaultCfg.EnableControllerTraceLog, + } expectRUCfg := controller.GenerateRUConfig(defaultCfg) + expectRUCfg.DegradedModeWaitDuration = time.Second // initial config verification respString := sendRequest("GET", getAddr()+configURL, nil) - defaultString, err := json.Marshal(defaultCfg) + expectStr, err := json.Marshal(expectCfg) re.NoError(err) - re.JSONEq(string(respString), string(defaultString)) + re.JSONEq(string(respString), string(expectStr)) re.EqualValues(expectRUCfg, c1.GetConfig()) testCases := []struct { @@ -1384,6 +1393,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh value: waitDuration, expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration }, }, + { + configJSON: fmt.Sprintf(`{"ltb-token-rpc-max-delay": "%v"}`, tokenRPCMaxDelay), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { + ruConfig.WaitRetryTimes = int(tokenRPCMaxDelay / ruConfig.WaitRetryInterval) + }, + }, { configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration), value: waitDuration, diff --git a/tests/pdctl/resourcemanager/resource_manager_command_test.go b/tests/pdctl/resourcemanager/resource_manager_command_test.go index ad43e0abca9..6396e436a8b 100644 --- a/tests/pdctl/resourcemanager/resource_manager_command_test.go +++ b/tests/pdctl/resourcemanager/resource_manager_command_test.go @@ -56,7 +56,7 @@ func (s *testResourceManagerSuite) TearDownSuite() { } func (s *testResourceManagerSuite) TestConfigController() { - expectCfg := server.ControllerConfig{} + expectCfg := server.Config{} expectCfg.Adjust(nil) // Show controller config checkShow := func() { @@ -67,7 +67,7 @@ func (s *testResourceManagerSuite) TestConfigController() { actualCfg := server.ControllerConfig{} err = json.Unmarshal(output, &actualCfg) s.Nil(err) - s.Equal(expectCfg, actualCfg) + s.Equal(expectCfg.Controller, actualCfg) } // Check default config @@ -78,20 +78,20 @@ func (s *testResourceManagerSuite) TestConfigController() { output, err := pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) s.Nil(err) s.Contains(string(output), "Success!") - expectCfg.LTBMaxWaitDuration = typeutil.Duration{Duration: 1 * time.Hour} + expectCfg.Controller.LTBMaxWaitDuration = typeutil.Duration{Duration: 1 * time.Hour} checkShow() args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "enable-controller-trace-log", "true"} output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) s.Nil(err) s.Contains(string(output), "Success!") - expectCfg.EnableControllerTraceLog = true + expectCfg.Controller.EnableControllerTraceLog = true checkShow() args = []string{"-u", s.pdAddr, "resource-manager", "config", "controller", "set", "write-base-cost", "2"} output, err = pdctl.ExecuteCommand(pdctlCmd.GetRootCmd(), args...) s.Nil(err) s.Contains(string(output), "Success!") - expectCfg.RequestUnit.WriteBaseCost = 2 + expectCfg.Controller.RequestUnit.WriteBaseCost = 2 checkShow() }