Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: refactor hot region scheduler #4488

Merged
merged 9 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions server/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,12 @@ func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedu
return s.randomSchedule(cluster, infos)
}

func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, infos []*statistics.StoreLoadDetail) (ops []*operator.Operator) {
func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, srcStores []*statistics.StoreLoadDetail) (ops []*operator.Operator) {
isleader := s.r.Int()%2 == 1
for _, detail := range infos {
srcStoreID := detail.Info.Store.GetID()
for _, srcStore := range srcStores {
srcStoreID := srcStore.GetID()
if isleader {
if s.conf.has(srcStoreID) || len(detail.HotPeers) < 1 {
if s.conf.has(srcStoreID) || len(srcStore.HotPeers) < 1 {
continue
}
} else {
Expand All @@ -321,7 +321,7 @@ func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, infos
}
}

for _, peer := range detail.HotPeers {
for _, peer := range srcStore.HotPeers {
if s.OpController.GetOperator(peer.RegionID) != nil {
continue
}
Expand Down
142 changes: 51 additions & 91 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,16 +344,13 @@ type balanceSolver struct {
// they may be byte(0), key(1), query(2), and always less than dimLen
firstPriority int
secondPriority int

firstPriorityIsBetter bool
secondPriorityIsBetter bool
}

type solution struct {
srcDetail *statistics.StoreLoadDetail
srcStore *statistics.StoreLoadDetail
srcPeerStat *statistics.HotPeerStat
region *core.RegionInfo
dstDetail *statistics.StoreLoadDetail
dstStore *statistics.StoreLoadDetail

// progressiveRank measures the contribution for balance.
// The smaller the rank, the better this solution is.
Expand All @@ -362,17 +359,8 @@ type solution struct {
}

func (bs *balanceSolver) init() {
switch toResourceType(bs.rwTy, bs.opTy) {
case writePeer:
bs.stLoadDetail = bs.sche.stLoadInfos[writePeer]
case writeLeader:
bs.stLoadDetail = bs.sche.stLoadInfos[writeLeader]
case readLeader:
bs.stLoadDetail = bs.sche.stLoadInfos[readLeader]
case readPeer:
bs.stLoadDetail = bs.sche.stLoadInfos[readPeer]
}
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
// Init store load detail according to the type.
bs.stLoadDetail = bs.sche.stLoadInfos[toResourceType(bs.rwTy, bs.opTy)]

bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}
bs.minDst = &statistics.StoreLoad{
Expand Down Expand Up @@ -414,16 +402,13 @@ func (bs *balanceSolver) getPriorities() []string {
querySupport := bs.sche.conf.checkQuerySupport(bs.Cluster)
// For read, transfer-leader and move-peer have the same priority config
// For write, they are different
switch bs.rwTy {
case statistics.Read:
switch toResourceType(bs.rwTy, bs.opTy) {
case readLeader, readPeer:
return adjustConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities)
case statistics.Write:
switch bs.opTy {
case transferLeader:
return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities)
case movePeer:
return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities)
}
case writeLeader:
return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities)
case writePeer:
return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities)
}
log.Error("illegal type or illegal operator while getting the priority", zap.String("type", bs.rwTy.String()), zap.String("operator", bs.opTy.String()))
return []string{}
Expand All @@ -444,16 +429,8 @@ func (bs *balanceSolver) isValid() bool {
if bs.Cluster == nil || bs.sche == nil || bs.stLoadDetail == nil {
return false
}
switch bs.rwTy {
case statistics.Write, statistics.Read:
default:
return false
}
switch bs.opTy {
case movePeer, transferLeader:
default:
return false
}
// ignore the return value because it will panic if the type is not correct.
_ = toResourceType(bs.rwTy, bs.opTy)
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
return true
}

Expand All @@ -465,17 +442,17 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
bs.cur = &solution{}

for _, srcDetail := range bs.filterSrcStores() {
bs.cur.srcDetail = srcDetail
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore

for _, srcPeerStat := range bs.filterHotPeers() {
bs.cur.srcPeerStat = srcPeerStat
bs.cur.region = bs.getRegion()
if bs.cur.region == nil {
continue
}
for _, dstDetail := range bs.filterDstStores() {
bs.cur.dstDetail = dstDetail
for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
bs.calcProgressiveRank()
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOp, newInfl := bs.buildOperator(); newOp != nil {
Expand All @@ -495,34 +472,26 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
if bs.best == nil || len(bs.ops) == 0 {
return false
}
if bs.best.srcDetail.Info.IsTiFlash != bs.best.dstDetail.Info.IsTiFlash {
if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "not-same-engine").Inc()
return false
}
// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.isForWriteLeader():
switch toResourceType(bs.rwTy, bs.opTy) {
case writeLeader:
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
case bs.isForWritePeer():
if bs.best.srcDetail.Info.IsTiFlash {
case writePeer:
if bs.best.srcStore.IsTiFlash() {
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
} else {
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}
return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcDetail.GetID(), bs.best.dstDetail.GetID(), bs.infl, maxZombieDur)
}

func (bs *balanceSolver) isForWriteLeader() bool {
return bs.rwTy == statistics.Write && bs.opTy == transferLeader
}

func (bs *balanceSolver) isForWritePeer() bool {
return bs.rwTy == statistics.Write && bs.opTy == movePeer
return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcStore.GetID(), bs.best.dstStore.GetID(), bs.infl, maxZombieDur)
}

// filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than
Expand All @@ -533,7 +502,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for id, detail := range bs.stLoadDetail {
srcToleranceRatio := confSrcToleranceRatio
if detail.Info.IsTiFlash {
if detail.IsTiFlash() {
if !confEnableForTiFlash {
continue
}
Expand Down Expand Up @@ -571,7 +540,7 @@ func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *
// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
ret := bs.cur.srcDetail.HotPeers
ret := bs.cur.srcStore.HotPeers
// Return at most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
maxPeerNum := bs.sche.conf.GetMaxPeerNumber()

Expand Down Expand Up @@ -675,13 +644,13 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {

switch bs.opTy {
case movePeer:
srcPeer := region.GetStorePeer(bs.cur.srcDetail.GetID())
srcPeer := region.GetStorePeer(bs.cur.srcStore.GetID())
if srcPeer == nil {
log.Debug("region does not have a peer on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID()))
return nil
}
case transferLeader:
if region.GetLeader().GetStoreId() != bs.cur.srcDetail.GetID() {
if region.GetLeader().GetStoreId() != bs.cur.srcStore.GetID() {
log.Debug("region leader is not on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID()))
return nil
}
Expand All @@ -698,7 +667,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*statistics.StoreLoadDetai
filters []filter.Filter
candidates []*statistics.StoreLoadDetail
)
srcStore := bs.cur.srcDetail.Info.Store
srcStore := bs.cur.srcStore.StoreInfo
switch bs.opTy {
case movePeer:
filters = []filter.Filter{
Expand Down Expand Up @@ -738,9 +707,9 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for _, detail := range candidates {
store := detail.Info.Store
store := detail.StoreInfo
dstToleranceRatio := confDstToleranceRatio
if detail.Info.IsTiFlash {
if detail.IsTiFlash() {
if !confEnableForTiFlash {
continue
}
Expand Down Expand Up @@ -777,14 +746,14 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
// calcProgressiveRank calculates `bs.cur.progressiveRank`.
// See the comments of `solution.progressiveRank` for more about progressive rank.
func (bs *balanceSolver) calcProgressiveRank() {
src := bs.cur.srcDetail
dst := bs.cur.dstDetail
src := bs.cur.srcStore
dst := bs.cur.dstStore
srcLd := src.LoadPred.Min()
dstLd := dst.LoadPred.Max()
bs.cur.progressiveRank = 0
peer := bs.cur.srcPeerStat

if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
if !bs.isTolerance(src, dst, bs.firstPriority) {
return
}
Expand All @@ -804,22 +773,18 @@ func (bs *balanceSolver) calcProgressiveRank() {
return
}
bs.cur.progressiveRank = -3
bs.firstPriorityIsBetter = true
bs.secondPriorityIsBetter = true
case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced.
if !bs.isTolerance(src, dst, bs.secondPriority) {
return
}
bs.cur.progressiveRank = -2
bs.secondPriorityIsBetter = true
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio:
// If belong to the case, first priority dim will be more balanced, ignore the second priority dim.
if !bs.isTolerance(src, dst, bs.firstPriority) {
return
}
bs.cur.progressiveRank = -1
bs.firstPriorityIsBetter = true
}
}
}
Expand Down Expand Up @@ -886,13 +851,13 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
return false
}

if r := bs.compareSrcStore(bs.cur.srcDetail, old.srcDetail); r < 0 {
if r := bs.compareSrcStore(bs.cur.srcStore, old.srcStore); r < 0 {
return true
} else if r > 0 {
return false
}

if r := bs.compareDstStore(bs.cur.dstDetail, old.dstDetail); r < 0 {
if r := bs.compareDstStore(bs.cur.dstStore, old.dstStore); r < 0 {
return true
} else if r > 0 {
return false
Expand All @@ -901,7 +866,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
if bs.cur.srcPeerStat != old.srcPeerStat {
// compare region

if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
kind := statistics.GetRegionStatKind(statistics.Write, bs.firstPriority)
switch {
case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind):
Expand Down Expand Up @@ -960,7 +925,7 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadD
if detail1 != detail2 {
// compare source store
var lpCmp storeLPCmp
if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
lpCmp = sliceLPCmp(
minLPCmp(negLoadCmp(sliceLoadCmp(
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
Expand Down Expand Up @@ -993,7 +958,7 @@ func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadD
if detail1 != detail2 {
// compare destination store
var lpCmp storeLPCmp
if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
lpCmp = sliceLPCmp(
maxLPCmp(sliceLoadCmp(
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
Expand Down Expand Up @@ -1026,16 +991,14 @@ func stepRank(rk0 float64, step float64) func(float64) int64 {
}
}

// Once we are ready to build the operator, we must ensure the following things:
// 1. the source store and destination store in the current solution are not nil
// 2. the peer we choose as a source in the current solution is not nil and it belongs to the source store
// 3. the region which owns the peer in the current solution is not nil and its ID should equal to the peer's region ID
func (bs *balanceSolver) isReadyToBuild() bool {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
if bs.cur.srcDetail == nil || bs.cur.dstDetail == nil ||
bs.cur.srcPeerStat == nil || bs.cur.region == nil {
return false
}
if bs.cur.srcDetail.GetID() != bs.cur.srcPeerStat.StoreID ||
bs.cur.region.GetID() != bs.cur.srcPeerStat.ID() {
return false
}
return true
return bs.cur.srcStore != nil && bs.cur.dstStore != nil &&
bs.cur.srcPeerStat != nil && bs.cur.srcPeerStat.StoreID == bs.cur.srcStore.GetID() &&
bs.cur.region != nil && bs.cur.region.GetID() == bs.cur.srcPeerStat.ID()
}

func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistics.Influence) {
Expand All @@ -1049,8 +1012,8 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic
targetLabel string
)

srcStoreID := bs.cur.srcDetail.GetID()
dstStoreID := bs.cur.dstDetail.GetID()
srcStoreID := bs.cur.srcStore.GetID()
dstStoreID := bs.cur.dstStore.GetID()
switch bs.opTy {
case movePeer:
srcPeer := bs.cur.region.GetStorePeer(srcStoreID) // checked in getRegionAndSrcPeer
Expand Down Expand Up @@ -1103,12 +1066,13 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic
}

dim := ""
if bs.firstPriorityIsBetter && bs.secondPriorityIsBetter {
switch bs.cur.progressiveRank {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
case -3:
dim = "all"
} else if bs.firstPriorityIsBetter {
dim = dimToString(bs.firstPriority)
} else if bs.secondPriorityIsBetter {
case -2:
dim = dimToString(bs.secondPriority)
case -1:
dim = dimToString(bs.firstPriority)
}

op.SetPriorityLevel(core.HighPriority)
Expand Down Expand Up @@ -1151,10 +1115,6 @@ func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur
return
}

func (h *hotScheduler) clearPendingInfluence() {
h.regionPendings = make(map[uint64]*pendingInfluence)
}

type opType int

const (
Expand Down
Loading