diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index a7fab470e72..d1ca9e4892a 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -453,7 +453,7 @@ func (f *StoreStateFilter) Source(opts opt.Options, store *core.StoreInfo) bool // Target returns true when the store can be selected as the schedule // target. -func (f StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool { +func (f *StoreStateFilter) Target(opts opt.Options, store *core.StoreInfo) bool { if store.IsTombstone() { f.Reason = "tombstone" return false diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 36d33bdbc97..168378542d1 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -70,6 +70,7 @@ type balanceLeaderSchedulerConfig struct { type balanceLeaderScheduler struct { *BaseScheduler + *retryQuota conf *balanceLeaderSchedulerConfig opController *schedule.OperatorController filters []filter.Filter @@ -83,6 +84,7 @@ func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf * s := &balanceLeaderScheduler{ BaseScheduler: base, + retryQuota: newRetryQuota(balanceLeaderRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation), conf: conf, opController: opController, counter: balanceLeaderCounter, @@ -164,12 +166,15 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera sourceStoreLabel := strconv.FormatUint(sourceID, 10) sourceAddress := source.GetAddress() l.counter.WithLabelValues("high-score", sourceAddress, sourceStoreLabel).Inc() - for j := 0; j < balanceLeaderRetryLimit; j++ { + retryLimit := l.retryQuota.GetLimit(source) + for j := 0; j < retryLimit; j++ { if ops := l.transferLeaderOut(cluster, source); len(ops) > 0 { + l.retryQuota.ResetLimit(source) ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", sourceAddress, sourceStoreLabel)) return ops } } + l.Attenuate(source) log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", sourceID)) } if i < len(targets) { @@ -179,16 +184,19 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera targetStoreLabel := strconv.FormatUint(targetID, 10) targetAddress := target.GetAddress() l.counter.WithLabelValues("low-score", targetAddress, targetStoreLabel).Inc() - - for j := 0; j < balanceLeaderRetryLimit; j++ { + retryLimit := l.retryQuota.GetLimit(target) + for j := 0; j < retryLimit; j++ { if ops := l.transferLeaderIn(cluster, target); len(ops) > 0 { + l.retryQuota.ResetLimit(target) ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", targetAddress, targetStoreLabel)) return ops } } + l.Attenuate(target) log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", targetID)) } } + l.retryQuota.GC(append(sources, targets...)) return nil } diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 0947b012391..0e29136c88f 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -71,6 +71,7 @@ type balanceRegionSchedulerConfig struct { type balanceRegionScheduler struct { *BaseScheduler + *retryQuota conf *balanceRegionSchedulerConfig opController *schedule.OperatorController filters []filter.Filter @@ -83,6 +84,7 @@ func newBalanceRegionScheduler(opController *schedule.OperatorController, conf * base := NewBaseScheduler(opController) scheduler := &balanceRegionScheduler{ BaseScheduler: base, + retryQuota: newRetryQuota(balanceRegionRetryLimit, defaultMinRetryLimit, defaultRetryQuotaAttenuation), conf: conf, opController: opController, counter: balanceRegionCounter, @@ -148,8 +150,8 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera }) for _, source := range stores { sourceID := source.GetID() - - for i := 0; i < balanceRegionRetryLimit; i++ { + retryLimit := s.retryQuota.GetLimit(source) + for i := 0; i < retryLimit; i++ { // Priority pick the region that has a pending peer. // Pending region may means the disk is overload, remove the pending region firstly. region := cluster.RandPendingRegion(sourceID, s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster)) @@ -186,11 +188,14 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera oldPeer := region.GetStorePeer(sourceID) if op := s.transferPeer(cluster, region, oldPeer); op != nil { + s.retryQuota.ResetLimit(source) op.Counters = append(op.Counters, schedulerCounter.WithLabelValues(s.GetName(), "new-operator")) return []*operator.Operator{op} } } + s.retryQuota.Attenuate(source) } + s.retryQuota.GC(stores) return nil } diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index 46ff8ed5040..b0932c577e3 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -31,9 +31,11 @@ import ( const ( // adjustRatio is used to adjust TolerantSizeRatio according to region count. - adjustRatio float64 = 0.005 - leaderTolerantSizeRatio float64 = 5.0 - minTolerantSizeRatio float64 = 1.0 + adjustRatio float64 = 0.005 + leaderTolerantSizeRatio float64 = 5.0 + minTolerantSizeRatio float64 = 1.0 + defaultMinRetryLimit = 1 + defaultRetryQuotaAttenuation = 2 ) func minUint64(a, b uint64) uint64 { @@ -356,3 +358,53 @@ func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat { Stats: peers, } } + +type retryQuota struct { + initialLimit int + minLimit int + attenuation int + + limits map[uint64]int +} + +func newRetryQuota(initialLimit, minLimit, attenuation int) *retryQuota { + return &retryQuota{ + initialLimit: initialLimit, + minLimit: minLimit, + attenuation: attenuation, + limits: make(map[uint64]int), + } +} + +func (q *retryQuota) GetLimit(store *core.StoreInfo) int { + id := store.GetID() + if limit, ok := q.limits[id]; ok { + return limit + } + q.limits[id] = q.initialLimit + return q.initialLimit +} + +func (q *retryQuota) ResetLimit(store *core.StoreInfo) { + q.limits[store.GetID()] = q.initialLimit +} + +func (q *retryQuota) Attenuate(store *core.StoreInfo) { + newLimit := q.GetLimit(store) / q.attenuation + if newLimit < q.minLimit { + newLimit = q.minLimit + } + q.limits[store.GetID()] = newLimit +} + +func (q *retryQuota) GC(keepStores []*core.StoreInfo) { + set := make(map[uint64]struct{}, len(keepStores)) + for _, store := range keepStores { + set[store.GetID()] = struct{}{} + } + for id := range q.limits { + if _, ok := set[id]; !ok { + delete(q.limits, id) + } + } +} diff --git a/server/schedulers/utils_test.go b/server/schedulers/utils_test.go index 49c86b3b782..d379334a86a 100644 --- a/server/schedulers/utils_test.go +++ b/server/schedulers/utils_test.go @@ -18,6 +18,8 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/server/core" ) const ( @@ -50,3 +52,35 @@ func (s *testMinMaxSuite) TestMinDuration(c *C) { c.Assert(minDuration(time.Second, time.Minute), Equals, time.Second) c.Assert(minDuration(time.Second, time.Second), Equals, time.Second) } + +var _ = Suite(&testUtilsSuite{}) + +type testUtilsSuite struct{} + +func (s *testUtilsSuite) TestRetryQuota(c *C) { + q := newRetryQuota(10, 1, 2) + store1 := core.NewStoreInfo(&metapb.Store{Id: 1}) + store2 := core.NewStoreInfo(&metapb.Store{Id: 2}) + keepStores := []*core.StoreInfo{store1} + + // test GetLimit + c.Assert(q.GetLimit(store1), Equals, 10) + + // test Attenuate + for _, expected := range []int{5, 2, 1, 1, 1} { + q.Attenuate(store1) + c.Assert(q.GetLimit(store1), Equals, expected) + } + + // test GC + c.Assert(q.GetLimit(store2), Equals, 10) + q.Attenuate(store2) + c.Assert(q.GetLimit(store2), Equals, 5) + q.GC(keepStores) + c.Assert(q.GetLimit(store1), Equals, 1) + c.Assert(q.GetLimit(store2), Equals, 10) + + // test ResetLimit + q.ResetLimit(store1) + c.Assert(q.GetLimit(store1), Equals, 10) +}