From 3c5e7a787c8afd307983e67c60c7d263996b8421 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 23 Feb 2022 18:07:43 +0800 Subject: [PATCH] scheduler: refactor hot region scheduler (#4488) ref tikv/pd#4477 Signed-off-by: Ryan Leung --- server/schedulers/grant_hot_region.go | 10 +- server/schedulers/hot_region.go | 142 ++++++++------------- server/schedulers/hot_region_test.go | 70 +++++----- server/statistics/collector.go | 6 +- server/statistics/store_hot_peers_infos.go | 8 +- server/statistics/store_load.go | 20 +-- 6 files changed, 110 insertions(+), 146 deletions(-) diff --git a/server/schedulers/grant_hot_region.go b/server/schedulers/grant_hot_region.go index e45f2adf049..a75881fbc71 100644 --- a/server/schedulers/grant_hot_region.go +++ b/server/schedulers/grant_hot_region.go @@ -307,12 +307,12 @@ func (s *grantHotRegionScheduler) dispatch(typ statistics.RWType, cluster schedu return s.randomSchedule(cluster, infos) } -func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, infos []*statistics.StoreLoadDetail) (ops []*operator.Operator) { +func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, srcStores []*statistics.StoreLoadDetail) (ops []*operator.Operator) { isleader := s.r.Int()%2 == 1 - for _, detail := range infos { - srcStoreID := detail.Info.Store.GetID() + for _, srcStore := range srcStores { + srcStoreID := srcStore.GetID() if isleader { - if s.conf.has(srcStoreID) || len(detail.HotPeers) < 1 { + if s.conf.has(srcStoreID) || len(srcStore.HotPeers) < 1 { continue } } else { @@ -321,7 +321,7 @@ func (s *grantHotRegionScheduler) randomSchedule(cluster schedule.Cluster, infos } } - for _, peer := range detail.HotPeers { + for _, peer := range srcStore.HotPeers { if s.OpController.GetOperator(peer.RegionID) != nil { continue } diff --git a/server/schedulers/hot_region.go b/server/schedulers/hot_region.go index e8ece7b54c8..1547af027f0 100644 --- a/server/schedulers/hot_region.go +++ b/server/schedulers/hot_region.go @@ -344,16 +344,13 @@ type balanceSolver struct { // they may be byte(0), key(1), query(2), and always less than dimLen firstPriority int secondPriority int - - firstPriorityIsBetter bool - secondPriorityIsBetter bool } type solution struct { - srcDetail *statistics.StoreLoadDetail + srcStore *statistics.StoreLoadDetail srcPeerStat *statistics.HotPeerStat region *core.RegionInfo - dstDetail *statistics.StoreLoadDetail + dstStore *statistics.StoreLoadDetail // progressiveRank measures the contribution for balance. // The smaller the rank, the better this solution is. @@ -362,17 +359,8 @@ type solution struct { } func (bs *balanceSolver) init() { - switch toResourceType(bs.rwTy, bs.opTy) { - case writePeer: - bs.stLoadDetail = bs.sche.stLoadInfos[writePeer] - case writeLeader: - bs.stLoadDetail = bs.sche.stLoadInfos[writeLeader] - case readLeader: - bs.stLoadDetail = bs.sche.stLoadInfos[readLeader] - case readPeer: - bs.stLoadDetail = bs.sche.stLoadInfos[readPeer] - } - // And it will be unnecessary to filter unhealthy store, because it has been solved in process heartbeat + // Init store load detail according to the type. + bs.stLoadDetail = bs.sche.stLoadInfos[toResourceType(bs.rwTy, bs.opTy)] bs.maxSrc = &statistics.StoreLoad{Loads: make([]float64, statistics.DimLen)} bs.minDst = &statistics.StoreLoad{ @@ -414,16 +402,13 @@ func (bs *balanceSolver) getPriorities() []string { querySupport := bs.sche.conf.checkQuerySupport(bs.Cluster) // For read, transfer-leader and move-peer have the same priority config // For write, they are different - switch bs.rwTy { - case statistics.Read: + switch toResourceType(bs.rwTy, bs.opTy) { + case readLeader, readPeer: return adjustConfig(querySupport, bs.sche.conf.GetReadPriorities(), getReadPriorities) - case statistics.Write: - switch bs.opTy { - case transferLeader: - return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) - case movePeer: - return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) - } + case writeLeader: + return adjustConfig(querySupport, bs.sche.conf.GetWriteLeaderPriorities(), getWriteLeaderPriorities) + case writePeer: + return adjustConfig(querySupport, bs.sche.conf.GetWritePeerPriorities(), getWritePeerPriorities) } log.Error("illegal type or illegal operator while getting the priority", zap.String("type", bs.rwTy.String()), zap.String("operator", bs.opTy.String())) return []string{} @@ -444,16 +429,8 @@ func (bs *balanceSolver) isValid() bool { if bs.Cluster == nil || bs.sche == nil || bs.stLoadDetail == nil { return false } - switch bs.rwTy { - case statistics.Write, statistics.Read: - default: - return false - } - switch bs.opTy { - case movePeer, transferLeader: - default: - return false - } + // ignore the return value because it will panic if the type is not correct. + _ = toResourceType(bs.rwTy, bs.opTy) return true } @@ -465,8 +442,8 @@ func (bs *balanceSolver) solve() []*operator.Operator { } bs.cur = &solution{} - for _, srcDetail := range bs.filterSrcStores() { - bs.cur.srcDetail = srcDetail + for _, srcStore := range bs.filterSrcStores() { + bs.cur.srcStore = srcStore for _, srcPeerStat := range bs.filterHotPeers() { bs.cur.srcPeerStat = srcPeerStat @@ -474,8 +451,8 @@ func (bs *balanceSolver) solve() []*operator.Operator { if bs.cur.region == nil { continue } - for _, dstDetail := range bs.filterDstStores() { - bs.cur.dstDetail = dstDetail + for _, dstStore := range bs.filterDstStores() { + bs.cur.dstStore = dstStore bs.calcProgressiveRank() if bs.cur.progressiveRank < 0 && bs.betterThan(bs.best) { if newOp, newInfl := bs.buildOperator(); newOp != nil { @@ -495,18 +472,18 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { if bs.best == nil || len(bs.ops) == 0 { return false } - if bs.best.srcDetail.Info.IsTiFlash != bs.best.dstDetail.Info.IsTiFlash { + if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() { schedulerCounter.WithLabelValues(bs.sche.GetName(), "not-same-engine").Inc() return false } // Depending on the source of the statistics used, a different ZombieDuration will be used. // If the statistics are from the sum of Regions, there will be a longer ZombieDuration. var maxZombieDur time.Duration - switch { - case bs.isForWriteLeader(): + switch toResourceType(bs.rwTy, bs.opTy) { + case writeLeader: maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() - case bs.isForWritePeer(): - if bs.best.srcDetail.Info.IsTiFlash { + case writePeer: + if bs.best.srcStore.IsTiFlash() { maxZombieDur = bs.sche.conf.GetRegionsStatZombieDuration() } else { maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() @@ -514,15 +491,7 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool { default: maxZombieDur = bs.sche.conf.GetStoreStatZombieDuration() } - return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcDetail.GetID(), bs.best.dstDetail.GetID(), bs.infl, maxZombieDur) -} - -func (bs *balanceSolver) isForWriteLeader() bool { - return bs.rwTy == statistics.Write && bs.opTy == transferLeader -} - -func (bs *balanceSolver) isForWritePeer() bool { - return bs.rwTy == statistics.Write && bs.opTy == movePeer + return bs.sche.tryAddPendingInfluence(bs.ops[0], bs.best.srcStore.GetID(), bs.best.dstStore.GetID(), bs.infl, maxZombieDur) } // filterSrcStores compare the min rate and the ratio * expectation rate, if two dim rate is greater than @@ -533,7 +502,7 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() for id, detail := range bs.stLoadDetail { srcToleranceRatio := confSrcToleranceRatio - if detail.Info.IsTiFlash { + if detail.IsTiFlash() { if !confEnableForTiFlash { continue } @@ -571,7 +540,7 @@ func (bs *balanceSolver) checkSrcByDimPriorityAndTolerance(minLoad, expectLoad * // filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status. // The returned hotPeer count in controlled by `max-peer-number`. func (bs *balanceSolver) filterHotPeers() []*statistics.HotPeerStat { - ret := bs.cur.srcDetail.HotPeers + ret := bs.cur.srcStore.HotPeers // Return at most MaxPeerNum peers, to prevent balanceSolver.solve() too slow. maxPeerNum := bs.sche.conf.GetMaxPeerNumber() @@ -675,13 +644,13 @@ func (bs *balanceSolver) getRegion() *core.RegionInfo { switch bs.opTy { case movePeer: - srcPeer := region.GetStorePeer(bs.cur.srcDetail.GetID()) + srcPeer := region.GetStorePeer(bs.cur.srcStore.GetID()) if srcPeer == nil { log.Debug("region does not have a peer on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID())) return nil } case transferLeader: - if region.GetLeader().GetStoreId() != bs.cur.srcDetail.GetID() { + if region.GetLeader().GetStoreId() != bs.cur.srcStore.GetID() { log.Debug("region leader is not on source store, maybe stat out of date", zap.Uint64("region-id", bs.cur.srcPeerStat.ID())) return nil } @@ -698,7 +667,7 @@ func (bs *balanceSolver) filterDstStores() map[uint64]*statistics.StoreLoadDetai filters []filter.Filter candidates []*statistics.StoreLoadDetail ) - srcStore := bs.cur.srcDetail.Info.Store + srcStore := bs.cur.srcStore.StoreInfo switch bs.opTy { case movePeer: filters = []filter.Filter{ @@ -738,9 +707,9 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st confDstToleranceRatio := bs.sche.conf.GetDstToleranceRatio() confEnableForTiFlash := bs.sche.conf.GetEnableForTiFlash() for _, detail := range candidates { - store := detail.Info.Store + store := detail.StoreInfo dstToleranceRatio := confDstToleranceRatio - if detail.Info.IsTiFlash { + if detail.IsTiFlash() { if !confEnableForTiFlash { continue } @@ -777,14 +746,14 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist // calcProgressiveRank calculates `bs.cur.progressiveRank`. // See the comments of `solution.progressiveRank` for more about progressive rank. func (bs *balanceSolver) calcProgressiveRank() { - src := bs.cur.srcDetail - dst := bs.cur.dstDetail + src := bs.cur.srcStore + dst := bs.cur.dstStore srcLd := src.LoadPred.Min() dstLd := dst.LoadPred.Max() bs.cur.progressiveRank = 0 peer := bs.cur.srcPeerStat - if bs.isForWriteLeader() { + if toResourceType(bs.rwTy, bs.opTy) == writeLeader { if !bs.isTolerance(src, dst, bs.firstPriority) { return } @@ -804,22 +773,18 @@ func (bs *balanceSolver) calcProgressiveRank() { return } bs.cur.progressiveRank = -3 - bs.firstPriorityIsBetter = true - bs.secondPriorityIsBetter = true case firstPriorityDecRatio <= minorDecRatio && secondPriorityDimHot && secondPriorityDecRatio <= greatDecRatio: // If belong to the case, first priority dim will be not worsened, second priority dim will be more balanced. if !bs.isTolerance(src, dst, bs.secondPriority) { return } bs.cur.progressiveRank = -2 - bs.secondPriorityIsBetter = true case firstPriorityDimHot && firstPriorityDecRatio <= greatDecRatio: // If belong to the case, first priority dim will be more balanced, ignore the second priority dim. if !bs.isTolerance(src, dst, bs.firstPriority) { return } bs.cur.progressiveRank = -1 - bs.firstPriorityIsBetter = true } } } @@ -886,13 +851,13 @@ func (bs *balanceSolver) betterThan(old *solution) bool { return false } - if r := bs.compareSrcStore(bs.cur.srcDetail, old.srcDetail); r < 0 { + if r := bs.compareSrcStore(bs.cur.srcStore, old.srcStore); r < 0 { return true } else if r > 0 { return false } - if r := bs.compareDstStore(bs.cur.dstDetail, old.dstDetail); r < 0 { + if r := bs.compareDstStore(bs.cur.dstStore, old.dstStore); r < 0 { return true } else if r > 0 { return false @@ -901,7 +866,7 @@ func (bs *balanceSolver) betterThan(old *solution) bool { if bs.cur.srcPeerStat != old.srcPeerStat { // compare region - if bs.isForWriteLeader() { + if toResourceType(bs.rwTy, bs.opTy) == writeLeader { kind := statistics.GetRegionStatKind(statistics.Write, bs.firstPriority) switch { case bs.cur.srcPeerStat.GetLoad(kind) > old.srcPeerStat.GetLoad(kind): @@ -960,7 +925,7 @@ func (bs *balanceSolver) compareSrcStore(detail1, detail2 *statistics.StoreLoadD if detail1 != detail2 { // compare source store var lpCmp storeLPCmp - if bs.isForWriteLeader() { + if toResourceType(bs.rwTy, bs.opTy) == writeLeader { lpCmp = sliceLPCmp( minLPCmp(negLoadCmp(sliceLoadCmp( stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.maxSrc.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), @@ -993,7 +958,7 @@ func (bs *balanceSolver) compareDstStore(detail1, detail2 *statistics.StoreLoadD if detail1 != detail2 { // compare destination store var lpCmp storeLPCmp - if bs.isForWriteLeader() { + if toResourceType(bs.rwTy, bs.opTy) == writeLeader { lpCmp = sliceLPCmp( maxLPCmp(sliceLoadCmp( stLdRankCmp(stLdRate(bs.firstPriority), stepRank(bs.minDst.Loads[bs.firstPriority], bs.rankStep.Loads[bs.firstPriority])), @@ -1026,16 +991,14 @@ func stepRank(rk0 float64, step float64) func(float64) int64 { } } +// Once we are ready to build the operator, we must ensure the following things: +// 1. the source store and destination store in the current solution are not nil +// 2. the peer we choose as a source in the current solution is not nil and it belongs to the source store +// 3. the region which owns the peer in the current solution is not nil and its ID should equal to the peer's region ID func (bs *balanceSolver) isReadyToBuild() bool { - if bs.cur.srcDetail == nil || bs.cur.dstDetail == nil || - bs.cur.srcPeerStat == nil || bs.cur.region == nil { - return false - } - if bs.cur.srcDetail.GetID() != bs.cur.srcPeerStat.StoreID || - bs.cur.region.GetID() != bs.cur.srcPeerStat.ID() { - return false - } - return true + return bs.cur.srcStore != nil && bs.cur.dstStore != nil && + bs.cur.srcPeerStat != nil && bs.cur.srcPeerStat.StoreID == bs.cur.srcStore.GetID() && + bs.cur.region != nil && bs.cur.region.GetID() == bs.cur.srcPeerStat.ID() } func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistics.Influence) { @@ -1049,8 +1012,8 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic targetLabel string ) - srcStoreID := bs.cur.srcDetail.GetID() - dstStoreID := bs.cur.dstDetail.GetID() + srcStoreID := bs.cur.srcStore.GetID() + dstStoreID := bs.cur.dstStore.GetID() switch bs.opTy { case movePeer: srcPeer := bs.cur.region.GetStorePeer(srcStoreID) // checked in getRegionAndSrcPeer @@ -1103,12 +1066,13 @@ func (bs *balanceSolver) buildOperator() (op *operator.Operator, infl *statistic } dim := "" - if bs.firstPriorityIsBetter && bs.secondPriorityIsBetter { + switch bs.cur.progressiveRank { + case -3: dim = "all" - } else if bs.firstPriorityIsBetter { - dim = dimToString(bs.firstPriority) - } else if bs.secondPriorityIsBetter { + case -2: dim = dimToString(bs.secondPriority) + case -1: + dim = dimToString(bs.firstPriority) } op.SetPriorityLevel(core.HighPriority) @@ -1151,10 +1115,6 @@ func (h *hotScheduler) calcPendingInfluence(op *operator.Operator, maxZombieDur return } -func (h *hotScheduler) clearPendingInfluence() { - h.regionPendings = make(map[uint64]*pendingInfluence) -} - type opType int const ( diff --git a/server/schedulers/hot_region_test.go b/server/schedulers/hot_region_test.go index c41c370a7e6..07e3f8dd780 100644 --- a/server/schedulers/hot_region_test.go +++ b/server/schedulers/hot_region_test.go @@ -60,6 +60,10 @@ func newHotWriteScheduler(opController *schedule.OperatorController, conf *hotRe return ret } +func clearPendingInfluence(h *hotScheduler) { + h.regionPendings = make(map[uint64]*pendingInfluence) +} + type testHotSchedulerSuite struct{} type testHotReadRegionSchedulerSuite struct{} type testHotWriteRegionSchedulerSuite struct{} @@ -212,13 +216,13 @@ func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockclust {3, []uint64{1, 2, 4}, 512 * KB, 0, 0}, }) c.Assert(hb.Schedule(tc), Not(HasLen), 0) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) // Will transfer a hot region from store 1, because the total count of peers // which is hot for store 1 is larger than other stores. for i := 0; i < 20; i++ { op := hb.Schedule(tc)[0] - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) switch op.Len() { case 1: // balance by leader selected @@ -240,12 +244,12 @@ func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockclust // hot region scheduler is restricted by `hot-region-schedule-limit`. tc.SetHotRegionScheduleLimit(0) c.Assert(hb.IsScheduleAllowed(tc), IsFalse) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) tc.SetHotRegionScheduleLimit(int(config.NewTestOptions().GetScheduleConfig().HotRegionScheduleLimit)) for i := 0; i < 20; i++ { op := hb.Schedule(tc)[0] - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) c.Assert(op.Len(), Equals, 4) if op.RegionID() == 2 { // peer in store 1 of the region 2 can transfer to store 5 or store 6 because of the label @@ -259,12 +263,12 @@ func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockclust // hot region scheduler is not affect by `balance-region-schedule-limit`. tc.SetRegionScheduleLimit(0) c.Assert(hb.Schedule(tc), HasLen, 1) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) // Always produce operator c.Assert(hb.Schedule(tc), HasLen, 1) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) c.Assert(hb.Schedule(tc), HasLen, 1) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) // | store_id | write_bytes_rate | // |----------|------------------| @@ -307,7 +311,7 @@ func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockclust // Region 5 can only move peer to store 6. for i := 0; i < 30; i++ { op := hb.Schedule(tc)[0] - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) switch op.RegionID() { case 1, 2: if op.Len() == 3 { @@ -331,7 +335,7 @@ func (s *testHotWriteRegionSchedulerSuite) checkByteRateOnly(c *C, tc *mockclust tc.Regions.RemoveRegion(tc.GetRegion(i)) } hb.Schedule(tc) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) } func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { @@ -427,7 +431,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { // Will transfer a hot learner from store 8, because the total count of peers // which is hot for store 8 is larger than other TiFlash stores. for i := 0; i < 20; i++ { - hb.clearPendingInfluence() + clearPendingInfluence(hb) op := hb.Schedule(tc)[0] switch op.Len() { case 1: @@ -443,7 +447,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { // Disable for TiFlash hb.conf.SetEnableForTiFlash(false) for i := 0; i < 20; i++ { - hb.clearPendingInfluence() + clearPendingInfluence(hb) op := hb.Schedule(tc)[0] testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 1) } @@ -504,7 +508,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { pdServerCfg := tc.GetOpts().GetPDServerConfig() pdServerCfg.FlowRoundByDigit = 8 tc.GetOpts().SetPDServerConfig(pdServerCfg) - hb.clearPendingInfluence() + clearPendingInfluence(hb) c.Assert(hb.Schedule(tc), Not(HasLen), 0) c.Assert( loadsEqual( @@ -518,7 +522,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestByteRateOnlyWithTiFlash(c *C) { // Will transfer a hot region from store 1, because the total count of peers // which is hot for store 1 is larger than other stores. for i := 0; i < 20; i++ { - hb.clearPendingInfluence() + clearPendingInfluence(hb) op := hb.Schedule(tc)[0] switch op.Len() { case 1: @@ -571,7 +575,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithQuery(c *C) { }) schedulePeerPr = 0.0 for i := 0; i < 100; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op := hb.Schedule(tc)[0] testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 3) } @@ -609,7 +613,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithKeyRate(c *C) { }) for i := 0; i < 100; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op := hb.Schedule(tc)[0] // byteDecRatio <= 0.95 && keyDecRatio <= 0.95 testutil.CheckTransferPeer(c, op, operator.OpHotRegion, 1, 4) @@ -672,7 +676,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestUnhealthyStore(c *C) { // test dst for _, interval := range intervals { tc.SetStoreLastHeartbeatInterval(4, interval) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) hb.Schedule(tc) // no panic } @@ -748,7 +752,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) { }) for i := 0; i < 100; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) c.Assert(hb.Schedule(tc), HasLen, 0) } @@ -761,7 +765,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestLeader(c *C) { // store3 has 2 peer as leader // We expect to transfer leader from store2 to store1 or store3 for i := 0; i < 100; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op := hb.Schedule(tc)[0] testutil.CheckTransferLeaderFrom(c, op, operator.OpHotRegion, 2) c.Assert(hb.Schedule(tc), HasLen, 0) @@ -821,7 +825,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithPendingInfluence(c *C) { } for i := 0; i < 20; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) cnt := 0 testLoop: for j := 0; j < 1000; j++ { @@ -922,7 +926,7 @@ func (s *testHotWriteRegionSchedulerSuite) TestWithRuleEnabled(c *C) { }) for i := 0; i < 100; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op := hb.Schedule(tc)[0] // The targetID should always be 1 as leader is only allowed to be placed in store1 or store2 by placement rule testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 2, 1) @@ -995,7 +999,7 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { // move leader from store 1 to store 5 // it is better than transfer leader from store 1 to store 3 testutil.CheckTransferPeerWithLeaderTransfer(c, op, operator.OpHotRegion, 1, 5) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) // assume handle the transfer leader operator rather than move leader tc.AddRegionWithReadInfo(3, 3, 512*KB*statistics.ReadReportInterval, 0, 0, statistics.ReadReportInterval, []uint64{1, 2}) @@ -1030,14 +1034,14 @@ func (s *testHotReadRegionSchedulerSuite) TestByteRateOnly(c *C) { // We will move leader peer of region 1 from 1 to 5 testutil.CheckTransferPeerWithLeaderTransfer(c, hb.Schedule(tc)[0], operator.OpHotRegion|operator.OpLeader, 1, 5) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) // Should not panic if region not found. for i := uint64(1); i <= 3; i++ { tc.Regions.RemoveRegion(tc.GetRegion(i)) } hb.Schedule(tc) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) } func (s *testHotReadRegionSchedulerSuite) TestWithQuery(c *C) { @@ -1066,7 +1070,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithQuery(c *C) { }) for i := 0; i < 100; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op := hb.Schedule(tc)[0] testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 3) } @@ -1104,7 +1108,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithKeyRate(c *C) { }) for i := 0; i < 100; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op := hb.Schedule(tc)[0] // byteDecRatio <= 0.95 && keyDecRatio <= 0.95 testutil.CheckTransferLeader(c, op, operator.OpHotRegion, 1, 4) @@ -1189,7 +1193,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { // Before schedule, store byte/key rate: 7.1 | 6.1 | 6 | 5 // Min and max from storeLoadPred. They will be generated in the comparison of current and future. for i := 0; i < 20; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op1 := hb.Schedule(tc)[0] testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4) @@ -1211,7 +1215,7 @@ func (s *testHotReadRegionSchedulerSuite) TestWithPendingInfluence(c *C) { // Before schedule, store byte/key rate: 7.1 | 6.1 | 6 | 5 for i := 0; i < 20; i++ { - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) op1 := hb.Schedule(tc)[0] testutil.CheckTransferPeer(c, op1, operator.OpHotRegion, 1, 4) @@ -1535,7 +1539,7 @@ func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) { // try schedule hb.prepareForBalance(testcase.kind, tc) leaderSolver := newBalanceSolver(hb, tc, testcase.kind, transferLeader) - leaderSolver.cur = &solution{srcDetail: hb.stLoadInfos[toResourceType(testcase.kind, transferLeader)][2]} + leaderSolver.cur = &solution{srcStore: hb.stLoadInfos[toResourceType(testcase.kind, transferLeader)][2]} c.Check(leaderSolver.filterHotPeers(), HasLen, 0) // skip schedule threshold := tc.GetHotRegionCacheHitsThreshold() tc.SetHotRegionCacheHitsThreshold(0) @@ -1795,12 +1799,12 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { ops := hb.Schedule(tc) c.Assert(ops, HasLen, 1) testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) hb.(*hotScheduler).conf.WritePeerPriorities = []string{KeyPriority, BytePriority} ops = hb.Schedule(tc) c.Assert(ops, HasLen, 1) testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) // assert read priority schedule hb, err = schedule.CreateScheduler(statistics.Read.String(), schedule.NewOperatorController(ctx, nil, nil), storage.NewStorageWithMemoryBackend(), nil) @@ -1817,7 +1821,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { ops = hb.Schedule(tc) c.Assert(ops, HasLen, 1) testutil.CheckTransferLeader(c, ops[0], operator.OpHotRegion, 1, 2) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) hb.(*hotScheduler).conf.ReadPriorities = []string{KeyPriority, BytePriority} ops = hb.Schedule(tc) c.Assert(ops, HasLen, 1) @@ -1836,7 +1840,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { ops = hb.Schedule(tc) c.Assert(ops, HasLen, 1) testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 1, 5) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) tc.UpdateStorageWrittenStats(1, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval) tc.UpdateStorageWrittenStats(2, 6*MB*statistics.StoreHeartBeatReportInterval, 6*MB*statistics.StoreHeartBeatReportInterval) @@ -1847,7 +1851,7 @@ func (s *testHotSchedulerSuite) TestHotScheduleWithPriority(c *C) { ops = hb.Schedule(tc) c.Assert(ops, HasLen, 1) testutil.CheckTransferPeer(c, ops[0], operator.OpHotRegion, 4, 5) - hb.(*hotScheduler).clearPendingInfluence() + clearPendingInfluence(hb.(*hotScheduler)) } func (s *testHotWriteRegionSchedulerSuite) TestHotWriteLeaderScheduleWithPriority(c *C) { diff --git a/server/statistics/collector.go b/server/statistics/collector.go index a5f8e9a6f9c..e904be43bed 100644 --- a/server/statistics/collector.go +++ b/server/statistics/collector.go @@ -37,12 +37,12 @@ func (c tikvCollector) Engine() string { } func (c tikvCollector) Filter(info *StoreSummaryInfo, kind core.ResourceKind) bool { - if info.IsTiFlash { + if info.IsTiFlash() { return false } switch kind { case core.LeaderKind: - return info.Store.AllowLeaderTransfer() + return info.AllowLeaderTransfer() case core.RegionKind: return true } @@ -92,7 +92,7 @@ func (c tiflashCollector) Filter(info *StoreSummaryInfo, kind core.ResourceKind) case core.LeaderKind: return false case core.RegionKind: - return info.IsTiFlash + return info.IsTiFlash() } return false } diff --git a/server/statistics/store_hot_peers_infos.go b/server/statistics/store_hot_peers_infos.go index 2613669d6c2..e0cc0afaa98 100644 --- a/server/statistics/store_hot_peers_infos.go +++ b/server/statistics/store_hot_peers_infos.go @@ -128,7 +128,7 @@ func summaryStoresLoadByEngine( allHotPeersCount := 0 for _, info := range storeInfos { - store := info.Store + store := info.StoreInfo id := store.GetID() storeLoads, ok := storesLoads[id] if !ok || !collector.Filter(info, kind) { @@ -171,9 +171,9 @@ func summaryStoresLoadByEngine( // Construct store load info. loadDetail = append(loadDetail, &StoreLoadDetail{ - Info: info, - LoadPred: stLoadPred, - HotPeers: hotPeers, + StoreSummaryInfo: info, + LoadPred: stLoadPred, + HotPeers: hotPeers, }) } diff --git a/server/statistics/store_load.go b/server/statistics/store_load.go index 1d2039a912c..828943ac178 100644 --- a/server/statistics/store_load.go +++ b/server/statistics/store_load.go @@ -22,16 +22,11 @@ import ( // StoreLoadDetail records store load information. type StoreLoadDetail struct { - Info *StoreSummaryInfo + *StoreSummaryInfo LoadPred *StoreLoadPred HotPeers []*HotPeerStat } -// GetID return the ID of store. -func (li *StoreLoadDetail) GetID() uint64 { - return li.Info.Store.GetID() -} - // ToHotPeersStat abstracts load information to HotPeersStat. func (li *StoreLoadDetail) ToHotPeersStat() *HotPeersStat { totalLoads := make([]float64, RegionStatCount) @@ -116,8 +111,8 @@ func GetRegionStatKind(rwTy RWType, dim int) RegionStatKind { // StoreSummaryInfo records the summary information of store. type StoreSummaryInfo struct { - Store *core.StoreInfo - IsTiFlash bool + *core.StoreInfo + isTiFlash bool PendingSum *Influence } @@ -132,8 +127,8 @@ func SummaryStoreInfos(stores []*core.StoreInfo) map[uint64]*StoreSummaryInfo { infos := make(map[uint64]*StoreSummaryInfo, len(stores)) for _, store := range stores { info := &StoreSummaryInfo{ - Store: store, - IsTiFlash: core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash), + StoreInfo: store, + isTiFlash: core.IsStoreContainLabel(store.GetMeta(), core.EngineKey, core.EngineTiFlash), PendingSum: nil, } infos[store.GetID()] = info @@ -158,6 +153,11 @@ func (s *StoreSummaryInfo) AddInfluence(infl *Influence, w float64) { s.PendingSum.Count += infl.Count * w } +// IsTiFlash returns true if the store is TiFlash. +func (s *StoreSummaryInfo) IsTiFlash() bool { + return s.isTiFlash +} + // GetPendingInfluence returns the current pending influence. func GetPendingInfluence(stores []*core.StoreInfo) map[uint64]*Influence { stInfos := SummaryStoreInfos(stores)