From e46a4e5419bdf132e1f33c39ec197b630087ff98 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 7 Dec 2021 14:25:56 +0800 Subject: [PATCH] This is an automated cherry-pick of #4396 Signed-off-by: ti-chi-bot --- pkg/movingaverage/median_filter.go | 85 ++++++++++ pkg/movingaverage/time_median.go | 79 +++++++++ server/statistics/avg_over_time.go | 16 ++ server/statistics/hot_peer.go | 50 ++++++ server/statistics/hot_peer_cache.go | 125 ++++++++++++++ server/statistics/hot_peer_cache_test.go | 201 +++++++++++++++++++++++ server/statistics/kind.go | 149 +++++++++++++++++ server/statistics/queue.go | 15 ++ server/statistics/queue_test.go | 11 ++ 9 files changed, 731 insertions(+) create mode 100644 pkg/movingaverage/median_filter.go create mode 100644 pkg/movingaverage/time_median.go create mode 100644 server/statistics/kind.go diff --git a/pkg/movingaverage/median_filter.go b/pkg/movingaverage/median_filter.go new file mode 100644 index 000000000000..921470c1b730 --- /dev/null +++ b/pkg/movingaverage/median_filter.go @@ -0,0 +1,85 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package movingaverage + +import "github.com/montanaflynn/stats" + +// MedianFilter works as a median filter with specified window size. +// There are at most `size` data points for calculating. +// References: https://en.wikipedia.org/wiki/Median_filter. +type MedianFilter struct { + records []float64 + size uint64 + count uint64 + instantaneous float64 +} + +// NewMedianFilter returns a MedianFilter. +func NewMedianFilter(size int) *MedianFilter { + return &MedianFilter{ + records: make([]float64, size), + size: uint64(size), + } +} + +// Add adds a data point. +func (r *MedianFilter) Add(n float64) { + r.instantaneous = n + r.records[r.count%r.size] = n + r.count++ +} + +// Get returns the median of the data set. +func (r *MedianFilter) Get() float64 { + if r.count == 0 { + return 0 + } + records := r.records + if r.count < r.size { + records = r.records[:r.count] + } + median, _ := stats.Median(records) + return median +} + +// Reset cleans the data set. +func (r *MedianFilter) Reset() { + r.instantaneous = 0 + r.count = 0 +} + +// Set = Reset + Add. +func (r *MedianFilter) Set(n float64) { + r.instantaneous = n + r.records[0] = n + r.count = 1 +} + +// GetInstantaneous returns the value just added. +func (r *MedianFilter) GetInstantaneous() float64 { + return r.instantaneous +} + +// Clone returns a copy of MedianFilter +func (r *MedianFilter) Clone() *MedianFilter { + records := make([]float64, len(r.records)) + copy(records, r.records) + return &MedianFilter{ + records: records, + size: r.size, + count: r.count, + instantaneous: r.instantaneous, + } +} diff --git a/pkg/movingaverage/time_median.go b/pkg/movingaverage/time_median.go new file mode 100644 index 000000000000..1c9f9595f352 --- /dev/null +++ b/pkg/movingaverage/time_median.go @@ -0,0 +1,79 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package movingaverage + +import "time" + +// TimeMedian is AvgOverTime + MedianFilter +// Size of MedianFilter should be larger than double size of AvgOverTime to denoisy. +// Delay is aotSize * mfSize * reportInterval/4 +// and the min filled period is aotSize * reportInterval, which is not related with mfSize +type TimeMedian struct { + aot *AvgOverTime + mf *MedianFilter + aotSize int + mfSize int + instantaneous float64 +} + +// NewTimeMedian returns a TimeMedian with given size. +func NewTimeMedian(aotSize, mfSize int, reportInterval time.Duration) *TimeMedian { + return &TimeMedian{ + aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval), + mf: NewMedianFilter(mfSize), + aotSize: aotSize, + mfSize: mfSize, + } +} + +// Get returns change rate in the median of the several intervals. +func (t *TimeMedian) Get() float64 { + return t.mf.Get() +} + +// Add adds recent change to TimeMedian. +func (t *TimeMedian) Add(delta float64, interval time.Duration) { + t.instantaneous = delta / interval.Seconds() + t.aot.Add(delta, interval) + if t.aot.IsFull() { + t.mf.Add(t.aot.Get()) + } +} + +// Set sets the given average. +func (t *TimeMedian) Set(avg float64) { + t.mf.Set(avg) +} + +// GetFilledPeriod returns filled period. +func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize + return t.aotSize +} + +// GetInstantaneous returns instantaneous speed +func (t *TimeMedian) GetInstantaneous() float64 { + return t.instantaneous +} + +// Clone returns a copy of TimeMedian +func (t *TimeMedian) Clone() *TimeMedian { + return &TimeMedian{ + aot: t.aot.Clone(), + mf: t.mf.Clone(), + aotSize: t.aotSize, + mfSize: t.mfSize, + instantaneous: t.instantaneous, + } +} diff --git a/server/statistics/avg_over_time.go b/server/statistics/avg_over_time.go index 894c2502daab..73786e31f684 100644 --- a/server/statistics/avg_over_time.go +++ b/server/statistics/avg_over_time.go @@ -116,3 +116,19 @@ func (t *TimeMedian) Add(delta float64, interval time.Duration) { func (t *TimeMedian) Set(avg float64) { t.mf.Set(avg) } + +// Clone returns a copy of AvgOverTime +func (aot *AvgOverTime) Clone() *AvgOverTime { + que := aot.que.Clone() + margin := deltaWithInterval{ + delta: aot.margin.delta, + interval: aot.margin.interval, + } + return &AvgOverTime{ + que: que, + margin: margin, + deltaSum: aot.deltaSum, + intervalSum: aot.intervalSum, + avgInterval: aot.avgInterval, + } +} diff --git a/server/statistics/hot_peer.go b/server/statistics/hot_peer.go index 241a2110354b..4be09f912631 100644 --- a/server/statistics/hot_peer.go +++ b/server/statistics/hot_peer.go @@ -21,6 +21,56 @@ const ( dimLen ) +<<<<<<< HEAD +======= +type dimStat struct { + typ RegionStatKind + Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed. + LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed. +} + +func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat { + return &dimStat{ + typ: typ, + Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval), + LastAverage: movingaverage.NewAvgOverTime(reportInterval), + } +} + +func (d *dimStat) Add(delta float64, interval time.Duration) { + d.LastAverage.Add(delta, interval) + d.Rolling.Add(delta, interval) +} + +func (d *dimStat) isLastAverageHot(threshold float64) bool { + return d.LastAverage.Get() >= threshold +} + +func (d *dimStat) isHot(threshold float64) bool { + return d.Rolling.Get() >= threshold +} + +func (d *dimStat) isFull() bool { + return d.LastAverage.IsFull() +} + +func (d *dimStat) clearLastAverage() { + d.LastAverage.Clear() +} + +func (d *dimStat) Get() float64 { + return d.Rolling.Get() +} + +func (d *dimStat) Clone() *dimStat { + return &dimStat{ + typ: d.typ, + Rolling: d.Rolling.Clone(), + LastAverage: d.LastAverage.Clone(), + } +} + +>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396)) // HotPeerStat records each hot peer's statistics type HotPeerStat struct { StoreID uint64 `json:"store_id"` diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index 38f0485048fc..8c770de4f383 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -126,6 +126,7 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto tmpItem = oldItem } +<<<<<<< HEAD // This is used for the simulator. Ignore if report too fast. if !isExpired && Denoising && interval < hotRegionReportMinInterval { continue @@ -152,6 +153,57 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto if oldItem != nil { break } +======= +// CheckPeerFlow checks the flow information of a peer. +// Notice: CheckPeerFlow couldn't be used concurrently. +// CheckPeerFlow will update oldItem's rollingLoads into newItem, thus we should use write lock here. +func (f *hotPeerCache) CheckPeerFlow(peer *core.PeerInfo, region *core.RegionInfo) *HotPeerStat { + interval := peer.GetInterval() + if Denoising && interval < HotRegionReportMinInterval { + return nil + } + storeID := peer.GetStoreID() + deltaLoads := peer.GetLoads() + f.collectPeerMetrics(deltaLoads, interval) + loads := make([]float64, len(deltaLoads)) + for i := range deltaLoads { + loads[i] = deltaLoads[i] / float64(interval) + } + justTransferLeader := f.justTransferLeader(region) + oldItem := f.getOldHotPeerStat(region.GetID(), storeID) + thresholds := f.calcHotThresholds(storeID) + regionPeers := region.GetPeers() + peers := make([]uint64, 0, len(regionPeers)) + for _, peer := range regionPeers { + peers = append(peers, peer.StoreId) + } + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: region.GetID(), + Kind: f.kind, + Loads: loads, + LastUpdateTime: time.Now(), + needDelete: false, + isLeader: region.GetLeader().GetStoreId() == storeID, + justTransferLeader: justTransferLeader, + interval: interval, + peers: peers, + thresholds: thresholds, + } + + source := direct + if oldItem == nil { + inheritItem := f.takeInheritItem(region.GetID()) + if inheritItem != nil { + oldItem = inheritItem + source = inherit + } else { + for _, storeID := range f.getAllStoreIDs(region) { + oldItem = f.getOldHotPeerStat(region.GetID(), storeID) + if oldItem != nil { + source = adopt + break +>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396)) } } } @@ -161,8 +213,12 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo, storesStats *Sto ret = append(ret, newItem) } } +<<<<<<< HEAD return ret +======= + return f.updateHotPeerStat(newItem, oldItem, source, deltaLoads, time.Duration(interval)*time.Second) +>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396)) } func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool { @@ -172,7 +228,42 @@ func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool case ReadFlow: return f.isRegionHotWithPeer(region, region.GetLeader(), hotDegree) } +<<<<<<< HEAD return false +======= + for regionID := range previousHotStat { + if _, ok := reportRegions[regionID]; !ok { + oldItem := f.getOldHotPeerStat(regionID, storeID) + if oldItem == nil { + continue + } + newItem := &HotPeerStat{ + StoreID: storeID, + RegionID: regionID, + Kind: f.kind, + // use oldItem.thresholds to make the newItem won't affect the threshold + Loads: oldItem.thresholds, + LastUpdateTime: time.Now(), + needDelete: false, + isLeader: oldItem.isLeader, + justTransferLeader: oldItem.justTransferLeader, + interval: interval, + peers: oldItem.peers, + thresholds: oldItem.thresholds, + inCold: true, + } + deltaLoads := make([]float64, RegionStatCount) + for i, loads := range oldItem.thresholds { + deltaLoads[i] = loads * float64(interval) + } + stat := f.updateHotPeerStat(newItem, oldItem, direct, deltaLoads, time.Duration(interval)*time.Second) + if stat != nil { + ret = append(ret, stat) + } + } + } + return +>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396)) } func (f *hotPeerCache) CollectMetrics(stats *StoresStats, typ string) { @@ -292,10 +383,44 @@ func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb return false } +<<<<<<< HEAD func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, storesStats *StoresStats) *HotPeerStat { thresholds := f.calcHotThresholds(storesStats, newItem.StoreID) isHot := newItem.ByteRate >= thresholds[byteDim] || newItem.KeyRate >= thresholds[keyDim] +======= +func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { + return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(f.reportIntervalSecs)*time.Second) +} + +func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, source sourceKind, deltaLoads []float64, interval time.Duration) *HotPeerStat { + regionStats := f.kind.RegionStats() + if oldItem == nil { + return f.updateNewHotPeerStat(newItem, deltaLoads, interval) + } + + if source == adopt { + for _, dim := range oldItem.rollingLoads { + newItem.rollingLoads = append(newItem.rollingLoads, dim.Clone()) + } + } else { + newItem.rollingLoads = oldItem.rollingLoads + } + + if newItem.justTransferLeader { + newItem.lastTransferLeaderTime = time.Now() + // skip the first heartbeat flow statistic after transfer leader, because its statistics are calculated by the last leader in this store and are inaccurate + // maintain anticount and hotdegree to avoid store threshold and hot peer are unstable. + // For write stat, as the stat is send by region heartbeat, the first heartbeat will be skipped. + // For read stat, as the stat is send by store heartbeat, the first heartbeat won't be skipped. + if newItem.Kind == Write { + inheritItemDegree(newItem, oldItem) + return newItem + } + } else { + newItem.lastTransferLeaderTime = oldItem.lastTransferLeaderTime + } +>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396)) if newItem.needDelete { return newItem diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index 64e066302ce3..2dfe00eecffc 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -112,7 +112,26 @@ func checkAndUpdate(c *C, cache *hotPeerCache, region *core.RegionInfo, stats *S return res } +<<<<<<< HEAD func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, isHit bool) { +======= +func checkAndUpdateSync(cache *hotPeerCache, region *core.RegionInfo) (res []*HotPeerStat) { + reportInterval := region.GetInterval() + interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() + res = append(res, cache.CollectExpiredItems(region)...) + for _, peer := range region.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetLoads(), interval) + item := cache.CheckPeerFlow(peerInfo, region) + if item != nil { + res = append(res, item) + cache.Update(item) + } + } + return res +} + +func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind RWType, isHit bool) { +>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396)) var peers []*metapb.Peer if kind == ReadFlow { peers = []*metapb.Peer{region.GetLeader()} @@ -213,3 +232,185 @@ func newPeers(n int, pid genID, sid genID) []*metapb.Peer { } return peers } +<<<<<<< HEAD +======= + +func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { + cache := NewHotPeerCache(Read) + // we statistic read peer info from store heartbeat rather than region heartbeat + m := RegionHeartBeatReportInterval / StoreHeartBeatReportInterval + + // skip interval=0 + newItem := &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 0) + c.Check(newItem, IsNil) + + // new peer, interval is larger than report interval, but no hot + newItem = &HotPeerStat{needDelete: false, thresholds: []float64{1.0, 1.0, 1.0}, Kind: Read} + newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{0.0, 0.0, 0.0}, 10*time.Second) + c.Check(newItem, IsNil) + + // new peer, interval is less than report interval + newItem = &HotPeerStat{needDelete: false, thresholds: []float64{0.0, 0.0, 0.0}, Kind: Read} + newItem = cache.updateHotPeerStat(newItem, nil, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + c.Check(newItem, NotNil) + c.Check(newItem.HotDegree, Equals, 0) + c.Check(newItem.AntiCount, Equals, 0) + // sum of interval is less than report interval + oldItem := newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + c.Check(newItem.HotDegree, Equals, 0) + c.Check(newItem.AntiCount, Equals, 0) + // sum of interval is larger than report interval, and hot + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + c.Check(newItem.HotDegree, Equals, 1) + c.Check(newItem.AntiCount, Equals, 2*m) + // sum of interval is less than report interval + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 4*time.Second) + c.Check(newItem.HotDegree, Equals, 1) + c.Check(newItem.AntiCount, Equals, 2*m) + // sum of interval is larger than report interval, and hot + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + c.Check(newItem.HotDegree, Equals, 2) + c.Check(newItem.AntiCount, Equals, 2*m) + // sum of interval is larger than report interval, and cold + oldItem = newItem + newItem.thresholds = []float64{10.0, 10.0, 10.0} + newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + c.Check(newItem.HotDegree, Equals, 1) + c.Check(newItem.AntiCount, Equals, 2*m-1) + // sum of interval is larger than report interval, and cold + for i := 0; i < 2*m-1; i++ { + oldItem = newItem + newItem = cache.updateHotPeerStat(newItem, oldItem, direct, []float64{60.0, 60.0, 60.0}, 10*time.Second) + } + c.Check(newItem.HotDegree, Less, 0) + c.Check(newItem.AntiCount, Equals, 0) + c.Check(newItem.needDelete, IsTrue) +} + +func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { + byteRate := minHotThresholds[RegionReadBytes] * 2 + expectThreshold := byteRate * HotThresholdRatio + t.testMetrics(c, 120., byteRate, expectThreshold) + t.testMetrics(c, 60., byteRate, expectThreshold) + t.testMetrics(c, 30., byteRate, expectThreshold) + t.testMetrics(c, 17., byteRate, expectThreshold) + t.testMetrics(c, 1., byteRate, expectThreshold) +} + +func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold float64) { + cache := NewHotPeerCache(Read) + storeID := uint64(1) + c.Assert(byteRate, GreaterEqual, minHotThresholds[RegionReadBytes]) + for i := uint64(1); i < TopNN+10; i++ { + var oldItem *HotPeerStat + for { + thresholds := cache.calcHotThresholds(storeID) + newItem := &HotPeerStat{ + Kind: cache.kind, + StoreID: storeID, + RegionID: i, + needDelete: false, + thresholds: thresholds, + Loads: make([]float64, DimLen), + } + newItem.Loads[RegionReadBytes] = byteRate + newItem.Loads[RegionReadKeys] = 0 + oldItem = cache.getOldHotPeerStat(i, storeID) + if oldItem != nil && oldItem.rollingLoads[RegionReadBytes].isHot(thresholds[RegionReadBytes]) == true { + break + } + item := cache.updateHotPeerStat(newItem, oldItem, direct, []float64{byteRate * interval, 0.0, 0.0}, time.Duration(interval)*time.Second) + cache.Update(item) + } + thresholds := cache.calcHotThresholds(storeID) + if i < TopNN { + c.Assert(thresholds[RegionReadBytes], Equals, minHotThresholds[RegionReadBytes]) + } else { + c.Assert(thresholds[RegionReadBytes], Equals, expectThreshold) + } + } +} + +func (t *testHotPeerCache) TestRemoveFromCache(c *C) { + peerCounts := []int{3, 5} + intervals := []uint64{120, 60, 10} + for _, peerCount := range peerCounts { + for _, interval := range intervals { + cache := NewHotPeerCache(Write) + peers := newPeers(peerCount, + func(i int) uint64 { return uint64(10000 + i) }, + func(i int) uint64 { return uint64(i) }) + meta := &metapb.Region{ + Id: 1000, + Peers: peers, + StartKey: []byte(""), + EndKey: []byte(""), + RegionEpoch: &metapb.RegionEpoch{ConfVer: 6, Version: 6}, + } + region := core.NewRegionInfo( + meta, + peers[0], + core.SetReportInterval(interval), + core.SetWrittenBytes(10*1024*1024*interval), + core.SetWrittenKeys(10*1024*1024*interval), + core.SetWrittenQuery(1024*interval), + ) + for i := 1; i <= 200; i++ { + checkAndUpdate(c, cache, region, peerCount) + } + c.Assert(cache.storesOfRegion[region.GetID()], HasLen, peerCount) + var isClear bool + region = region.Clone(core.SetWrittenBytes(0), core.SetWrittenKeys(0), core.SetWrittenQuery(0)) + for i := 1; i <= 200; i++ { + checkAndUpdateSync(cache, region) + if len(cache.storesOfRegion[region.GetID()]) == 0 { + isClear = true + break + } + } + c.Assert(isClear, IsTrue) + } + } +} + +func BenchmarkCheckRegionFlow(b *testing.B) { + cache := NewHotPeerCache(Read) + region := core.NewRegionInfo(&metapb.Region{ + Id: 1, + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + }, + }, + &metapb.Peer{Id: 101, StoreId: 1}, + ) + newRegion := region.Clone( + core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), + core.SetReadBytes(30000*10), + core.SetReadKeys(300000*10)) + peerInfos := make([]*core.PeerInfo, 0) + for _, peer := range newRegion.GetPeers() { + peerInfo := core.NewPeerInfo(peer, region.GetLoads(), 10) + peerInfos = append(peerInfos, peerInfo) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + items := make([]*HotPeerStat, 0) + for _, peerInfo := range peerInfos { + item := cache.CheckPeerFlow(peerInfo, region) + if item != nil { + items = append(items, item) + } + } + for _, ret := range items { + cache.Update(ret) + } + } +} +>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396)) diff --git a/server/statistics/kind.go b/server/statistics/kind.go new file mode 100644 index 000000000000..e663d1ce56c6 --- /dev/null +++ b/server/statistics/kind.go @@ -0,0 +1,149 @@ +// Copyright 2020 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +// RegionStatKind represents the statistics type of region. +type RegionStatKind int + +// Different region statistics kinds. +const ( + RegionReadBytes RegionStatKind = iota + RegionReadKeys + RegionReadQuery + RegionWriteBytes + RegionWriteKeys + RegionWriteQuery + + RegionStatCount +) + +func (k RegionStatKind) String() string { + switch k { + case RegionReadBytes: + return "read_bytes" + case RegionReadKeys: + return "read_keys" + case RegionWriteBytes: + return "write_bytes" + case RegionWriteKeys: + return "write_keys" + case RegionReadQuery: + return "read_query" + case RegionWriteQuery: + return "write_query" + } + return "unknown RegionStatKind" +} + +// StoreStatKind represents the statistics type of store. +type StoreStatKind int + +// Different store statistics kinds. +const ( + StoreReadBytes StoreStatKind = iota + StoreReadKeys + StoreWriteBytes + StoreWriteKeys + StoreReadQuery + StoreWriteQuery + StoreCPUUsage + StoreDiskReadRate + StoreDiskWriteRate + + StoreRegionsWriteBytes // Same as StoreWriteBytes, but it is counted by RegionHeartbeat. + StoreRegionsWriteKeys // Same as StoreWriteKeys, but it is counted by RegionHeartbeat. + + StoreStatCount +) + +func (k StoreStatKind) String() string { + switch k { + case StoreReadBytes: + return "store_read_bytes" + case StoreReadKeys: + return "store_read_keys" + case StoreWriteBytes: + return "store_write_bytes" + case StoreReadQuery: + return "store_read_query" + case StoreWriteQuery: + return "store_write_query" + case StoreWriteKeys: + return "store_write_keys" + case StoreCPUUsage: + return "store_cpu_usage" + case StoreDiskReadRate: + return "store_disk_read_rate" + case StoreDiskWriteRate: + return "store_disk_write_rate" + case StoreRegionsWriteBytes: + return "store_regions_write_bytes" + case StoreRegionsWriteKeys: + return "store_regions_write_keys" + } + + return "unknown StoreStatKind" +} + +// sourceKind represents the statistics item source. +type sourceKind int + +const ( + direct sourceKind = iota // there is a corresponding peer in this store. + inherit // there is no a corresponding peer in this store and there is a peer just deleted. + adopt // there is no corresponding peer in this store and there is no peer just deleted, we need to copy from other stores. +) + +func (k sourceKind) String() string { + switch k { + case direct: + return "direct" + case inherit: + return "inherit" + case adopt: + return "adopt" + } + return "unknown" +} + +// RWType is a identify hot region types. +type RWType uint32 + +// Flags for r/w type. +const ( + Write RWType = iota + Read +) + +func (k RWType) String() string { + switch k { + case Write: + return "write" + case Read: + return "read" + } + return "unimplemented" +} + +// RegionStats returns hot items according to kind +func (k RWType) RegionStats() []RegionStatKind { + switch k { + case Write: + return []RegionStatKind{RegionWriteBytes, RegionWriteKeys, RegionWriteQuery} + case Read: + return []RegionStatKind{RegionReadBytes, RegionReadKeys, RegionReadQuery} + } + return nil +} diff --git a/server/statistics/queue.go b/server/statistics/queue.go index f61fa023e8ae..559d87367adc 100644 --- a/server/statistics/queue.go +++ b/server/statistics/queue.go @@ -52,3 +52,18 @@ func (sq *SafeQueue) PopFront() interface{} { defer sq.mu.Unlock() return sq.que.PopFront() } + +// Clone returns a copy of SafeQueue +func (sq *SafeQueue) Clone() *SafeQueue { + sq.mu.Lock() + defer sq.mu.Unlock() + q := queue.New().Init() + for i := 0; i < sq.que.Len(); i++ { + v := sq.que.PopFront() + sq.que.PushBack(v) + q.PushBack(v) + } + return &SafeQueue{ + que: q, + } +} diff --git a/server/statistics/queue_test.go b/server/statistics/queue_test.go index 455f1002ce92..4ddffd089a6e 100644 --- a/server/statistics/queue_test.go +++ b/server/statistics/queue_test.go @@ -26,3 +26,14 @@ func (t *testMovingAvg) TestQueue(c *C) { c.Assert(1, Equals, v1.(int)) c.Assert(2, Equals, v2.(int)) } + +func (t *testMovingAvg) TestClone(c *C) { + s1 := NewSafeQueue() + s1.PushBack(1) + s1.PushBack(2) + s2 := s1.Clone() + s2.PopFront() + s2.PopFront() + c.Assert(s1.que.Len(), Equals, 2) + c.Assert(s2.que.Len(), Equals, 0) +}