Skip to content

Commit

Permalink
scheduler: dynamically adjust the retry limit according to the operat…
Browse files Browse the repository at this point in the history
…or (tikv#4007) (tikv#4043)

* scheduler: dynamically adjust the retry limit according to the operator (tikv#4007)

Signed-off-by: HunDunDM <hundundm@gmail.com>
  • Loading branch information
HunDunDM authored Aug 27, 2021
1 parent 05b9d6d commit d67fc3d
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 9 deletions.
2 changes: 1 addition & 1 deletion server/schedule/filter/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ func (f *StoreStateFilter) Source(opts opt.Options, store *core.StoreInfo) bool

// Target returns true when the store can be selected as the schedule
// target.
func (f StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
func (f *StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool {
if store.IsTombstone() {
f.Reason = "tombstone"
return false
Expand Down
14 changes: 11 additions & 3 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type balanceLeaderSchedulerConfig struct {

type balanceLeaderScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceLeaderSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -83,6 +84,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *

s := &balanceLeaderScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceLeaderRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -164,12 +166,15 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
sourceStoreLabel := strconv.FormatUint(sourceID, 10)
sourceAddress := source.GetAddress()
l.counter.WithLabelValues("high-score", sourceAddress, sourceStoreLabel).Inc()
for j := 0; j < balanceLeaderRetryLimit; j++ {
retryLimit := l.retryQuota.GetLimit(source)
for j := 0; j < retryLimit; j++ {
if ops := l.transferLeaderOut(cluster, source); len(ops) > 0 {
l.retryQuota.ResetLimit(source)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", sourceAddress, sourceStoreLabel))
return ops
}
}
l.Attenuate(source)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", sourceID))
}
if i < len(targets) {
Expand All @@ -179,16 +184,19 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
targetStoreLabel := strconv.FormatUint(targetID, 10)
targetAddress := target.GetAddress()
l.counter.WithLabelValues("low-score", targetAddress, targetStoreLabel).Inc()

for j := 0; j < balanceLeaderRetryLimit; j++ {
retryLimit := l.retryQuota.GetLimit(target)
for j := 0; j < retryLimit; j++ {
if ops := l.transferLeaderIn(cluster, target); len(ops) > 0 {
l.retryQuota.ResetLimit(target)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", targetAddress, targetStoreLabel))
return ops
}
}
l.Attenuate(target)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", targetID))
}
}
l.retryQuota.GC(append(sources, targets...))
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type balanceRegionSchedulerConfig struct {

type balanceRegionScheduler struct {
*BaseScheduler
*retryQuota
conf *balanceRegionSchedulerConfig
opController *schedule.OperatorController
filters []filter.Filter
Expand All @@ -83,6 +84,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *
base := NewBaseScheduler(opController)
scheduler := &balanceRegionScheduler{
BaseScheduler: base,
retryQuota: newRetryQuota(balanceRegionRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation),
conf: conf,
opController: opController,
counter: balanceRegionCounter,
Expand Down Expand Up @@ -148,8 +150,8 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
})
for _, source := range stores {
sourceID := source.GetID()

for i := 0; i < balanceRegionRetryLimit; i++ {
retryLimit := s.retryQuota.GetLimit(source)
for i := 0; i < retryLimit; i++ {
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
region := cluster.RandPendingRegion(sourceID, s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster))
Expand Down Expand Up @@ -186,11 +188,14 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera

oldPeer := region.GetStorePeer(sourceID)
if op := s.transferPeer(cluster, region, oldPeer); op != nil {
s.retryQuota.ResetLimit(source)
op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator"))
return []*operator.Operator{op}
}
}
s.retryQuota.Attenuate(source)
}
s.retryQuota.GC(stores)
return nil
}

Expand Down
58 changes: 55 additions & 3 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (

const (
// adjustRatio is used to adjust TolerantSizeRatio according to region count.
adjustRatio float64 = 0.005
leaderTolerantSizeRatio float64 = 5.0
minTolerantSizeRatio float64 = 1.0
adjustRatio float64 = 0.005
leaderTolerantSizeRatio float64 = 5.0
minTolerantSizeRatio float64 = 1.0
defaultMinRetryLimit = 1
defaultRetryQuotaAttenuation = 2
)

func minUint64(a, b uint64) uint64 {
Expand Down Expand Up @@ -356,3 +358,53 @@ func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat {
Stats: peers,
}
}

type retryQuota struct {
initialLimit int
minLimit int
attenuation int

limits map[uint64]int
}

func newRetryQuota(initialLimit, minLimit, attenuation int) *retryQuota {
return &retryQuota{
initialLimit: initialLimit,
minLimit: minLimit,
attenuation: attenuation,
limits: make(map[uint64]int),
}
}

func (q *retryQuota) GetLimit(store *core.StoreInfo) int {
id := store.GetID()
if limit, ok := q.limits[id]; ok {
return limit
}
q.limits[id] = q.initialLimit
return q.initialLimit
}

func (q *retryQuota) ResetLimit(store *core.StoreInfo) {
q.limits[store.GetID()] = q.initialLimit
}

func (q *retryQuota) Attenuate(store *core.StoreInfo) {
newLimit := q.GetLimit(store) / q.attenuation
if newLimit < q.minLimit {
newLimit = q.minLimit
}
q.limits[store.GetID()] = newLimit
}

func (q *retryQuota) GC(keepStores []*core.StoreInfo) {
set := make(map[uint64]struct{}, len(keepStores))
for _, store := range keepStores {
set[store.GetID()] = struct{}{}
}
for id := range q.limits {
if _, ok := set[id]; !ok {
delete(q.limits, id)
}
}
}
34 changes: 34 additions & 0 deletions server/schedulers/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/server/core"
)

const (
Expand Down Expand Up @@ -50,3 +52,35 @@ func (s *testMinMaxSuite) TestMinDuration(c *C) {
c.Assert(minDuration(time.Second, time.Minute), Equals, time.Second)
c.Assert(minDuration(time.Second, time.Second), Equals, time.Second)
}

var _ = Suite(&testUtilsSuite{})

type testUtilsSuite struct{}

func (s *testUtilsSuite) TestRetryQuota(c *C) {
q := newRetryQuota(10, 1, 2)
store1 := core.NewStoreInfo(&metapb.Store{Id: 1})
store2 := core.NewStoreInfo(&metapb.Store{Id: 2})
keepStores := []*core.StoreInfo{store1}

// test GetLimit
c.Assert(q.GetLimit(store1), Equals, 10)

// test Attenuate
for _, expected := range []int{5, 2, 1, 1, 1} {
q.Attenuate(store1)
c.Assert(q.GetLimit(store1), Equals, expected)
}

// test GC
c.Assert(q.GetLimit(store2), Equals, 10)
q.Attenuate(store2)
c.Assert(q.GetLimit(store2), Equals, 5)
q.GC(keepStores)
c.Assert(q.GetLimit(store1), Equals, 1)
c.Assert(q.GetLimit(store2), Equals, 10)

// test ResetLimit
q.ResetLimit(store1)
c.Assert(q.GetLimit(store1), Equals, 10)
}

0 comments on commit d67fc3d

Please sign in to comment.