Skip to content

Commit

Permalink
mcs: fix scheduler memory sync in api server (#7389)
Browse files Browse the repository at this point in the history
close #7388

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] authored Nov 20, 2023
1 parent 1e1817d commit 5a4e9ef
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 56 deletions.
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
log.Info("update scheduler config", zap.String("name", name),
zap.String("value", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration {
return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth)
}

// Prepare does some prepare work
func (s *BaseScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil }
// PrepareConfig does some prepare work about config.
func (s *BaseScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return nil }

// Cleanup does some cleanup work
func (s *BaseScheduler) Cleanup(cluster sche.SchedulerCluster) {}
// CleanConfig does some cleanup work about config.
func (s *BaseScheduler) CleanConfig(cluster sche.SchedulerCluster) {}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint6
}
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -251,7 +251,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWithRanges {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,15 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictStore := s.conf.evictStore()
if evictStore != 0 {
return cluster.SlowStoreEvicted(evictStore)
}
return nil
}

func (s *evictSlowStoreScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictStore())
suite.False(es2.conf.readyForRecovery())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}

func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePersistFail() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,15 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictedStoreID := s.conf.evictedStore()
if evictedStoreID == 0 {
return nil
}
return cluster.SlowTrendEvicted(evictedStoreID)
}

func (s *evictSlowTrendScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,10 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendPrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictedStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictedStore())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error {
return nil
}

func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -209,7 +209,7 @@ func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *grantLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWithRanges {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Scheduler interface {
ReloadConfig() error
GetMinInterval() time.Duration
GetNextInterval(interval time.Duration) time.Duration
Prepare(cluster sche.SchedulerCluster) error
Cleanup(cluster sche.SchedulerCluster)
PrepareConfig(cluster sche.SchedulerCluster) error
CleanConfig(cluster sche.SchedulerCluster)
Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan)
IsScheduleAllowed(cluster sche.SchedulerCluster) bool
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (c *Controller) AddSchedulerHandler(scheduler Scheduler, args ...string) er
return err
}
c.cluster.GetSchedulerConfig().AddSchedulerCfg(scheduler.GetType(), args)
return nil
err := scheduler.PrepareConfig(c.cluster)
return err
}

// RemoveSchedulerHandler removes the HTTP handler for a scheduler.
Expand All @@ -183,6 +184,7 @@ func (c *Controller) RemoveSchedulerHandler(name string) error {
return err
}

s.(Scheduler).CleanConfig(c.cluster)
delete(c.schedulerHandlers, name)

return nil
Expand All @@ -198,7 +200,7 @@ func (c *Controller) AddScheduler(scheduler Scheduler, args ...string) error {
}

s := NewScheduleController(c.ctx, c.cluster, c.opController, scheduler)
if err := s.Scheduler.Prepare(c.cluster); err != nil {
if err := s.Scheduler.PrepareConfig(c.cluster); err != nil {
return err
}

Expand Down Expand Up @@ -343,7 +345,7 @@ func (c *Controller) IsSchedulerExisted(name string) (bool, error) {
func (c *Controller) runScheduler(s *ScheduleController) {
defer logutil.LogPanic()
defer c.wg.Done()
defer s.Scheduler.Cleanup(c.cluster)
defer s.Scheduler.CleanConfig(c.cluster)

ticker := time.NewTicker(s.GetInterval())
defer ticker.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
return schedulers.EncodeConfig(s.conf)
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -198,7 +198,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWitRanges {
Expand Down
Loading

0 comments on commit 5a4e9ef

Please sign in to comment.