Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed May 12, 2021
1 parent 3bf277e commit a025e39
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 47 deletions.
13 changes: 6 additions & 7 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,20 +1123,19 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h
if !item.IsNeedDelete() {
if kind == read {
// transfer leader won't skip the first heartbeat for read stat
c.Check(item.HotDegree, Equals, 1)
c.Check(item.HotDegree, Equals, 4)
} else {
c.Check(item.HotDegree, Equals, 3)
}
}
}

// heartbeat twice for read stat in order to let hot degree become 3
if kind == read {
heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3}, 1)
// heartbeat once for write stat in order to let hot degree become 4
if kind == write {
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3}, 1)
for _, item := range items {
if !item.IsNeedDelete() {
c.Check(item.HotDegree, Equals, 3)
c.Check(item.HotDegree, Equals, 4)
}
}
}
Expand All @@ -1155,7 +1154,7 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 3, 4}, 1)
c.Check(len(items), Greater, 0)
for _, item := range items {
c.Check(item.HotDegree, Equals, 4)
c.Check(item.HotDegree, Equals, 5)
}
items = heartbeat(1, 2, 512*KB*reportInterval, 0, reportInterval, []uint64{1, 4}, 1)
c.Check(len(items), Greater, 0)
Expand All @@ -1164,7 +1163,7 @@ func (s *testHotCacheSuite) checkRegionFlowTest(c *C, tc *mockcluster.Cluster, h
c.Check(item.IsNeedDelete(), IsTrue)
continue
}
c.Check(item.HotDegree, Equals, 5)
c.Check(item.HotDegree, Equals, 6)
}
}

Expand Down
4 changes: 2 additions & 2 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func NewHotCache(ctx context.Context) *HotCache {

// ExpiredItems returns the items which are already expired.:
func (w *HotCache) ExpiredItems(region *core.RegionInfo) (expiredItems []*HotPeerStat) {
expiredItems = append(expiredItems, w.writeFlow.ExpiredItems(region)...)
expiredItems = append(expiredItems, w.readFlow.ExpiredItems(region)...)
expiredItems = append(expiredItems, w.writeFlow.CollectExpiredItems(region)...)
expiredItems = append(expiredItems, w.readFlow.CollectExpiredItems(region)...)
return
}

Expand Down
48 changes: 13 additions & 35 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/server/core"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -143,12 +142,14 @@ func (f *hotPeerCache) collectPeerMetrics(loads []float64, interval uint64) {
}
}

func (f *hotPeerCache) ExpiredItems(region *core.RegionInfo) []*HotPeerStat {
// CollectExpiredItems collects expired items, mark them as needDelete and puts them into inherit items
func (f *hotPeerCache) CollectExpiredItems(region *core.RegionInfo) []*HotPeerStat {
regionID := region.GetID()
items := make([]*HotPeerStat, 0)
for _, storeID := range f.getAllStoreIDs(region) {
for _, storeID := range f.getAllStoreIDs(region, true, true) {
if region.GetStorePeer(storeID) == nil {
item := f.getOldHotPeerStat(regionID, storeID)
f.putInheritItem(item)
if item != nil {
item.needDelete = true
items = append(items, item)
Expand All @@ -163,8 +164,8 @@ func (f *hotPeerCache) ExpiredItems(region *core.RegionInfo) []*HotPeerStat {
func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerStat) {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
regionID := region.GetID()
storeIDs := f.getAllStoreIDs(region)
includeFollowers := f.kind == WriteFlow
storeIDs := f.getAllStoreIDs(region, includeFollowers, false)
for _, storeID := range storeIDs {
peer := region.GetStorePeer(storeID)
var item *HotPeerStat
Expand All @@ -176,23 +177,12 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS
region.GetKeysRead(),
interval, region)
item = f.CheckPeerFlow(peerInfo, region, interval)
} else {
item = f.markExpiredItem(regionID, storeID)
}
if item != nil {
ret = append(ret, item)
}
}
var peers []uint64
for _, peer := range region.GetPeers() {
peers = append(peers, peer.StoreId)
}
log.Debug("region heartbeat info",
zap.String("type", f.kind.String()),
zap.Uint64("region", region.GetID()),
zap.Uint64("leader", region.GetLeader().GetStoreId()),
zap.Uint64s("peers", peers),
)
ret = append(ret, f.CollectExpiredItems(region)...)
return ret
}

Expand All @@ -207,13 +197,8 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
loads[i] = deltaLoads[i] / float64(interval)
}
justTransferLeader := f.justTransferLeader(region)
// remove peer
isExpired := f.isPeerExpired(peer, region)
oldItem := f.getOldHotPeerStat(region.GetID(), storeID)
if isExpired && oldItem != nil {
f.putInheritItem(oldItem)
}
if !isExpired && Denoising && interval < HotRegionReportMinInterval {
if Denoising && interval < HotRegionReportMinInterval {
return nil
}
thresholds := f.calcHotThresholds(storeID)
Expand All @@ -227,7 +212,7 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
Kind: f.kind,
Loads: loads,
LastUpdateTime: time.Now(),
needDelete: isExpired,
needDelete: false,
isLeader: region.GetLeader().GetStoreId() == storeID,
justTransferLeader: justTransferLeader,
interval: interval,
Expand All @@ -239,7 +224,7 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
if inheritItem != nil {
oldItem = inheritItem
} else {
for _, storeID := range f.getAllStoreIDs(region) {
for _, storeID := range f.getAllStoreIDs(region, true, true) {
oldItem = f.getOldHotPeerStat(region.GetID(), storeID)
if oldItem != nil {
break
Expand Down Expand Up @@ -304,12 +289,12 @@ func (f *hotPeerCache) calcHotThresholds(storeID uint64) []float64 {
}

// gets the storeIDs, including old region and new region
func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 {
func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo, includeFollower bool, includeOldStores bool) []uint64 {
storeIDs := make(map[uint64]struct{})
ret := make([]uint64, 0, len(region.GetPeers()))
// old stores
ids, ok := f.storesOfRegion[region.GetID()]
if ok {
if ok && includeOldStores {
for storeID := range ids {
storeIDs[storeID] = struct{}{}
ret = append(ret, storeID)
Expand All @@ -318,8 +303,7 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 {

// new stores
for _, peer := range region.GetPeers() {
// ReadFlow no need consider the followers.
if f.kind == ReadFlow && peer.GetStoreId() != region.GetLeader().GetStoreId() {
if region.GetLeader().StoreId != peer.StoreId && !includeFollower {
continue
}
if _, ok := storeIDs[peer.GetStoreId()]; !ok {
Expand Down Expand Up @@ -481,12 +465,6 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa
return newItem
}

func (f *hotPeerCache) markExpiredItem(regionID, storeID uint64) *HotPeerStat {
item := f.getOldHotPeerStat(regionID, storeID)
item.needDelete = true
return item
}

func (f *hotPeerCache) getFlowDeltaLoads(stat core.FlowStat) []float64 {
ret := make([]float64, RegionStatCount)
for k := RegionStatKind(0); k < RegionStatCount; k++ {
Expand Down
7 changes: 4 additions & 3 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ type testCacheCase struct {

func (t *testHotPeerCache) TestCache(c *C) {
tests := []*testCacheCase{
{ReadFlow, movePeer, 1, false},
{ReadFlow, addReplica, 1, true},
{ReadFlow, transferLeader, 3, false},
{ReadFlow, movePeer, 4, true},
{ReadFlow, addReplica, 4, false},
{WriteFlow, transferLeader, 3, true},
{WriteFlow, movePeer, 4, true},
{WriteFlow, addReplica, 4, true},
Expand All @@ -88,7 +89,7 @@ func (t *testHotPeerCache) TestCache(c *C) {

func testCache(c *C, t *testCacheCase) {
defaultSize := map[FlowKind]int{
ReadFlow: 1, // only leader
ReadFlow: 3, // all peers
WriteFlow: 3, // all peers
}
cache := NewHotStoresStats(t.kind)
Expand Down

0 comments on commit a025e39

Please sign in to comment.