Skip to content

Commit

Permalink
mcs: add a switch to dynamically enable scheduling service (#7595)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] committed Jan 17, 2024
1 parent 6472313 commit af84465
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 6 deletions.
18 changes: 18 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value interfa
case "label-property": // TODO: support changing label-property
case "keyspace":
return h.updateKeyspaceConfig(cfg, kp[len(kp)-1], value)
case "micro-service":
return h.updateMicroServiceConfig(cfg, kp[len(kp)-1], value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}
Expand All @@ -201,6 +203,22 @@ func (h *confHandler) updateKeyspaceConfig(config *config.Config, key string, va
return err
}

func (h *confHandler) updateMicroServiceConfig(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.MicroService, key, value)
if err != nil {
return err
}

if !found {
return errors.Errorf("config item %s not found", key)
}

if updated {
err = h.svr.SetMicroServiceConfig(config.MicroService)
}
return err
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value interface{}) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ var once sync.Once
func (c *RaftCluster) checkServices() {
if c.isAPIServiceMode {
servers, err := discovery.Discover(c.etcdClient, strconv.FormatUint(c.clusterID, 10), mcsutils.SchedulingServiceName)
if err != nil || len(servers) == 0 {
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.independentServices.Delete(mcsutils.SchedulingServiceName)
} else {
Expand Down
28 changes: 28 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type Config struct {

Keyspace KeyspaceConfig `toml:"keyspace" json:"keyspace"`

MicroService MicroServiceConfig `toml:"micro-service" json:"micro-service"`

Controller rm.ControllerConfig `toml:"controller" json:"controller"`
}

Expand Down Expand Up @@ -249,6 +251,8 @@ const (
defaultCheckRegionSplitInterval = 50 * time.Millisecond
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableSchedulingFallback = true
)

// Special keys for Labels
Expand Down Expand Up @@ -461,6 +465,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {

c.Keyspace.adjust(configMetaData.Child("keyspace"))

c.MicroService.adjust(configMetaData.Child("micro-service"))

c.Security.Encryption.Adjust()

if len(c.Log.Format) == 0 {
Expand Down Expand Up @@ -847,6 +853,28 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
}
}

// MicroServiceConfig is the configuration for micro service.
type MicroServiceConfig struct {
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
}

func (c *MicroServiceConfig) adjust(meta *configutil.ConfigMetaData) {
if !meta.IsDefined("enable-scheduling-fallback") {
c.EnableSchedulingFallback = defaultEnableSchedulingFallback
}
}

// Clone returns a copy of micro service config.
func (c *MicroServiceConfig) Clone() *MicroServiceConfig {
cfg := *c
return &cfg
}

// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to api service.
func (c *MicroServiceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}

// KeyspaceConfig is the configuration for keyspace management.
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
Expand Down
14 changes: 14 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type PersistOptions struct {
replicationMode atomic.Value
labelProperty atomic.Value
keyspace atomic.Value
microService atomic.Value
storeConfig atomic.Value
clusterVersion unsafe.Pointer
}
Expand All @@ -65,6 +66,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions {
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microService.Store(&cfg.MicroService)
// storeConfig will be fetched from TiKV later,
// set it to an empty config here first.
o.storeConfig.Store(&sc.StoreConfig{})
Expand Down Expand Up @@ -133,6 +135,16 @@ func (o *PersistOptions) SetKeyspaceConfig(cfg *KeyspaceConfig) {
o.keyspace.Store(cfg)
}

// GetMicroServiceConfig returns the micro service configuration.
func (o *PersistOptions) GetMicroServiceConfig() *MicroServiceConfig {
return o.microService.Load().(*MicroServiceConfig)
}

// SetMicroServiceConfig sets the micro service configuration.
func (o *PersistOptions) SetMicroServiceConfig(cfg *MicroServiceConfig) {
o.microService.Store(cfg)
}

// GetStoreConfig returns the store config.
func (o *PersistOptions) GetStoreConfig() *sc.StoreConfig {
return o.storeConfig.Load().(*sc.StoreConfig)
Expand Down Expand Up @@ -768,6 +780,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
ReplicationMode: *o.GetReplicationModeConfig(),
LabelProperty: o.GetLabelPropertyConfig(),
Keyspace: *o.GetKeyspaceConfig(),
MicroService: *o.GetMicroServiceConfig(),
ClusterVersion: *o.GetClusterVersion(),
},
StoreConfig: *o.GetStoreConfig(),
Expand Down Expand Up @@ -799,6 +812,7 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
o.replicationMode.Store(&cfg.ReplicationMode)
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microService.Store(&cfg.MicroService)
o.storeConfig.Store(&cfg.StoreConfig)
o.SetClusterVersion(&cfg.ClusterVersion)
}
Expand Down
22 changes: 22 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,7 @@ func (s *Server) GetConfig() *config.Config {
cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone()
cfg.ReplicationMode = *s.persistOptions.GetReplicationModeConfig()
cfg.Keyspace = *s.persistOptions.GetKeyspaceConfig().Clone()
cfg.MicroService = *s.persistOptions.GetMicroServiceConfig().Clone()
cfg.LabelProperty = s.persistOptions.GetLabelPropertyConfig().Clone()
cfg.ClusterVersion = *s.persistOptions.GetClusterVersion()
if s.storage == nil {
Expand Down Expand Up @@ -990,6 +991,27 @@ func (s *Server) SetKeyspaceConfig(cfg config.KeyspaceConfig) error {
return nil
}

// GetMicroServiceConfig gets the micro service config information.
func (s *Server) GetMicroServiceConfig() *config.MicroServiceConfig {
return s.persistOptions.GetMicroServiceConfig().Clone()
}

// SetMicroServiceConfig sets the micro service config information.
func (s *Server) SetMicroServiceConfig(cfg config.MicroServiceConfig) error {
old := s.persistOptions.GetMicroServiceConfig()
s.persistOptions.SetMicroServiceConfig(&cfg)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetMicroServiceConfig(old)
log.Error("failed to update micro service config",
zap.Reflect("new", cfg),
zap.Reflect("old", old),
errs.ZapError(err))
return err
}
log.Info("micro service config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old))
return nil
}

// GetScheduleConfig gets the balance config information.
func (s *Server) GetScheduleConfig() *sc.ScheduleConfig {
return s.persistOptions.GetScheduleConfig().Clone()
Expand Down
53 changes: 51 additions & 2 deletions tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,14 @@ func (suite *serverTestSuite) TestForwardStoreHeartbeat() {
})
}

func (suite *serverTestSuite) TestDynamicSwitch() {
func (suite *serverTestSuite) TestSchedulingServiceFallback() {
re := suite.Require()
// API server will execute scheduling jobs since there is no scheduler server.
leaderServer := suite.pdLeader.GetServer()
conf := leaderServer.GetMicroServiceConfig().Clone()
// Change back to the default value.
conf.EnableSchedulingFallback = true
leaderServer.SetMicroServiceConfig(*conf)
// API server will execute scheduling jobs since there is no scheduling server.
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
Expand Down Expand Up @@ -241,6 +246,50 @@ func (suite *serverTestSuite) TestDynamicSwitch() {
})
}

func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() {
re := suite.Require()

// API server will execute scheduling jobs since there is no scheduling server.
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
leaderServer := suite.pdLeader.GetServer()
// After Disabling scheduling service fallback, the API server will stop scheduling.
conf := leaderServer.GetMicroServiceConfig().Clone()
conf.EnableSchedulingFallback = false
leaderServer.SetMicroServiceConfig(*conf)
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
// Enable scheduling service fallback again, the API server will restart scheduling.
conf.EnableSchedulingFallback = true
leaderServer.SetMicroServiceConfig(*conf)
testutil.Eventually(re, func() bool {
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})

tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
// After scheduling server is started, API server will not execute scheduling jobs.
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
// Scheduling server is responsible for executing scheduling jobs.
testutil.Eventually(re, func() bool {
return tc.GetPrimaryServer().GetCluster().IsBackgroundJobsRunning()
})
// Disable scheduling service fallback and stop scheduling server. API server won't execute scheduling jobs again.
conf.EnableSchedulingFallback = false
leaderServer.SetMicroServiceConfig(*conf)
tc.GetPrimaryServer().Close()
time.Sleep(time.Second)
testutil.Eventually(re, func() bool {
return !suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
}

func (suite *serverTestSuite) TestSchedulerSync() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ func (suite *operatorTestSuite) checkTransferRegionWithPlacementRule(cluster *te
err := tu.CheckPostJSON(testDialClient, url, reqData, tu.StatusOK(re))
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
// wait for the scheduling server to update the config
tu.Eventually(re, func() bool {
return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled() == testCase.placementRuleEnable
})
Expand Down
2 changes: 1 addition & 1 deletion tests/server/api/rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,7 +1437,7 @@ func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCl
err = tu.CheckPostJSON(testDialClient, u, reqData, tu.StatusOK(re))
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
// wait for the scheduling server to update the config
tu.Eventually(re, func() bool {
return !sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled()
})
Expand Down
30 changes: 30 additions & 0 deletions tools/pd-ctl/tests/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,6 +1111,36 @@ func (suite *configTestSuite) checkPDServerConfig(cluster *pdTests.TestCluster)
re.Equal(int(3), conf.FlowRoundByDigit)
}

func (suite *configTestSuite) TestMicroServiceConfig() {
suite.env.RunTestInTwoModes(suite.checkMicroServiceConfig)
}

func (suite *configTestSuite) checkMicroServiceConfig(cluster *pdTests.TestCluster) {
re := suite.Require()
leaderServer := cluster.GetLeaderServer()
pdAddr := leaderServer.GetAddr()
cmd := ctl.GetRootCmd()

store := &metapb.Store{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
}
pdTests.MustPutStore(re, cluster, store)
svr := leaderServer.GetServer()
output, err := tests.ExecuteCommand(cmd, "-u", pdAddr, "config", "show", "all")
re.NoError(err)
cfg := config.Config{}
re.NoError(json.Unmarshal(output, &cfg))
re.True(svr.GetMicroServiceConfig().EnableSchedulingFallback)
re.True(cfg.MicroService.EnableSchedulingFallback)
// config set enable-scheduling-fallback <value>
args := []string{"-u", pdAddr, "config", "set", "enable-scheduling-fallback", "false"}
_, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
re.False(svr.GetMicroServiceConfig().EnableSchedulingFallback)
}

func assertBundles(re *require.Assertions, a, b []placement.GroupBundle) {
re.Len(b, len(a))
for i := 0; i < len(a); i++ {
Expand Down
2 changes: 1 addition & 1 deletion tools/pd-ctl/tests/operator/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (suite *operatorTestSuite) checkOperator(cluster *pdTests.TestCluster) {
_, err = tests.ExecuteCommand(cmd, "config", "set", "enable-placement-rules", "true")
re.NoError(err)
if sche := cluster.GetSchedulingPrimaryServer(); sche != nil {
// wait for the scheduler server to update the config
// wait for the scheduling server to update the config
testutil.Eventually(re, func() bool {
return sche.GetCluster().GetCheckerConfig().IsPlacementRulesEnabled()
})
Expand Down

0 comments on commit af84465

Please sign in to comment.