Skip to content

Commit

Permalink
pick for tikv#4446, tikv#4512
Browse files Browse the repository at this point in the history
Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 committed Jan 21, 2022
1 parent cdcb172 commit 00a0290
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 156 deletions.
5 changes: 5 additions & 0 deletions pkg/movingaverage/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,8 @@ func (aot *AvgOverTime) Clone() *AvgOverTime {
avgInterval: aot.avgInterval,
}
}

// GetIntervalSum returns the sum of interval
func (aot *AvgOverTime) GetIntervalSum() time.Duration {
return aot.intervalSum
}
14 changes: 14 additions & 0 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ type HotPeerStat struct {
// If the peer didn't been send by store heartbeat when it is already stored as hot peer stat,
// we will handle it as cold peer and mark the inCold flag
inCold bool
// source represents the statistics item source, such as direct, inherit.
source sourceKind
// If the item in storeA is just inherited from storeB,
// then other store, such as storeC, will be forbidden to inherit from storeA until the item in storeA is hot.
allowInherited bool
}

// ID returns region ID. Implementing TopNItem.
Expand All @@ -132,6 +137,8 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi
zap.Int("hot-anti-count", stat.AntiCount),
zap.Bool("just-transfer-leader", stat.justTransferLeader),
zap.Bool("is-leader", stat.isLeader),
zap.String("source", stat.source.String()),
zap.Bool("allow-inherited", stat.allowInherited),
zap.Bool("need-delete", stat.IsNeedDelete()),
zap.String("type", stat.Kind.String()),
zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime))
Expand Down Expand Up @@ -209,3 +216,10 @@ func (stat *HotPeerStat) hotStatReportInterval() int {
}
return WriteReportInterval
}

func (stat *HotPeerStat) getIntervalSum() time.Duration {
if len(stat.rollingLoads) == 0 || stat.rollingLoads[0] == nil {
return 0
}
return stat.rollingLoads[0].LastAverage.GetIntervalSum()
}
50 changes: 15 additions & 35 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type hotPeerCache struct {
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
topNTTL time.Duration
reportIntervalSecs int
}
Expand All @@ -67,7 +66,6 @@ func NewHotStoresStats(kind FlowKind) *hotPeerCache {
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
inheritItem: make(map[uint64]*HotPeerStat),
}
if kind == WriteFlow {
c.reportIntervalSecs = WriteReportInterval
Expand Down Expand Up @@ -98,7 +96,6 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat {
// Update updates the items in statistics.
func (f *hotPeerCache) Update(item *HotPeerStat) {
if item.IsNeedDelete() {
f.putInheritItem(item)
f.removeItem(item)
item.Log("region heartbeat delete from cache", log.Debug)
} else {
Expand Down Expand Up @@ -180,23 +177,16 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
thresholds: thresholds,
}

source := direct
if oldItem == nil {
inheritItem := f.takeInheritItem(region.GetID())
if inheritItem != nil {
oldItem = inheritItem
source = inherit
} else {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(region.GetID(), storeID)
if oldItem != nil {
source = adopt
break
}
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(region.GetID(), storeID)
if oldItem != nil && oldItem.allowInherited {
newItem.source = inherit
break
}
}
}
return f.updateHotPeerStat(newItem, oldItem, source, deltaLoads, time.Duration(interval)*time.Second)
return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second)
}

// CheckColdPeer checks the collect the un-heartbeat peer and maintain it.
Expand Down Expand Up @@ -233,7 +223,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st
for i, loads := range oldItem.thresholds {
deltaLoads[i] = loads * float64(interval)
}
stat := f.updateHotPeerStat(newItem, oldItem, direct, deltaLoads, time.Duration(interval)*time.Second)
stat := f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second)
if stat != nil {
ret = append(ret, stat)
}
Expand Down Expand Up @@ -370,18 +360,20 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian {
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second)
}

func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, source sourceKind, deltaLoads []float64, interval time.Duration) *HotPeerStat {
func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
regionStats := f.kind.RegionStats()
if oldItem == nil {
return f.updateNewHotPeerStat(newItem, deltaLoads, interval)
}

if source == adopt {
if newItem.source == inherit {
for _, dim := range oldItem.rollingLoads {
newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone())
}
newItem.allowInherited = false
} else {
newItem.rollingLoads = oldItem.rollingLoads
newItem.allowInherited = oldItem.allowInherited
}

if newItem.justTransferLeader {
Expand Down Expand Up @@ -458,22 +450,6 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f
return newItem
}

func (f *hotPeerCache) putInheritItem(item *HotPeerStat) {
f.inheritItem[item.RegionID] = item
}

func (f *hotPeerCache) takeInheritItem(regionID uint64) *HotPeerStat {
item, ok := f.inheritItem[regionID]
if !ok {
return nil
}
if item != nil {
delete(f.inheritItem, regionID)
return item
}
return nil
}

func (f *hotPeerCache) putItem(item *HotPeerStat) {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
Expand Down Expand Up @@ -512,17 +488,21 @@ func coldItem(newItem, oldItem *HotPeerStat) {
newItem.AntiCount = oldItem.AntiCount - 1
if newItem.AntiCount <= 0 {
newItem.needDelete = true
} else {
newItem.allowInherited = true
}
}

func hotItem(newItem, oldItem *HotPeerStat) {
newItem.HotDegree = oldItem.HotDegree + 1
newItem.AntiCount = hotRegionAntiCount
newItem.allowInherited = true
}

func initItemDegree(item *HotPeerStat) {
item.HotDegree = 1
item.AntiCount = hotRegionAntiCount
item.allowInherited = true
}

func inheritItemDegree(newItem, oldItem *HotPeerStat) {
Expand Down
Loading

0 comments on commit 00a0290

Please sign in to comment.