Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

resource_control: supports dynamically change the controller config #7042

Merged
merged 11 commits into from
Sep 18, 2023
1 change: 1 addition & 0 deletions client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/tikv/pd/client
go 1.21

require (
github.com/BurntSushi/toml v0.3.1
github.com/cloudfoundry/gosigar v1.3.6
github.com/gogo/protobuf v1.3.2
github.com/opentracing/opentracing-go v1.2.0
Expand Down
1 change: 1 addition & 0 deletions client/go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
Expand Down
44 changes: 22 additions & 22 deletions client/resource_group/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
// According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15.
defaultTargetPeriod = 5 * time.Second
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
defaultMaxWaitDuration = time.Second
defaultMaxWaitDuration = 30 * time.Second
)

const (
Expand All @@ -73,14 +73,17 @@ const (

// Because the resource manager has not been deployed in microservice mode,
// do not enable this function.
defaultDegradedModeWaitDuration = "0s"
defaultDegradedModeWaitDuration = 0
defaultAvgBatchProportion = 0.7
)

// Config is the configuration of the resource manager controller which includes some option for client needed.
type Config struct {
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`

// LTBMaxWaitDuration is the max wait time duration for local token bucket.
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
Expand All @@ -90,7 +93,8 @@ type Config struct {
// DefaultConfig returns the default resource manager controller configuration.
func DefaultConfig() *Config {
return &Config{
DegradedModeWaitDuration: defaultDegradedModeWaitDuration,
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
RequestUnit: DefaultRequestUnitConfig(),
}
}
Expand Down Expand Up @@ -143,8 +147,10 @@ type RUConfig struct {
WriteBytesCost RequestUnit
CPUMsCost RequestUnit
// The CPU statistics need to distinguish between different environments.
isSingleGroupByKeyspace bool
maxWaitDuration time.Duration
isSingleGroupByKeyspace bool

// some config for client
LTBMaxWaitDuration time.Duration
DegradedModeWaitDuration time.Duration
}

Expand All @@ -157,21 +163,15 @@ func DefaultRUConfig() *RUConfig {

// GenerateRUConfig generates the configuration by the given request unit configuration.
func GenerateRUConfig(config *Config) *RUConfig {
cfg := &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
maxWaitDuration: defaultMaxWaitDuration,
}
duration, err := time.ParseDuration(config.DegradedModeWaitDuration)
if err != nil {
cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration)
} else {
cfg.DegradedModeWaitDuration = duration
return &RUConfig{
ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost),
ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost),
ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte),
WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost),
WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost),
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
}
return cfg
}
123 changes: 91 additions & 32 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ type ResourceGroupProvider interface {
ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error)
DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error)
AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error)
LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error)
LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error)

// meta storage client
Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error)
Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error)
}

// ResourceControlCreateOption create a ResourceGroupsController with the optional settings.
Expand All @@ -89,7 +91,7 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption {
// WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets.
func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
return func(controller *ResourceGroupsController) {
controller.ruConfig.maxWaitDuration = d
controller.ruConfig.LTBMaxWaitDuration = d
}
}

Expand Down Expand Up @@ -122,6 +124,11 @@ type ResourceGroupsController struct {
// Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`.
currentRequests []*rmpb.TokenBucketRequest
}

opts []ResourceControlCreateOption

// a cache for ru config and make concurrency safe.
safeRuConfig atomic.Pointer[RUConfig]
}

// NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor
Expand All @@ -139,7 +146,7 @@ func NewResourceGroupController(
if requestUnitConfig != nil {
config.RequestUnit = *requestUnitConfig
}
log.Info("load resource controller config", zap.Reflect("config", config))

ruConfig := GenerateRUConfig(config)
controller := &ResourceGroupsController{
clientUniqueID: clientUniqueID,
Expand All @@ -148,34 +155,37 @@ func NewResourceGroupController(
lowTokenNotifyChan: make(chan struct{}, 1),
tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1),
tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen),
opts: opts,
}
for _, opt := range opts {
opt(controller)
}
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
controller.safeRuConfig.Store(controller.ruConfig)
return controller, nil
}

func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) {
items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath)
resp, err := provider.Get(ctx, []byte(controllerConfigPath))
if err != nil {
return nil, err
}
if len(items) == 0 {
if len(resp.Kvs) == 0 {
log.Warn("[resource group controller] server does not save config, load config failed")
return DefaultConfig(), nil
}
config := &Config{}
err = json.Unmarshal(items[0].PayLoad, config)
err = json.Unmarshal(resp.Kvs[0].GetValue(), config)
if err != nil {
return nil, err
}
return config, nil
}

// GetConfig returns the config of controller. It's only used for test.
// GetConfig returns the config of controller.
func (c *ResourceGroupsController) GetConfig() *RUConfig {
return c.ruConfig
return c.safeRuConfig.Load()
}

// Source List
Expand Down Expand Up @@ -213,22 +223,63 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
stateUpdateTicker = time.NewTicker(time.Millisecond * 100)
})

_, revision, err := c.provider.LoadResourceGroups(ctx)
_, metaRevision, err := c.provider.LoadResourceGroups(ctx)
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
resp, err := c.provider.Get(ctx, []byte(controllerConfigPath))
if err != nil {
log.Warn("load resource group revision failed", zap.Error(err))
}
var watchChannel chan []*meta_storagepb.Event
cfgRevision := resp.GetHeader().GetRevision()
var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event
if !c.ruConfig.isSingleGroupByKeyspace {
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
}
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
if err == nil || c.ruConfig.isSingleGroupByKeyspace {
watchRetryTimer.Stop()

watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
}
watchRetryTimer := time.NewTimer(watchRetryInterval)
defer watchRetryTimer.Stop()

for {
select {
/* tickers */
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-watchRetryTimer.C:
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group meta failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
}
}
if watchConfigChannel == nil {
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
if err != nil {
log.Warn("watch resource group config failed", zap.Error(err))
watchRetryTimer.Reset(watchRetryInterval)
}
}

case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
/* channels */
case <-c.loopCtx.Done():
resourceGroupStatusGauge.Reset()
return
Expand All @@ -242,14 +293,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
c.handleTokenBucketResponse(resp)
}
c.run.currentRequests = nil
case <-cleanupTicker.C:
c.cleanUpResourceGroup()
case <-stateUpdateTicker.C:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
if len(c.run.currentRequests) == 0 {
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
}
case <-c.lowTokenNotifyChan:
c.executeOnAllGroups((*groupCostController).updateRunState)
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
Expand All @@ -259,24 +302,22 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
if c.run.inDegradedMode {
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
}
case <-emergencyTokenAcquisitionTicker.C:
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
case resp, ok := <-watchChannel:
case resp, ok := <-watchMetaChannel:
failpoint.Inject("disableWatch", func() {
if c.ruConfig.isSingleGroupByKeyspace {
panic("disableWatch")
}
})
if !ok {
watchChannel = nil
watchMetaChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
revision = item.Kv.ModRevision
metaRevision = item.Kv.ModRevision
group := &rmpb.ResourceGroup{}
if err := proto.Unmarshal(item.Kv.Value, group); err != nil {
continue
Expand All @@ -293,14 +334,32 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
}
}
}
case <-watchRetryTimer.C:
watchChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(revision), pd.WithPrefix())
if err != nil {
case resp, ok := <-watchConfigChannel:
if !ok {
watchConfigChannel = nil
watchRetryTimer.Reset(watchRetryInterval)
failpoint.Inject("watchStreamError", func() {
watchRetryTimer.Reset(20 * time.Millisecond)
})
continue
}
for _, item := range resp {
cfgRevision = item.Kv.ModRevision
config := &Config{}
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
continue
}
c.ruConfig = GenerateRUConfig(config)

// Stay compatible with serverless
for _, opt := range c.opts {
opt(c)
}
copyCfg := *c.ruConfig
c.safeRuConfig.Store(&copyCfg)
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
}

case gc := <-c.tokenBucketUpdateChan:
now := gc.run.now
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
Expand Down Expand Up @@ -1127,7 +1186,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
for typ, counter := range gc.run.resourceTokens {
if v := getRawResourceValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand All @@ -1137,7 +1196,7 @@ func (gc *groupCostController) onRequestWait(
res := make([]*Reservation, 0, len(requestUnitLimitTypeList))
for typ, counter := range gc.run.requestUnitTokens {
if v := getRUValueFromConsumption(delta, typ); v > 0 {
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v))
res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v))
}
}
if d, err = WaitReservations(ctx, now, res); err == nil {
Expand Down
Loading
Loading