Skip to content

Commit

Permalink
resource_control: supports dynamically change the controller config (t…
Browse files Browse the repository at this point in the history
…ikv#7042)

close tikv#7043

resource_control: supports dynamically change the controller config
- supports dynamically changing the controller config
- export the `maxWaitDuration` for the local bucket limiter.

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

fix

Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Dec 6, 2023
1 parent 3846eac commit 22543de
Show file tree
Hide file tree
Showing 16 changed files with 478 additions and 85 deletions.
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
40 changes: 20 additions & 20 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const (
// According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15.
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = time.Second
defaultMaxWaitDuration = 30 * time.Second
)

const (
Expand All @@ -67,13 +67,16 @@ const (

// Because the resource manager has not been deployed in microservice mode,
// do not enable this function.
defaultDegradedModeWaitDuration = "0s"
defaultDegradedModeWaitDuration = 0
)

// Config is the configuration of the resource manager controller which includes some option for client needed.
type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
Expand All @@ -83,7 +86,8 @@ type Config struct {
// DefaultConfig returns the default resource manager controller configuration.
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
}
}
Expand Down Expand Up @@ -128,8 +132,10 @@ type RUConfig struct {
WriteBytesCost RequestUnit
CPUMsCost RequestUnit
// The CPU statistics need to distinguish between different environments.
isSingleGroupByKeyspace bool
maxWaitDuration time.Duration
isSingleGroupByKeyspace bool

// some config for client
LTBMaxWaitDuration time.Duration
DegradedModeWaitDuration time.Duration
}

Expand All @@ -142,19 +148,13 @@ func DefaultRUConfig() *RUConfig {

// GenerateRUConfig generates the configuration by the given request unit configuration.
func GenerateRUConfig(config *Config) *RUConfig {
cfg := &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
maxWaitDuration: defaultMaxWaitDuration,
}
duration, err := time.ParseDuration(config.DegradedModeWaitDuration)
if err != nil {
cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration)
} else {
cfg.DegradedModeWaitDuration = duration
return &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
}
return cfg
}
123 changes: 91 additions & 32 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ type ResourceGroupProvider interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)

// meta storage client
Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error)
Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error)
}

// ResourceControlCreateOption create a ResourceGroupsController with the optional settings.
Expand All @@ -85,7 +87,7 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption {
// WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.
func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
return func(controller *ResourceGroupsController) {
controller.ruConfig.maxWaitDuration = d
controller.ruConfig.LTBMaxWaitDuration = d
}
}

Expand Down Expand Up @@ -118,6 +120,11 @@ type ResourceGroupsController struct {
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`.
currentRequests []*rmpb.TokenBucketRequest
}

opts []ResourceControlCreateOption

// a cache for ru config and make concurrency safe.
safeRuConfig atomic.Pointer[RUConfig]
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
Expand All @@ -135,7 +142,7 @@ func NewResourceGroupController(
if requestUnitConfig != nil {
config.RequestUnit = *requestUnitConfig
}
log.Info("load resource controller config", zap.Reflect("config", config))

ruConfig := GenerateRUConfig(config)
controller := &ResourceGroupsController{
clientUniqueID: clientUniqueID,
Expand All @@ -144,34 +151,37 @@ func NewResourceGroupController(
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
}
for _, opt := range opts {
opt(controller)
}
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
controller.safeRuConfig.Store(controller.ruConfig)
return controller, nil
}

func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) {
items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath)
resp, err := provider.Get(ctx, []byte(controllerConfigPath))
if err != nil {
return nil, err
}
if len(items) == 0 {
if len(resp.Kvs) == 0 {
log.Warn("[resource group controller] server does not save config, load config failed")
return DefaultConfig(), nil
}
config := &Config{}
err = json.Unmarshal(items[0].PayLoad, config)
err = json.Unmarshal(resp.Kvs[0].GetValue(), config)
if err != nil {
return nil, err
}
return config, nil
}

// GetConfig returns the config of controller. It's only used for test.
// GetConfig returns the config of controller.
func (c *ResourceGroupsController) GetConfig() *RUConfig {
return c.ruConfig
return c.safeRuConfig.Load()
}

// Source List
Expand Down Expand Up @@ -209,22 +219,63 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
})

_, revision, err := c.provider.LoadResourceGroups(ctx)
_, metaRevision, err := c.provider.LoadResourceGroups(ctx)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
resp, err := c.provider.Get(ctx, []byte(controllerConfigPath))
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
var watchChannel chan []*meta_storagepb.Event
cfgRevision := resp.GetHeader().GetRevision()
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
if err == nil || c.ruConfig.isSingleGroupByKeyspace {
watchRetryTimer.Stop()

watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
defer watchRetryTimer.Stop()

for {
select {
/* tickers */
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
}
}
if watchConfigChannel == nil {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
}
}

case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
/* channels */
case <-c.loopCtx.Done():
resourceGroupStatusGauge.Reset()
return
Expand All @@ -238,14 +289,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.handleTokenBucketResponse(resp)
}
c.run.currentRequests = nil
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
Expand All @@ -255,24 +298,22 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
case resp, ok := <-watchChannel:
case resp, ok := <-watchMetaChannel:
failpoint.Inject("disableWatch", func() {
if c.ruConfig.isSingleGroupByKeyspace {
panic("disableWatch")
}
})
if !ok {
watchChannel = nil
watchMetaChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
revision = item.Kv.ModRevision
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
Expand All @@ -289,14 +330,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
}
}
case <-watchRetryTimer.C:
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
if err != nil {
case resp, ok := <-watchConfigChannel:
if !ok {
watchConfigChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
cfgRevision = item.Kv.ModRevision
config := &Config{}
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
continue
}
c.ruConfig = GenerateRUConfig(config)

// Stay compatible with serverless
for _, opt := range c.opts {
opt(c)
}
copyCfg := *c.ruConfig
c.safeRuConfig.Store(&copyCfg)
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
}

case gc := <-c.tokenBucketUpdateChan:
now := gc.run.now
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
Expand Down Expand Up @@ -1089,7 +1148,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand All @@ -1099,7 +1158,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand Down
Loading

0 comments on commit 22543de

Please sign in to comment.