Skip to content

Commit

Permalink
schedulers: fix the grant-leader-scheuler store pause/resume (#7128)
Browse files Browse the repository at this point in the history
ref #5839

The grant-leader-scheduler should also check the store pause/resume after reloading the config.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
JmPotato and ti-chi-bot[bot] authored Sep 21, 2023
1 parent 96ace89 commit e94b4e4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 11 deletions.
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/balance_witness.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type balanceWitnessHandler struct {
config *balanceWitnessSchedulerConfig
}

func newbalanceWitnessHandler(conf *balanceWitnessSchedulerConfig) http.Handler {
func newBalanceWitnessHandler(conf *balanceWitnessSchedulerConfig) http.Handler {
handler := &balanceWitnessHandler{
config: conf,
rd: render.New(render.Options{IndentJSON: true}),
Expand Down Expand Up @@ -161,7 +161,7 @@ func newBalanceWitnessScheduler(opController *operator.Controller, conf *balance
retryQuota: newRetryQuota(),
name: BalanceWitnessName,
conf: conf,
handler: newbalanceWitnessHandler(conf),
handler: newBalanceWitnessHandler(conf),
counter: balanceWitnessCounter,
filterCounter: filter.NewCounter(filter.BalanceWitness.String()),
}
Expand Down
22 changes: 13 additions & 9 deletions pkg/schedule/schedulers/evict_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,25 @@ func (s *evictLeaderScheduler) ReloadConfig() error {
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
// Resume and pause the leader transfer for each store.
for id := range s.conf.StoreIDWithRanges {
if _, ok := newCfg.StoreIDWithRanges[id]; ok {
pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
return nil
}

// pauseAndResumeLeaderTransfer checks the old and new store IDs, and pause or resume the leader transfer.
func pauseAndResumeLeaderTransfer(cluster *core.BasicCluster, old, new map[uint64][]core.KeyRange) {
for id := range old {
if _, ok := new[id]; ok {
continue
}
s.conf.cluster.ResumeLeaderTransfer(id)
cluster.ResumeLeaderTransfer(id)
}
for id := range newCfg.StoreIDWithRanges {
if _, ok := s.conf.StoreIDWithRanges[id]; ok {
for id := range new {
if _, ok := old[id]; ok {
continue
}
s.conf.cluster.PauseLeaderTransfer(id)
cluster.PauseLeaderTransfer(id)
}
s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
return nil
}

func (s *evictLeaderScheduler) Prepare(cluster sche.SchedulerCluster) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/schedulers/grant_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func (s *grantLeaderScheduler) ReloadConfig() error {
if err = DecodeConfig([]byte(cfgData), newCfg); err != nil {
return err
}
pauseAndResumeLeaderTransfer(s.conf.cluster, s.conf.StoreIDWithRanges, newCfg.StoreIDWithRanges)
s.conf.StoreIDWithRanges = newCfg.StoreIDWithRanges
return nil
}
Expand Down

0 comments on commit e94b4e4

Please sign in to comment.