Skip to content

Commit

Permalink
mcs: fix scheduler memory sync in api server (tikv#7389)
Browse files Browse the repository at this point in the history
close tikv#7388

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and rleungx committed Dec 1, 2023
1 parent ce5ae42 commit 0604d49
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 21 deletions.
3 changes: 2 additions & 1 deletion pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error {
prefixToTrim := cw.schedulerConfigPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
name := strings.TrimPrefix(string(kv.Key), prefixToTrim)
log.Info("update scheduler config", zap.String("name", string(kv.Value)))
log.Info("update scheduler config", zap.String("name", name),
zap.String("value", string(kv.Value)))
err := cw.storage.SaveSchedulerConfig(name, kv.Value)
if err != nil {
log.Warn("failed to save scheduler config",
Expand Down
8 changes: 4 additions & 4 deletions pkg/schedule/schedulers/base_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (s *BaseScheduler) GetNextInterval(interval time.Duration) time.Duration {
return intervalGrow(interval, MaxScheduleInterval, exponentialGrowth)
}

// Prepare does some prepare work
func (s *BaseScheduler) Prepare(cluster sche.SchedulerCluster) error { return nil }
// PrepareConfig does some prepare work about config.
func (s *BaseScheduler) PrepareConfig(cluster sche.SchedulerCluster) error { return nil }

// Cleanup does some cleanup work
func (s *BaseScheduler) Cleanup(cluster sche.SchedulerCluster) {}
// CleanConfig does some cleanup work about config.
func (s *BaseScheduler) CleanConfig(cluster sche.SchedulerCluster) {}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint6
}
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -251,7 +251,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWithRanges {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,15 @@ func (s *evictSlowStoreScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowStoreScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowStoreScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictStore := s.conf.evictStore()
if evictStore != 0 {
return cluster.SlowStoreEvicted(evictStore)
}
return nil
}

func (s *evictSlowStoreScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowStoreScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictStore())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}

func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePersistFail() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,15 +139,15 @@ func (s *evictSlowTrendScheduler) EncodeConfig() ([]byte, error) {
return EncodeConfig(s.conf)
}

func (s *evictSlowTrendScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictSlowTrendScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
evictedStoreID := s.conf.evictedStore()
if evictedStoreID == 0 {
return nil
}
return cluster.SlowTrendEvicted(evictedStoreID)
}

func (s *evictSlowTrendScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictSlowTrendScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.cleanupEvictLeader(cluster)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/evict_slow_trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ func (suite *evictSlowTrendTestSuite) TestEvictSlowTrendPrepare() {
suite.True(ok)
suite.Zero(es2.conf.evictedStore())
// prepare with no evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)

es2.conf.setStoreAndPersist(1)
suite.Equal(uint64(1), es2.conf.evictedStore())
// prepare with evict store.
suite.es.Prepare(suite.tc)
suite.es.PrepareConfig(suite.tc)
}
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error {
return nil
}

func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *grantLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -209,7 +209,7 @@ func (s *grantLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *grantLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *grantLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWithRanges {
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Scheduler interface {
ReloadConfig() error
GetMinInterval() time.Duration
GetNextInterval(interval time.Duration) time.Duration
Prepare(cluster sche.SchedulerCluster) error
Cleanup(cluster sche.SchedulerCluster)
PrepareConfig(cluster sche.SchedulerCluster) error
CleanConfig(cluster sche.SchedulerCluster)
Schedule(cluster sche.SchedulerCluster, dryRun bool) ([]*operator.Operator, []plan.Plan)
IsScheduleAllowed(cluster sche.SchedulerCluster) bool
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/scheduler_example/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *evictLeaderScheduler) EncodeConfig() ([]byte, error) {
return schedulers.EncodeConfig(s.conf)
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
func (s *evictLeaderScheduler) PrepareConfig(cluster sche.SchedulerCluster) error {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
var res error
Expand All @@ -198,7 +198,7 @@ func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
return res
}

func (s *evictLeaderScheduler) Cleanup(cluster sche.SchedulerCluster) {
func (s *evictLeaderScheduler) CleanConfig(cluster sche.SchedulerCluster) {
s.conf.mu.RLock()
defer s.conf.mu.RUnlock()
for id := range s.conf.StoreIDWitRanges {
Expand Down

0 comments on commit 0604d49

Please sign in to comment.