Skip to content

Commit

Permalink
statistics: fix the problem that the hot cache cannot be emptied when…
Browse files Browse the repository at this point in the history
… the interval is less than 60 (#4396) (#4433)

* This is an automated cherry-pick of #4396

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* ref #4390

Signed-off-by: lhy1024 <admin@liudos.us>

* fix ci

Signed-off-by: lhy1024 <admin@liudos.us>

* pick for #4446, #4512

Signed-off-by: lhy1024 <admin@liudos.us>

* fix test

Signed-off-by: lhy1024 <admin@liudos.us>

* update

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: lhy1024 <admin@liudos.us>
  • Loading branch information
ti-chi-bot and lhy1024 committed Jan 26, 2022
1 parent f4ed192 commit 6919bc7
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 99 deletions.
3 changes: 1 addition & 2 deletions pkg/errs/errs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package errs

import (
"bytes"
"fmt"
"strconv"
"strings"
"testing"
Expand All @@ -38,7 +37,7 @@ func newTestingWriter() *testingWriter {
func (w *testingWriter) Write(p []byte) (n int, err error) {
n = len(p)
p = bytes.TrimRight(p, "\n")
m := fmt.Sprintf("%s", p)
m := string(p)
w.messages = append(w.messages, m)
return n, nil
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/movingaverage/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,24 @@ func (aot *AvgOverTime) Set(avg float64) {
func (aot *AvgOverTime) IsFull() bool {
return aot.intervalSum >= aot.avgInterval
}

// Clone returns a copy of AvgOverTime
func (aot *AvgOverTime) Clone() *AvgOverTime {
que := aot.que.Clone()
margin := deltaWithInterval{
delta: aot.margin.delta,
interval: aot.margin.interval,
}
return &AvgOverTime{
que: que,
margin: margin,
deltaSum: aot.deltaSum,
intervalSum: aot.intervalSum,
avgInterval: aot.avgInterval,
}
}

// GetIntervalSum returns the sum of interval
func (aot *AvgOverTime) GetIntervalSum() time.Duration {
return aot.intervalSum
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/median_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,14 @@ func (r *MedianFilter) Set(n float64) {
r.records[0] = n
r.count = 1
}

// Clone returns a copy of MedianFilter
func (r *MedianFilter) Clone() *MedianFilter {
records := make([]float64, len(r.records))
copy(records, r.records)
return &MedianFilter{
records: records,
size: r.size,
count: r.count,
}
}
15 changes: 15 additions & 0 deletions pkg/movingaverage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,18 @@ func (sq *SafeQueue) PopFront() interface{} {
defer sq.mu.Unlock()
return sq.que.PopFront()
}

// Clone returns a copy of SafeQueue
func (sq *SafeQueue) Clone() *SafeQueue {
sq.mu.Lock()
defer sq.mu.Unlock()
q := queue.New().Init()
for i := 0; i < sq.que.Len(); i++ {
v := sq.que.PopFront()
sq.que.PushBack(v)
q.PushBack(v)
}
return &SafeQueue{
que: q,
}
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,14 @@ func (t *testMovingAvg) TestQueue(c *C) {
c.Assert(1, Equals, v1.(int))
c.Assert(2, Equals, v2.(int))
}

func (t *testMovingAvg) TestClone(c *C) {
s1 := NewSafeQueue()
s1.PushBack(1)
s1.PushBack(2)
s2 := s1.Clone()
s2.PopFront()
s2.PopFront()
c.Assert(s1.que.Len(), Equals, 2)
c.Assert(s2.que.Len(), Equals, 0)
}
11 changes: 11 additions & 0 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,14 @@ func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize
func (t *TimeMedian) GetInstantaneous() float64 {
return t.instantaneous
}

// Clone returns a copy of TimeMedian
func (t *TimeMedian) Clone() *TimeMedian {
return &TimeMedian{
aot: t.aot.Clone(),
mf: t.mf.Clone(),
aotSize: t.aotSize,
mfSize: t.mfSize,
instantaneous: t.instantaneous,
}
}
22 changes: 22 additions & 0 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ func (d *dimStat) Get() float64 {
return d.Rolling.Get()
}

func (d *dimStat) Clone() *dimStat {
return &dimStat{
typ: d.typ,
Rolling: d.Rolling.Clone(),
LastAverage: d.LastAverage.Clone(),
}
}

// HotPeerStat records each hot peer's statistics
type HotPeerStat struct {
StoreID uint64 `json:"store_id"`
Expand Down Expand Up @@ -99,6 +107,11 @@ type HotPeerStat struct {
// If the peer didn't been send by store heartbeat when it is already stored as hot peer stat,
// we will handle it as cold peer and mark the inCold flag
inCold bool
// source represents the statistics item source, such as direct, inherit.
source sourceKind
// If the item in storeA is just inherited from storeB,
// then other store, such as storeC, will be forbidden to inherit from storeA until the item in storeA is hot.
allowInherited bool
}

// ID returns region ID. Implementing TopNItem.
Expand All @@ -124,6 +137,8 @@ func (stat *HotPeerStat) Log(str string, level func(msg string, fields ...zap.Fi
zap.Int("hot-anti-count", stat.AntiCount),
zap.Bool("just-transfer-leader", stat.justTransferLeader),
zap.Bool("is-leader", stat.isLeader),
zap.String("source", stat.source.String()),
zap.Bool("allow-inherited", stat.allowInherited),
zap.Bool("need-delete", stat.IsNeedDelete()),
zap.String("type", stat.Kind.String()),
zap.Time("last-transfer-leader-time", stat.lastTransferLeaderTime))
Expand Down Expand Up @@ -201,3 +216,10 @@ func (stat *HotPeerStat) hotStatReportInterval() int {
}
return WriteReportInterval
}

func (stat *HotPeerStat) getIntervalSum() time.Duration {
if len(stat.rollingLoads) == 0 || stat.rollingLoads[0] == nil {
return 0
}
return stat.rollingLoads[0].LastAverage.GetIntervalSum()
}
49 changes: 20 additions & 29 deletions server/statistics/hot_peer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ type hotPeerCache struct {
peersOfStore map[uint64]*TopN // storeID -> hot peers
storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs
regionsOfStore map[uint64]map[uint64]struct{} // storeID -> regionIDs
inheritItem map[uint64]*HotPeerStat // regionID -> HotPeerStat
topNTTL time.Duration
reportIntervalSecs int
}
Expand All @@ -67,7 +66,6 @@ func NewHotStoresStats(kind FlowKind) *hotPeerCache {
peersOfStore: make(map[uint64]*TopN),
storesOfRegion: make(map[uint64]map[uint64]struct{}),
regionsOfStore: make(map[uint64]map[uint64]struct{}),
inheritItem: make(map[uint64]*HotPeerStat),
}
if kind == WriteFlow {
c.reportIntervalSecs = WriteReportInterval
Expand Down Expand Up @@ -98,7 +96,6 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat {
// Update updates the items in statistics.
func (f *hotPeerCache) Update(item *HotPeerStat) {
if item.IsNeedDelete() {
f.putInheritItem(item)
f.removeItem(item)
item.Log("region heartbeat delete from cache", log.Debug)
} else {
Expand Down Expand Up @@ -178,17 +175,15 @@ func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInf
interval: interval,
peers: peers,
thresholds: thresholds,
source: direct,
}

if oldItem == nil {
inheritItem := f.takeInheritItem(region.GetID())
if inheritItem != nil {
oldItem = inheritItem
} else {
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(region.GetID(), storeID)
if oldItem != nil {
break
}
for _, storeID := range f.getAllStoreIDs(region) {
oldItem = f.getOldHotPeerStat(region.GetID(), storeID)
if oldItem != nil && oldItem.allowInherited {
newItem.source = inherit
break
}
}
}
Expand Down Expand Up @@ -372,7 +367,15 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, deltaLoa
return f.updateNewHotPeerStat(newItem, deltaLoads, interval)
}

newItem.rollingLoads = oldItem.rollingLoads
if newItem.source == inherit {
for _, dim := range oldItem.rollingLoads {
newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone())
}
newItem.allowInherited = false
} else {
newItem.rollingLoads = oldItem.rollingLoads
newItem.allowInherited = oldItem.allowInherited
}

if newItem.justTransferLeader {
newItem.lastTransferLeaderTime = time.Now()
Expand Down Expand Up @@ -448,22 +451,6 @@ func (f *hotPeerCache) updateNewHotPeerStat(newItem *HotPeerStat, deltaLoads []f
return newItem
}

func (f *hotPeerCache) putInheritItem(item *HotPeerStat) {
f.inheritItem[item.RegionID] = item
}

func (f *hotPeerCache) takeInheritItem(regionID uint64) *HotPeerStat {
item, ok := f.inheritItem[regionID]
if !ok {
return nil
}
if item != nil {
delete(f.inheritItem, regionID)
return item
}
return nil
}

func (f *hotPeerCache) putItem(item *HotPeerStat) {
peers, ok := f.peersOfStore[item.StoreID]
if !ok {
Expand Down Expand Up @@ -502,17 +489,21 @@ func coldItem(newItem, oldItem *HotPeerStat) {
newItem.AntiCount = oldItem.AntiCount - 1
if newItem.AntiCount <= 0 {
newItem.needDelete = true
} else {
newItem.allowInherited = true
}
}

func hotItem(newItem, oldItem *HotPeerStat) {
newItem.HotDegree = oldItem.HotDegree + 1
newItem.AntiCount = hotRegionAntiCount
newItem.allowInherited = true
}

func initItemDegree(item *HotPeerStat) {
item.HotDegree = 1
item.AntiCount = hotRegionAntiCount
item.allowInherited = true
}

func inheritItemDegree(newItem, oldItem *HotPeerStat) {
Expand Down
Loading

0 comments on commit 6919bc7

Please sign in to comment.