Skip to content

Commit

Permalink
scheduler: refactor hot region scheduler (#4488)
Browse files Browse the repository at this point in the history
ref #4477

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored Feb 23, 2022
1 parent 8328528 commit 3c5e7a7
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 146 deletions.
10 changes: 5 additions & 5 deletions server/schedulers/grant_hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,12 @@ func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedu
return s.randomSchedule(cluster, infos)
}

func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, infos []*statistics.StoreLoadDetail) (ops []*operator.Operator) {
func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, srcStores []*statistics.StoreLoadDetail) (ops []*operator.Operator) {
isleader := s.r.Int()%2 == 1
for _, detail := range infos {
srcStoreID := detail.Info.Store.GetID()
for _, srcStore := range srcStores {
srcStoreID := srcStore.GetID()
if isleader {
if s.conf.has(srcStoreID) || len(detail.HotPeers) < 1 {
if s.conf.has(srcStoreID) || len(srcStore.HotPeers) < 1 {
continue
}
} else {
Expand All @@ -321,7 +321,7 @@ func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, infos
}
}

for _, peer := range detail.HotPeers {
for _, peer := range srcStore.HotPeers {
if s.OpController.GetOperator(peer.RegionID) != nil {
continue
}
Expand Down
142 changes: 51 additions & 91 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,16 +344,13 @@ type balanceSolver struct {
// they may be byte(0), key(1), query(2), and always less than dimLen
firstPriority int
secondPriority int

firstPriorityIsBetter bool
secondPriorityIsBetter bool
}

type solution struct {
srcDetail *statistics.StoreLoadDetail
srcStore *statistics.StoreLoadDetail
srcPeerStat *statistics.HotPeerStat
region *core.RegionInfo
dstDetail *statistics.StoreLoadDetail
dstStore *statistics.StoreLoadDetail

// progressiveRank measures the contribution for balance.
// The smaller the rank, the better this solution is.
Expand All @@ -362,17 +359,8 @@ type solution struct {
}

func (bs *balanceSolver) init() {
switch toResourceType(bs.rwTy, bs.opTy) {
case writePeer:
bs.stLoadDetail = bs.sche.stLoadInfos[writePeer]
case writeLeader:
bs.stLoadDetail = bs.sche.stLoadInfos[writeLeader]
case readLeader:
bs.stLoadDetail = bs.sche.stLoadInfos[readLeader]
case readPeer:
bs.stLoadDetail = bs.sche.stLoadInfos[readPeer]
}
// And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat
// Init store load detail according to the type.
bs.stLoadDetail = bs.sche.stLoadInfos[toResourceType(bs.rwTy, bs.opTy)]

bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)}
bs.minDst = &statistics.StoreLoad{
Expand Down Expand Up @@ -414,16 +402,13 @@ func (bs *balanceSolver) getPriorities() []string {
querySupport := bs.sche.conf.checkQuerySupport(bs.Cluster)
// For read, transfer-leader and move-peer have the same priority config
// For write, they are different
switch bs.rwTy {
case statistics.Read:
switch toResourceType(bs.rwTy, bs.opTy) {
case readLeader, readPeer:
return adjustConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities)
case statistics.Write:
switch bs.opTy {
case transferLeader:
return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities)
case movePeer:
return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities)
}
case writeLeader:
return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities)
case writePeer:
return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities)
}
log.Error("illegal type or illegal operator while getting the priority", zap.String("type", bs.rwTy.String()), zap.String("operator", bs.opTy.String()))
return []string{}
Expand All @@ -444,16 +429,8 @@ func (bs *balanceSolver) isValid() bool {
if bs.Cluster == nil || bs.sche == nil || bs.stLoadDetail == nil {
return false
}
switch bs.rwTy {
case statistics.Write, statistics.Read:
default:
return false
}
switch bs.opTy {
case movePeer, transferLeader:
default:
return false
}
// ignore the return value because it will panic if the type is not correct.
_ = toResourceType(bs.rwTy, bs.opTy)
return true
}

Expand All @@ -465,17 +442,17 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
bs.cur = &solution{}

for _, srcDetail := range bs.filterSrcStores() {
bs.cur.srcDetail = srcDetail
for _, srcStore := range bs.filterSrcStores() {
bs.cur.srcStore = srcStore

for _, srcPeerStat := range bs.filterHotPeers() {
bs.cur.srcPeerStat = srcPeerStat
bs.cur.region = bs.getRegion()
if bs.cur.region == nil {
continue
}
for _, dstDetail := range bs.filterDstStores() {
bs.cur.dstDetail = dstDetail
for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
bs.calcProgressiveRank()
if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) {
if newOp, newInfl := bs.buildOperator(); newOp != nil {
Expand All @@ -495,34 +472,26 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
if bs.best == nil || len(bs.ops) == 0 {
return false
}
if bs.best.srcDetail.Info.IsTiFlash != bs.best.dstDetail.Info.IsTiFlash {
if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
schedulerCounter.WithLabelValues(bs.sche.GetName(), "not-same-engine").Inc()
return false
}
// Depending on the source of the statistics used, a different ZombieDuration will be used.
// If the statistics are from the sum of Regions, there will be a longer ZombieDuration.
var maxZombieDur time.Duration
switch {
case bs.isForWriteLeader():
switch toResourceType(bs.rwTy, bs.opTy) {
case writeLeader:
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
case bs.isForWritePeer():
if bs.best.srcDetail.Info.IsTiFlash {
case writePeer:
if bs.best.srcStore.IsTiFlash() {
maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration()
} else {
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}
default:
maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration()
}
return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcDetail.GetID(), bs.best.dstDetail.GetID(), bs.infl, maxZombieDur)
}

func (bs *balanceSolver) isForWriteLeader() bool {
return bs.rwTy == statistics.Write && bs.opTy == transferLeader
}

func (bs *balanceSolver) isForWritePeer() bool {
return bs.rwTy == statistics.Write && bs.opTy == movePeer
return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcStore.GetID(), bs.best.dstStore.GetID(), bs.infl, maxZombieDur)
}

// filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than
Expand All @@ -533,7 +502,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for id, detail := range bs.stLoadDetail {
srcToleranceRatio := confSrcToleranceRatio
if detail.Info.IsTiFlash {
if detail.IsTiFlash() {
if !confEnableForTiFlash {
continue
}
Expand Down Expand Up @@ -571,7 +540,7 @@ func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad *
// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat {
ret := bs.cur.srcDetail.HotPeers
ret := bs.cur.srcStore.HotPeers
// Return at most MaxPeerNum peers, to prevent balanceSolver.solve() too slow.
maxPeerNum := bs.sche.conf.GetMaxPeerNumber()

Expand Down Expand Up @@ -675,13 +644,13 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo {

switch bs.opTy {
case movePeer:
srcPeer := region.GetStorePeer(bs.cur.srcDetail.GetID())
srcPeer := region.GetStorePeer(bs.cur.srcStore.GetID())
if srcPeer == nil {
log.Debug("region does not have a peer on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID()))
return nil
}
case transferLeader:
if region.GetLeader().GetStoreId() != bs.cur.srcDetail.GetID() {
if region.GetLeader().GetStoreId() != bs.cur.srcStore.GetID() {
log.Debug("region leader is not on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID()))
return nil
}
Expand All @@ -698,7 +667,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*statistics.StoreLoadDetai
filters []filter.Filter
candidates []*statistics.StoreLoadDetail
)
srcStore := bs.cur.srcDetail.Info.Store
srcStore := bs.cur.srcStore.StoreInfo
switch bs.opTy {
case movePeer:
filters = []filter.Filter{
Expand Down Expand Up @@ -738,9 +707,9 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio()
confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash()
for _, detail := range candidates {
store := detail.Info.Store
store := detail.StoreInfo
dstToleranceRatio := confDstToleranceRatio
if detail.Info.IsTiFlash {
if detail.IsTiFlash() {
if !confEnableForTiFlash {
continue
}
Expand Down Expand Up @@ -777,14 +746,14 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
// calcProgressiveRank calculates `bs.cur.progressiveRank`.
// See the comments of `solution.progressiveRank` for more about progressive rank.
func (bs *balanceSolver) calcProgressiveRank() {
src := bs.cur.srcDetail
dst := bs.cur.dstDetail
src := bs.cur.srcStore
dst := bs.cur.dstStore
srcLd := src.LoadPred.Min()
dstLd := dst.LoadPred.Max()
bs.cur.progressiveRank = 0
peer := bs.cur.srcPeerStat

if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
if !bs.isTolerance(src, dst, bs.firstPriority) {
return
}
Expand All @@ -804,22 +773,18 @@ func (bs *balanceSolver) calcProgressiveRank() {
return
}
bs.cur.progressiveRank = -3
bs.firstPriorityIsBetter = true
bs.secondPriorityIsBetter = true
case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio:
// If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced.
if !bs.isTolerance(src, dst, bs.secondPriority) {
return
}
bs.cur.progressiveRank = -2
bs.secondPriorityIsBetter = true
case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio:
// If belong to the case, first priority dim will be more balanced, ignore the second priority dim.
if !bs.isTolerance(src, dst, bs.firstPriority) {
return
}
bs.cur.progressiveRank = -1
bs.firstPriorityIsBetter = true
}
}
}
Expand Down Expand Up @@ -886,13 +851,13 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
return false
}

if r := bs.compareSrcStore(bs.cur.srcDetail, old.srcDetail); r < 0 {
if r := bs.compareSrcStore(bs.cur.srcStore, old.srcStore); r < 0 {
return true
} else if r > 0 {
return false
}

if r := bs.compareDstStore(bs.cur.dstDetail, old.dstDetail); r < 0 {
if r := bs.compareDstStore(bs.cur.dstStore, old.dstStore); r < 0 {
return true
} else if r > 0 {
return false
Expand All @@ -901,7 +866,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool {
if bs.cur.srcPeerStat != old.srcPeerStat {
// compare region

if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
kind := statistics.GetRegionStatKind(statistics.Write, bs.firstPriority)
switch {
case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind):
Expand Down Expand Up @@ -960,7 +925,7 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadD
if detail1 != detail2 {
// compare source store
var lpCmp storeLPCmp
if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
lpCmp = sliceLPCmp(
minLPCmp(negLoadCmp(sliceLoadCmp(
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
Expand Down Expand Up @@ -993,7 +958,7 @@ func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadD
if detail1 != detail2 {
// compare destination store
var lpCmp storeLPCmp
if bs.isForWriteLeader() {
if toResourceType(bs.rwTy, bs.opTy) == writeLeader {
lpCmp = sliceLPCmp(
maxLPCmp(sliceLoadCmp(
stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])),
Expand Down Expand Up @@ -1026,16 +991,14 @@ func stepRank(rk0 float64, step float64) func(float64) int64 {
}
}

// Once we are ready to build the operator, we must ensure the following things:
// 1. the source store and destination store in the current solution are not nil
// 2. the peer we choose as a source in the current solution is not nil and it belongs to the source store
// 3. the region which owns the peer in the current solution is not nil and its ID should equal to the peer's region ID
func (bs *balanceSolver) isReadyToBuild() bool {
if bs.cur.srcDetail == nil || bs.cur.dstDetail == nil ||
bs.cur.srcPeerStat == nil || bs.cur.region == nil {
return false
}
if bs.cur.srcDetail.GetID() != bs.cur.srcPeerStat.StoreID ||
bs.cur.region.GetID() != bs.cur.srcPeerStat.ID() {
return false
}
return true
return bs.cur.srcStore != nil && bs.cur.dstStore != nil &&
bs.cur.srcPeerStat != nil && bs.cur.srcPeerStat.StoreID == bs.cur.srcStore.GetID() &&
bs.cur.region != nil && bs.cur.region.GetID() == bs.cur.srcPeerStat.ID()
}

func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistics.Influence) {
Expand All @@ -1049,8 +1012,8 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic
targetLabel string
)

srcStoreID := bs.cur.srcDetail.GetID()
dstStoreID := bs.cur.dstDetail.GetID()
srcStoreID := bs.cur.srcStore.GetID()
dstStoreID := bs.cur.dstStore.GetID()
switch bs.opTy {
case movePeer:
srcPeer := bs.cur.region.GetStorePeer(srcStoreID) // checked in getRegionAndSrcPeer
Expand Down Expand Up @@ -1103,12 +1066,13 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic
}

dim := ""
if bs.firstPriorityIsBetter && bs.secondPriorityIsBetter {
switch bs.cur.progressiveRank {
case -3:
dim = "all"
} else if bs.firstPriorityIsBetter {
dim = dimToString(bs.firstPriority)
} else if bs.secondPriorityIsBetter {
case -2:
dim = dimToString(bs.secondPriority)
case -1:
dim = dimToString(bs.firstPriority)
}

op.SetPriorityLevel(core.HighPriority)
Expand Down Expand Up @@ -1151,10 +1115,6 @@ func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur
return
}

func (h *hotScheduler) clearPendingInfluence() {
h.regionPendings = make(map[uint64]*pendingInfluence)
}

type opType int

const (
Expand Down
Loading

0 comments on commit 3c5e7a7

Please sign in to comment.