Skip to content

Commit

Permalink
support changing batch for slow score scheduler
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Dec 10, 2024
1 parent 1e76110 commit f318c85
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 4 deletions.
34 changes: 30 additions & 4 deletions pkg/schedule/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type evictSlowStoreSchedulerConfig struct {
// Duration gap for recovering the candidate, unit: s.
RecoveryDurationGap uint64 `json:"recovery-duration"`
EvictedStores []uint64 `json:"evict-stores"`
Batch int `json:"batch"`
}

func initEvictSlowStoreSchedulerConfig() *evictSlowStoreSchedulerConfig {
Expand All @@ -55,6 +56,7 @@ func initEvictSlowStoreSchedulerConfig() *evictSlowStoreSchedulerConfig {
lastSlowStoreCaptureTS: time.Time{},
RecoveryDurationGap: defaultRecoveryDurationGap,
EvictedStores: make([]uint64, 0),
Batch: EvictLeaderBatchSize,
}
}

Expand All @@ -63,6 +65,7 @@ func (conf *evictSlowStoreSchedulerConfig) clone() *evictSlowStoreSchedulerConfi
defer conf.RUnlock()
return &evictSlowStoreSchedulerConfig{
RecoveryDurationGap: conf.RecoveryDurationGap,
Batch: conf.Batch,
}
}

Expand All @@ -79,8 +82,10 @@ func (conf *evictSlowStoreSchedulerConfig) getKeyRangesByID(id uint64) []core.Ke
return []core.KeyRange{core.NewKeyRange("", "")}
}

func (*evictSlowStoreSchedulerConfig) getBatch() int {
return EvictLeaderBatchSize
func (conf *evictSlowStoreSchedulerConfig) getBatch() int {
conf.RLock()
defer conf.RUnlock()
return conf.Batch
}

func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 {
Expand Down Expand Up @@ -143,22 +148,39 @@ func (handler *evictSlowStoreHandler) updateConfig(w http.ResponseWriter, r *htt
return
}
recoveryDurationGapFloat, ok := input["recovery-duration"].(float64)
if !ok {
if input["recovery-duration"] != nil && !ok {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'recovery-duration'").Error())
return
}

batch := handler.config.getBatch()
batchFloat, ok := input["batch"].(float64)
if input["batch"] != nil && !ok {
handler.rd.JSON(w, http.StatusInternalServerError, errors.New("invalid argument for 'batch'").Error())
return
}
if ok {
if batchFloat < 1 || batchFloat > 10 {
handler.rd.JSON(w, http.StatusBadRequest, "batch is invalid, it should be in [1, 10]")
return
}
batch = (int)(batchFloat)
}

handler.config.Lock()
defer handler.config.Unlock()
prevRecoveryDurationGap := handler.config.RecoveryDurationGap
prevBatch := handler.config.Batch
recoveryDurationGap := uint64(recoveryDurationGapFloat)
handler.config.RecoveryDurationGap = recoveryDurationGap
handler.config.Batch = batch
if err := handler.config.save(); err != nil {
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
handler.config.RecoveryDurationGap = prevRecoveryDurationGap
handler.config.Batch = prevBatch
return
}
log.Info("evict-slow-store-scheduler update 'recovery-duration' - unit: s", zap.Uint64("prev", prevRecoveryDurationGap), zap.Uint64("cur", recoveryDurationGap))
log.Info("evict-slow-store-scheduler update config", zap.Uint64("prev-recovery-duration", prevRecoveryDurationGap), zap.Uint64("cur-recovery-duration", recoveryDurationGap), zap.Int("prev-batch", prevBatch), zap.Int("cur-batch", batch))
handler.rd.JSON(w, http.StatusOK, "Config updated.")
}

Expand Down Expand Up @@ -192,6 +214,9 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error {
if err := s.conf.load(newCfg); err != nil {
return err
}
if newCfg.Batch == 0 {
newCfg.Batch = EvictLeaderBatchSize
}
old := make(map[uint64]struct{})
for _, id := range s.conf.EvictedStores {
old[id] = struct{}{}
Expand All @@ -203,6 +228,7 @@ func (s *evictSlowStoreScheduler) ReloadConfig() error {
pauseAndResumeLeaderTransfer(s.conf.cluster, constant.In, old, new)
s.conf.RecoveryDurationGap = newCfg.RecoveryDurationGap
s.conf.EvictedStores = newCfg.EvictedStores
s.conf.Batch = newCfg.Batch
return nil
}

Expand Down
61 changes: 61 additions & 0 deletions pkg/schedule/schedulers/evict_slow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/pingcap/failpoint"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockcluster"
Expand Down Expand Up @@ -144,3 +145,63 @@ func (suite *evictSlowStoreTestSuite) TestEvictSlowStorePersistFail() {
ops, _ = suite.es.Schedule(suite.tc, false)
re.NotEmpty(ops)
}

func TestEvictSlowStoreBatch(t *testing.T) {
re := require.New(t)
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()

// Add stores
tc.AddLeaderStore(1, 0)
tc.AddLeaderStore(2, 0)
tc.AddLeaderStore(3, 0)
// Add regions with leader in store 1
for i := 0; i < 10000; i++ {

Check failure on line 159 in pkg/schedule/schedulers/evict_slow_store_test.go

View workflow job for this annotation

GitHub Actions / statics

for loop can be changed to use an integer range (Go 1.22+) (intrange)
tc.AddLeaderRegion(uint64(i), 1, 2)
}

storage := storage.NewStorageWithMemoryBackend()
es, err := CreateScheduler(types.EvictSlowStoreScheduler, oc, storage, ConfigSliceDecoder(types.EvictSlowStoreScheduler, []string{}), nil)
re.NoError(err)
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap", "return(true)"))
storeInfo := tc.GetStore(1)
newStoreInfo := storeInfo.Clone(func(store *core.StoreInfo) {
store.GetStoreStats().SlowScore = 100
})
tc.PutStore(newStoreInfo)
re.True(es.IsScheduleAllowed(tc))
// Add evict leader scheduler to store 1
ops, _ := es.Schedule(tc, false)
re.Len(ops, 3)
operatorutil.CheckMultiTargetTransferLeader(re, ops[0], operator.OpLeader, 1, []uint64{2})
re.Equal(types.EvictSlowStoreScheduler.String(), ops[0].Desc())

es.(*evictSlowStoreScheduler).conf.Batch = 5
re.NoError(es.(*evictSlowStoreScheduler).conf.save())
ops, _ = es.Schedule(tc, false)
re.Len(ops, 5)

newStoreInfo = storeInfo.Clone(func(store *core.StoreInfo) {
store.GetStoreStats().SlowScore = 0
})

tc.PutStore(newStoreInfo)
// no slow store need to evict.
ops, _ = es.Schedule(tc, false)
re.Empty(ops)

es2, ok := es.(*evictSlowStoreScheduler)
re.True(ok)
re.Zero(es2.conf.evictStore())

// check the value from storage.
var persistValue evictSlowStoreSchedulerConfig
err = es2.conf.load(&persistValue)
re.NoError(err)

re.Equal(es2.conf.EvictedStores, persistValue.EvictedStores)
re.Zero(persistValue.evictStore())
re.True(persistValue.readyForRecovery())
re.Equal(5, persistValue.Batch)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/schedule/schedulers/transientRecoveryGap"))
}
14 changes: 14 additions & 0 deletions tools/pd-ctl/tests/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,20 @@ func (suite *schedulerTestSuite) checkSchedulerConfig(cluster *pdTests.TestClust
})
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "balance-leader-scheduler"}, nil)
re.Contains(echo, "Success!")

// test evict slow store scheduler
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-slow-store-scheduler"}, nil)
re.Contains(echo, "Success!")
conf = make(map[string]any)
conf1 = make(map[string]any)
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-store-scheduler", "show"}, &conf)
re.Equal(3., conf["batch"])
echo = mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-store-scheduler", "set", "batch", "10"}, nil)
re.Contains(echo, "Success!")
testutil.Eventually(re, func() bool {
mustExec(re, cmd, []string{"-u", pdAddr, "scheduler", "config", "evict-slow-store-scheduler"}, &conf1)
return conf1["batch"] == 10.
})
}

func (suite *schedulerTestSuite) TestGrantHotRegionScheduler() {
Expand Down

0 comments on commit f318c85

Please sign in to comment.