Skip to content

Commit

Permalink
movingaverage: reduce memory consume (#5798)
Browse files Browse the repository at this point in the history
ref #5691

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Dec 23, 2022
1 parent 882eb1d commit 7c43985
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 57 deletions.
8 changes: 4 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (mc *Cluster) AddRegionWithReadInfo(
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(0, reportInterval))
r = r.Clone(core.SetReadQuery(readQuery))
filledNum := mc.HotCache.GetFilledPeriod(statistics.Read)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand All @@ -410,7 +410,7 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStor
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes), core.SetReadKeys(readKeys), core.SetReportInterval(0, reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.Read)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo(
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReadQuery(readQuery))
r = r.Clone(core.SetReportInterval(0, reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.Read)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand Down Expand Up @@ -466,7 +466,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(
r = r.Clone(core.SetReportInterval(0, reportInterval))
r = r.Clone(core.SetWrittenQuery(writtenQuery))

filledNum := mc.HotCache.GetFilledPeriod(statistics.Write)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/movingaverage/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,11 @@ func (aot *AvgOverTime) Clone() *AvgOverTime {
func (aot *AvgOverTime) GetIntervalSum() time.Duration {
return aot.intervalSum
}

// GetInstantaneous returns the value just added.
func (aot *AvgOverTime) GetInstantaneous() float64 {
if aot.que.Len() == 0 || aot.que.Back() == nil {
return 0
}
return aot.que.Back().(deltaWithInterval).delta
}
2 changes: 1 addition & 1 deletion pkg/movingaverage/avg_over_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestMinFilled(t *testing.T) {
for aotSize := 2; aotSize < 10; aotSize++ {
for mfSize := 2; mfSize < 10; mfSize++ {
tm := NewTimeMedian(aotSize, mfSize, interval)
for i := 0; i < tm.GetFilledPeriod(); i++ {
for i := 0; i < aotSize; i++ {
re.Equal(0.0, tm.Get())
tm.Add(rate*interval.Seconds(), interval)
}
Expand Down
27 changes: 11 additions & 16 deletions pkg/movingaverage/median_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ type MedianFilter struct {
// It is not thread safe to read and write records at the same time.
// If there are concurrent read and write, the read may get an old value.
// And we should avoid concurrent write.
records []float64
size uint64
count uint64
instantaneous float64
isUpdated bool
result float64
records []float64
size uint64
count uint64
isUpdated bool
result float64
}

// NewMedianFilter returns a MedianFilter.
Expand All @@ -43,7 +42,6 @@ func NewMedianFilter(size int) *MedianFilter {

// Add adds a data point.
func (r *MedianFilter) Add(n float64) {
r.instantaneous = n
r.records[r.count%r.size] = n
r.count++
r.isUpdated = true
Expand All @@ -68,34 +66,31 @@ func (r *MedianFilter) Get() float64 {

// Reset cleans the data set.
func (r *MedianFilter) Reset() {
r.instantaneous = 0
r.count = 0
r.isUpdated = true
}

// Set = Reset + Add.
func (r *MedianFilter) Set(n float64) {
r.instantaneous = n
r.records[0] = n
r.count = 1
r.isUpdated = true
}

// GetInstantaneous returns the value just added.
func (r *MedianFilter) GetInstantaneous() float64 {
return r.instantaneous
return r.records[(r.count-1)%r.size]
}

// Clone returns a copy of MedianFilter
func (r *MedianFilter) Clone() *MedianFilter {
records := make([]float64, len(r.records))
copy(records, r.records)
return &MedianFilter{
records: records,
size: r.size,
count: r.count,
instantaneous: r.instantaneous,
isUpdated: r.isUpdated,
result: r.result,
records: records,
size: r.size,
count: r.count,
isUpdated: r.isUpdated,
result: r.result,
}
}
1 change: 1 addition & 0 deletions pkg/movingaverage/moving_average_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func checkAdd(re *require.Assertions, ma MovingAvg, data []float64, expected []f
re.Len(data, len(expected))
for i, x := range data {
ma.Add(x)
re.Equal(x, ma.GetInstantaneous())
re.LessOrEqual(math.Abs(ma.Get()-expected[i]), 1e-7)
}
}
Expand Down
28 changes: 7 additions & 21 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,15 @@ import "time"
// Delay is aotSize * mfSize * reportInterval/4
// and the min filled period is aotSize * reportInterval, which is not related with mfSize
type TimeMedian struct {
aot *AvgOverTime
mf *MedianFilter
aotSize int
mfSize int
instantaneous float64
aot *AvgOverTime
mf *MedianFilter
}

// NewTimeMedian returns a TimeMedian with given size.
func NewTimeMedian(aotSize, mfSize int, reportInterval time.Duration) *TimeMedian {
return &TimeMedian{
aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval),
mf: NewMedianFilter(mfSize),
aotSize: aotSize,
mfSize: mfSize,
aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval),
mf: NewMedianFilter(mfSize),
}
}

Expand All @@ -45,7 +40,6 @@ func (t *TimeMedian) Get() float64 {

// Add adds recent change to TimeMedian.
func (t *TimeMedian) Add(delta float64, interval time.Duration) {
t.instantaneous = delta / interval.Seconds()
t.aot.Add(delta, interval)
if t.aot.IsFull() {
t.mf.Add(t.aot.Get())
Expand All @@ -57,23 +51,15 @@ func (t *TimeMedian) Set(avg float64) {
t.mf.Set(avg)
}

// GetFilledPeriod returns filled period.
func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize
return t.aotSize
}

// GetInstantaneous returns instantaneous speed
func (t *TimeMedian) GetInstantaneous() float64 {
return t.instantaneous
return t.aot.GetInstantaneous()
}

// Clone returns a copy of TimeMedian
func (t *TimeMedian) Clone() *TimeMedian {
return &TimeMedian{
aot: t.aot.Clone(),
mf: t.mf.Clone(),
aotSize: t.aotSize,
mfSize: t.mfSize,
instantaneous: t.instantaneous,
aot: t.aot.Clone(),
mf: t.mf.Clone(),
}
}
15 changes: 0 additions & 15 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ package statistics

import (
"context"
"time"

"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/server/core"
)

Expand Down Expand Up @@ -190,19 +188,6 @@ func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat {
return w.writeCache.collectExpiredItems(region)
}

// GetFilledPeriod returns filled period.
// This is used for mockcluster, for test purpose.
func (w *HotCache) GetFilledPeriod(kind RWType) int {
var reportIntervalSecs int
switch kind {
case Write:
reportIntervalSecs = w.writeCache.kind.ReportInterval()
case Read:
reportIntervalSecs = w.readCache.kind.ReportInterval()
}
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(reportIntervalSecs)*time.Second).GetFilledPeriod()
}

// GetThresholds returns thresholds.
// This is used for test purpose.
func (w *HotCache) GetThresholds(kind RWType, storeID uint64) []float64 {
Expand Down

0 comments on commit 7c43985

Please sign in to comment.