diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index bb26805a818..7265a375848 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -855,9 +855,9 @@ func (s *scheduleController) Schedule() []*operator.Operator { } cacheCluster := newCacheCluster(s.cluster) // If we have schedule, reset interval to the minimal interval. - if op := s.Scheduler.Schedule(cacheCluster); op != nil { + if ops := s.Scheduler.Schedule(cacheCluster); len(ops) > 0 { s.nextInterval = s.Scheduler.GetMinInterval() - return op + return ops } } s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval) diff --git a/server/cluster/coordinator_test.go b/server/cluster/coordinator_test.go index 5d917780ace..c9a6b5d3a5b 100644 --- a/server/cluster/coordinator_test.go +++ b/server/cluster/coordinator_test.go @@ -1006,7 +1006,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { if time.Since(start) > time.Second { break } - c.Assert(ops, IsNil) + c.Assert(ops, HasLen, 0) } // reset all stores' limit @@ -1024,7 +1024,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { // sleep 1 seconds to make sure that the token is filled up time.Sleep(time.Second) for i := 0; i < 100; i++ { - c.Assert(lb.Schedule(tc), NotNil) + c.Assert(len(lb.Schedule(tc)), Greater, 0) } } @@ -1052,10 +1052,10 @@ func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) { c.Assert(oc.AddOperator(op2), IsTrue) op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3}) c.Assert(oc.AddOperator(op3), IsFalse) - c.Assert(lb.Schedule(tc), IsNil) + c.Assert(lb.Schedule(tc), HasLen, 0) // sleep 2 seconds to make sure that token is filled up time.Sleep(2 * time.Second) - c.Assert(lb.Schedule(tc), NotNil) + c.Assert(len(lb.Schedule(tc)), Greater, 0) } func (s *testOperatorControllerSuite) TestDownStoreLimit(c *C) { @@ -1146,7 +1146,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) { c.Assert(sc.GetInterval(), Equals, i) - c.Assert(sc.Schedule(), IsNil) + c.Assert(sc.Schedule(), HasLen, 0) } // limit = 2 lb.limit = 2 @@ -1227,7 +1227,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) { for _, n := range idleSeconds { sc.nextInterval = schedulers.MinScheduleInterval for totalSleep := time.Duration(0); totalSleep <= time.Second*time.Duration(n); totalSleep += sc.GetInterval() { - c.Assert(sc.Schedule(), IsNil) + c.Assert(sc.Schedule(), HasLen, 0) } c.Assert(sc.GetInterval(), Less, time.Second*time.Duration(n/2)) } diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 9b574b24171..fe0df1214a3 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -795,10 +795,7 @@ func (oc *OperatorController) GetOpInfluence(cluster Cluster) operator.OpInfluen defer oc.RUnlock() for _, op := range oc.operators { if !op.CheckTimeout() && !op.CheckSuccess() { - region := cluster.GetRegion(op.RegionID()) - if region != nil { - op.UnfinishedInfluence(influence, region) - } + AddOpInfluence(op, influence, cluster) } } return influence @@ -823,6 +820,14 @@ func (oc *OperatorController) GetFastOpInfluence(cluster Cluster, influence oper } } +// AddOpInfluence add operator influence for cluster +func AddOpInfluence(op *operator.Operator, influence operator.OpInfluence, cluster Cluster) { + region := cluster.GetRegion(op.RegionID()) + if region != nil { + op.TotalInfluence(influence, region) + } +} + // NewTotalOpInfluence creates a OpInfluence. func NewTotalOpInfluence(operators []*operator.Operator, cluster Cluster) operator.OpInfluence { influence := operator.OpInfluence{ @@ -830,10 +835,7 @@ func NewTotalOpInfluence(operators []*operator.Operator, cluster Cluster) operat } for _, op := range operators { - region := cluster.GetRegion(op.RegionID()) - if region != nil { - op.TotalInfluence(influence, region) - } + AddOpInfluence(op, influence, cluster) } return influence diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index 0805d73ab00..e7a93ec2765 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -36,6 +36,11 @@ const ( BalanceLeaderType = "balance-leader" // balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store. balanceLeaderRetryLimit = 10 + // BalanceLeaderBatchSize is the default number of operators to transfer leaders by one scheduling + BalanceLeaderBatchSize = 5 + + transferIn = "transfer-in" + transferOut = "transfer-out" ) func init() { @@ -51,6 +56,7 @@ func init() { } conf.Ranges = ranges conf.Name = BalanceLeaderName + conf.Batch = BalanceLeaderBatchSize return nil } }) @@ -67,6 +73,7 @@ func init() { type balanceLeaderSchedulerConfig struct { Name string `json:"name"` Ranges []core.KeyRange `json:"ranges"` + Batch int `json:"batch"` } type balanceLeaderScheduler struct { @@ -137,6 +144,53 @@ func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) boo return allowed } +type candidateStores struct { + stores []*core.StoreInfo + storeIndexMap map[uint64]int + index int + compareOption func([]*core.StoreInfo) func(int, int) bool +} + +func newCandidateStores(stores []*core.StoreInfo, compareOption func([]*core.StoreInfo) func(int, int) bool) *candidateStores { + cs := &candidateStores{stores: stores, compareOption: compareOption} + cs.storeIndexMap = map[uint64]int{} + cs.initSort() + return cs +} + +// hasStore returns returns true when there are leftover stores. +func (cs *candidateStores) hasStore() bool { + return cs.index < len(cs.stores) +} + +func (cs *candidateStores) getStore() *core.StoreInfo { + return cs.stores[cs.index] +} + +func (cs *candidateStores) next() { + cs.index++ +} + +func (cs *candidateStores) initSort() { + sort.Slice(cs.stores, cs.compareOption(cs.stores)) + for i := 0; i < len(cs.stores); i++ { + cs.storeIndexMap[cs.stores[i].GetID()] = i + } +} + +func (cs *candidateStores) reSort(stores ...*core.StoreInfo) { + if !cs.hasStore() { + return + } + for _, store := range stores { + index, ok := cs.storeIndexMap[store.GetID()] + if !ok { + continue + } + resortStores(cs.stores, cs.storeIndexMap, index, cs.compareOption(cs.stores)) + } +} + func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.Operator { schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc() @@ -146,63 +200,119 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator. plan := newBalancePlan(kind, cluster, opInfluence) stores := cluster.GetStores() - sources := filter.SelectSourceStores(stores, l.filters, cluster.GetOpts()) - targets := filter.SelectTargetStores(stores, l.filters, cluster.GetOpts()) - sort.Slice(sources, func(i, j int) bool { - iOp := plan.GetOpInfluence(sources[i].GetID()) - jOp := plan.GetOpInfluence(sources[j].GetID()) - return sources[i].LeaderScore(leaderSchedulePolicy, iOp) > - sources[j].LeaderScore(leaderSchedulePolicy, jOp) - }) - sort.Slice(targets, func(i, j int) bool { - iOp := plan.GetOpInfluence(targets[i].GetID()) - jOp := plan.GetOpInfluence(targets[j].GetID()) - return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < - targets[j].LeaderScore(leaderSchedulePolicy, jOp) - }) + greaterOption := func(stores []*core.StoreInfo) func(int, int) bool { + return func(i, j int) bool { + iOp := plan.GetOpInfluence(stores[i].GetID()) + jOp := plan.GetOpInfluence(stores[j].GetID()) + return stores[i].LeaderScore(plan.kind.Policy, iOp) > + stores[j].LeaderScore(plan.kind.Policy, jOp) + } + } + lessOption := func(stores []*core.StoreInfo) func(int, int) bool { + return func(i, j int) bool { + iOp := plan.GetOpInfluence(stores[i].GetID()) + jOp := plan.GetOpInfluence(stores[j].GetID()) + return stores[i].LeaderScore(plan.kind.Policy, iOp) < + stores[j].LeaderScore(plan.kind.Policy, jOp) + } + } + sourceCandidate := newCandidateStores(filter.SelectSourceStores(stores, l.filters, cluster.GetOpts()), greaterOption) + targetCandidate := newCandidateStores(filter.SelectTargetStores(stores, l.filters, cluster.GetOpts()), lessOption) + usedRegions := make(map[uint64]struct{}) - for i := 0; i < len(sources) || i < len(targets); i++ { - if i < len(sources) { - plan.source, plan.target = sources[i], nil - retryLimit := l.retryQuota.GetLimit(plan.source) - log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("source-store", plan.SourceStoreID())) - l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc() - for j := 0; j < retryLimit; j++ { - schedulerCounter.WithLabelValues(l.GetName(), "total").Inc() - if ops := l.transferLeaderOut(plan); len(ops) > 0 { - l.retryQuota.ResetLimit(plan.source) - ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel())) - return ops + result := make([]*operator.Operator, 0, l.conf.Batch) + for sourceCandidate.hasStore() || targetCandidate.hasStore() { + // first choose source + if sourceCandidate.hasStore() { + op := createTransferLeaderOperator(sourceCandidate, transferOut, l, plan, usedRegions) + if op != nil { + result = append(result, op) + if len(result) >= l.conf.Batch { + return result } + makeInfluence(op, plan, usedRegions, sourceCandidate, targetCandidate) } - l.Attenuate(plan.source) - log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID())) } - if i < len(targets) { - plan.source, plan.target = nil, targets[i] - retryLimit := l.retryQuota.GetLimit(plan.target) - log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("target-store", plan.TargetStoreID())) - l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc() - for j := 0; j < retryLimit; j++ { - schedulerCounter.WithLabelValues(l.GetName(), "total").Inc() - if ops := l.transferLeaderIn(plan); len(ops) > 0 { - l.retryQuota.ResetLimit(plan.target) - ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel())) - return ops + // next choose target + if targetCandidate.hasStore() { + op := createTransferLeaderOperator(targetCandidate, transferIn, l, plan, usedRegions) + if op != nil { + result = append(result, op) + if len(result) >= l.conf.Batch { + return result } + makeInfluence(op, plan, usedRegions, sourceCandidate, targetCandidate) } - l.Attenuate(plan.target) - log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID())) } } - l.retryQuota.GC(append(sources, targets...)) - return nil + l.retryQuota.GC(append(sourceCandidate.stores, targetCandidate.stores...)) + return result +} + +func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLeaderScheduler, + plan *balancePlan, usedRegions map[uint64]struct{}) *operator.Operator { + store := cs.getStore() + retryLimit := l.retryQuota.GetLimit(store) + var creator func(*balancePlan) *operator.Operator + switch dir { + case transferOut: + plan.source, plan.target = store, nil + l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc() + creator = l.transferLeaderOut + case transferIn: + plan.source, plan.target = nil, store + l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc() + creator = l.transferLeaderIn + } + var op *operator.Operator + for i := 0; i < retryLimit; i++ { + schedulerCounter.WithLabelValues(l.GetName(), "total").Inc() + if op = creator(plan); op != nil { + if _, ok := usedRegions[op.RegionID()]; !ok { + break + } + op = nil + } + } + if op != nil { + l.retryQuota.ResetLimit(store) + op.Counters = append(op.Counters, l.counter.WithLabelValues(dir, plan.SourceMetricLabel())) + } else { + l.Attenuate(store) + log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64(dir, store.GetID())) + cs.next() + } + return op +} + +func makeInfluence(op *operator.Operator, plan *balancePlan, usedRegions map[uint64]struct{}, candidates ...*candidateStores) { + usedRegions[op.RegionID()] = struct{}{} + schedule.AddOpInfluence(op, plan.opInfluence, plan.Cluster) + for _, candidate := range candidates { + candidate.reSort(plan.source, plan.target) + } +} + +// resortStores is used to sort stores again after creating an operator. +// It will repeatedly swap the specific store and next store if they are in wrong order. +// In general, it has very few swaps. In the worst case, the time complexity is O(n). +func resortStores(stores []*core.StoreInfo, index map[uint64]int, pos int, less func(i, j int) bool) { + swapper := func(i, j int) { stores[i], stores[j] = stores[j], stores[i] } + for ; pos+1 < len(stores) && !less(pos, pos+1); pos++ { + swapper(pos, pos+1) + index[stores[pos].GetID()] = pos + } + for ; pos > 1 && less(pos, pos-1); pos-- { + swapper(pos, pos-1) + index[stores[pos].GetID()] = pos + } + index[stores[pos].GetID()] = pos } // transferLeaderOut transfers leader from the source store. // It randomly selects a health region from the source store, then picks // the best follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operator.Operator { +func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) *operator.Operator { plan.region = plan.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, schedule.IsRegionHealthy) if plan.region == nil { log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.SourceStoreID())) @@ -223,7 +333,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operato return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < targets[j].LeaderScore(leaderSchedulePolicy, jOp) }) for _, plan.target = range targets { - if op := l.createOperator(plan); len(op) > 0 { + if op := l.createOperator(plan); op != nil { return op } } @@ -235,7 +345,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operato // transferLeaderIn transfers leader to the target store. // It randomly selects a health region from the target store, then picks // the worst follower peer and transfers the leader. -func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) []*operator.Operator { +func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) *operator.Operator { plan.region = plan.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, schedule.IsRegionHealthy) if plan.region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.TargetStoreID())) @@ -273,7 +383,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) []*operator // If the region is hot or the difference between the two stores is tolerable, then // no new operator need to be created, otherwise create an operator that transfers // the leader from the source store to the target store for the region. -func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) []*operator.Operator { +func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) *operator.Operator { if plan.IsRegionHot(plan.region) { log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", plan.region.GetID())) schedulerCounter.WithLabelValues(l.GetName(), "region-hot").Inc() @@ -300,5 +410,5 @@ func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) []*operator.O ) op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(plan.sourceScore, 'f', 2, 64) op.AdditionalInfos["targetScore"] = strconv.FormatFloat(plan.targetScore, 'f', 2, 64) - return []*operator.Operator{op} + return op } diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 358bd1f2789..7f050e9ecc4 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math/rand" + "sort" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" @@ -230,13 +231,13 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceLimit(c *C) { s.tc.AddLeaderStore(3, 0) s.tc.AddLeaderStore(4, 0) s.tc.AddLeaderRegion(1, 1, 2, 3, 4) - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) // Stores: 1 2 3 4 // Leaders: 16 0 0 0 // Region1: L F F F s.tc.UpdateLeaderCount(1, 16) - c.Check(s.schedule(), NotNil) + c.Assert(len(s.schedule()), Greater, 0) // Stores: 1 2 3 4 // Leaders: 7 8 9 10 @@ -246,7 +247,7 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceLimit(c *C) { s.tc.UpdateLeaderCount(3, 9) s.tc.UpdateLeaderCount(4, 10) s.tc.AddLeaderRegion(1, 4, 1, 2, 3) - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) // Stores: 1 2 3 4 // Leaders: 7 8 9 16 @@ -266,9 +267,9 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceLeaderSchedulePolicy(c *C) s.tc.AddLeaderStore(4, 10, 100*MB) s.tc.AddLeaderRegion(1, 1, 2, 3, 4) c.Assert(s.tc.GetScheduleConfig().LeaderSchedulePolicy, Equals, core.ByCount.String()) // default by count - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) s.tc.SetLeaderSchedulePolicy(core.BySize.String()) - c.Check(s.schedule(), NotNil) + c.Assert(len(s.schedule()), Greater, 0) } func (s *testBalanceLeaderSchedulerSuite) TestBalanceLeaderTolerantRatio(c *C) { @@ -284,13 +285,13 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceLeaderTolerantRatio(c *C) { s.tc.AddLeaderStore(4, 10, 100) s.tc.AddLeaderRegion(1, 1, 2, 3, 4) c.Assert(s.tc.GetScheduleConfig().LeaderSchedulePolicy, Equals, core.ByCount.String()) // default by count - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) c.Assert(s.tc.GetStore(1).GetLeaderCount(), Equals, 14) s.tc.AddLeaderStore(1, 15, 100) c.Assert(s.tc.GetStore(1).GetLeaderCount(), Equals, 15) c.Check(s.schedule(), NotNil) s.tc.SetTolerantSizeRatio(6) // (15-10)<6 - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) } func (s *testBalanceLeaderSchedulerSuite) TestScheduleWithOpInfluence(c *C) { @@ -312,7 +313,7 @@ func (s *testBalanceLeaderSchedulerSuite) TestScheduleWithOpInfluence(c *C) { c.Assert(s.tc.GetScheduleConfig().LeaderSchedulePolicy, Equals, core.ByCount.String()) // default by count c.Check(s.schedule(), NotNil) s.tc.SetLeaderSchedulePolicy(core.BySize.String()) - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) // Stores: 1 2 3 4 // Leaders: 8 8 9 13 @@ -322,7 +323,7 @@ func (s *testBalanceLeaderSchedulerSuite) TestScheduleWithOpInfluence(c *C) { s.tc.UpdateLeaderCount(3, 9) s.tc.UpdateLeaderCount(4, 13) s.tc.AddLeaderRegion(1, 4, 1, 2, 3) - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) } func (s *testBalanceLeaderSchedulerSuite) TestTransferLeaderOut(c *C) { @@ -475,8 +476,8 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { s.tc.AddLeaderRegion(1, 4, 2, 3) s.tc.AddLeaderRegion(2, 1, 2, 3) // The cluster is balanced. - c.Assert(s.schedule(), IsNil) - c.Assert(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) + c.Assert(s.schedule(), HasLen, 0) // store3's leader drops: // Stores: 1 2 3 4 @@ -533,22 +534,22 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestSingleRangeBalance(c *C) { c.Assert(ops[0].FinishedCounters, HasLen, 3) lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"h", "n"})) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"b", "f"})) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "a"})) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"g", ""})) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", "f"})) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) lb, err = schedule.CreateScheduler(BalanceLeaderType, s.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"b", ""})) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) } func (s *testBalanceLeaderRangeSchedulerSuite) TestMultiRangeBalance(c *C) { @@ -575,11 +576,87 @@ func (s *testBalanceLeaderRangeSchedulerSuite) TestMultiRangeBalance(c *C) { s.tc.RemoveRegion(s.tc.GetRegion(2)) s.tc.AddLeaderRegionWithRange(3, "u", "w", 1, 2, 3, 4) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) s.tc.RemoveRegion(s.tc.GetRegion(3)) s.tc.AddLeaderRegionWithRange(4, "", "", 1, 2, 3, 4) c.Assert(err, IsNil) - c.Assert(lb.Schedule(s.tc), IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 0) +} + +func (s *testBalanceLeaderRangeSchedulerSuite) TestBatchBalance(c *C) { + s.tc.AddLeaderStore(1, 100) + s.tc.AddLeaderStore(2, 0) + s.tc.AddLeaderStore(3, 0) + s.tc.AddLeaderStore(4, 100) + s.tc.AddLeaderStore(5, 100) + s.tc.AddLeaderStore(6, 0) + + s.tc.AddLeaderRegionWithRange(uint64(102), "102a", "102z", 1, 2, 3) + s.tc.AddLeaderRegionWithRange(uint64(103), "103a", "103z", 4, 5, 6) + lb, err := schedule.CreateScheduler(BalanceLeaderType, s.oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), HasLen, 2) + for i := 1; i <= 50; i++ { + s.tc.AddLeaderRegionWithRange(uint64(i), fmt.Sprintf("%da", i), fmt.Sprintf("%dz", i), 1, 2, 3) + } + for i := 51; i <= 100; i++ { + s.tc.AddLeaderRegionWithRange(uint64(i), fmt.Sprintf("%da", i), fmt.Sprintf("%dz", i), 4, 5, 6) + } + s.tc.AddLeaderRegionWithRange(uint64(101), "101a", "101z", 5, 4, 3) + ops := lb.Schedule(s.tc) + c.Assert(ops, HasLen, 5) + regions := make(map[uint64]struct{}) + for _, op := range ops { + regions[op.RegionID()] = struct{}{} + } + c.Assert(regions, HasLen, 5) +} + +func (s *testBalanceLeaderRangeSchedulerSuite) TestReSortStores(c *C) { + s.tc.AddLeaderStore(1, 104) + s.tc.AddLeaderStore(2, 0) + s.tc.AddLeaderStore(3, 0) + s.tc.AddLeaderStore(4, 100) + s.tc.AddLeaderStore(5, 100) + s.tc.AddLeaderStore(6, 0) + stores := s.tc.Stores.GetStores() + + deltaMap := make(map[uint64]int64) + less := func(i, j int) bool { + iOp := deltaMap[stores[i].GetID()] + jOp := deltaMap[stores[j].GetID()] + return stores[i].LeaderScore(0, iOp) > stores[j].LeaderScore(0, jOp) + } + + sort.Slice(stores, less) + storeIndexMap := map[uint64]int{} + for i := 0; i < len(stores); i++ { + storeIndexMap[stores[i].GetID()] = i + } + c.Assert(stores[0].GetID(), Equals, uint64(1)) + c.Assert(storeIndexMap[uint64(1)], Equals, 0) + deltaMap[1] = -1 + + resortStores(stores, storeIndexMap, storeIndexMap[uint64(1)], less) + c.Assert(stores[0].GetID(), Equals, uint64(1)) + c.Assert(storeIndexMap[uint64(1)], Equals, 0) + deltaMap[1] = -4 + resortStores(stores, storeIndexMap, storeIndexMap[uint64(1)], less) + c.Assert(stores[2].GetID(), Equals, uint64(1)) + c.Assert(storeIndexMap[uint64(1)], Equals, 2) + topID := stores[0].GetID() + deltaMap[topID] = -1 + resortStores(stores, storeIndexMap, storeIndexMap[topID], less) + c.Assert(stores[1].GetID(), Equals, uint64(1)) + c.Assert(storeIndexMap[uint64(1)], Equals, 1) + c.Assert(stores[2].GetID(), Equals, topID) + c.Assert(storeIndexMap[topID], Equals, 2) + + bottomID := stores[5].GetID() + deltaMap[bottomID] = 4 + resortStores(stores, storeIndexMap, storeIndexMap[bottomID], less) + c.Assert(stores[3].GetID(), Equals, bottomID) + c.Assert(storeIndexMap[bottomID], Equals, 3) } var _ = Suite(&testBalanceRegionSchedulerSuite{}) @@ -627,10 +704,10 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { // store 2 becomes the store with least regions. testutil.CheckTransferPeerWithLeaderTransfer(c, sb.Schedule(tc)[0], operator.OpKind(0), 4, 2) opt.SetMaxReplicas(3) - c.Assert(sb.Schedule(tc), IsNil) + c.Assert(sb.Schedule(tc), HasLen, 0) opt.SetMaxReplicas(1) - c.Assert(sb.Schedule(tc), NotNil) + c.Assert(len(sb.Schedule(tc)), Greater, 0) } func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { @@ -659,7 +736,7 @@ func (s *testBalanceRegionSchedulerSuite) checkReplica3(c *C, tc *mockcluster.Cl tc.AddLeaderRegion(1, 1, 2, 3) // This schedule try to replace peer in store 1, but we have no other stores. - c.Assert(sb.Schedule(tc), IsNil) + c.Assert(sb.Schedule(tc), HasLen, 0) // Store 4 has smaller region score than store 2. tc.AddLabelsStore(4, 2, map[string]string{"zone": "z1", "rack": "r2", "host": "h1"}) @@ -694,7 +771,7 @@ func (s *testBalanceRegionSchedulerSuite) checkReplica3(c *C, tc *mockcluster.Cl // Store 9 has different zone with other stores but larger region score than store 1. tc.AddLabelsStore(9, 20, map[string]string{"zone": "z2", "rack": "r1", "host": "h1"}) - c.Assert(sb.Schedule(tc), IsNil) + c.Assert(sb.Schedule(tc), HasLen, 0) } func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { @@ -986,7 +1063,7 @@ func (s *testBalanceRegionSchedulerSuite) TestEmptyRegion(c *C) { tc.PutRegionStores(i, 1, 3, 4) } operators = sb.Schedule(tc) - c.Assert(operators, IsNil) + c.Assert(operators, HasLen, 0) } var _ = Suite(&testRandomMergeSchedulerSuite{}) diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index e17729172cf..63c38e32cdf 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -113,7 +113,7 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { bs, err := schedule.CreateScheduler(BalanceLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(BalanceLeaderType, []string{"", ""})) c.Assert(err, IsNil) op = bs.Schedule(tc) - c.Assert(op, IsNil) + c.Assert(op, HasLen, 0) // Can't evict leader from store2, neither. el, err := schedule.CreateScheduler(EvictLeaderType, oc, storage.NewStorageWithMemoryBackend(), schedule.ConfigSliceDecoder(EvictLeaderType, []string{"2"})) @@ -603,7 +603,7 @@ func (s *testBalanceLeaderSchedulerWithRuleEnabledSuite) TestBalanceLeaderWithCo if testcase.schedule { c.Check(len(s.schedule()), Equals, 1) } else { - c.Check(s.schedule(), IsNil) + c.Assert(s.schedule(), HasLen, 0) } } } @@ -645,7 +645,7 @@ func (s *testEvictSlowStoreSuite) TestEvictSlowStore(c *C) { c.Assert(op[0].Desc(), Equals, EvictSlowStoreType) // Cannot balance leaders to store 1 op = bs.Schedule(tc) - c.Check(op, IsNil) + c.Assert(op, HasLen, 0) newStoreInfo = storeInfo.Clone(func(store *core.StoreInfo) { store.GetStoreStats().SlowScore = 0 })