diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index bd0c63f639d..41f3519e779 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -389,7 +389,7 @@ func (mc *Cluster) AddRegionWithReadInfo( r = r.Clone(core.SetReadKeys(readKeys)) r = r.Clone(core.SetReportInterval(0, reportInterval)) r = r.Clone(core.SetReadQuery(readQuery)) - filledNum := mc.HotCache.GetFilledPeriod(statistics.Read) + filledNum := statistics.DefaultAotSize if len(filledNums) > 0 { filledNum = filledNums[0] } @@ -410,7 +410,7 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStor otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat { r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...) r = r.Clone(core.SetReadBytes(readBytes), core.SetReadKeys(readKeys), core.SetReportInterval(0, reportInterval)) - filledNum := mc.HotCache.GetFilledPeriod(statistics.Read) + filledNum := statistics.DefaultAotSize if len(filledNums) > 0 { filledNum = filledNums[0] } @@ -438,7 +438,7 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo( r = r.Clone(core.SetReadKeys(readKeys)) r = r.Clone(core.SetReadQuery(readQuery)) r = r.Clone(core.SetReportInterval(0, reportInterval)) - filledNum := mc.HotCache.GetFilledPeriod(statistics.Read) + filledNum := statistics.DefaultAotSize if len(filledNums) > 0 { filledNum = filledNums[0] } @@ -466,7 +466,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo( r = r.Clone(core.SetReportInterval(0, reportInterval)) r = r.Clone(core.SetWrittenQuery(writtenQuery)) - filledNum := mc.HotCache.GetFilledPeriod(statistics.Write) + filledNum := statistics.DefaultAotSize if len(filledNums) > 0 { filledNum = filledNums[0] } diff --git a/pkg/movingaverage/avg_over_time.go b/pkg/movingaverage/avg_over_time.go index 12a6a4b25c2..227ee32e8c7 100644 --- a/pkg/movingaverage/avg_over_time.go +++ b/pkg/movingaverage/avg_over_time.go @@ -131,3 +131,11 @@ func (aot *AvgOverTime) Clone() *AvgOverTime { func (aot *AvgOverTime) GetIntervalSum() time.Duration { return aot.intervalSum } + +// GetInstantaneous returns the value just added. +func (aot *AvgOverTime) GetInstantaneous() float64 { + if aot.que.Len() == 0 || aot.que.Back() == nil { + return 0 + } + return aot.que.Back().(deltaWithInterval).delta +} diff --git a/pkg/movingaverage/avg_over_time_test.go b/pkg/movingaverage/avg_over_time_test.go index 13b7da27aef..a0787f5af81 100644 --- a/pkg/movingaverage/avg_over_time_test.go +++ b/pkg/movingaverage/avg_over_time_test.go @@ -84,7 +84,7 @@ func TestMinFilled(t *testing.T) { for aotSize := 2; aotSize < 10; aotSize++ { for mfSize := 2; mfSize < 10; mfSize++ { tm := NewTimeMedian(aotSize, mfSize, interval) - for i := 0; i < tm.GetFilledPeriod(); i++ { + for i := 0; i < aotSize; i++ { re.Equal(0.0, tm.Get()) tm.Add(rate*interval.Seconds(), interval) } diff --git a/pkg/movingaverage/median_filter.go b/pkg/movingaverage/median_filter.go index 5195f0c9253..7041ae53b29 100644 --- a/pkg/movingaverage/median_filter.go +++ b/pkg/movingaverage/median_filter.go @@ -23,12 +23,11 @@ type MedianFilter struct { // It is not thread safe to read and write records at the same time. // If there are concurrent read and write, the read may get an old value. // And we should avoid concurrent write. - records []float64 - size uint64 - count uint64 - instantaneous float64 - isUpdated bool - result float64 + records []float64 + size uint64 + count uint64 + isUpdated bool + result float64 } // NewMedianFilter returns a MedianFilter. @@ -43,7 +42,6 @@ func NewMedianFilter(size int) *MedianFilter { // Add adds a data point. func (r *MedianFilter) Add(n float64) { - r.instantaneous = n r.records[r.count%r.size] = n r.count++ r.isUpdated = true @@ -68,14 +66,12 @@ func (r *MedianFilter) Get() float64 { // Reset cleans the data set. func (r *MedianFilter) Reset() { - r.instantaneous = 0 r.count = 0 r.isUpdated = true } // Set = Reset + Add. func (r *MedianFilter) Set(n float64) { - r.instantaneous = n r.records[0] = n r.count = 1 r.isUpdated = true @@ -83,7 +79,7 @@ func (r *MedianFilter) Set(n float64) { // GetInstantaneous returns the value just added. func (r *MedianFilter) GetInstantaneous() float64 { - return r.instantaneous + return r.records[(r.count-1)%r.size] } // Clone returns a copy of MedianFilter @@ -91,11 +87,10 @@ func (r *MedianFilter) Clone() *MedianFilter { records := make([]float64, len(r.records)) copy(records, r.records) return &MedianFilter{ - records: records, - size: r.size, - count: r.count, - instantaneous: r.instantaneous, - isUpdated: r.isUpdated, - result: r.result, + records: records, + size: r.size, + count: r.count, + isUpdated: r.isUpdated, + result: r.result, } } diff --git a/pkg/movingaverage/moving_average_test.go b/pkg/movingaverage/moving_average_test.go index 173eea773bb..37aaba28e47 100644 --- a/pkg/movingaverage/moving_average_test.go +++ b/pkg/movingaverage/moving_average_test.go @@ -43,6 +43,7 @@ func checkAdd(re *require.Assertions, ma MovingAvg, data []float64, expected []f re.Len(data, len(expected)) for i, x := range data { ma.Add(x) + re.Equal(x, ma.GetInstantaneous()) re.LessOrEqual(math.Abs(ma.Get()-expected[i]), 1e-7) } } diff --git a/pkg/movingaverage/time_median.go b/pkg/movingaverage/time_median.go index 1c9f9595f35..88ad562a3cf 100644 --- a/pkg/movingaverage/time_median.go +++ b/pkg/movingaverage/time_median.go @@ -21,20 +21,15 @@ import "time" // Delay is aotSize * mfSize * reportInterval/4 // and the min filled period is aotSize * reportInterval, which is not related with mfSize type TimeMedian struct { - aot *AvgOverTime - mf *MedianFilter - aotSize int - mfSize int - instantaneous float64 + aot *AvgOverTime + mf *MedianFilter } // NewTimeMedian returns a TimeMedian with given size. func NewTimeMedian(aotSize, mfSize int, reportInterval time.Duration) *TimeMedian { return &TimeMedian{ - aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval), - mf: NewMedianFilter(mfSize), - aotSize: aotSize, - mfSize: mfSize, + aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval), + mf: NewMedianFilter(mfSize), } } @@ -45,7 +40,6 @@ func (t *TimeMedian) Get() float64 { // Add adds recent change to TimeMedian. func (t *TimeMedian) Add(delta float64, interval time.Duration) { - t.instantaneous = delta / interval.Seconds() t.aot.Add(delta, interval) if t.aot.IsFull() { t.mf.Add(t.aot.Get()) @@ -57,23 +51,15 @@ func (t *TimeMedian) Set(avg float64) { t.mf.Set(avg) } -// GetFilledPeriod returns filled period. -func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize - return t.aotSize -} - // GetInstantaneous returns instantaneous speed func (t *TimeMedian) GetInstantaneous() float64 { - return t.instantaneous + return t.aot.GetInstantaneous() } // Clone returns a copy of TimeMedian func (t *TimeMedian) Clone() *TimeMedian { return &TimeMedian{ - aot: t.aot.Clone(), - mf: t.mf.Clone(), - aotSize: t.aotSize, - mfSize: t.mfSize, - instantaneous: t.instantaneous, + aot: t.aot.Clone(), + mf: t.mf.Clone(), } } diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index 66121e8d026..464c937a874 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -16,9 +16,7 @@ package statistics import ( "context" - "time" - "github.com/tikv/pd/pkg/movingaverage" "github.com/tikv/pd/server/core" ) @@ -190,19 +188,6 @@ func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat { return w.writeCache.collectExpiredItems(region) } -// GetFilledPeriod returns filled period. -// This is used for mockcluster, for test purpose. -func (w *HotCache) GetFilledPeriod(kind RWType) int { - var reportIntervalSecs int - switch kind { - case Write: - reportIntervalSecs = w.writeCache.kind.ReportInterval() - case Read: - reportIntervalSecs = w.readCache.kind.ReportInterval() - } - return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(reportIntervalSecs)*time.Second).GetFilledPeriod() -} - // GetThresholds returns thresholds. // This is used for test purpose. func (w *HotCache) GetThresholds(kind RWType, storeID uint64) []float64 {