Skip to content

Commit

Permalink
statistic: replace old inherit with adopt in hot statistic (#4512)
Browse files Browse the repository at this point in the history
* remove inherit

Signed-off-by: lhy1024 <admin@liudos.us>

* add test

Signed-off-by: lhy1024 <admin@liudos.us>

* add test

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Jan 10, 2022
1 parent 68c4ff3 commit 6f8edf2
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 47 deletions.
10 changes: 5 additions & 5 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@ type HotPeerStat struct {
// If the peer didn't been send by store heartbeat when it is already stored as hot peer stat,
// we will handle it as cold peer and mark the inCold flag
inCold bool
// source represents the statistics item source, such as directly, inherit, adopt.
// source represents the statistics item source, such as direct, inherit.
source sourceKind
// If the item in storeA is just adopted from storeB,
// then other store, such as storeC, will be forbidden to adopt from storeA until the item in storeA is hot.
allowAdopt bool
// If the item in storeA is just inherited from storeB,
// then other store, such as storeC, will be forbidden to inherit from storeA until the item in storeA is hot.
allowInherited bool
}

// ID returns region ID. Implementing TopNItem.
Expand Down Expand Up @@ -139,7 +139,7 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi
zap.Int("hot-anti-count", stat.AntiCount),
zap.Duration("sum-interval", stat.getIntervalSum()),
zap.String("source", stat.source.String()),
zap.Bool("allow-adopt", stat.allowAdopt),
zap.Bool("allow-inherited", stat.allowInherited),
zap.String("action-type", stat.actionType.String()),
zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime))
}
Expand Down
49 changes: 11 additions & 38 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type hotPeerCache struct {
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
topNTTL time.Duration
reportIntervalSecs int
taskQueue chan flowItemTask
Expand All @@ -70,7 +69,6 @@ func NewHotPeerCache(kind RWType) *hotPeerCache {
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
inheritItem: make(map[uint64]*HotPeerStat),
taskQueue: make(chan flowItemTask, queueCap),
}
if kind == Write {
Expand Down Expand Up @@ -102,9 +100,6 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat {
func (f *hotPeerCache) updateStat(item *HotPeerStat) {
switch item.actionType {
case Remove:
if item.AntiCount > 0 { // means it's deleted because expired rather than cold
f.putInheritItem(item)
}
f.removeItem(item)
item.Log("region heartbeat remove from cache", log.Debug)
incMetrics("remove_item", item.StoreID, item.Kind)
Expand Down Expand Up @@ -194,17 +189,11 @@ func (f *hotPeerCache) checkPeerFlow(peer *core.PeerInfo, region *core.RegionInf
}

if oldItem == nil {
inheritItem := f.takeInheritItem(regionID)
if inheritItem != nil {
oldItem = inheritItem
newItem.source = inherit
} else {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(regionID, storeID)
if oldItem != nil && oldItem.allowAdopt {
newItem.source = adopt
break
}
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(regionID, storeID)
if oldItem != nil && oldItem.allowInherited {
newItem.source = inherit
break
}
}
}
Expand Down Expand Up @@ -393,14 +382,14 @@ func (f *hotPeerCache) updateHotPeerStat(region *core.RegionInfo, newItem, oldIt
return f.updateNewHotPeerStat(regionStats, newItem, deltaLoads, interval)
}

if newItem.source == adopt {
if newItem.source == inherit {
for _, dim := range oldItem.rollingLoads {
newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone())
}
newItem.allowAdopt = false
newItem.allowInherited = false
} else {
newItem.rollingLoads = oldItem.rollingLoads
newItem.allowAdopt = oldItem.allowAdopt
newItem.allowInherited = oldItem.allowInherited
}

if f.justTransferLeader(region) {
Expand Down Expand Up @@ -474,22 +463,6 @@ func (f *hotPeerCache) updateNewHotPeerStat(regionStats []RegionStatKind, newIte
return newItem
}

func (f *hotPeerCache) putInheritItem(item *HotPeerStat) {
f.inheritItem[item.RegionID] = item
}

func (f *hotPeerCache) takeInheritItem(regionID uint64) *HotPeerStat {
item, ok := f.inheritItem[regionID]
if !ok {
return nil
}
if item != nil {
delete(f.inheritItem, regionID)
return item
}
return nil
}

func (f *hotPeerCache) putItem(item *HotPeerStat) {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
Expand Down Expand Up @@ -529,14 +502,14 @@ func coldItem(newItem, oldItem *HotPeerStat) {
if newItem.AntiCount <= 0 {
newItem.actionType = Remove
} else {
newItem.allowAdopt = true
newItem.allowInherited = true
}
}

func hotItem(newItem, oldItem *HotPeerStat) {
newItem.HotDegree = oldItem.HotDegree + 1
newItem.AntiCount = hotRegionAntiCount
newItem.allowAdopt = true
newItem.allowInherited = true
if newItem.Kind == Read {
newItem.AntiCount = hotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval)
}
Expand All @@ -545,7 +518,7 @@ func hotItem(newItem, oldItem *HotPeerStat) {
func initItem(item *HotPeerStat) {
item.HotDegree = 1
item.AntiCount = hotRegionAntiCount
item.allowAdopt = true
item.allowInherited = true
if item.Kind == Read {
item.AntiCount = hotRegionAntiCount * (RegionHeartBeatReportInterval / StoreHeartBeatReportInterval)
}
Expand Down
39 changes: 39 additions & 0 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,45 @@ func (t *testHotPeerCache) TestRemoveFromCacheRandom(c *C) {
}
}

// See issue #4510
func (t *testHotPeerCache) TestCacheInherit(c *C) {
cache := NewHotPeerCache(Read)
region := buildRegion(Read, 3, 10)
// prepare
for i := 1; i <= 200; i++ {
checkAndUpdate(c, cache, region)
}
// move peer
newStoreID := uint64(10)
_, region = schedule(c, addReplica, region, newStoreID)
checkAndUpdate(c, cache, region)
newStoreID, region = schedule(c, removeReplica, region)
rets := checkAndUpdate(c, cache, region)
for _, ret := range rets {
if ret.actionType != Remove {
flow := ret.GetLoads()[RegionReadBytes]
c.Assert(flow, Equals, float64(region.GetBytesRead()/ReadReportInterval))
}
}
// new flow
newFlow := region.GetBytesRead() * 10
region = region.Clone(core.SetReadBytes(newFlow))
for i := 1; i <= 200; i++ {
checkAndUpdate(c, cache, region)
}
// move peer
_, region = schedule(c, addReplica, region, newStoreID)
checkAndUpdate(c, cache, region)
_, region = schedule(c, removeReplica, region)
rets = checkAndUpdate(c, cache, region)
for _, ret := range rets {
if ret.actionType != Remove {
flow := ret.GetLoads()[RegionReadBytes]
c.Assert(flow, Equals, float64(newFlow/ReadReportInterval))
}
}
}

func BenchmarkCheckRegionFlow(b *testing.B) {
cache := NewHotPeerCache(Read)
region := core.NewRegionInfo(&metapb.Region{
Expand Down
5 changes: 1 addition & 4 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ type sourceKind int

const (
direct sourceKind = iota // there is a corresponding peer in this store.
inherit // there is no a corresponding peer in this store and there is a peer just deleted.
adopt // there is no corresponding peer in this store and there is no peer just deleted, we need to copy from other stores.
inherit // there is no corresponding peer in this store and we need to copy from other stores.
)

func (k sourceKind) String() string {
Expand All @@ -112,8 +111,6 @@ func (k sourceKind) String() string {
return "direct"
case inherit:
return "inherit"
case adopt:
return "adopt"
}
return "unknown"
}
Expand Down

0 comments on commit 6f8edf2

Please sign in to comment.