diff --git a/pkg/movingaverage/avg_over_time.go b/pkg/movingaverage/avg_over_time.go index b64358baf8e..35f85ab8253 100644 --- a/pkg/movingaverage/avg_over_time.go +++ b/pkg/movingaverage/avg_over_time.go @@ -117,3 +117,8 @@ func (aot *AvgOverTime) Clone() *AvgOverTime { avgInterval: aot.avgInterval, } } + +// GetIntervalSum returns the sum of interval +func (aot *AvgOverTime) GetIntervalSum() time.Duration { + return aot.intervalSum +} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 78f4ae0f9a8..1c50f73085b 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -109,6 +109,11 @@ type HotPeerStat struct { // If the peer didn't been send by store heartbeat when it is already stored as hot peer stat, // we will handle it as cold peer and mark the inCold flag inCold bool + // source represents the statistics item source, such as direct, inherit. + source sourceKind + // If the item in storeA is just inherited from storeB, + // then other store, such as storeC, will be forbidden to inherit from storeA until the item in storeA is hot. + allowInherited bool } // ID returns region ID. Implementing TopNItem. @@ -134,6 +139,8 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi zap.Int("hot-anti-count", stat.AntiCount), zap.Bool("just-transfer-leader", stat.justTransferLeader), zap.Bool("is-leader", stat.isLeader), + zap.String("source", stat.source.String()), + zap.Bool("allow-inherited", stat.allowInherited), zap.Bool("need-delete", stat.IsNeedDelete()), zap.String("type", stat.Kind.String()), zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime)) @@ -211,3 +218,10 @@ func (stat *HotPeerStat) hotStatReportInterval() int { } return WriteReportInterval } + +func (stat *HotPeerStat) getIntervalSum() time.Duration { + if len(stat.rollingLoads) == 0 || stat.rollingLoads[0] == nil { + return 0 + } + return stat.rollingLoads[0].LastAverage.GetIntervalSum() +} diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index d5cdda190c2..f78f8ddc9a8 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -58,7 +58,6 @@ type hotPeerCache struct { peersOfStore map[uint64]*TopN // storeID -> hot peers storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs - inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat topNTTL time.Duration reportIntervalSecs int } @@ -70,7 +69,6 @@ func NewHotPeerCache(kind FlowKind) *hotPeerCache { peersOfStore: make(map[uint64]*TopN), storesOfRegion: make(map[uint64]map[uint64]struct{}), regionsOfStore: make(map[uint64]map[uint64]struct{}), - inheritItem: make(map[uint64]*HotPeerStat), } if kind == WriteFlow { c.reportIntervalSecs = WriteReportInterval @@ -101,7 +99,6 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { // Update updates the items in statistics. func (f *hotPeerCache) Update(item *HotPeerStat) { if item.IsNeedDelete() { - f.putInheritItem(item) f.removeItem(item) item.Log("region heartbeat delete from cache", log.Debug) } else { @@ -185,25 +182,19 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf interval: interval, peers: peers, thresholds: thresholds, + source: direct, } - source := direct if oldItem == nil { - inheritItem := f.takeInheritItem(region.GetID()) - if inheritItem != nil { - oldItem = inheritItem - source = inherit - } else { - for _, storeID := range f.getAllStoreIDs(region) { - oldItem = f.getOldHotPeerStat(region.GetID(), storeID) - if oldItem != nil { - source = adopt - break - } + for _, storeID := range f.getAllStoreIDs(region) { + oldItem = f.getOldHotPeerStat(region.GetID(), storeID) + if oldItem != nil && oldItem.allowInherited { + newItem.source = inherit + break } } } - return f.updateHotPeerStat(newItem, oldItem, source, deltaLoads, time.Duration(interval)*time.Second) + return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) } // CheckColdPeer checks the collect the un-heartbeat peer and maintain it. @@ -240,7 +231,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st for i, loads := range oldItem.thresholds { deltaLoads[i] = loads * float64(interval) } - stat := f.updateHotPeerStat(newItem, oldItem, direct, deltaLoads, time.Duration(interval)*time.Second) + stat := f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second) if stat != nil { ret = append(ret, stat) } @@ -377,18 +368,20 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second) } -func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, source sourceKind, deltaLoads []float64, interval time.Duration) *HotPeerStat { +func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat { regionStats := f.kind.RegionStats() if oldItem == nil { return f.updateNewHotPeerStat(newItem, deltaLoads, interval) } - if source == adopt { + if newItem.source == inherit { for _, dim := range oldItem.rollingLoads { newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone()) } + newItem.allowInherited = false } else { newItem.rollingLoads = oldItem.rollingLoads + newItem.allowInherited = oldItem.allowInherited } if newItem.justTransferLeader { @@ -465,22 +458,6 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f return newItem } -func (f *hotPeerCache) putInheritItem(item *HotPeerStat) { - f.inheritItem[item.RegionID] = item -} - -func (f *hotPeerCache) takeInheritItem(regionID uint64) *HotPeerStat { - item, ok := f.inheritItem[regionID] - if !ok { - return nil - } - if item != nil { - delete(f.inheritItem, regionID) - return item - } - return nil -} - func (f *hotPeerCache) putItem(item *HotPeerStat) { peers, ok := f.peersOfStore[item.StoreID] if !ok { @@ -519,6 +496,8 @@ func coldItem(newItem, oldItem *HotPeerStat) { newItem.AntiCount = oldItem.AntiCount - 1 if newItem.AntiCount <= 0 { newItem.needDelete = true + } else { + newItem.allowInherited = true } } @@ -528,6 +507,7 @@ func hotItem(newItem, oldItem *HotPeerStat) { if newItem.Kind == ReadFlow { newItem.AntiCount = hotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval) } + newItem.allowInherited = true } func initItemDegree(item *HotPeerStat) { @@ -536,6 +516,7 @@ func initItemDegree(item *HotPeerStat) { if item.Kind == ReadFlow { item.AntiCount = hotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval) } + item.allowInherited = true } func inheritItemDegree(newItem, oldItem *HotPeerStat) { diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index 09ecd532375..5082192130c 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -16,12 +16,12 @@ package statistics import ( "math/rand" + "sort" "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" ) @@ -31,23 +31,9 @@ type testHotPeerCache struct{} func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { cache := NewHotPeerCache(WriteFlow) - peers := newPeers(3, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) - meta := &metapb.Region{ - Id: 1000, - Peers: peers, - StartKey: []byte(""), - EndKey: []byte(""), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, - } intervals := []uint64{120, 60} for _, interval := range intervals { - region := core.NewRegionInfo(meta, peers[0], - // interval is [0, interval] - core.SetReportInterval(interval), - core.SetWrittenBytes(interval*100*1024)) - + region := buildRegion(WriteFlow, 3, interval) checkAndUpdate(c, cache, region, 3) { stats := cache.RegionStats(0) @@ -65,6 +51,7 @@ const ( transferLeader operator = iota movePeer addReplica + removeReplica ) type testCacheCase struct { @@ -94,11 +81,11 @@ func testCache(c *C, t *testCacheCase) { WriteFlow: 3, // all peers } cache := NewHotPeerCache(t.kind) - region := buildRegion(nil, nil, t.kind) + region := buildRegion(t.kind, 3, 60) checkAndUpdate(c, cache, region, defaultSize[t.kind]) checkHit(c, cache, region, t.kind, false) // all peers are new - srcStore, region := schedule(t.operator, region, t.kind) + srcStore, region := schedule(c, t.operator, region, 10) res := checkAndUpdate(c, cache, region, t.expect) checkHit(c, cache, region, t.kind, true) // hit cache if t.expect != defaultSize[t.kind] { @@ -106,37 +93,65 @@ func testCache(c *C, t *testCacheCase) { } } -func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, expect int) (res []*HotPeerStat) { +func orderingPeers(cache *hotPeerCache, region *core.RegionInfo) []*metapb.Peer { + var peers []*metapb.Peer + for _, peer := range region.GetPeers() { + if cache.getOldHotPeerStat(region.GetID(), peer.StoreId) != nil { + peers = append([]*metapb.Peer{peer}, peers...) + } else { + peers = append(peers, peer) + } + } + return peers +} + +func checkFlow(cache *hotPeerCache, region *core.RegionInfo, peers []*metapb.Peer) (res []*HotPeerStat) { reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() res = append(res, cache.CollectExpiredItems(region)...) - for _, peer := range region.GetPeers() { + for _, peer := range peers { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) item := cache.CheckPeerFlow(peerInfo, region) if item != nil { res = append(res, item) } } - c.Assert(res, HasLen, expect) + return res +} + +func updateFlow(cache *hotPeerCache, res []*HotPeerStat) []*HotPeerStat { for _, p := range res { cache.Update(p) } return res } -func checkAndUpdateSync(cache *hotPeerCache, region *core.RegionInfo) (res []*HotPeerStat) { - reportInterval := region.GetInterval() - interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() - res = append(res, cache.CollectExpiredItems(region)...) - for _, peer := range region.GetPeers() { - peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) - item := cache.CheckPeerFlow(peerInfo, region) - if item != nil { - res = append(res, item) - cache.Update(item) - } +type check func(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) + +func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { + res = checkFlow(cache, region, region.GetPeers()) + if len(expect) != 0 { + c.Assert(res, HasLen, expect[0]) } - return res + return updateFlow(cache, res) +} + +// Check and update peers in the specified order that old item that he items that have not expired come first, and the items that have expired come second. +// This order is also similar to the previous version. By the way the order in now version is random. +func checkAndUpdateWithOrdering(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { + res = checkFlow(cache, region, orderingPeers(cache, region)) + if len(expect) != 0 { + c.Assert(res, HasLen, expect[0]) + } + return updateFlow(cache, res) +} + +func checkAndUpdateSkipOne(c *C, cache *hotPeerCache, region *core.RegionInfo, expect ...int) (res []*HotPeerStat) { + res = checkFlow(cache, region, region.GetPeers()[1:]) + if len(expect) != 0 { + c.Assert(res, HasLen, expect[0]) + } + return updateFlow(cache, res) } func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, isHit bool) { @@ -162,21 +177,62 @@ func checkNeedDelete(c *C, ret []*HotPeerStat, storeID uint64, needDelete bool) } } -func schedule(operator operator, region *core.RegionInfo, kind FlowKind) (srcStore uint64, _ *core.RegionInfo) { +// checkIntervalSum checks whether the interval sum of the peers are different. +func checkIntervalSum(cache *hotPeerCache, region *core.RegionInfo) bool { + var intervalSums []int + for _, peer := range region.GetPeers() { + oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) + if oldItem != nil { + intervalSums = append(intervalSums, int(oldItem.getIntervalSum())) + } + } + sort.Ints(intervalSums) + return intervalSums[0] != intervalSums[len(intervalSums)-1] +} + +// checkIntervalSumContinuous checks whether the interval sum of the peer is continuous. +func checkIntervalSumContinuous(c *C, intervalSums map[uint64]int, rets []*HotPeerStat, interval uint64) { + for _, ret := range rets { + if ret.needDelete == true { + delete(intervalSums, ret.StoreID) + continue + } + new := int(ret.getIntervalSum() / 1000000000) + if ret.source == direct { + if old, ok := intervalSums[ret.StoreID]; ok { + c.Assert(new, Equals, (old+int(interval))%RegionHeartBeatReportInterval) + } + } + intervalSums[ret.StoreID] = new + } +} + +func schedule(c *C, operator operator, region *core.RegionInfo, targets ...uint64) (srcStore uint64, _ *core.RegionInfo) { switch operator { case transferLeader: _, newLeader := pickFollower(region) - return region.GetLeader().StoreId, buildRegion(region.GetMeta(), newLeader, kind) + return region.GetLeader().StoreId, region.Clone(core.WithLeader(newLeader)) case movePeer: + c.Assert(targets, HasLen, 1) index, _ := pickFollower(region) - meta := region.GetMeta() - srcStore := meta.Peers[index].StoreId - meta.Peers[index] = &metapb.Peer{Id: 4, StoreId: 4} - return srcStore, buildRegion(meta, region.GetLeader(), kind) + srcStore := region.GetPeers()[index].StoreId + region := region.Clone(core.WithAddPeer(&metapb.Peer{Id: targets[0]*10 + 1, StoreId: targets[0]})) + region = region.Clone(core.WithRemoveStorePeer(srcStore)) + return srcStore, region case addReplica: - meta := region.GetMeta() - meta.Peers = append(meta.Peers, &metapb.Peer{Id: 4, StoreId: 4}) - return 0, buildRegion(meta, region.GetLeader(), kind) + c.Assert(targets, HasLen, 1) + region := region.Clone(core.WithAddPeer(&metapb.Peer{Id: targets[0]*10 + 1, StoreId: targets[0]})) + return 0, region + case removeReplica: + if len(targets) == 0 { + index, _ := pickFollower(region) + srcStore = region.GetPeers()[index].StoreId + } else { + srcStore = targets[0] + } + region = region.Clone(core.WithRemoveStorePeer(srcStore)) + return srcStore, region + default: return 0, nil } @@ -198,30 +254,39 @@ func pickFollower(region *core.RegionInfo) (index int, peer *metapb.Peer) { return dst, meta.Peers[dst] } -func buildRegion(meta *metapb.Region, leader *metapb.Peer, kind FlowKind) *core.RegionInfo { - const interval = uint64(60) - if meta == nil { - peer1 := &metapb.Peer{Id: 1, StoreId: 1} - peer2 := &metapb.Peer{Id: 2, StoreId: 2} - peer3 := &metapb.Peer{Id: 3, StoreId: 3} - - meta = &metapb.Region{ - Id: 1000, - Peers: []*metapb.Peer{peer1, peer2, peer3}, - StartKey: []byte(""), - EndKey: []byte(""), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, - } - leader = meta.Peers[rand.Intn(3)] +func buildRegion(kind FlowKind, peerCount int, interval uint64) *core.RegionInfo { + peers := newPeers(peerCount, + func(i int) uint64 { return uint64(10000 + i) }, + func(i int) uint64 { return uint64(i) }) + meta := &metapb.Region{ + Id: 1000, + Peers: peers, + StartKey: []byte(""), + EndKey: []byte(""), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, } + leader := meta.Peers[rand.Intn(3)] + switch kind { case ReadFlow: - return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), - core.SetReadBytes(interval*100*1024)) + return core.NewRegionInfo( + meta, + leader, + core.SetReportInterval(interval), + core.SetReadBytes(10*1024*1024*interval), + core.SetReadKeys(10*1024*1024*interval), + core.SetReadQuery(1024*interval), + ) case WriteFlow: - return core.NewRegionInfo(meta, leader, core.SetReportInterval(interval), - core.SetWrittenBytes(interval*100*1024)) + return core.NewRegionInfo( + meta, + leader, + core.SetReportInterval(interval), + core.SetWrittenBytes(10*1024*1024*interval), + core.SetWrittenKeys(10*1024*1024*interval), + core.SetWrittenQuery(1024*interval), + ) default: return nil } @@ -248,50 +313,50 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { // skip interval=0 newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: ReadFlow} - newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 0) + newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 0) c.Check(newItem, IsNil) // new peer, interval is larger than report interval, but no hot newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0, 1.0}, Kind: ReadFlow} - newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second) c.Check(newItem, IsNil) // new peer, interval is less than report interval newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: ReadFlow} - newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem, NotNil) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is less than report interval oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is less than report interval oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 2) c.Check(newItem.AntiCount, Equals, 2*m) // sum of interval is larger than report interval, and cold oldItem = newItem newItem.thresholds = []float64{10.0, 10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2*m-1) // sum of interval is larger than report interval, and cold for i := 0; i < 2*m-1; i++ { oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second) } c.Check(newItem.HotDegree, Less, 0) c.Check(newItem.AntiCount, Equals, 0) @@ -330,7 +395,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true { break } - item := cache.updateHotPeerStat(newItem, oldItem, direct, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) + item := cache.updateHotPeerStat(newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) cache.Update(item) } thresholds := cache.calcHotThresholds(storeID) @@ -343,65 +408,184 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold } func (t *testHotPeerCache) TestRemoveFromCache(c *C) { + peerCount := 3 + interval := uint64(5) + checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} + for _, checker := range checkers { + cache := NewHotPeerCache(WriteFlow) + region := buildRegion(WriteFlow, peerCount, interval) + // prepare + intervalSums := make(map[uint64]int) + for i := 1; i <= 200; i++ { + rets := checker(c, cache, region) + checkIntervalSumContinuous(c, intervalSums, rets, interval) + } + // make the interval sum of peers are different + checkAndUpdateSkipOne(c, cache, region) + checkIntervalSum(cache, region) + // check whether cold cache is cleared + var isClear bool + intervalSums = make(map[uint64]int) + region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) + for i := 1; i <= 200; i++ { + rets := checker(c, cache, region) + checkIntervalSumContinuous(c, intervalSums, rets, interval) + if len(cache.storesOfRegion[region.GetID()]) == 0 { + isClear = true + break + } + } + c.Assert(isClear, IsTrue) + } +} + +func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) { peerCounts := []int{3, 5} - intervals := []uint64{120, 60, 10} + intervals := []uint64{120, 60, 10, 5} + checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, peerCount := range peerCounts { for _, interval := range intervals { - cache := NewHotPeerCache(WriteFlow) - peers := newPeers(peerCount, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) - meta := &metapb.Region{ - Id: 1000, - Peers: peers, - StartKey: []byte(""), - EndKey: []byte(""), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, - } - region := core.NewRegionInfo( - meta, - peers[0], - core.SetReportInterval(interval), - core.SetWrittenBytes(10*1024*1024*interval), - core.SetWrittenKeys(10*1024*1024*interval), - core.SetWrittenQuery(1024*interval), - ) - for i := 1; i <= 200; i++ { - checkAndUpdate(c, cache, region, peerCount) - } - c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount) - var isClear bool - region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) - for i := 1; i <= 200; i++ { - checkAndUpdateSync(cache, region) - if len(cache.storesOfRegion[region.GetID()]) == 0 { - isClear = true - break + for _, checker := range checkers { + cache := NewHotPeerCache(WriteFlow) + region := buildRegion(WriteFlow, peerCount, interval) + + target := uint64(10) + intervalSums := make(map[uint64]int) + step := func(i int) { + tmp := uint64(0) + if i%5 == 0 { + tmp, region = schedule(c, removeReplica, region) + } + rets := checker(c, cache, region) + checkIntervalSumContinuous(c, intervalSums, rets, interval) + if i%5 == 0 { + _, region = schedule(c, addReplica, region, target) + target = tmp + } } + + // prepare with random move peer to make the interval sum of peers are different + for i := 1; i < 150; i++ { + step(i) + if i > 150 && checkIntervalSum(cache, region) { + break + } + } + if interval < RegionHeartBeatReportInterval { + c.Assert(checkIntervalSum(cache, region), IsTrue) + } + c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount) + + // check whether cold cache is cleared + var isClear bool + intervalSums = make(map[uint64]int) + region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) + for i := 1; i < 200; i++ { + step(i) + if len(cache.storesOfRegion[region.GetID()]) == 0 { + isClear = true + break + } + } + c.Assert(isClear, IsTrue) } - c.Assert(isClear, IsTrue) + } + } +} + +func checkCoolDown(c *C, cache *hotPeerCache, region *core.RegionInfo, expect bool) { + item := cache.getOldHotPeerStat(region.GetID(), region.GetLeader().GetStoreId()) + c.Assert(item.IsNeedCoolDownTransferLeader(3), Equals, expect) +} + +func (t *testHotPeerCache) TestCoolDownTransferLeader(c *C) { + cache := NewHotPeerCache(ReadFlow) + region := buildRegion(ReadFlow, 3, 60) + + moveLeader := func() { + _, region = schedule(c, movePeer, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + _, region = schedule(c, transferLeader, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, true) + } + transferLeader := func() { + _, region = schedule(c, transferLeader, region) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, true) + } + movePeer := func() { + _, region = schedule(c, movePeer, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + } + addReplica := func() { + _, region = schedule(c, addReplica, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + } + removeReplica := func() { + _, region = schedule(c, removeReplica, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + } + cases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} + for _, runCase := range cases { + cache = NewHotPeerCache(ReadFlow) + region = buildRegion(ReadFlow, 3, 60) + for i := 1; i <= 200; i++ { + checkAndUpdate(c, cache, region) + } + checkCoolDown(c, cache, region, false) + runCase() + } +} + +// See issue #4510 +func (t *testHotPeerCache) TestCacheInherit(c *C) { + cache := NewHotPeerCache(ReadFlow) + region := buildRegion(ReadFlow, 3, 10) + // prepare + for i := 1; i <= 200; i++ { + checkAndUpdate(c, cache, region) + } + // move peer + newStoreID := uint64(10) + _, region = schedule(c, addReplica, region, newStoreID) + checkAndUpdate(c, cache, region) + newStoreID, region = schedule(c, removeReplica, region) + rets := checkAndUpdate(c, cache, region) + for _, ret := range rets { + if ret.needDelete == false { + flow := ret.GetLoads()[RegionReadBytes] + c.Assert(flow, Equals, float64(region.GetBytesRead()/ReadReportInterval)) + } + } + // new flow + newFlow := region.GetBytesRead() * 10 + region = region.Clone(core.SetReadBytes(newFlow)) + for i := 1; i <= 200; i++ { + checkAndUpdate(c, cache, region) + } + // move peer + _, region = schedule(c, addReplica, region, newStoreID) + checkAndUpdate(c, cache, region) + _, region = schedule(c, removeReplica, region) + rets = checkAndUpdate(c, cache, region) + for _, ret := range rets { + if ret.needDelete == false { + flow := ret.GetLoads()[RegionReadBytes] + c.Assert(flow, Equals, float64(newFlow/ReadReportInterval)) } } } func BenchmarkCheckRegionFlow(b *testing.B) { cache := NewHotPeerCache(ReadFlow) - region := core.NewRegionInfo(&metapb.Region{ - Id: 1, - Peers: []*metapb.Peer{ - {Id: 101, StoreId: 1}, - {Id: 102, StoreId: 2}, - {Id: 103, StoreId: 3}, - }, - }, - &metapb.Peer{Id: 101, StoreId: 1}, - ) - newRegion := region.Clone( - core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), - core.SetReadBytes(30000*10), - core.SetReadKeys(300000*10)) + region := buildRegion(ReadFlow, 3, 10) peerInfos := make([]*core.PeerInfo, 0) - for _, peer := range newRegion.GetPeers() { + for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) peerInfos = append(peerInfos, peerInfo) } diff --git a/server/statistics/kind.go b/server/statistics/kind.go index 0d1adf0eb0d..b89010af914 100644 --- a/server/statistics/kind.go +++ b/server/statistics/kind.go @@ -132,8 +132,7 @@ type sourceKind int const ( direct sourceKind = iota // there is a corresponding peer in this store. - inherit // there is no a corresponding peer in this store and there is a peer just deleted. - adopt // there is no corresponding peer in this store and there is no peer just deleted, we need to copy from other stores. + inherit // there is no corresponding peer in this store and we need to copy from other stores. ) func (k sourceKind) String() string { @@ -142,8 +141,6 @@ func (k sourceKind) String() string { return "direct" case inherit: return "inherit" - case adopt: - return "adopt" } return "unknown" }