From 602c10d218079fd455df008161832dc494bef3ad Mon Sep 17 00:00:00 2001 From: JmPotato Date: Tue, 15 Aug 2023 11:39:29 +0800 Subject: [PATCH] scheduling: watch the respective scheduler config and trim the prefix (#6955) ref tikv/pd#5839 - Make scheduling service watch the respective scheduler config. - Trim the key prefix inside the watcher. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/mcs/scheduling/server/config/config.go | 22 +++++ pkg/mcs/scheduling/server/config/watcher.go | 90 ++++++++++++++----- pkg/mcs/scheduling/server/rule/watcher.go | 80 ++++++++++------- pkg/mcs/scheduling/server/server.go | 9 +- pkg/storage/endpoint/key_path.go | 17 ++-- .../mcs/scheduling/config_test.go | 52 ++++++++++- .../integrations/mcs/scheduling/rule_test.go | 5 +- 7 files changed, 202 insertions(+), 73 deletions(-) diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index 7839ec7f274..ab26e1add29 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -19,6 +19,7 @@ import ( "os" "path/filepath" "strings" + "sync" "sync/atomic" "time" "unsafe" @@ -192,10 +193,13 @@ func (c *Config) validate() error { // PersistConfig wraps all configurations that need to persist to storage and // allows to access them safely. type PersistConfig struct { + // Store the global configuration that is related to the scheduling. clusterVersion unsafe.Pointer schedule atomic.Value replication atomic.Value storeConfig atomic.Value + // Store the respective configurations for different schedulers. + schedulerConfig sync.Map } // NewPersistConfig creates a new PersistConfig instance. @@ -253,6 +257,24 @@ func (o *PersistConfig) GetStoreConfig() *sc.StoreConfig { return o.storeConfig.Load().(*sc.StoreConfig) } +// SetSchedulerConfig sets the scheduler configuration with the given name. +func (o *PersistConfig) SetSchedulerConfig(name, data string) { + o.schedulerConfig.Store(name, data) +} + +// RemoveSchedulerConfig removes the scheduler configuration with the given name. +func (o *PersistConfig) RemoveSchedulerConfig(name string) { + o.schedulerConfig.Delete(name) +} + +// GetSchedulerConfig returns the scheduler configuration with the given name. +func (o *PersistConfig) GetSchedulerConfig(name string) string { + if v, ok := o.schedulerConfig.Load(name); ok { + return v.(string) + } + return "" +} + // GetMaxReplicas returns the max replicas. func (o *PersistConfig) GetMaxReplicas() int { return int(o.GetReplicationConfig().MaxReplicas) diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 81ec4b62f0c..c9010db69a3 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -17,11 +17,13 @@ package config import ( "context" "encoding/json" + "strings" "sync" "github.com/coreos/go-semver/semver" "github.com/pingcap/log" sc "github.com/tikv/pd/pkg/schedule/config" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" @@ -34,11 +36,20 @@ type Watcher struct { ctx context.Context cancel context.CancelFunc - etcdClient *clientv3.Client - watcher *etcdutil.LoopWatcher + // configPath is the path of the configuration in etcd: + // - Key: /pd/{cluster_id}/config + // - Value: configuration JSON. + configPath string + // schedulerConfigPathPrefix is the path prefix of the scheduler configuration in etcd: + // - Key: /pd/{cluster_id}/scheduler_config/{scheduler_name} + // - Value: configuration JSON. + schedulerConfigPathPrefix string + + etcdClient *clientv3.Client + configWatcher *etcdutil.LoopWatcher + schedulerConfigWatcher *etcdutil.LoopWatcher *PersistConfig - // TODO: watch the scheduler config change. } type persistedConfig struct { @@ -52,19 +63,30 @@ type persistedConfig struct { func NewWatcher( ctx context.Context, etcdClient *clientv3.Client, - // configPath is the path of the configuration in etcd: - // - Key: /pd/{cluster_id}/config - // - Value: configuration JSON. - configPath string, + clusterID uint64, persistConfig *PersistConfig, ) (*Watcher, error) { ctx, cancel := context.WithCancel(ctx) cw := &Watcher{ - ctx: ctx, - cancel: cancel, - etcdClient: etcdClient, - PersistConfig: persistConfig, + ctx: ctx, + cancel: cancel, + configPath: endpoint.ConfigPath(clusterID), + schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID), + etcdClient: etcdClient, + PersistConfig: persistConfig, + } + err := cw.initializeConfigWatcher() + if err != nil { + return nil, err } + err = cw.initializeSchedulerConfigWatcher() + if err != nil { + return nil, err + } + return cw, nil +} + +func (cw *Watcher) initializeConfigWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { cfg := &persistedConfig{} if err := json.Unmarshal(kv.Value, cfg); err != nil { @@ -84,21 +106,41 @@ func NewWatcher( postEventFn := func() error { return nil } - cw.watcher = etcdutil.NewLoopWatcher( - ctx, - &cw.wg, - etcdClient, - "scheduling-config-watcher", - configPath, - putFn, - deleteFn, - postEventFn, + cw.configWatcher = etcdutil.NewLoopWatcher( + cw.ctx, &cw.wg, + cw.etcdClient, + "scheduling-config-watcher", cw.configPath, + putFn, deleteFn, postEventFn, ) - cw.watcher.StartWatchLoop() - if err := cw.watcher.WaitLoad(); err != nil { - return nil, err + cw.configWatcher.StartWatchLoop() + return cw.configWatcher.WaitLoad() +} + +func (cw *Watcher) initializeSchedulerConfigWatcher() error { + prefixToTrim := cw.schedulerConfigPathPrefix + "/" + putFn := func(kv *mvccpb.KeyValue) error { + cw.SetSchedulerConfig( + strings.TrimPrefix(string(kv.Key), prefixToTrim), + string(kv.Value), + ) + return nil } - return cw, nil + deleteFn := func(kv *mvccpb.KeyValue) error { + cw.RemoveSchedulerConfig(strings.TrimPrefix(string(kv.Key), prefixToTrim)) + return nil + } + postEventFn := func() error { + return nil + } + cw.schedulerConfigWatcher = etcdutil.NewLoopWatcher( + cw.ctx, &cw.wg, + cw.etcdClient, + "scheduling-scheduler-config-watcher", cw.schedulerConfigPathPrefix, + putFn, deleteFn, postEventFn, + clientv3.WithPrefix(), + ) + cw.schedulerConfigWatcher.StartWatchLoop() + return cw.schedulerConfigWatcher.WaitLoad() } // Close closes the watcher. diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index c85644ff14f..cf0e1cd8ba1 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -16,6 +16,7 @@ package rule import ( "context" + "strings" "sync" "github.com/tikv/pd/pkg/storage/endpoint" @@ -44,7 +45,7 @@ func (rs *ruleStorage) LoadRules(f func(k, v string)) error { return nil } -// SaveRule stores a rule cfg to the rulesPath. +// SaveRule stores a rule cfg to the rulesPathPrefix. func (rs *ruleStorage) SaveRule(ruleKey string, rule interface{}) error { rs.rules.Store(ruleKey, rule) return nil @@ -104,6 +105,19 @@ type Watcher struct { cancel context.CancelFunc wg sync.WaitGroup + // rulesPathPrefix: + // - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id} + // - Value: placement.Rule + rulesPathPrefix string + // ruleGroupPathPrefix: + // - Key: /pd/{cluster_id}/rule_group/{group_id} + // - Value: placement.RuleGroup + ruleGroupPathPrefix string + // regionLabelPathPrefix: + // - Key: /pd/{cluster_id}/region_label/{rule_id} + // - Value: labeler.LabelRule + regionLabelPathPrefix string + etcdClient *clientv3.Client ruleStore *ruleStorage @@ -117,47 +131,45 @@ type Watcher struct { func NewWatcher( ctx context.Context, etcdClient *clientv3.Client, - // rulePath: - // - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id} - // - Value: placement.Rule - // ruleGroupPath: - // - Key: /pd/{cluster_id}/rule_group/{group_id} - // - Value: placement.RuleGroup - // regionLabelPath: - // - Key: /pd/{cluster_id}/region_label/{rule_id} - // - Value: labeler.LabelRule - rulesPath, ruleGroupPath, regionLabelPath string, + clusterID uint64, ) (*Watcher, error) { ctx, cancel := context.WithCancel(ctx) rw := &Watcher{ - ctx: ctx, - cancel: cancel, - etcdClient: etcdClient, - ruleStore: &ruleStorage{}, + ctx: ctx, + cancel: cancel, + rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), + ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), + regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), + etcdClient: etcdClient, + ruleStore: &ruleStorage{}, } - err := rw.initializeRuleWatcher(rulesPath) + err := rw.initializeRuleWatcher() if err != nil { return nil, err } - err = rw.initializeGroupWatcher(ruleGroupPath) + err = rw.initializeGroupWatcher() if err != nil { return nil, err } - err = rw.initializeRegionLabelWatcher(regionLabelPath) + err = rw.initializeRegionLabelWatcher() if err != nil { return nil, err } return rw, nil } -func (rw *Watcher) initializeRuleWatcher(rulePath string) error { +func (rw *Watcher) initializeRuleWatcher() error { + prefixToTrim := rw.rulesPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { // Since the PD API server will validate the rule before saving it to etcd, // so we could directly save the string rule in JSON to the storage here. - return rw.ruleStore.SaveRule(string(kv.Key), string(kv.Value)) + return rw.ruleStore.SaveRule( + strings.TrimPrefix(string(kv.Key), prefixToTrim), + string(kv.Value), + ) } deleteFn := func(kv *mvccpb.KeyValue) error { - return rw.ruleStore.DeleteRule(string(kv.Key)) + return rw.ruleStore.DeleteRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { return nil @@ -165,7 +177,7 @@ func (rw *Watcher) initializeRuleWatcher(rulePath string) error { rw.ruleWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, - "scheduling-rule-watcher", rulePath, + "scheduling-rule-watcher", rw.rulesPathPrefix, putFn, deleteFn, postEventFn, clientv3.WithPrefix(), ) @@ -173,12 +185,16 @@ func (rw *Watcher) initializeRuleWatcher(rulePath string) error { return rw.ruleWatcher.WaitLoad() } -func (rw *Watcher) initializeGroupWatcher(ruleGroupPath string) error { +func (rw *Watcher) initializeGroupWatcher() error { + prefixToTrim := rw.ruleGroupPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - return rw.ruleStore.SaveRuleGroup(string(kv.Key), string(kv.Value)) + return rw.ruleStore.SaveRuleGroup( + strings.TrimPrefix(string(kv.Key), prefixToTrim), + string(kv.Value), + ) } deleteFn := func(kv *mvccpb.KeyValue) error { - return rw.ruleStore.DeleteRuleGroup(string(kv.Key)) + return rw.ruleStore.DeleteRuleGroup(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { return nil @@ -186,7 +202,7 @@ func (rw *Watcher) initializeGroupWatcher(ruleGroupPath string) error { rw.groupWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, - "scheduling-rule-group-watcher", ruleGroupPath, + "scheduling-rule-group-watcher", rw.ruleGroupPathPrefix, putFn, deleteFn, postEventFn, clientv3.WithPrefix(), ) @@ -194,12 +210,16 @@ func (rw *Watcher) initializeGroupWatcher(ruleGroupPath string) error { return rw.groupWatcher.WaitLoad() } -func (rw *Watcher) initializeRegionLabelWatcher(regionLabelPath string) error { +func (rw *Watcher) initializeRegionLabelWatcher() error { + prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - return rw.ruleStore.SaveRegionRule(string(kv.Key), string(kv.Value)) + return rw.ruleStore.SaveRegionRule( + strings.TrimPrefix(string(kv.Key), prefixToTrim), + string(kv.Value), + ) } deleteFn := func(kv *mvccpb.KeyValue) error { - return rw.ruleStore.DeleteRegionRule(string(kv.Key)) + return rw.ruleStore.DeleteRegionRule(strings.TrimPrefix(string(kv.Key), prefixToTrim)) } postEventFn := func() error { return nil @@ -207,7 +227,7 @@ func (rw *Watcher) initializeRegionLabelWatcher(regionLabelPath string) error { rw.labelWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, - "scheduling-region-label-watcher", regionLabelPath, + "scheduling-region-label-watcher", rw.regionLabelPathPrefix, putFn, deleteFn, postEventFn, clientv3.WithPrefix(), ) diff --git a/pkg/mcs/scheduling/server/server.go b/pkg/mcs/scheduling/server/server.go index 845dbe38aa5..e55ddd5aca9 100644 --- a/pkg/mcs/scheduling/server/server.go +++ b/pkg/mcs/scheduling/server/server.go @@ -550,18 +550,13 @@ func (s *Server) startServer() (err error) { func (s *Server) startWatcher() (err error) { s.configWatcher, err = config.NewWatcher( - s.ctx, s.etcdClient, - endpoint.ConfigPath(s.clusterID), - s.persistConfig, + s.ctx, s.etcdClient, s.clusterID, s.persistConfig, ) if err != nil { return err } s.ruleWatcher, err = rule.NewWatcher( - s.ctx, s.etcdClient, - endpoint.RulesPath(s.clusterID), - endpoint.RuleGroupPath(s.clusterID), - endpoint.RegionLabelPath(s.clusterID), + s.ctx, s.etcdClient, s.clusterID, ) return err } diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index a463fc8acf6..0e99431044a 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -92,18 +92,23 @@ func ConfigPath(clusterID uint64) string { return path.Join(PDRootPath(clusterID), configPath) } -// RulesPath returns the path to save the placement rules. -func RulesPath(clusterID uint64) string { +// SchedulerConfigPathPrefix returns the path prefix to save the scheduler config. +func SchedulerConfigPathPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), customScheduleConfigPath) +} + +// RulesPathPrefix returns the path prefix to save the placement rules. +func RulesPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), rulesPath) } -// RuleGroupPath returns the path to save the placement rule groups. -func RuleGroupPath(clusterID uint64) string { +// RuleGroupPathPrefix returns the path prefix to save the placement rule groups. +func RuleGroupPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), ruleGroupPath) } -// RegionLabelPath returns the path to save the region label. -func RegionLabelPath(clusterID uint64) string { +// RegionLabelPathPrefix returns the path prefix to save the region label. +func RegionLabelPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), regionLabelPath) } diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index e2f2eeacfd1..e1c124b965f 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -22,7 +22,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/mcs/scheduling/server/config" sc "github.com/tikv/pd/pkg/schedule/config" - "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/versioninfo" "github.com/tikv/pd/tests" @@ -47,6 +47,7 @@ func TestConfig(t *testing.T) { func (suite *configTestSuite) SetupSuite() { re := suite.Require() + schedulers.Register() var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) suite.cluster, err = tests.NewTestAPICluster(suite.ctx, 1) @@ -56,6 +57,8 @@ func (suite *configTestSuite) SetupSuite() { leaderName := suite.cluster.WaitLeader() suite.pdLeaderServer = suite.cluster.GetServer(leaderName) re.NoError(suite.pdLeaderServer.BootstrapCluster()) + // Force the coordinator to be prepared to initialize the schedulers. + suite.pdLeaderServer.GetRaftCluster().GetCoordinator().GetPrepareChecker().SetPrepared() } func (suite *configTestSuite) TearDownSuite() { @@ -72,7 +75,7 @@ func (suite *configTestSuite) TestConfigWatch() { watcher, err := config.NewWatcher( suite.ctx, suite.pdLeaderServer.GetEtcdClient(), - endpoint.ConfigPath(suite.cluster.GetCluster().GetId()), + suite.cluster.GetCluster().GetId(), config.NewPersistConfig(config.NewConfig()), ) re.NoError(err) @@ -118,3 +121,48 @@ func persistConfig(re *require.Assertions, pdLeaderServer *tests.TestServer) { err := pdLeaderServer.GetPersistOptions().Persist(pdLeaderServer.GetServer().GetStorage()) re.NoError(err) } + +func (suite *configTestSuite) TestSchedulerConfigWatch() { + re := suite.Require() + + // Make sure the config is persisted before the watcher is created. + persistConfig(re, suite.pdLeaderServer) + // Create a config watcher. + watcher, err := config.NewWatcher( + suite.ctx, + suite.pdLeaderServer.GetEtcdClient(), + suite.cluster.GetCluster().GetId(), + config.NewPersistConfig(config.NewConfig()), + ) + re.NoError(err) + // Get all default scheduler names. + var ( + schedulerNames []string + schedulerController = suite.pdLeaderServer.GetRaftCluster().GetCoordinator().GetSchedulersController() + ) + testutil.Eventually(re, func() bool { + schedulerNames = schedulerController.GetSchedulerNames() + return len(schedulerNames) == len(sc.DefaultSchedulers) + }) + // Check all default schedulers' configs. + for _, schedulerName := range schedulerNames { + testutil.Eventually(re, func() bool { + return len(watcher.GetSchedulerConfig(schedulerName)) > 0 + }) + } + // Add a new scheduler. + err = suite.pdLeaderServer.GetServer().GetHandler().AddEvictLeaderScheduler(1) + re.NoError(err) + // Check the new scheduler's config. + testutil.Eventually(re, func() bool { + return len(watcher.GetSchedulerConfig(schedulers.EvictLeaderName)) > 0 + }) + // Remove the scheduler. + err = suite.pdLeaderServer.GetServer().GetHandler().RemoveScheduler(schedulers.EvictLeaderName) + re.NoError(err) + // Check the removed scheduler's config. + testutil.Eventually(re, func() bool { + return len(watcher.GetSchedulerConfig(schedulers.EvictLeaderName)) == 0 + }) + watcher.Close() +} diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index bdffb6b2bb9..68347366378 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -100,13 +100,10 @@ func (suite *ruleTestSuite) TestRuleWatch() { re := suite.Require() // Create a rule watcher. - clusterID := suite.cluster.GetCluster().GetId() watcher, err := rule.NewWatcher( suite.ctx, suite.pdLeaderServer.GetEtcdClient(), - endpoint.RulesPath(clusterID), - endpoint.RuleGroupPath(clusterID), - endpoint.RegionLabelPath(clusterID), + suite.cluster.GetCluster().GetId(), ) re.NoError(err) ruleStorage := watcher.GetRuleStorage()