Skip to content

Commit

Permalink
This is an automated cherry-pick of #4396
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 authored and ti-chi-bot committed Dec 7, 2021
1 parent 3a23716 commit e46a4e5
Show file tree
Hide file tree
Showing 9 changed files with 731 additions and 0 deletions.
85 changes: 85 additions & 0 deletions pkg/movingaverage/median_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package movingaverage

import "github.com/montanaflynn/stats"

// MedianFilter works as a median filter with specified window size.
// There are at most `size` data points for calculating.
// References: https://en.wikipedia.org/wiki/Median_filter.
type MedianFilter struct {
records []float64
size uint64
count uint64
instantaneous float64
}

// NewMedianFilter returns a MedianFilter.
func NewMedianFilter(size int) *MedianFilter {
return &MedianFilter{
records: make([]float64, size),
size: uint64(size),
}
}

// Add adds a data point.
func (r *MedianFilter) Add(n float64) {
r.instantaneous = n
r.records[r.count%r.size] = n
r.count++
}

// Get returns the median of the data set.
func (r *MedianFilter) Get() float64 {
if r.count == 0 {
return 0
}
records := r.records
if r.count < r.size {
records = r.records[:r.count]
}
median, _ := stats.Median(records)
return median
}

// Reset cleans the data set.
func (r *MedianFilter) Reset() {
r.instantaneous = 0
r.count = 0
}

// Set = Reset + Add.
func (r *MedianFilter) Set(n float64) {
r.instantaneous = n
r.records[0] = n
r.count = 1
}

// GetInstantaneous returns the value just added.
func (r *MedianFilter) GetInstantaneous() float64 {
return r.instantaneous
}

// Clone returns a copy of MedianFilter
func (r *MedianFilter) Clone() *MedianFilter {
records := make([]float64, len(r.records))
copy(records, r.records)
return &MedianFilter{
records: records,
size: r.size,
count: r.count,
instantaneous: r.instantaneous,
}
}
79 changes: 79 additions & 0 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2020 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package movingaverage

import "time"

// TimeMedian is AvgOverTime + MedianFilter
// Size of MedianFilter should be larger than double size of AvgOverTime to denoisy.
// Delay is aotSize * mfSize * reportInterval/4
// and the min filled period is aotSize * reportInterval, which is not related with mfSize
type TimeMedian struct {
aot *AvgOverTime
mf *MedianFilter
aotSize int
mfSize int
instantaneous float64
}

// NewTimeMedian returns a TimeMedian with given size.
func NewTimeMedian(aotSize, mfSize int, reportInterval time.Duration) *TimeMedian {
return &TimeMedian{
aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval),
mf: NewMedianFilter(mfSize),
aotSize: aotSize,
mfSize: mfSize,
}
}

// Get returns change rate in the median of the several intervals.
func (t *TimeMedian) Get() float64 {
return t.mf.Get()
}

// Add adds recent change to TimeMedian.
func (t *TimeMedian) Add(delta float64, interval time.Duration) {
t.instantaneous = delta / interval.Seconds()
t.aot.Add(delta, interval)
if t.aot.IsFull() {
t.mf.Add(t.aot.Get())
}
}

// Set sets the given average.
func (t *TimeMedian) Set(avg float64) {
t.mf.Set(avg)
}

// GetFilledPeriod returns filled period.
func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize
return t.aotSize
}

// GetInstantaneous returns instantaneous speed
func (t *TimeMedian) GetInstantaneous() float64 {
return t.instantaneous
}

// Clone returns a copy of TimeMedian
func (t *TimeMedian) Clone() *TimeMedian {
return &TimeMedian{
aot: t.aot.Clone(),
mf: t.mf.Clone(),
aotSize: t.aotSize,
mfSize: t.mfSize,
instantaneous: t.instantaneous,
}
}
16 changes: 16 additions & 0 deletions server/statistics/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,19 @@ func (t *TimeMedian) Add(delta float64, interval time.Duration) {
func (t *TimeMedian) Set(avg float64) {
t.mf.Set(avg)
}

// Clone returns a copy of AvgOverTime
func (aot *AvgOverTime) Clone() *AvgOverTime {
que := aot.que.Clone()
margin := deltaWithInterval{
delta: aot.margin.delta,
interval: aot.margin.interval,
}
return &AvgOverTime{
que: que,
margin: margin,
deltaSum: aot.deltaSum,
intervalSum: aot.intervalSum,
avgInterval: aot.avgInterval,
}
}
50 changes: 50 additions & 0 deletions server/statistics/hot_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,56 @@ const (
dimLen
)

<<<<<<< HEAD
=======
type dimStat struct {
typ RegionStatKind
Rolling *movingaverage.TimeMedian // it's used to statistic hot degree and average speed.
LastAverage *movingaverage.AvgOverTime // it's used to obtain the average speed in last second as instantaneous speed.
}

func newDimStat(typ RegionStatKind, reportInterval time.Duration) *dimStat {
return &dimStat{
typ: typ,
Rolling: movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, reportInterval),
LastAverage: movingaverage.NewAvgOverTime(reportInterval),
}
}

func (d *dimStat) Add(delta float64, interval time.Duration) {
d.LastAverage.Add(delta, interval)
d.Rolling.Add(delta, interval)
}

func (d *dimStat) isLastAverageHot(threshold float64) bool {
return d.LastAverage.Get() >= threshold
}

func (d *dimStat) isHot(threshold float64) bool {
return d.Rolling.Get() >= threshold
}

func (d *dimStat) isFull() bool {
return d.LastAverage.IsFull()
}

func (d *dimStat) clearLastAverage() {
d.LastAverage.Clear()
}

func (d *dimStat) Get() float64 {
return d.Rolling.Get()
}

func (d *dimStat) Clone() *dimStat {
return &dimStat{
typ: d.typ,
Rolling: d.Rolling.Clone(),
LastAverage: d.LastAverage.Clone(),
}
}

>>>>>>> 2246ef626 (statistics: fix the problem that the hot cache cannot be emptied when the interval is less than 60 (#4396))
// HotPeerStat records each hot peer's statistics
type HotPeerStat struct {
StoreID uint64 `json:"store_id"`
Expand Down
Loading

0 comments on commit e46a4e5

Please sign in to comment.