Skip to content

Commit

Permalink
statistics: fix the problem that the hot cache cannot be emptied when…
Browse files Browse the repository at this point in the history
… the interval is less than 60 (#4396)

* fix cache in 5.2/5.3 ref #4390

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* fix ci

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* fix ci

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>
  • Loading branch information
lhy1024 authored Dec 7, 2021
1 parent 90a9689 commit 2246ef6
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 14 deletions.
16 changes: 16 additions & 0 deletions pkg/movingaverage/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,19 @@ func (aot *AvgOverTime) Set(avg float64) {
func (aot *AvgOverTime) IsFull() bool {
return aot.intervalSum >= aot.avgInterval
}

// Clone returns a copy of AvgOverTime
func (aot *AvgOverTime) Clone() *AvgOverTime {
que := aot.que.Clone()
margin := deltaWithInterval{
delta: aot.margin.delta,
interval: aot.margin.interval,
}
return &AvgOverTime{
que: que,
margin: margin,
deltaSum: aot.deltaSum,
intervalSum: aot.intervalSum,
avgInterval: aot.avgInterval,
}
}
12 changes: 12 additions & 0 deletions pkg/movingaverage/median_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,15 @@ func (r *MedianFilter) Set(n float64) {
func (r *MedianFilter) GetInstantaneous() float64 {
return r.instantaneous
}

// Clone returns a copy of MedianFilter
func (r *MedianFilter) Clone() *MedianFilter {
records := make([]float64, len(r.records))
copy(records, r.records)
return &MedianFilter{
records: records,
size: r.size,
count: r.count,
instantaneous: r.instantaneous,
}
}
15 changes: 15 additions & 0 deletions pkg/movingaverage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,18 @@ func (sq *SafeQueue) PopFront() interface{} {
defer sq.mu.Unlock()
return sq.que.PopFront()
}

// Clone returns a copy of SafeQueue
func (sq *SafeQueue) Clone() *SafeQueue {
sq.mu.Lock()
defer sq.mu.Unlock()
q := queue.New().Init()
for i := 0; i < sq.que.Len(); i++ {
v := sq.que.PopFront()
sq.que.PushBack(v)
q.PushBack(v)
}
return &SafeQueue{
que: q,
}
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,14 @@ func (t *testMovingAvg) TestQueue(c *C) {
c.Assert(1, Equals, v1.(int))
c.Assert(2, Equals, v2.(int))
}

func (t *testMovingAvg) TestClone(c *C) {
s1 := NewSafeQueue()
s1.PushBack(1)
s1.PushBack(2)
s2 := s1.Clone()
s2.PopFront()
s2.PopFront()
c.Assert(s1.que.Len(), Equals, 2)
c.Assert(s2.que.Len(), Equals, 0)
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,14 @@ func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize
func (t *TimeMedian) GetInstantaneous() float64 {
return t.instantaneous
}

// Clone returns a copy of TimeMedian
func (t *TimeMedian) Clone() *TimeMedian {
return &TimeMedian{
aot: t.aot.Clone(),
mf: t.mf.Clone(),
aotSize: t.aotSize,
mfSize: t.mfSize,
instantaneous: t.instantaneous,
}
}
8 changes: 8 additions & 0 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ func (d *dimStat) Get() float64 {
return d.Rolling.Get()
}

func (d *dimStat) Clone() *dimStat {
return &dimStat{
typ: d.typ,
Rolling: d.Rolling.Clone(),
LastAverage: d.LastAverage.Clone(),
}
}

// HotPeerStat records each hot peer's statistics
type HotPeerStat struct {
StoreID uint64 `json:"store_id"`
Expand Down
18 changes: 14 additions & 4 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,20 +186,24 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
peers: peers,
thresholds: thresholds,
}

source := direct
if oldItem == nil {
inheritItem := f.takeInheritItem(region.GetID())
if inheritItem != nil {
oldItem = inheritItem
source = inherit
} else {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(region.GetID(), storeID)
if oldItem != nil {
source = adopt
break
}
}
}
}
return f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second)
return f.updateHotPeerStat(newItem, oldItem, source, deltaLoads, time.Duration(interval)*time.Second)
}

// CheckColdPeer checks the collect the un-heartbeat peer and maintain it.
Expand Down Expand Up @@ -236,7 +240,7 @@ func (f *hotPeerCache) CheckColdPeer(storeID uint64, reportRegions map[uint64]st
for i, loads := range oldItem.thresholds {
deltaLoads[i] = loads * float64(interval)
}
stat := f.updateHotPeerStat(newItem, oldItem, deltaLoads, time.Duration(interval)*time.Second)
stat := f.updateHotPeerStat(newItem, oldItem, direct, deltaLoads, time.Duration(interval)*time.Second)
if stat != nil {
ret = append(ret, stat)
}
Expand Down Expand Up @@ -373,13 +377,19 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian {
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second)
}

func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoads []float64, interval time.Duration) *HotPeerStat {
func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, source sourceKind, deltaLoads []float64, interval time.Duration) *HotPeerStat {
regionStats := f.kind.RegionStats()
if oldItem == nil {
return f.updateNewHotPeerStat(newItem, deltaLoads, interval)
}

newItem.rollingLoads = oldItem.rollingLoads
if source == adopt {
for _, dim := range oldItem.rollingLoads {
newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone())
}
} else {
newItem.rollingLoads = oldItem.rollingLoads
}

if newItem.justTransferLeader {
newItem.lastTransferLeaderTime = time.Now()
Expand Down
77 changes: 67 additions & 10 deletions server/statistics/hot_peer_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, expect i
return res
}

func checkAndUpdateSync(cache *hotPeerCache, region *core.RegionInfo) (res []*HotPeerStat) {
reportInterval := region.GetInterval()
interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp()
res = append(res, cache.CollectExpiredItems(region)...)
for _, peer := range region.GetPeers() {
peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval)
item := cache.CheckPeerFlow(peerInfo, region)
if item != nil {
res = append(res, item)
cache.Update(item)
}
}
return res
}

func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, isHit bool) {
var peers []*metapb.Peer
if kind == Read {
Expand Down Expand Up @@ -233,50 +248,50 @@ func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) {

// skip interval=0
newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read}
newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 0)
newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 0)
c.Check(newItem, IsNil)

// new peer, interval is larger than report interval, but no hot
newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read}
newItem = cache.updateHotPeerStat(newItem, nil, []float64{0.0, 0.0, 0.0}, 10*time.Second)
newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 10*time.Second)
c.Check(newItem, IsNil)

// new peer, interval is less than report interval
newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read}
newItem = cache.updateHotPeerStat(newItem, nil, []float64{60.0, 60.0, 60.0}, 4*time.Second)
newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second)
c.Check(newItem, NotNil)
c.Check(newItem.HotDegree, Equals, 0)
c.Check(newItem.AntiCount, Equals, 0)
// sum of interval is less than report interval
oldItem := newItem
newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second)
newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second)
c.Check(newItem.HotDegree, Equals, 0)
c.Check(newItem.AntiCount, Equals, 0)
// sum of interval is larger than report interval, and hot
oldItem = newItem
newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second)
newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second)
c.Check(newItem.HotDegree, Equals, 1)
c.Check(newItem.AntiCount, Equals, 2*m)
// sum of interval is less than report interval
oldItem = newItem
newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 4*time.Second)
newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second)
c.Check(newItem.HotDegree, Equals, 1)
c.Check(newItem.AntiCount, Equals, 2*m)
// sum of interval is larger than report interval, and hot
oldItem = newItem
newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second)
newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second)
c.Check(newItem.HotDegree, Equals, 2)
c.Check(newItem.AntiCount, Equals, 2*m)
// sum of interval is larger than report interval, and cold
oldItem = newItem
newItem.thresholds = []float64{10.0, 10.0, 10.0}
newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second)
newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second)
c.Check(newItem.HotDegree, Equals, 1)
c.Check(newItem.AntiCount, Equals, 2*m-1)
// sum of interval is larger than report interval, and cold
for i := 0; i < 2*m-1; i++ {
oldItem = newItem
newItem = cache.updateHotPeerStat(newItem, oldItem, []float64{60.0, 60.0, 60.0}, 10*time.Second)
newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second)
}
c.Check(newItem.HotDegree, Less, 0)
c.Check(newItem.AntiCount, Equals, 0)
Expand Down Expand Up @@ -315,7 +330,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold
if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true {
break
}
item := cache.updateHotPeerStat(newItem, oldItem, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second)
item := cache.updateHotPeerStat(newItem, oldItem, direct, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second)
cache.Update(item)
}
thresholds := cache.calcHotThresholds(storeID)
Expand All @@ -327,6 +342,48 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold
}
}

func (t *testHotPeerCache) TestRemoveFromCache(c *C) {
peerCounts := []int{3, 5}
intervals := []uint64{120, 60, 10}
for _, peerCount := range peerCounts {
for _, interval := range intervals {
cache := NewHotPeerCache(Write)
peers := newPeers(peerCount,
func(i int) uint64 { return uint64(10000 + i) },
func(i int) uint64 { return uint64(i) })
meta := &metapb.Region{
Id: 1000,
Peers: peers,
StartKey: []byte(""),
EndKey: []byte(""),
RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6},
}
region := core.NewRegionInfo(
meta,
peers[0],
core.SetReportInterval(interval),
core.SetWrittenBytes(10*1024*1024*interval),
core.SetWrittenKeys(10*1024*1024*interval),
core.SetWrittenQuery(1024*interval),
)
for i := 1; i <= 200; i++ {
checkAndUpdate(c, cache, region, peerCount)
}
c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount)
var isClear bool
region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0))
for i := 1; i <= 200; i++ {
checkAndUpdateSync(cache, region)
if len(cache.storesOfRegion[region.GetID()]) == 0 {
isClear = true
break
}
}
c.Assert(isClear, IsTrue)
}
}
}

func BenchmarkCheckRegionFlow(b *testing.B) {
cache := NewHotPeerCache(Read)
region := core.NewRegionInfo(&metapb.Region{
Expand Down
21 changes: 21 additions & 0 deletions server/statistics/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,27 @@ func (k StoreStatKind) String() string {
return "unknown StoreStatKind"
}

// sourceKind represents the statistics item source.
type sourceKind int

const (
direct sourceKind = iota // there is a corresponding peer in this store.
inherit // there is no a corresponding peer in this store and there is a peer just deleted.
adopt // there is no corresponding peer in this store and there is no peer just deleted, we need to copy from other stores.
)

func (k sourceKind) String() string {
switch k {
case direct:
return "direct"
case inherit:
return "inherit"
case adopt:
return "adopt"
}
return "unknown"
}

// RWType is a identify hot region types.
type RWType uint32

Expand Down

0 comments on commit 2246ef6

Please sign in to comment.