From 5861e9705abfd111e3b2f9a10c696fa86698df7d Mon Sep 17 00:00:00 2001 From: ShuNing Date: Tue, 19 Sep 2023 05:25:40 +0800 Subject: [PATCH] This is an automated cherry-pick of #7042 close tikv/pd#7043 Signed-off-by: ti-chi-bot --- client/go.mod | 5 + client/go.sum | 1 + client/resource_group/controller/config.go | 48 +++-- .../resource_group/controller/controller.go | 173 ++++++++++++++++- client/resource_group/controller/util.go | 68 +++++++ client/resource_group/controller/util_test.go | 51 +++++ client/resource_manager_client.go | 16 +- .../resource_manager/server/apis/v1/api.go | 45 +++++ pkg/mcs/resource_manager/server/config.go | 6 + .../resource_manager/server/config_test.go | 2 + pkg/mcs/resource_manager/server/manager.go | 73 +++++++- pkg/storage/endpoint/resource_group.go | 6 + server/server.go | 5 +- .../resource_manager/resource_manager_test.go | 176 ++++++++++++++++++ 14 files changed, 653 insertions(+), 22 deletions(-) create mode 100644 client/resource_group/controller/util.go create mode 100644 client/resource_group/controller/util_test.go diff --git a/client/go.mod b/client/go.mod index 04508ed8e758..5e5a7fd8cef6 100644 --- a/client/go.mod +++ b/client/go.mod @@ -3,7 +3,12 @@ module github.com/tikv/pd/client go 1.20 require ( +<<<<<<< HEAD github.com/elastic/gosigar v0.14.2 +======= + github.com/BurntSushi/toml v0.3.1 + github.com/cloudfoundry/gosigar v1.3.6 +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) github.com/gogo/protobuf v1.3.2 github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c diff --git a/client/go.sum b/client/go.sum index 86b99315ecc8..8b35ed71bd8d 100644 --- a/client/go.sum +++ b/client/go.sum @@ -1,4 +1,5 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/client/resource_group/controller/config.go b/client/resource_group/controller/config.go index 9c9ff74baa50..f55e031b8d80 100644 --- a/client/resource_group/controller/config.go +++ b/client/resource_group/controller/config.go @@ -52,7 +52,7 @@ const ( // According to the resource control Grafana panel and Prometheus sampling period, the period should be the factor of 15. defaultTargetPeriod = 5 * time.Second // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. - defaultMaxWaitDuration = time.Second + defaultMaxWaitDuration = 30 * time.Second ) const ( @@ -67,23 +67,39 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. +<<<<<<< HEAD defaultDegradedModeWaitDuration = "0s" +======= + defaultDegradedModeWaitDuration = 0 + defaultAvgBatchProportion = 0.7 +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) ) // ControllerConfig is the configuration of the resource manager controller which includes some option for client needed. type ControllerConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. - DegradedModeWaitDuration string `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` } +<<<<<<< HEAD // DefaultControllerConfig returns the default resource manager controller configuration. func DefaultControllerConfig() *ControllerConfig { return &ControllerConfig{ DegradedModeWaitDuration: defaultDegradedModeWaitDuration, +======= +// DefaultConfig returns the default resource manager controller configuration. +func DefaultConfig() *Config { + return &Config{ + DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration), + LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration), +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) RequestUnit: DefaultRequestUnitConfig(), } } @@ -128,8 +144,10 @@ type Config struct { WriteBytesCost RequestUnit CPUMsCost RequestUnit // The CPU statistics need to distinguish between different environments. - isSingleGroupByKeyspace bool - maxWaitDuration time.Duration + isSingleGroupByKeyspace bool + + // some config for client + LTBMaxWaitDuration time.Duration DegradedModeWaitDuration time.Duration } @@ -140,6 +158,7 @@ func DefaultConfig() *Config { ) } +<<<<<<< HEAD // GenerateConfig generates the configuration by the given request unit configuration. func GenerateConfig(config *ControllerConfig) *Config { cfg := &Config{ @@ -149,12 +168,19 @@ func GenerateConfig(config *ControllerConfig) *Config { WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), maxWaitDuration: defaultMaxWaitDuration, +======= +// GenerateRUConfig generates the configuration by the given request unit configuration. +func GenerateRUConfig(config *Config) *RUConfig { + return &RUConfig{ + ReadBaseCost: RequestUnit(config.RequestUnit.ReadBaseCost), + ReadPerBatchBaseCost: RequestUnit(config.RequestUnit.ReadPerBatchBaseCost), + ReadBytesCost: RequestUnit(config.RequestUnit.ReadCostPerByte), + WriteBaseCost: RequestUnit(config.RequestUnit.WriteBaseCost), + WritePerBatchBaseCost: RequestUnit(config.RequestUnit.WritePerBatchBaseCost), + WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte), + CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost), + LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration, + DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration, +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) } - duration, err := time.ParseDuration(config.DegradedModeWaitDuration) - if err != nil { - cfg.DegradedModeWaitDuration, _ = time.ParseDuration(defaultDegradedModeWaitDuration) - } else { - cfg.DegradedModeWaitDuration = duration - } - return cfg } diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 6a08756e85d8..75a306bac623 100755 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -64,7 +64,15 @@ type ResourceGroupProvider interface { ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) +<<<<<<< HEAD LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]pd.GlobalConfigItem, int64, error) +======= + LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) + + // meta storage client + Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) + Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) } // ResourceControlCreateOption create a ResourceGroupsController with the optional settings. @@ -80,7 +88,11 @@ func EnableSingleGroupByKeyspace() ResourceControlCreateOption { // WithMaxWaitDuration is the option to set the max wait duration for acquiring token buckets. func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption { return func(controller *ResourceGroupsController) { +<<<<<<< HEAD controller.config.maxWaitDuration = d +======= + controller.ruConfig.LTBMaxWaitDuration = d +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) } } @@ -113,6 +125,11 @@ type ResourceGroupsController struct { // Currently, we don't do multiple `AcquireTokenBuckets`` at the same time, so there are no concurrency problems with `currentRequests`. currentRequests []*rmpb.TokenBucketRequest } + + opts []ResourceControlCreateOption + + // a cache for ru config and make concurrency safe. + safeRuConfig atomic.Pointer[RUConfig] } // NewResourceGroupController returns a new ResourceGroupsController which impls ResourceGroupKVInterceptor @@ -130,7 +147,12 @@ func NewResourceGroupController( if requestUnitConfig != nil { controllerConfig.RequestUnit = *requestUnitConfig } +<<<<<<< HEAD config := GenerateConfig(controllerConfig) +======= + + ruConfig := GenerateRUConfig(config) +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) controller := &ResourceGroupsController{ clientUniqueID: clientUniqueID, provider: provider, @@ -138,34 +160,57 @@ func NewResourceGroupController( lowTokenNotifyChan: make(chan struct{}, 1), tokenResponseChan: make(chan []*rmpb.TokenBucketResponse, 1), tokenBucketUpdateChan: make(chan *groupCostController, maxNotificationChanLen), + opts: opts, } for _, opt := range opts { opt(controller) } +<<<<<<< HEAD controller.calculators = []ResourceCalculator{newKVCalculator(controller.config), newSQLCalculator(controller.config)} return controller, nil } func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*ControllerConfig, error) { items, _, err := provider.LoadGlobalConfig(ctx, nil, controllerConfigPath) +======= + log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig)) + controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)} + controller.safeRuConfig.Store(controller.ruConfig) + return controller, nil +} + +func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Config, error) { + resp, err := provider.Get(ctx, []byte(controllerConfigPath)) +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) if err != nil { return nil, err } - if len(items) == 0 { + if len(resp.Kvs) == 0 { log.Warn("[resource group controller] server does not save config, load config failed") return DefaultControllerConfig(), nil } +<<<<<<< HEAD controllerConfig := &ControllerConfig{} err = json.Unmarshal(items[0].PayLoad, controllerConfig) +======= + config := &Config{} + err = json.Unmarshal(resp.Kvs[0].GetValue(), config) +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) if err != nil { return nil, err } return controllerConfig, nil } +<<<<<<< HEAD // GetConfig returns the config of controller. It's only used for test. func (c *ResourceGroupsController) GetConfig() *Config { return c.config +======= +// GetConfig returns the config of controller. +func (c *ResourceGroupsController) GetConfig() *RUConfig { + return c.safeRuConfig.Load() +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) } // Source List @@ -203,8 +248,66 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { stateUpdateTicker = time.NewTicker(time.Millisecond * 100) }) +<<<<<<< HEAD +======= + _, metaRevision, err := c.provider.LoadResourceGroups(ctx) + if err != nil { + log.Warn("load resource group revision failed", zap.Error(err)) + } + resp, err := c.provider.Get(ctx, []byte(controllerConfigPath)) + if err != nil { + log.Warn("load resource group revision failed", zap.Error(err)) + } + cfgRevision := resp.GetHeader().GetRevision() + var watchMetaChannel, watchConfigChannel chan []*meta_storagepb.Event + if !c.ruConfig.isSingleGroupByKeyspace { + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + } + } + + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) + } + watchRetryTimer := time.NewTimer(watchRetryInterval) + defer watchRetryTimer.Stop() + +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) for { select { + /* tickers */ + case <-cleanupTicker.C: + c.cleanUpResourceGroup() + case <-stateUpdateTicker.C: + c.executeOnAllGroups((*groupCostController).updateRunState) + c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) + if len(c.run.currentRequests) == 0 { + c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) + } + case <-watchRetryTimer.C: + if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil { + watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group meta failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + } + } + if watchConfigChannel == nil { + watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix()) + if err != nil { + log.Warn("watch resource group config failed", zap.Error(err)) + watchRetryTimer.Reset(watchRetryInterval) + } + } + + case <-emergencyTokenAcquisitionTicker.C: + c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) + /* channels */ case <-c.loopCtx.Done(): resourceGroupStatusGauge.Reset() return @@ -218,6 +321,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { c.handleTokenBucketResponse(resp) } c.run.currentRequests = nil +<<<<<<< HEAD case <-cleanupTicker.C: if err := c.cleanUpResourceGroup(c.loopCtx); err != nil { log.Error("[resource group controller] clean up resource groups failed", zap.Error(err)) @@ -228,6 +332,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if len(c.run.currentRequests) == 0 { c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */) } +======= +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) case <-c.lowTokenNotifyChan: c.executeOnAllGroups((*groupCostController).updateRunState) c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec) @@ -237,8 +343,69 @@ func (c *ResourceGroupsController) Start(ctx context.Context) { if c.run.inDegradedMode { c.executeOnAllGroups((*groupCostController).applyDegradedMode) } +<<<<<<< HEAD case <-emergencyTokenAcquisitionTicker.C: c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition) +======= + case resp, ok := <-watchMetaChannel: + failpoint.Inject("disableWatch", func() { + if c.ruConfig.isSingleGroupByKeyspace { + panic("disableWatch") + } + }) + if !ok { + watchMetaChannel = nil + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + continue + } + for _, item := range resp { + metaRevision = item.Kv.ModRevision + group := &rmpb.ResourceGroup{} + if err := proto.Unmarshal(item.Kv.Value, group); err != nil { + continue + } + switch item.Type { + case meta_storagepb.Event_PUT: + if item, ok := c.groupsController.Load(group.Name); ok { + gc := item.(*groupCostController) + gc.modifyMeta(group) + } + case meta_storagepb.Event_DELETE: + if _, ok := c.groupsController.LoadAndDelete(group.Name); ok { + resourceGroupStatusGauge.DeleteLabelValues(group.Name) + } + } + } + case resp, ok := <-watchConfigChannel: + if !ok { + watchConfigChannel = nil + watchRetryTimer.Reset(watchRetryInterval) + failpoint.Inject("watchStreamError", func() { + watchRetryTimer.Reset(20 * time.Millisecond) + }) + continue + } + for _, item := range resp { + cfgRevision = item.Kv.ModRevision + config := &Config{} + if err := json.Unmarshal(item.Kv.Value, config); err != nil { + continue + } + c.ruConfig = GenerateRUConfig(config) + + // Stay compatible with serverless + for _, opt := range c.opts { + opt(c) + } + copyCfg := *c.ruConfig + c.safeRuConfig.Store(©Cfg) + log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig)) + } + +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) case gc := <-c.tokenBucketUpdateChan: now := gc.run.now go gc.handleTokenBucketUpdateEvent(c.loopCtx, now) @@ -1018,7 +1185,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) for typ, counter := range gc.run.resourceTokens { if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { @@ -1028,7 +1195,7 @@ func (gc *groupCostController) onRequestWait( res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) for typ, counter := range gc.run.requestUnitTokens { if v := getRUValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.maxWaitDuration, now, v)) + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) } } if d, err = WaitReservations(ctx, now, res); err == nil { diff --git a/client/resource_group/controller/util.go b/client/resource_group/controller/util.go new file mode 100644 index 000000000000..e3450e0ae0d1 --- /dev/null +++ b/client/resource_group/controller/util.go @@ -0,0 +1,68 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "fmt" + "strconv" + "time" + + "github.com/pingcap/errors" +) + +// Duration is a wrapper of time.Duration for TOML and JSON. +type Duration struct { + time.Duration +} + +// NewDuration creates a Duration from time.Duration. +func NewDuration(duration time.Duration) Duration { + return Duration{Duration: duration} +} + +// MarshalJSON returns the duration as a JSON string. +func (d *Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.String())), nil +} + +// UnmarshalJSON parses a JSON string into the duration. +func (d *Duration) UnmarshalJSON(text []byte) error { + s, err := strconv.Unquote(string(text)) + if err != nil { + return errors.WithStack(err) + } + duration, err := time.ParseDuration(s) + if err != nil { + return errors.WithStack(err) + } + d.Duration = duration + return nil +} + +// UnmarshalText parses a TOML string into the duration. +func (d *Duration) UnmarshalText(text []byte) error { + var err error + d.Duration, err = time.ParseDuration(string(text)) + return errors.WithStack(err) +} + +// MarshalText returns the duration as a JSON string. +func (d Duration) MarshalText() ([]byte, error) { + return []byte(d.String()), nil +} diff --git a/client/resource_group/controller/util_test.go b/client/resource_group/controller/util_test.go new file mode 100644 index 000000000000..b542e6713dc1 --- /dev/null +++ b/client/resource_group/controller/util_test.go @@ -0,0 +1,51 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "testing" + + "github.com/BurntSushi/toml" + "github.com/stretchr/testify/require" +) + +type example struct { + Interval Duration `json:"interval" toml:"interval"` +} + +func TestDurationJSON(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`{"interval":"1h1m1s"}`) + re.NoError(json.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) + + b, err := json.Marshal(example) + re.NoError(err) + re.Equal(string(text), string(b)) +} + +func TestDurationTOML(t *testing.T) { + t.Parallel() + re := require.New(t) + example := &example{} + + text := []byte(`interval = "1h1m1s"`) + re.Nil(toml.Unmarshal(text, example)) + re.Equal(float64(60*60+60+1), example.Interval.Seconds()) +} diff --git a/client/resource_manager_client.go b/client/resource_manager_client.go index 90cfc977acf5..90c8fa8f7c14 100644 --- a/client/resource_manager_client.go +++ b/client/resource_manager_client.go @@ -31,15 +31,25 @@ import ( type actionType int const ( - add actionType = 0 - modify actionType = 1 - groupSettingsPathPrefix = "resource_group/settings" + add actionType = 0 + modify actionType = 1 + groupSettingsPathPrefix = "resource_group/settings" + controllerConfigPathPrefix = "resource_group/controller" // errNotPrimary is returned when the requested server is not primary. errNotPrimary = "not primary" // errNotLeader is returned when the requested server is not pd leader. errNotLeader = "not leader" ) +<<<<<<< HEAD +======= +// GroupSettingsPathPrefixBytes is used to watch or get resource groups. +var GroupSettingsPathPrefixBytes = []byte(groupSettingsPathPrefix) + +// ControllerConfigPathPrefixBytes is used to watch or get controller config. +var ControllerConfigPathPrefixBytes = []byte(controllerConfigPathPrefix) + +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)) // ResourceManagerClient manages resource group info and token request. type ResourceManagerClient interface { ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) diff --git a/pkg/mcs/resource_manager/server/apis/v1/api.go b/pkg/mcs/resource_manager/server/apis/v1/api.go index c6c0fc8e9e1d..7e7d59c1f290 100644 --- a/pkg/mcs/resource_manager/server/apis/v1/api.go +++ b/pkg/mcs/resource_manager/server/apis/v1/api.go @@ -16,7 +16,9 @@ package apis import ( "errors" + "fmt" "net/http" + "reflect" "sync" "github.com/gin-contrib/cors" @@ -29,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi" + "github.com/tikv/pd/pkg/utils/reflectutil" ) // APIPathPrefix is the prefix of the API path. @@ -96,6 +99,8 @@ func (s *Service) RegisterRouter() { configEndpoint.GET("/group/:name", s.getResourceGroup) configEndpoint.GET("/groups", s.getResourceGroupList) configEndpoint.DELETE("/group/:name", s.deleteResourceGroup) + configEndpoint.GET("/controller", s.getControllerConfig) + configEndpoint.POST("/controller", s.setControllerConfig) } func (s *Service) handler() http.Handler { @@ -190,3 +195,43 @@ func (s *Service) deleteResourceGroup(c *gin.Context) { } c.JSON(http.StatusOK, "Success!") } + +// GetControllerConfig +// +// @Tags ResourceManager +// @Summary Get the resource controller config. +// @Success 200 {string} json format of rmserver.ControllerConfig +// @Failure 400 {string} error +// @Router /config/controller [GET] +func (s *Service) getControllerConfig(c *gin.Context) { + config := s.manager.GetControllerConfig() + c.IndentedJSON(http.StatusOK, config) +} + +// SetControllerConfig +// +// @Tags ResourceManager +// @Summary Set the resource controller config. +// @Param config body object true "json params, rmserver.ControllerConfig" +// @Success 200 {string} string "Success!" +// @Failure 400 {string} error +// @Router /config/controller [POST] +func (s *Service) setControllerConfig(c *gin.Context) { + conf := make(map[string]interface{}) + if err := c.ShouldBindJSON(&conf); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + for k, v := range conf { + key := reflectutil.FindJSONFullTagByChildTag(reflect.TypeOf(rmserver.ControllerConfig{}), k) + if key == "" { + c.String(http.StatusBadRequest, fmt.Sprintf("config item %s not found", k)) + return + } + if err := s.manager.UpdateControllerConfigItem(key, v); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } + } + c.String(http.StatusOK, "Success!") +} diff --git a/pkg/mcs/resource_manager/server/config.go b/pkg/mcs/resource_manager/server/config.go index 9f7ee63231b4..d5e9c09602d2 100644 --- a/pkg/mcs/resource_manager/server/config.go +++ b/pkg/mcs/resource_manager/server/config.go @@ -51,6 +51,8 @@ const ( // Because the resource manager has not been deployed in microservice mode, // do not enable this function. defaultDegradedModeWaitDuration = time.Second * 0 + // defaultMaxWaitDuration is the max duration to wait for the token before throwing error. + defaultMaxWaitDuration = 30 * time.Second ) // Config is the configuration for the resource manager. @@ -85,6 +87,9 @@ type ControllerConfig struct { // EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect. DegradedModeWaitDuration typeutil.Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"` + // LTBMaxWaitDuration is the max wait time duration for local token bucket. + LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"` + // RequestUnit is the configuration determines the coefficients of the RRU and WRU cost. // This configuration should be modified carefully. RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"` @@ -98,6 +103,7 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) { rmc.RequestUnit.Adjust() configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration) + configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration) failpoint.Inject("enableDegradedMode", func() { configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second) }) diff --git a/pkg/mcs/resource_manager/server/config_test.go b/pkg/mcs/resource_manager/server/config_test.go index c0cac4da9c0c..dd8dd2d2814f 100644 --- a/pkg/mcs/resource_manager/server/config_test.go +++ b/pkg/mcs/resource_manager/server/config_test.go @@ -27,6 +27,7 @@ func TestControllerConfig(t *testing.T) { re := require.New(t) cfgData := ` [controller] +ltb-max-wait-duration = "60s" degraded-mode-wait-duration = "2s" [controller.request-unit] read-base-cost = 1.0 @@ -42,6 +43,7 @@ read-cpu-ms-cost = 5.0 re.NoError(err) re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2) + re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7) re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7) diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go index 7bb3148b75b8..c7b55b0d9da0 100644 --- a/pkg/mcs/resource_manager/server/manager.go +++ b/pkg/mcs/resource_manager/server/manager.go @@ -19,10 +19,12 @@ import ( "encoding/json" "math" "sort" + "strings" "sync" "time" "github.com/gogo/protobuf/proto" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" rmpb "github.com/pingcap/kvproto/pkg/resource_manager" "github.com/pingcap/log" @@ -30,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/jsonutil" "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) @@ -98,33 +101,54 @@ func (m *Manager) GetBasicServer() bs.Server { } // Init initializes the resource group manager. +<<<<<<< HEAD:pkg/mcs/resource_manager/server/manager.go func (m *Manager) Init(ctx context.Context) { // Todo: If we can modify following configs in the future, we should reload these configs. // Store the controller config into the storage. m.storage.SaveControllerConfig(m.controllerConfig) +======= +func (m *Manager) Init(ctx context.Context) error { + v, err := m.storage.LoadControllerConfig() + if err != nil { + log.Error("resource controller config load failed", zap.Error(err), zap.String("v", v)) + return err + } + if err = json.Unmarshal([]byte(v), &m.controllerConfig); err != nil { + log.Error("un-marshall controller config failed, fallback to default", zap.Error(err), zap.String("v", v)) + } + + // re-save the config to make sure the config has been persisted. + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + return err + } +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)):pkg/mcs/resourcemanager/server/manager.go // Load resource group meta info from storage. m.groups = make(map[string]*ResourceGroup) handler := func(k, v string) { group := &rmpb.ResourceGroup{} if err := proto.Unmarshal([]byte(v), group); err != nil { - log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + log.Error("failed to parse the resource group", zap.Error(err), zap.String("k", k), zap.String("v", v)) panic(err) } m.groups[group.Name] = FromProtoResourceGroup(group) } - m.storage.LoadResourceGroupSettings(handler) + if err := m.storage.LoadResourceGroupSettings(handler); err != nil { + return err + } // Load resource group states from storage. tokenHandler := func(k, v string) { tokens := &GroupStates{} if err := json.Unmarshal([]byte(v), tokens); err != nil { - log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + log.Error("failed to parse the resource group state", zap.Error(err), zap.String("k", k), zap.String("v", v)) panic(err) } if group, ok := m.groups[k]; ok { group.SetStatesIntoResourceGroup(tokens) } } - m.storage.LoadResourceGroupStates(tokenHandler) + if err := m.storage.LoadResourceGroupStates(tokenHandler); err != nil { + return err + } // Add default group if it's not inited. if _, ok := m.groups[reservedDefaultGroupName]; !ok { @@ -155,6 +179,47 @@ func (m *Manager) Init(ctx context.Context) { log.Info("resource group manager finishes initialization") } +// UpdateControllerConfigItem updates the controller config item. +func (m *Manager) UpdateControllerConfigItem(key string, value interface{}) error { + kp := strings.Split(key, ".") + if len(kp) == 0 { + return errors.Errorf("invalid key %s", key) + } + m.Lock() + var config interface{} + switch kp[0] { + case "request-unit": + config = &m.controllerConfig.RequestUnit + default: + config = m.controllerConfig + } + updated, found, err := jsonutil.AddKeyValue(config, kp[len(kp)-1], value) + if err != nil { + m.Unlock() + return err + } + + if !found { + m.Unlock() + return errors.Errorf("config item %s not found", key) + } + m.Unlock() + if updated { + if err := m.storage.SaveControllerConfig(m.controllerConfig); err != nil { + log.Error("save controller config failed", zap.Error(err)) + } + log.Info("updated controller config item", zap.String("key", key), zap.Any("value", value)) + } + return nil +} + +// GetControllerConfig returns the controller config. +func (m *Manager) GetControllerConfig() *ControllerConfig { + m.RLock() + defer m.RUnlock() + return m.controllerConfig +} + // AddResourceGroup puts a resource group. // NOTE: AddResourceGroup should also be idempotent because tidb depends // on this retry mechanism. diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index f1b3feb36aab..150ea77a1c7f 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -27,6 +27,7 @@ type ResourceGroupStorage interface { SaveResourceGroupStates(name string, obj interface{}) error DeleteResourceGroupStates(name string) error SaveControllerConfig(config interface{}) error + LoadControllerConfig() (string, error) } var _ ResourceGroupStorage = (*StorageEndpoint)(nil) @@ -65,3 +66,8 @@ func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { func (se *StorageEndpoint) SaveControllerConfig(config interface{}) error { return se.saveJSON(controllerConfigPath, config) } + +// LoadControllerConfig loads the resource controller config from storage. +func (se *StorageEndpoint) LoadControllerConfig() (string, error) { + return se.Load(controllerConfigPath) +} diff --git a/server/server.go b/server/server.go index fa4f329deadb..bcb03632fd9c 100644 --- a/server/server.go +++ b/server/server.go @@ -1582,7 +1582,10 @@ func (s *Server) campaignLeader() { log.Info("triggering the leader callback functions") for _, cb := range s.leaderCallbacks { - cb(ctx) + if err := cb(ctx); err != nil { + log.Error("failed to execute leader callback function", errs.ZapError(err)) + return + } } // Try to create raft cluster. diff --git a/tests/integrations/mcs/resource_manager/resource_manager_test.go b/tests/integrations/mcs/resource_manager/resource_manager_test.go index 99c97cd4d9d5..17c7296bece8 100644 --- a/tests/integrations/mcs/resource_manager/resource_manager_test.go +++ b/tests/integrations/mcs/resource_manager/resource_manager_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "strconv" "strings" @@ -1080,3 +1081,178 @@ func (suite *resourceManagerClientTestSuite) TestRemoveStaleResourceGroup() { re.NoError(failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/fastCleanup")) controller.Stop() } +<<<<<<< HEAD:tests/integrations/mcs/resource_manager/resource_manager_test.go +======= + +func (suite *resourceManagerClientTestSuite) TestSkipConsumptionForBackgroundJobs() { + re := suite.Require() + cli := suite.client + + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + + cfg := &controller.RequestUnitConfig{ + ReadBaseCost: 1, + ReadCostPerByte: 1, + WriteBaseCost: 1, + WriteCostPerByte: 1, + CPUMsCost: 1, + } + c, _ := controller.NewResourceGroupController(suite.ctx, 1, cli, cfg) + c.Start(suite.ctx) + + resourceGroupName := suite.initGroups[1].Name + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_default")) + // test fallback for nil. + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "")) + + resourceGroupName = "background_job" + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br")) + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) + // test fallback for nil. + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) + + // modify `Default` to check fallback. + resp, err := cli.ModifyResourceGroup(suite.ctx, &rmpb.ResourceGroup{ + Name: "default", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 1, + BurstLimit: -1, + }, + Tokens: 1, + }, + }, + BackgroundSettings: &rmpb.BackgroundSettings{JobTypes: []string{"lightning", "ddl"}}, + }) + re.NoError(err) + re.Contains(resp, "Success!") + // wait for watch event modify. + time.Sleep(time.Millisecond * 100) + + resourceGroupName = suite.initGroups[1].Name + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_default")) + // test fallback for `"lightning", "ddl"`. + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "")) + + resourceGroupName = "background_job" + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_br")) + re.True(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_lightning")) + // test fallback for `"lightning", "ddl"`. + re.False(c.IsBackgroundRequest(suite.ctx, resourceGroupName, "internal_ddl")) + + c.Stop() +} + +func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigChanged() { + re := suite.Require() + cli := suite.client + for _, group := range suite.initGroups { + resp, err := cli.AddResourceGroup(suite.ctx, group) + re.NoError(err) + re.Contains(resp, "Success!") + } + c1, err := controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + c1.Start(suite.ctx) + // with client option + c2, err := controller.NewResourceGroupController(suite.ctx, 2, cli, nil, controller.WithMaxWaitDuration(time.Hour)) + re.NoError(err) + c2.Start(suite.ctx) + // helper function for sending HTTP requests and checking responses + sendRequest := func(method, url string, body io.Reader) []byte { + req, err := http.NewRequest(method, url, body) + re.NoError(err) + resp, err := http.DefaultClient.Do(req) + re.NoError(err) + defer resp.Body.Close() + bytes, err := io.ReadAll(resp.Body) + re.NoError(err) + if resp.StatusCode != http.StatusOK { + re.Fail(string(bytes)) + } + return bytes + } + + getAddr := func() string { + server := suite.cluster.GetServer(suite.cluster.GetLeader()) + if rand.Intn(100)%2 == 1 { + server = suite.cluster.GetServer(suite.cluster.GetFollower()) + } + return server.GetAddr() + } + + configURL := "/resource-manager/api/v1/config/controller" + waitDuration := 10 * time.Second + readBaseCost := 1.5 + defaultCfg := controller.DefaultConfig() + // failpoint enableDegradedMode will setup and set it be 1s. + defaultCfg.DegradedModeWaitDuration.Duration = time.Second + expectRUCfg := controller.GenerateRUConfig(defaultCfg) + // initial config verification + respString := sendRequest("GET", getAddr()+configURL, nil) + defaultString, err := json.Marshal(defaultCfg) + re.NoError(err) + re.JSONEq(string(respString), string(defaultString)) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + testCases := []struct { + configJSON string + value interface{} + expected func(ruConfig *controller.RUConfig) + }{ + { + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration), + value: waitDuration, + expected: func(ruConfig *controller.RUConfig) { ruConfig.LTBMaxWaitDuration = waitDuration }, + }, + { + configJSON: fmt.Sprintf(`{"read-base-cost": %v}`, readBaseCost), + value: readBaseCost, + expected: func(ruConfig *controller.RUConfig) { ruConfig.ReadBaseCost = controller.RequestUnit(readBaseCost) }, + }, + { + configJSON: fmt.Sprintf(`{"write-base-cost": %v}`, readBaseCost*2), + value: readBaseCost * 2, + expected: func(ruConfig *controller.RUConfig) { ruConfig.WriteBaseCost = controller.RequestUnit(readBaseCost * 2) }, + }, + { + // reset the degraded-mode-wait-duration to default in test. + configJSON: fmt.Sprintf(`{"degraded-mode-wait-duration": "%v"}`, time.Second), + value: time.Second, + expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = time.Second }, + }, + } + // change properties one by one and verify each time + for _, t := range testCases { + sendRequest("POST", getAddr()+configURL, strings.NewReader(t.configJSON)) + time.Sleep(500 * time.Millisecond) + t.expected(expectRUCfg) + re.EqualValues(expectRUCfg, c1.GetConfig()) + + expectRUCfg2 := *expectRUCfg + // always apply the client option + expectRUCfg2.LTBMaxWaitDuration = time.Hour + re.EqualValues(&expectRUCfg2, c2.GetConfig()) + } + // restart c1 + c1.Stop() + c1, err = controller.NewResourceGroupController(suite.ctx, 1, cli, nil) + re.NoError(err) + re.EqualValues(expectRUCfg, c1.GetConfig()) +} +>>>>>>> 54eb4e495 (resource_control: supports dynamically change the controller config (#7042)):tests/integrations/mcs/resourcemanager/resource_manager_test.go