Skip to content

Commit

Permalink
Merge branch 'master' into feature/bucket_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
bufferflies committed Apr 12, 2022
2 parents 655fe6c + b7b785f commit d67e39d
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 57 deletions.
115 changes: 58 additions & 57 deletions server/schedulers/evict_slow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
package schedulers

import (
"strconv"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
Expand All @@ -41,20 +38,6 @@ const (
func init() {
schedule.RegisterSliceDecoderBuilder(EvictSlowStoreType, func(args []string) schedule.ConfigDecoder {
return func(v interface{}) error {
if len(args) != 1 && len(args) != 0 {
return errs.ErrSchedulerConfig.FastGenByArgs("evicted-store")
}
conf, ok := v.(*evictSlowStoreSchedulerConfig)
if !ok {
return errs.ErrScheduleConfigNotExist.FastGenByArgs()
}
if len(args) == 1 {
id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return errs.ErrStrconvParseUint.Wrap(err).FastGenWithCause()
}
conf.EvictedStores = []uint64{id}
}
return nil
}
})
Expand Down Expand Up @@ -89,6 +72,27 @@ func (conf *evictSlowStoreSchedulerConfig) getSchedulerName() string {
return EvictSlowStoreName
}

func (conf *evictSlowStoreSchedulerConfig) evictStore() uint64 {
if len(conf.EvictedStores) == 0 {
return 0
}
return conf.EvictedStores[0]
}

func (conf *evictSlowStoreSchedulerConfig) setStoreAndPersist(id uint64) error {
conf.EvictedStores = []uint64{id}
return conf.Persist()
}

func (conf *evictSlowStoreSchedulerConfig) clearAndPersist() (oldID uint64, err error) {
oldID = conf.evictStore()
if oldID > 0 {
conf.EvictedStores = []uint64{}
err = conf.Persist()
}
return
}

type evictSlowStoreScheduler struct {
*BaseScheduler
conf *evictSlowStoreSchedulerConfig
Expand All @@ -114,17 +118,22 @@ func (s *evictSlowStoreScheduler) Prepare(cluster schedule.Cluster) error {
}

func (s *evictSlowStoreScheduler) Cleanup(cluster schedule.Cluster) {
if len(s.conf.EvictedStores) != 0 {
s.cleanupEvictLeader(cluster)
}
s.cleanupEvictLeader(cluster)
}

func (s *evictSlowStoreScheduler) prepareEvictLeader(cluster schedule.Cluster) error {
return cluster.SlowStoreEvicted(s.conf.EvictedStores[0])
}

func (s *evictSlowStoreScheduler) cleanupEvictLeader(cluster schedule.Cluster) {
cluster.SlowStoreRecovered(s.conf.EvictedStores[0])
evictSlowStore, err := s.conf.clearAndPersist()
if err != nil {
log.Info("evict-slow-store-scheduler persist config failed", zap.Uint64("store-id", evictSlowStore))
}
if evictSlowStore == 0 {
return
}
cluster.SlowStoreRecovered(evictSlowStore)
}

func (s *evictSlowStoreScheduler) schedulerEvictLeader(cluster schedule.Cluster) []*operator.Operator {
Expand All @@ -149,9 +158,8 @@ func (s *evictSlowStoreScheduler) Schedule(cluster schedule.Cluster) []*operator
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
var ops []*operator.Operator

evictedStores := s.conf.EvictedStores
if len(evictedStores) != 0 {
store := cluster.GetStore(evictedStores[0])
if s.conf.evictStore() != 0 {
store := cluster.GetStore(s.conf.evictStore())
if store == nil || store.IsRemoved() {
// Previous slow store had been removed, remove the sheduler and check
// slow node next time.
Expand All @@ -163,44 +171,37 @@ func (s *evictSlowStoreScheduler) Schedule(cluster schedule.Cluster) []*operator
} else {
return s.schedulerEvictLeader(cluster)
}
err := s.conf.Persist()
s.cleanupEvictLeader(cluster)
return ops
}

slowStores := make([]*core.StoreInfo, 0)
for _, store := range cluster.GetStores() {
if store.IsRemoved() {
continue
}

if (store.IsPreparing() || store.IsServing()) && store.IsSlow() {
slowStores = append(slowStores, store)
}
}

// If there is only one slow store, evict leaders from that store.
if len(slowStores) == 1 && slowStores[0].GetSlowScore() >= slowStoreEvictThreshold {
store := slowStores[0]
log.Info("detected slow store, start to evict leaders",
zap.Uint64("store-id", store.GetID()))
err := s.conf.setStoreAndPersist(store.GetID())
if err != nil {
log.Info("evict-slow-store-scheduler persist config failed")
return ops
}
// Stop to evict leaders
s.cleanupEvictLeader(cluster)
s.conf.EvictedStores = []uint64{}
} else {
slowStores := make([]*core.StoreInfo, 0)
for _, store := range cluster.GetStores() {
if store.IsRemoved() {
continue
}

if (store.IsPreparing() || store.IsServing()) && store.IsSlow() {
slowStores = append(slowStores, store)
}
}

// If there is only one slow store, evict leaders from that store.
if len(slowStores) == 1 && slowStores[0].GetSlowScore() >= slowStoreEvictThreshold {
store := slowStores[0]
log.Info("detected slow store, start to evict leaders",
zap.Uint64("store-id", store.GetID()))
s.conf.EvictedStores = []uint64{store.GetID()}
err := s.conf.Persist()
if err != nil {
log.Info("evict-slow-store-scheduler persist config failed")
return ops
}
err = s.prepareEvictLeader(cluster)
if err != nil {
log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", store.GetID()))
return ops
}
ops = s.schedulerEvictLeader(cluster)
err = s.prepareEvictLeader(cluster)
if err != nil {
log.Info("prepare for evicting leader failed", zap.Error(err), zap.Uint64("store-id", store.GetID()))
return ops
}
ops = s.schedulerEvictLeader(cluster)
}

return ops
Expand Down
22 changes: 22 additions & 0 deletions server/schedulers/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package schedulers

import (
"context"
"encoding/json"
"strings"
"testing"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -591,4 +593,24 @@ func (s *testEvictSlowStoreSuite) TestEvictSlowStore(c *C) {
c.Check(es.Schedule(tc), IsNil)
op = bs.Schedule(tc)
testutil.CheckTransferLeader(c, op[0], operator.OpLeader, 2, 1)

es2, ok := es.(*evictSlowStoreScheduler)
c.Assert(ok, IsTrue)
c.Assert(es2.conf.evictStore(), Equals, uint64(0))

// check the value from storage.
sches, vs, err := es2.conf.storage.LoadAllScheduleConfig()
c.Assert(err, IsNil)
valueStr := ""
for id, sche := range sches {
if strings.EqualFold(sche, EvictSlowStoreName) {
valueStr = vs[id]
}
}

var persistValue evictSlowStoreSchedulerConfig
err = json.Unmarshal([]byte(valueStr), &persistValue)
c.Assert(err, IsNil)
c.Assert(persistValue.EvictedStores, DeepEquals, es2.conf.EvictedStores)
c.Assert(persistValue.evictStore(), Equals, uint64(0))
}

0 comments on commit d67e39d

Please sign in to comment.