Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: allow balance-leader-scheduler generate multiple operators #4652

Merged
merged 19 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,9 @@ func (s *scheduleController) Schedule() []*operator.Operator {
}
cacheCluster := newCacheCluster(s.cluster)
// If we have schedule, reset interval to the minimal interval.
if op := s.Scheduler.Schedule(cacheCluster); op != nil {
if ops := s.Scheduler.Schedule(cacheCluster); len(ops) > 0 {
s.nextInterval = s.Scheduler.GetMinInterval()
return op
return ops
}
}
s.nextInterval = s.Scheduler.GetNextInterval(s.nextInterval)
Expand Down
12 changes: 6 additions & 6 deletions server/cluster/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
if time.Since(start) > time.Second {
break
}
c.Assert(ops, IsNil)
c.Assert(ops, HasLen, 0)
}

// reset all stores' limit
Expand All @@ -1024,7 +1024,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) {
// sleep 1 seconds to make sure that the token is filled up
time.Sleep(time.Second)
for i := 0; i < 100; i++ {
c.Assert(lb.Schedule(tc), NotNil)
c.Assert(len(lb.Schedule(tc)), Greater, 0)
}
}

Expand Down Expand Up @@ -1052,10 +1052,10 @@ func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) {
c.Assert(oc.AddOperator(op2), IsTrue)
op3 := newTestOperator(1, tc.GetRegion(2).GetRegionEpoch(), operator.OpRegion, operator.AddPeer{ToStore: 1, PeerID: 3})
c.Assert(oc.AddOperator(op3), IsFalse)
c.Assert(lb.Schedule(tc), IsNil)
c.Assert(lb.Schedule(tc), HasLen, 0)
// sleep 2 seconds to make sure that token is filled up
time.Sleep(2 * time.Second)
c.Assert(lb.Schedule(tc), NotNil)
c.Assert(len(lb.Schedule(tc)), Greater, 0)
}

func (s *testOperatorControllerSuite) TestDownStoreLimit(c *C) {
Expand Down Expand Up @@ -1146,7 +1146,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) {

for i := schedulers.MinScheduleInterval; sc.GetInterval() != schedulers.MaxScheduleInterval; i = sc.GetNextInterval(i) {
c.Assert(sc.GetInterval(), Equals, i)
c.Assert(sc.Schedule(), IsNil)
c.Assert(sc.Schedule(), HasLen, 0)
}
// limit = 2
lb.limit = 2
Expand Down Expand Up @@ -1227,7 +1227,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) {
for _, n := range idleSeconds {
sc.nextInterval = schedulers.MinScheduleInterval
for totalSleep := time.Duration(0); totalSleep <= time.Second*time.Duration(n); totalSleep += sc.GetInterval() {
c.Assert(sc.Schedule(), IsNil)
c.Assert(sc.Schedule(), HasLen, 0)
}
c.Assert(sc.GetInterval(), Less, time.Second*time.Duration(n/2))
}
Expand Down
18 changes: 10 additions & 8 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,10 +795,7 @@ func (oc *OperatorController) GetOpInfluence(cluster Cluster) operator.OpInfluen
defer oc.RUnlock()
for _, op := range oc.operators {
if !op.CheckTimeout() && !op.CheckSuccess() {
region := cluster.GetRegion(op.RegionID())
if region != nil {
op.UnfinishedInfluence(influence, region)
}
AddOpInfluence(op, influence, cluster)
}
}
return influence
Expand All @@ -823,17 +820,22 @@ func (oc *OperatorController) GetFastOpInfluence(cluster Cluster, influence oper
}
}

// AddOpInfluence add operator influence for cluster
func AddOpInfluence(op *operator.Operator, influence operator.OpInfluence, cluster Cluster) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it used for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change store leader score to sort again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use this function on L841-845 in the function NewTotalOpInfluence?

region := cluster.GetRegion(op.RegionID())
if region != nil {
op.TotalInfluence(influence, region)
}
}

// NewTotalOpInfluence creates a OpInfluence.
func NewTotalOpInfluence(operators []*operator.Operator, cluster Cluster) operator.OpInfluence {
influence := operator.OpInfluence{
StoresInfluence: make(map[uint64]*operator.StoreInfluence),
}

for _, op := range operators {
region := cluster.GetRegion(op.RegionID())
if region != nil {
op.TotalInfluence(influence, region)
}
AddOpInfluence(op, influence, cluster)
}

return influence
Expand Down
209 changes: 161 additions & 48 deletions server/schedulers/balance_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
BalanceLeaderType = "balance-leader"
// balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store.
balanceLeaderRetryLimit = 10
// BalanceLeaderBatchSize is the default number of operators to transfer leaders by one scheduling
BalanceLeaderBatchSize = 5

transferIn = "transfer-in"
transferOut = "transfer-out"
)

func init() {
Expand All @@ -51,6 +56,7 @@ func init() {
}
conf.Ranges = ranges
conf.Name = BalanceLeaderName
conf.Batch = BalanceLeaderBatchSize
return nil
}
})
Expand All @@ -67,6 +73,7 @@ func init() {
type balanceLeaderSchedulerConfig struct {
Name string `json:"name"`
Ranges []core.KeyRange `json:"ranges"`
Batch int `json:"batch"`
}

type balanceLeaderScheduler struct {
Expand Down Expand Up @@ -137,6 +144,47 @@ func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) boo
return allowed
}

type candidateStores struct {
stores []*core.StoreInfo
storeIndexMap map[uint64]int
index int
compareOption func([]*core.StoreInfo) func(int, int) bool
}

// hasStore returns returns true when there are leftover stores.
func (cs *candidateStores) hasStore() bool {
return cs.index < len(cs.stores)
}

func (cs *candidateStores) getStore() *core.StoreInfo {
return cs.stores[cs.index]
}

func (cs *candidateStores) next() {
cs.index++
}

func (cs *candidateStores) initSort() {
sort.Slice(cs.stores, cs.compareOption(cs.stores))
cs.storeIndexMap = map[uint64]int{}
for i := 0; i < len(cs.stores); i++ {
cs.storeIndexMap[cs.stores[i].GetID()] = i
}
}

func (cs *candidateStores) reSort(stores ...*core.StoreInfo) {
if !cs.hasStore() {
return
}
for _, store := range stores {
index, ok := cs.storeIndexMap[store.GetID()]
if !ok {
continue
}
resortStores(cs.stores, cs.storeIndexMap, index, cs.compareOption(cs.stores))
}
}

func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.Operator {
schedulerCounter.WithLabelValues(l.GetName(), "schedule").Inc()

Expand All @@ -146,63 +194,128 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
plan := newBalancePlan(kind, cluster, opInfluence)

stores := cluster.GetStores()
sources := filter.SelectSourceStores(stores, l.filters, cluster.GetOpts())
targets := filter.SelectTargetStores(stores, l.filters, cluster.GetOpts())
sort.Slice(sources, func(i, j int) bool {
iOp := plan.GetOpInfluence(sources[i].GetID())
jOp := plan.GetOpInfluence(sources[j].GetID())
return sources[i].LeaderScore(leaderSchedulePolicy, iOp) >
sources[j].LeaderScore(leaderSchedulePolicy, jOp)
})
sort.Slice(targets, func(i, j int) bool {
iOp := plan.GetOpInfluence(targets[i].GetID())
jOp := plan.GetOpInfluence(targets[j].GetID())
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) <
targets[j].LeaderScore(leaderSchedulePolicy, jOp)
})
greaterOption := func(stores []*core.StoreInfo) func(int, int) bool {
return func(i, j int) bool {
iOp := plan.GetOpInfluence(stores[i].GetID())
jOp := plan.GetOpInfluence(stores[j].GetID())
return stores[i].LeaderScore(plan.kind.Policy, iOp) >
stores[j].LeaderScore(plan.kind.Policy, jOp)
}
}
lessOption := func(stores []*core.StoreInfo) func(int, int) bool {
return func(i, j int) bool {
iOp := plan.GetOpInfluence(stores[i].GetID())
jOp := plan.GetOpInfluence(stores[j].GetID())
return stores[i].LeaderScore(plan.kind.Policy, iOp) <
stores[j].LeaderScore(plan.kind.Policy, jOp)
}
}
sourceCandidate := &candidateStores{
stores: filter.SelectSourceStores(stores, l.filters, cluster.GetOpts()),
compareOption: greaterOption,
}
sourceCandidate.initSort()
targetCandidate := &candidateStores{
stores: filter.SelectTargetStores(stores, l.filters, cluster.GetOpts()),
compareOption: lessOption,
}
targetCandidate.initSort()
usedRegions := make(map[uint64]struct{})

for i := 0; i < len(sources) || i < len(targets); i++ {
if i < len(sources) {
plan.source, plan.target = sources[i], nil
retryLimit := l.retryQuota.GetLimit(plan.source)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("source-store", plan.SourceStoreID()))
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc()
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderOut(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.source)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel()))
return ops
result := make([]*operator.Operator, 0, l.conf.Batch)
for sourceCandidate.hasStore() || targetCandidate.hasStore() {
// first choose source
if sourceCandidate.hasStore() {
op := createTransferLeaderOperator(sourceCandidate, transferOut, l, plan, usedRegions)
if op != nil {
result = append(result, op)
if len(result) >= l.conf.Batch {
return result
}
makeInfluence(op, plan, usedRegions, sourceCandidate, targetCandidate)
}
l.Attenuate(plan.source)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID()))
}
if i < len(targets) {
plan.source, plan.target = nil, targets[i]
retryLimit := l.retryQuota.GetLimit(plan.target)
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("target-store", plan.TargetStoreID()))
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc()
for j := 0; j < retryLimit; j++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if ops := l.transferLeaderIn(plan); len(ops) > 0 {
l.retryQuota.ResetLimit(plan.target)
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel()))
return ops
// next choose target
if targetCandidate.hasStore() {
op := createTransferLeaderOperator(targetCandidate, transferIn, l, plan, usedRegions)
if op != nil {
result = append(result, op)
if len(result) >= l.conf.Batch {
return result
}
makeInfluence(op, plan, usedRegions, sourceCandidate, targetCandidate)
}
l.Attenuate(plan.target)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID()))
}
}
l.retryQuota.GC(append(sources, targets...))
return nil
l.retryQuota.GC(append(sourceCandidate.stores, targetCandidate.stores...))
return result
}

func createTransferLeaderOperator(cs *candidateStores, dir string, l *balanceLeaderScheduler,
plan *balancePlan, usedRegions map[uint64]struct{}) *operator.Operator {
store := cs.getStore()
retryLimit := l.retryQuota.GetLimit(store)
var creator func(*balancePlan) *operator.Operator
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64(dir, store.GetID()))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no score field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement didn't have it before, but it's better to add it.

switch dir {
case transferOut:
plan.source, plan.target = store, nil
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc()
creator = l.transferLeaderOut
case transferIn:
plan.source, plan.target = nil, store
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc()
creator = l.transferLeaderIn
}
var op *operator.Operator = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var op *operator.Operator = nil
var op *operator.Operator

for i := 0; i < retryLimit; i++ {
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc()
if op = creator(plan); op != nil {
if _, ok := usedRegions[op.RegionID()]; !ok {
break
}
op = nil
}
}
if op != nil {
l.retryQuota.ResetLimit(store)
op.Counters = append(op.Counters, l.counter.WithLabelValues(dir, plan.SourceMetricLabel()))
} else {
l.Attenuate(store)
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64(dir, store.GetID()))
cs.next()
}
return op
}

func makeInfluence(op *operator.Operator, plan *balancePlan, usedRegions map[uint64]struct{}, candidates ...*candidateStores) {
usedRegions[op.RegionID()] = struct{}{}
schedule.AddOpInfluence(op, plan.opInfluence, plan.Cluster)
for _, candidate := range candidates {
candidate.reSort(plan.source, plan.target)
}
}

// resortStores is used to sort stores again after creating an operator.
// It will repeatedly swap the specific store and next store if they are in wrong order.
// In general, it has very few swaps. In the worst case, the time complexity is O(n).
func resortStores(stores []*core.StoreInfo, index map[uint64]int, pos int, less func(i, j int) bool) {
swapper := func(i, j int) { stores[i], stores[j] = stores[j], stores[i] }
for ; pos+1 < len(stores) && !less(pos, pos+1); pos++ {
swapper(pos, pos+1)
index[stores[pos].GetID()] = pos
}
for ; pos > 1 && less(pos, pos-1); pos-- {
swapper(pos, pos-1)
index[stores[pos].GetID()] = pos
}
index[stores[pos].GetID()] = pos
}

// transferLeaderOut transfers leader from the source store.
// It randomly selects a health region from the source store, then picks
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operator.Operator {
func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) *operator.Operator {
plan.region = plan.RandLeaderRegion(plan.SourceStoreID(), l.conf.Ranges, schedule.IsRegionHealthy)
if plan.region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.SourceStoreID()))
Expand All @@ -223,7 +336,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operato
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) < targets[j].LeaderScore(leaderSchedulePolicy, jOp)
})
for _, plan.target = range targets {
if op := l.createOperator(plan); len(op) > 0 {
if op := l.createOperator(plan); op != nil {
return op
}
}
Expand All @@ -235,7 +348,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(plan *balancePlan) []*operato
// transferLeaderIn transfers leader to the target store.
// It randomly selects a health region from the target store, then picks
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) []*operator.Operator {
func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) *operator.Operator {
plan.region = plan.RandFollowerRegion(plan.TargetStoreID(), l.conf.Ranges, schedule.IsRegionHealthy)
if plan.region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", plan.TargetStoreID()))
Expand Down Expand Up @@ -273,7 +386,7 @@ func (l *balanceLeaderScheduler) transferLeaderIn(plan *balancePlan) []*operator
// If the region is hot or the difference between the two stores is tolerable, then
// no new operator need to be created, otherwise create an operator that transfers
// the leader from the source store to the target store for the region.
func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) []*operator.Operator {
func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) *operator.Operator {
if plan.IsRegionHot(plan.region) {
log.Debug("region is hot region, ignore it", zap.String("scheduler", l.GetName()), zap.Uint64("region-id", plan.region.GetID()))
schedulerCounter.WithLabelValues(l.GetName(), "region-hot").Inc()
Expand All @@ -300,5 +413,5 @@ func (l *balanceLeaderScheduler) createOperator(plan *balancePlan) []*operator.O
)
op.AdditionalInfos["sourceScore"] = strconv.FormatFloat(plan.sourceScore, 'f', 2, 64)
op.AdditionalInfos["targetScore"] = strconv.FormatFloat(plan.targetScore, 'f', 2, 64)
return []*operator.Operator{op}
return op
}
Loading