-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
statistics: add more test for hot statistics #4499
Changes from 4 commits
bf57cf7
51e793c
b8753de
0856012
bfac82d
50f8a37
7bc7ec7
e40dc69
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -16,12 +16,12 @@ package statistics | |||||
|
||||||
import ( | ||||||
"math/rand" | ||||||
"sort" | ||||||
"testing" | ||||||
"time" | ||||||
|
||||||
. "github.com/pingcap/check" | ||||||
"github.com/pingcap/kvproto/pkg/metapb" | ||||||
"github.com/pingcap/kvproto/pkg/pdpb" | ||||||
"github.com/tikv/pd/server/core" | ||||||
) | ||||||
|
||||||
|
@@ -31,23 +31,9 @@ type testHotPeerCache struct{} | |||||
|
||||||
func (t *testHotPeerCache) TestStoreTimeUnsync(c *C) { | ||||||
cache := NewHotPeerCache(Write) | ||||||
peers := newPeers(3, | ||||||
func(i int) uint64 { return uint64(10000 + i) }, | ||||||
func(i int) uint64 { return uint64(i) }) | ||||||
meta := &metapb.Region{ | ||||||
Id: 1000, | ||||||
Peers: peers, | ||||||
StartKey: []byte(""), | ||||||
EndKey: []byte(""), | ||||||
RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, | ||||||
} | ||||||
intervals := []uint64{120, 60} | ||||||
for _, interval := range intervals { | ||||||
region := core.NewRegionInfo(meta, peers[0], | ||||||
// interval is [0, interval] | ||||||
core.SetReportInterval(interval), | ||||||
core.SetWrittenBytes(interval*100*1024)) | ||||||
|
||||||
region := buildRegion(Write, 3, interval) | ||||||
checkAndUpdate(c, cache, region, 3) | ||||||
{ | ||||||
stats := cache.RegionStats(0) | ||||||
|
@@ -191,6 +177,36 @@ func checkOp(c *C, ret []*HotPeerStat, storeID uint64, actionType ActionType) { | |||||
} | ||||||
} | ||||||
|
||||||
// checkIntervalSum checks whether the interval sum of the peers are different. | ||||||
func checkIntervalSum(cache *hotPeerCache, region *core.RegionInfo) bool { | ||||||
var intervalSums []int | ||||||
for _, peer := range region.GetPeers() { | ||||||
oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) | ||||||
if oldItem != nil { | ||||||
intervalSums = append(intervalSums, int(oldItem.getIntervalSum())) | ||||||
} | ||||||
} | ||||||
sort.Ints(intervalSums) | ||||||
return intervalSums[0] != intervalSums[len(intervalSums)-1] | ||||||
} | ||||||
|
||||||
// checkIntervalSumContinuous checks whether the interval sum of the peer is continuous. | ||||||
func checkIntervalSumContinuous(c *C, intervalSums map[uint64]int, rets []*HotPeerStat, interval uint64) { | ||||||
for _, ret := range rets { | ||||||
if ret.actionType == Remove { | ||||||
delete(intervalSums, ret.StoreID) | ||||||
continue | ||||||
} | ||||||
new := int(ret.getIntervalSum() / 1000000000) | ||||||
if ret.source == direct { | ||||||
if old, ok := intervalSums[ret.StoreID]; ok { | ||||||
c.Assert(new, Equals, (old+int(interval))%RegionHeartBeatReportInterval) | ||||||
} | ||||||
} | ||||||
intervalSums[ret.StoreID] = new | ||||||
} | ||||||
} | ||||||
|
||||||
func schedule(c *C, operator operator, region *core.RegionInfo, targets ...uint64) (srcStore uint64, _ *core.RegionInfo) { | ||||||
switch operator { | ||||||
case transferLeader: | ||||||
|
@@ -392,29 +408,28 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold | |||||
} | ||||||
|
||||||
func (t *testHotPeerCache) TestRemoveFromCache(c *C) { | ||||||
peerCount := 3 | ||||||
interval := uint64(5) | ||||||
checkers := []check{checkAndUpdate, checkAndUpdateWithOrdering} | ||||||
for _, checker := range checkers { | ||||||
cache := NewHotPeerCache(Write) | ||||||
region := buildRegion(Write, 3, 5) | ||||||
region := buildRegion(Write, peerCount, interval) | ||||||
// prepare | ||||||
intervalSums := make(map[uint64]int) | ||||||
for i := 1; i <= 200; i++ { | ||||||
checker(c, cache, region) | ||||||
rets := checker(c, cache, region) | ||||||
checkIntervalSumContinuous(c, intervalSums, rets, interval) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it will be too long and difficult to understand. |
||||||
} | ||||||
// make the interval sum of peers are different | ||||||
checkAndUpdateSkipOne(c, cache, region) | ||||||
var intervalSums []int | ||||||
for _, peer := range region.GetPeers() { | ||||||
oldItem := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) | ||||||
intervalSums = append(intervalSums, int(oldItem.getIntervalSum())) | ||||||
} | ||||||
c.Assert(intervalSums, HasLen, 3) | ||||||
c.Assert(intervalSums[0], Not(Equals), intervalSums[1]) | ||||||
c.Assert(intervalSums[0], Not(Equals), intervalSums[2]) | ||||||
checkIntervalSum(cache, region) | ||||||
// check whether cold cache is cleared | ||||||
var isClear bool | ||||||
intervalSums = make(map[uint64]int) | ||||||
region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) | ||||||
for i := 1; i <= 200; i++ { | ||||||
checker(c, cache, region) | ||||||
rets := checker(c, cache, region) | ||||||
checkIntervalSumContinuous(c, intervalSums, rets, interval) | ||||||
if len(cache.storesOfRegion[region.GetID()]) == 0 { | ||||||
isClear = true | ||||||
break | ||||||
|
@@ -435,28 +450,38 @@ func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) { | |||||
region := buildRegion(Write, peerCount, interval) | ||||||
|
||||||
target := uint64(10) | ||||||
movePeer := func() { | ||||||
intervalSums := make(map[uint64]int) | ||||||
step := func(i int) { | ||||||
tmp := uint64(0) | ||||||
tmp, region = schedule(c, removeReplica, region) | ||||||
_, region = schedule(c, addReplica, region, target) | ||||||
target = tmp | ||||||
if i%5 == 0 { | ||||||
tmp, region = schedule(c, removeReplica, region) | ||||||
} | ||||||
rets := checker(c, cache, region) | ||||||
checkIntervalSumContinuous(c, intervalSums, rets, interval) | ||||||
if i%5 == 0 { | ||||||
_, region = schedule(c, addReplica, region, target) | ||||||
target = tmp | ||||||
} | ||||||
} | ||||||
|
||||||
// prepare with random move peer to make the interval sum of peers are different | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the comment need to be changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to change it because the actual move peer is to add replica in the target store and then remove replica from the source store |
||||||
for i := 1; i <= 200; i++ { | ||||||
if i%5 == 0 { | ||||||
movePeer() | ||||||
for i := 1; i < 150; i++ { | ||||||
step(i) | ||||||
if i > 150 && checkIntervalSum(cache, region) { | ||||||
break | ||||||
} | ||||||
checker(c, cache, region) | ||||||
} | ||||||
if interval < RegionHeartBeatReportInterval { | ||||||
c.Assert(checkIntervalSum(cache, region), IsTrue) | ||||||
} | ||||||
c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount) | ||||||
|
||||||
// check whether cold cache is cleared | ||||||
var isClear bool | ||||||
intervalSums = make(map[uint64]int) | ||||||
region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) | ||||||
for i := 1; i <= 200; i++ { | ||||||
if i%5 == 0 { | ||||||
movePeer() | ||||||
} | ||||||
checker(c, cache, region) | ||||||
for i := 1; i < 200; i++ { | ||||||
step(i) | ||||||
if len(cache.storesOfRegion[region.GetID()]) == 0 { | ||||||
isClear = true | ||||||
break | ||||||
|
@@ -468,24 +493,60 @@ func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) { | |||||
} | ||||||
} | ||||||
|
||||||
func checkCoolDown(c *C, cache *hotPeerCache, region *core.RegionInfo, expect bool) { | ||||||
item := cache.getOldHotPeerStat(region.GetID(), region.GetLeader().GetStoreId()) | ||||||
c.Assert(item.IsNeedCoolDownTransferLeader(3), Equals, expect) | ||||||
} | ||||||
|
||||||
func (t *testHotPeerCache) TestCoolDownTransferLeader(c *C) { | ||||||
cache := NewHotPeerCache(Read) | ||||||
region := buildRegion(Read, 3, 60) | ||||||
|
||||||
moveLeader := func() { | ||||||
_, region = schedule(c, movePeer, region, 10) | ||||||
checkAndUpdate(c, cache, region) | ||||||
checkCoolDown(c, cache, region, false) | ||||||
_, region = schedule(c, transferLeader, region, 10) | ||||||
checkAndUpdate(c, cache, region) | ||||||
checkCoolDown(c, cache, region, true) | ||||||
} | ||||||
transferLeader := func() { | ||||||
_, region = schedule(c, transferLeader, region) | ||||||
checkAndUpdate(c, cache, region) | ||||||
checkCoolDown(c, cache, region, true) | ||||||
} | ||||||
movePeer := func() { | ||||||
_, region = schedule(c, movePeer, region, 10) | ||||||
checkAndUpdate(c, cache, region) | ||||||
checkCoolDown(c, cache, region, false) | ||||||
} | ||||||
addReplica := func() { | ||||||
_, region = schedule(c, addReplica, region, 10) | ||||||
checkAndUpdate(c, cache, region) | ||||||
checkCoolDown(c, cache, region, false) | ||||||
} | ||||||
removeReplica := func() { | ||||||
_, region = schedule(c, removeReplica, region, 10) | ||||||
checkAndUpdate(c, cache, region) | ||||||
checkCoolDown(c, cache, region, false) | ||||||
} | ||||||
cases := []func(){moveLeader, transferLeader, movePeer, addReplica, removeReplica} | ||||||
for _, scheduleAndCheck := range cases { | ||||||
lhy1024 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
cache = NewHotPeerCache(Read) | ||||||
region = buildRegion(Read, 3, 60) | ||||||
for i := 1; i <= 200; i++ { | ||||||
checkAndUpdate(c, cache, region) | ||||||
} | ||||||
checkCoolDown(c, cache, region, false) | ||||||
scheduleAndCheck() | ||||||
} | ||||||
} | ||||||
|
||||||
func BenchmarkCheckRegionFlow(b *testing.B) { | ||||||
cache := NewHotPeerCache(Read) | ||||||
region := core.NewRegionInfo(&metapb.Region{ | ||||||
Id: 1, | ||||||
Peers: []*metapb.Peer{ | ||||||
{Id: 101, StoreId: 1}, | ||||||
{Id: 102, StoreId: 2}, | ||||||
{Id: 103, StoreId: 3}, | ||||||
}, | ||||||
}, | ||||||
&metapb.Peer{Id: 101, StoreId: 1}, | ||||||
) | ||||||
newRegion := region.Clone( | ||||||
core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), | ||||||
core.SetReadBytes(30000*10), | ||||||
core.SetReadKeys(300000*10)) | ||||||
region := buildRegion(Read, 3, 10) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It changes the original value. Is it expected? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't matter, just make sure it's hot |
||||||
peerInfos := make([]*core.PeerInfo, 0) | ||||||
for _, peer := range newRegion.GetPeers() { | ||||||
for _, peer := range region.GetPeers() { | ||||||
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) | ||||||
peerInfos = append(peerInfos, peerInfo) | ||||||
} | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto