diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index f0e36a7b008..4698cc4769a 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -16,12 +16,12 @@ package statistics import ( "math/rand" + "sort" "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" - "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" ) @@ -31,23 +31,9 @@ type testHotPeerCache struct{} func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { cache := NewHotPeerCache(Write) - peers := newPeers(3, - func(i int) uint64 { return uint64(10000 + i) }, - func(i int) uint64 { return uint64(i) }) - meta := &metapb.Region{ - Id: 1000, - Peers: peers, - StartKey: []byte(""), - EndKey: []byte(""), - RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, - } intervals := []uint64{120, 60} for _, interval := range intervals { - region := core.NewRegionInfo(meta, peers[0], - // interval is [0, interval] - core.SetReportInterval(interval), - core.SetWrittenBytes(interval*100*1024)) - + region := buildRegion(Write, 3, interval) checkAndUpdate(c, cache, region, 3) { stats := cache.RegionStats(0) @@ -191,6 +177,36 @@ func checkOp(c *C, ret []*HotPeerStat, storeID uint64, actionType ActionType) { } } +// checkIntervalSum checks whether the interval sum of the peers are different. +func checkIntervalSum(cache *hotPeerCache, region *core.RegionInfo) bool { + var intervalSums []int + for _, peer := range region.GetPeers() { + oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) + if oldItem != nil { + intervalSums = append(intervalSums, int(oldItem.getIntervalSum())) + } + } + sort.Ints(intervalSums) + return intervalSums[0] != intervalSums[len(intervalSums)-1] +} + +// checkIntervalSumContinuous checks whether the interval sum of the peer is continuous. +func checkIntervalSumContinuous(c *C, intervalSums map[uint64]int, rets []*HotPeerStat, interval uint64) { + for _, ret := range rets { + if ret.actionType == Remove { + delete(intervalSums, ret.StoreID) + continue + } + new := int(ret.getIntervalSum() / 1000000000) + if ret.source == direct { + if old, ok := intervalSums[ret.StoreID]; ok { + c.Assert(new, Equals, (old+int(interval))%RegionHeartBeatReportInterval) + } + } + intervalSums[ret.StoreID] = new + } +} + func schedule(c *C, operator operator, region *core.RegionInfo, targets ...uint64) (srcStore uint64, _ *core.RegionInfo) { switch operator { case transferLeader: @@ -392,29 +408,28 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold } func (t *testHotPeerCache) TestRemoveFromCache(c *C) { + peerCount := 3 + interval := uint64(5) checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} for _, checker := range checkers { cache := NewHotPeerCache(Write) - region := buildRegion(Write, 3, 5) + region := buildRegion(Write, peerCount, interval) // prepare + intervalSums := make(map[uint64]int) for i := 1; i <= 200; i++ { - checker(c, cache, region) + rets := checker(c, cache, region) + checkIntervalSumContinuous(c, intervalSums, rets, interval) } // make the interval sum of peers are different checkAndUpdateSkipOne(c, cache, region) - var intervalSums []int - for _, peer := range region.GetPeers() { - oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) - intervalSums = append(intervalSums, int(oldItem.getIntervalSum())) - } - c.Assert(intervalSums, HasLen, 3) - c.Assert(intervalSums[0], Not(Equals), intervalSums[1]) - c.Assert(intervalSums[0], Not(Equals), intervalSums[2]) + checkIntervalSum(cache, region) // check whether cold cache is cleared var isClear bool + intervalSums = make(map[uint64]int) region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) for i := 1; i <= 200; i++ { - checker(c, cache, region) + rets := checker(c, cache, region) + checkIntervalSumContinuous(c, intervalSums, rets, interval) if len(cache.storesOfRegion[region.GetID()]) == 0 { isClear = true break @@ -435,28 +450,38 @@ func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) { region := buildRegion(Write, peerCount, interval) target := uint64(10) - movePeer := func() { + intervalSums := make(map[uint64]int) + step := func(i int) { tmp := uint64(0) - tmp, region = schedule(c, removeReplica, region) - _, region = schedule(c, addReplica, region, target) - target = tmp + if i%5 == 0 { + tmp, region = schedule(c, removeReplica, region) + } + rets := checker(c, cache, region) + checkIntervalSumContinuous(c, intervalSums, rets, interval) + if i%5 == 0 { + _, region = schedule(c, addReplica, region, target) + target = tmp + } } + // prepare with random move peer to make the interval sum of peers are different - for i := 1; i <= 200; i++ { - if i%5 == 0 { - movePeer() + for i := 1; i < 150; i++ { + step(i) + if i > 150 && checkIntervalSum(cache, region) { + break } - checker(c, cache, region) + } + if interval < RegionHeartBeatReportInterval { + c.Assert(checkIntervalSum(cache, region), IsTrue) } c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount) + // check whether cold cache is cleared var isClear bool + intervalSums = make(map[uint64]int) region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) - for i := 1; i <= 200; i++ { - if i%5 == 0 { - movePeer() - } - checker(c, cache, region) + for i := 1; i < 200; i++ { + step(i) if len(cache.storesOfRegion[region.GetID()]) == 0 { isClear = true break @@ -468,6 +493,55 @@ func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) { } } +func checkCoolDown(c *C, cache *hotPeerCache, region *core.RegionInfo, expect bool) { + item := cache.getOldHotPeerStat(region.GetID(), region.GetLeader().GetStoreId()) + c.Assert(item.IsNeedCoolDownTransferLeader(3), Equals, expect) +} + +func (t *testHotPeerCache) TestCoolDownTransferLeader(c *C) { + cache := NewHotPeerCache(Read) + region := buildRegion(Read, 3, 60) + + moveLeader := func() { + _, region = schedule(c, movePeer, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + _, region = schedule(c, transferLeader, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, true) + } + transferLeader := func() { + _, region = schedule(c, transferLeader, region) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, true) + } + movePeer := func() { + _, region = schedule(c, movePeer, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + } + addReplica := func() { + _, region = schedule(c, addReplica, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + } + removeReplica := func() { + _, region = schedule(c, removeReplica, region, 10) + checkAndUpdate(c, cache, region) + checkCoolDown(c, cache, region, false) + } + cases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} + for _, runCase := range cases { + cache = NewHotPeerCache(Read) + region = buildRegion(Read, 3, 60) + for i := 1; i <= 200; i++ { + checkAndUpdate(c, cache, region) + } + checkCoolDown(c, cache, region, false) + runCase() + } +} + // See issue #4510 func (t *testHotPeerCache) TestCacheInherit(c *C) { cache := NewHotPeerCache(Read) @@ -509,22 +583,9 @@ func (t *testHotPeerCache) TestCacheInherit(c *C) { func BenchmarkCheckRegionFlow(b *testing.B) { cache := NewHotPeerCache(Read) - region := core.NewRegionInfo(&metapb.Region{ - Id: 1, - Peers: []*metapb.Peer{ - {Id: 101, StoreId: 1}, - {Id: 102, StoreId: 2}, - {Id: 103, StoreId: 3}, - }, - }, - &metapb.Peer{Id: 101, StoreId: 1}, - ) - newRegion := region.Clone( - core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), - core.SetReadBytes(30000*10), - core.SetReadKeys(300000*10)) + region := buildRegion(Read, 3, 10) peerInfos := make([]*core.PeerInfo, 0) - for _, peer := range newRegion.GetPeers() { + for _, peer := range region.GetPeers() { peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) peerInfos = append(peerInfos, peerInfo) }