-
Notifications
You must be signed in to change notification settings - Fork 725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
scheduler: allow balance-leader-scheduler
generate multiple operators
#4652
Changes from 5 commits
ab6f582
f5be6dd
263cc1c
0976e10
7798e88
0528f86
3f139b8
065f012
15925f9
f55e76b
6f9232b
44b23ae
594ecec
2cd7b9c
30fd878
efbb655
b165436
9a32d17
785ade2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -51,6 +51,7 @@ func init() { | |
} | ||
conf.Ranges = ranges | ||
conf.Name = BalanceLeaderName | ||
conf.Batch = 5 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about using a constant? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix |
||
return nil | ||
} | ||
}) | ||
|
@@ -67,6 +68,7 @@ func init() { | |
type balanceLeaderSchedulerConfig struct { | ||
Name string `json:"name"` | ||
Ranges []core.KeyRange `json:"ranges"` | ||
Batch int `json:"batch"` | ||
} | ||
|
||
type balanceLeaderScheduler struct { | ||
|
@@ -148,6 +150,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator. | |
stores := cluster.GetStores() | ||
sources := filter.SelectSourceStores(stores, l.filters, cluster.GetOpts()) | ||
targets := filter.SelectTargetStores(stores, l.filters, cluster.GetOpts()) | ||
result := make([]*operator.Operator, 0, l.conf.Batch) | ||
sort.Slice(sources, func(i, j int) bool { | ||
iOp := plan.GetOpInfluence(sources[i].GetID()) | ||
jOp := plan.GetOpInfluence(sources[j].GetID()) | ||
|
@@ -161,42 +164,92 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator. | |
targets[j].LeaderScore(leaderSchedulePolicy, jOp) | ||
}) | ||
|
||
for i := 0; i < len(sources) || i < len(targets); i++ { | ||
if i < len(sources) { | ||
plan.source, plan.target = sources[i], nil | ||
usedRegions := make(map[uint64]struct{}) | ||
sourcePoint := 0 | ||
targetPoint := 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think at least some comments are needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. rafactor it |
||
for sourcePoint < len(sources) || targetPoint < len(targets) { | ||
if sourcePoint < len(sources) { | ||
used := false | ||
plan.source, plan.target = sources[sourcePoint], nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe index? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep, updated |
||
retryLimit := l.retryQuota.GetLimit(plan.source) | ||
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("source-store", plan.SourceStoreID())) | ||
l.counter.WithLabelValues("high-score", plan.SourceMetricLabel()).Inc() | ||
for j := 0; j < retryLimit; j++ { | ||
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc() | ||
if ops := l.transferLeaderOut(plan); len(ops) > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about changing it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix |
||
if _, ok := usedRegions[ops[0].RegionID()]; ok { | ||
continue | ||
} | ||
l.retryQuota.ResetLimit(plan.source) | ||
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-out", plan.SourceMetricLabel())) | ||
return ops | ||
result = append(result, ops...) | ||
if len(result) >= l.conf.Batch { | ||
return result | ||
} | ||
used = true | ||
usedRegions[ops[0].RegionID()] = struct{}{} | ||
schedule.AddOpInfluence(ops[0], plan.opInfluence, cluster) | ||
sortStores(sources, sourcePoint, func(i, j int) bool { | ||
iOp := plan.GetOpInfluence(sources[i].GetID()) | ||
nolouch marked this conversation as resolved.
Show resolved
Hide resolved
|
||
jOp := plan.GetOpInfluence(sources[j].GetID()) | ||
return sources[i].LeaderScore(leaderSchedulePolicy, iOp) <= | ||
sources[j].LeaderScore(leaderSchedulePolicy, jOp) | ||
}) | ||
break | ||
} | ||
} | ||
l.Attenuate(plan.source) | ||
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID())) | ||
if !used { | ||
sourcePoint++ | ||
l.Attenuate(plan.source) | ||
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("source", plan.SourceStoreID())) | ||
} | ||
} | ||
if i < len(targets) { | ||
plan.source, plan.target = nil, targets[i] | ||
if targetPoint < len(targets) { | ||
used := false | ||
plan.source, plan.target = nil, targets[targetPoint] | ||
retryLimit := l.retryQuota.GetLimit(plan.target) | ||
log.Debug("store leader score", zap.String("scheduler", l.GetName()), zap.Uint64("target-store", plan.TargetStoreID())) | ||
l.counter.WithLabelValues("low-score", plan.TargetMetricLabel()).Inc() | ||
for j := 0; j < retryLimit; j++ { | ||
schedulerCounter.WithLabelValues(l.GetName(), "total").Inc() | ||
if ops := l.transferLeaderIn(plan); len(ops) > 0 { | ||
if _, ok := usedRegions[ops[0].RegionID()]; ok { | ||
continue | ||
} | ||
l.retryQuota.ResetLimit(plan.target) | ||
ops[0].Counters = append(ops[0].Counters, l.counter.WithLabelValues("transfer-in", plan.TargetMetricLabel())) | ||
return ops | ||
result = append(result, ops...) | ||
if len(result) >= l.conf.Batch { | ||
return result | ||
} | ||
used = true | ||
usedRegions[ops[0].RegionID()] = struct{}{} | ||
schedule.AddOpInfluence(ops[0], plan.opInfluence, cluster) | ||
sortStores(targets, targetPoint, func(i, j int) bool { | ||
iOp := plan.GetOpInfluence(targets[i].GetID()) | ||
jOp := plan.GetOpInfluence(targets[j].GetID()) | ||
return targets[i].LeaderScore(leaderSchedulePolicy, iOp) >= | ||
targets[j].LeaderScore(leaderSchedulePolicy, jOp) | ||
}) | ||
break | ||
} | ||
} | ||
l.Attenuate(plan.target) | ||
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID())) | ||
if !used { | ||
targetPoint++ | ||
l.Attenuate(plan.target) | ||
log.Debug("no operator created for selected stores", zap.String("scheduler", l.GetName()), zap.Uint64("target", plan.TargetStoreID())) | ||
} | ||
} | ||
} | ||
l.retryQuota.GC(append(sources, targets...)) | ||
return nil | ||
return result | ||
} | ||
|
||
func sortStores(stores []*core.StoreInfo, pos int, less func(i, j int) bool) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix |
||
swapper := func(i, j int) { stores[i], stores[j] = stores[j], stores[i] } | ||
for ; pos+1 < len(stores) && less(pos, pos+1); pos++ { | ||
swapper(pos, pos+1) | ||
} | ||
} | ||
|
||
// transferLeaderOut transfers leader from the source store. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is it used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change store leader score to sort again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use this function on L841-845 in the function
NewTotalOpInfluence
?