Skip to content

feat: introduce percentile aggregator for window sliding #164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import (

// HeadroomReporterOptions holds the configurations for headroom reporter in qos aware plugin
type HeadroomReporterOptions struct {
HeadroomReporterSyncPeriod time.Duration
HeadroomReporterSlidingWindowTime time.Duration
HeadroomReporterSlidingWindowMinStep general.ResourceList
HeadroomReporterSlidingWindowMaxStep general.ResourceList
HeadroomReporterSyncPeriod time.Duration
HeadroomReporterSlidingWindowTime time.Duration
HeadroomReporterSlidingWindowMinStep general.ResourceList
HeadroomReporterSlidingWindowMaxStep general.ResourceList
HeadroomReporterSlidingWindowAggregateFunction string
HeadroomReporterSlidingWindowAggregateArguments string

*CPUHeadroomManagerOptions
*MemoryHeadroomManagerOptions
Expand All @@ -52,8 +54,9 @@ func NewHeadroomReporterOptions() *HeadroomReporterOptions {
v1.ResourceCPU: resource.MustParse("4"),
v1.ResourceMemory: resource.MustParse("5Gi"),
},
CPUHeadroomManagerOptions: NewCPUHeadroomManagerOptions(),
MemoryHeadroomManagerOptions: NewMemoryHeadroomManagerOptions(),
HeadroomReporterSlidingWindowAggregateFunction: general.SmoothWindowAggFuncAvg,
CPUHeadroomManagerOptions: NewCPUHeadroomManagerOptions(),
MemoryHeadroomManagerOptions: NewMemoryHeadroomManagerOptions(),
}
}

Expand All @@ -67,6 +70,10 @@ func (o *HeadroomReporterOptions) AddFlags(fs *pflag.FlagSet) {
"the min step headroom resource need to change")
fs.Var(&o.HeadroomReporterSlidingWindowMaxStep, "headroom-reporter-sliding-window-max-step",
"the max step headroom resource can change")
fs.StringVar(&o.HeadroomReporterSlidingWindowAggregateFunction, "headroom-reporter-sliding-window-aggregate-function", o.HeadroomReporterSlidingWindowAggregateFunction,
"the aggregate function of sliding window, like average, percentile, min, max, std")
fs.StringVar(&o.HeadroomReporterSlidingWindowAggregateArguments, "headroom-reporter-sliding-window-aggregate-arguments", o.HeadroomReporterSlidingWindowAggregateArguments,
"the args of aggregator function")

o.CPUHeadroomManagerOptions.AddFlags(fs)
o.MemoryHeadroomManagerOptions.AddFlags(fs)
Expand All @@ -78,6 +85,8 @@ func (o *HeadroomReporterOptions) ApplyTo(c *reporter.HeadroomReporterConfigurat
c.HeadroomReporterSlidingWindowTime = o.HeadroomReporterSlidingWindowTime
c.HeadroomReporterSlidingWindowMinStep = v1.ResourceList(o.HeadroomReporterSlidingWindowMinStep)
c.HeadroomReporterSlidingWindowMaxStep = v1.ResourceList(o.HeadroomReporterSlidingWindowMaxStep)
c.HeadroomReporterSlidingWindowAggregateFunction = o.HeadroomReporterSlidingWindowAggregateFunction
c.HeadroomReporterSlidingWindowAggregateArguments = o.HeadroomReporterSlidingWindowAggregateArguments

var errList []error
errList = append(errList, o.CPUHeadroomManagerOptions.ApplyTo(c.CPUHeadroomManagerConfiguration))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func generateCPUWindowOptions(conf *reporter.HeadroomReporterConfiguration) Gene
SlidingWindowTime: conf.HeadroomReporterSlidingWindowTime,
MinStep: conf.HeadroomReporterSlidingWindowMinStep[v1.ResourceCPU],
MaxStep: conf.HeadroomReporterSlidingWindowMaxStep[v1.ResourceCPU],
AggregateFunc: conf.HeadroomReporterSlidingWindowAggregateFunction,
AggregateArgs: conf.HeadroomReporterSlidingWindowAggregateArguments,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ type GenericSlidingWindowOptions struct {
// MinStep min step of the value change
MinStep resource.Quantity
// MaxStep max step of the value change
MaxStep resource.Quantity
MaxStep resource.Quantity
AggregateFunc string
AggregateArgs string
}

type GenericHeadroomManager struct {
Expand Down Expand Up @@ -95,7 +97,9 @@ func NewGenericHeadroomManager(name v1.ResourceName, useMilliValue, reportMilliV
reportSlidingWindow: general.NewCappedSmoothWindow(
slidingWindowOptions.MinStep,
slidingWindowOptions.MaxStep,
general.NewAverageWithTTLSmoothWindow(slidingWindowSize, slidingWindowTTL, useMilliValue),
general.NewAggregatorSmoothWindow(general.SmoothWindowOpts{WindowSize: slidingWindowSize,
TTL: slidingWindowTTL, UsedMillValue: useMilliValue, AggregateFunc: slidingWindowOptions.AggregateFunc,
AggregateArgs: slidingWindowOptions.AggregateArgs}),
),
emitter: emitter,
getReclaimOptions: getReclaimOptions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func generateMemoryWindowOptions(conf *reporter.HeadroomReporterConfiguration) G
SlidingWindowTime: conf.HeadroomReporterSlidingWindowTime,
MinStep: conf.HeadroomReporterSlidingWindowMinStep[v1.ResourceMemory],
MaxStep: conf.HeadroomReporterSlidingWindowMaxStep[v1.ResourceMemory],
AggregateFunc: conf.HeadroomReporterSlidingWindowAggregateFunction,
AggregateArgs: conf.HeadroomReporterSlidingWindowAggregateArguments,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (

// HeadroomReporterConfiguration stores configurations of headroom reporters in qos aware plugin
type HeadroomReporterConfiguration struct {
HeadroomReporterSyncPeriod time.Duration
HeadroomReporterSlidingWindowTime time.Duration
HeadroomReporterSlidingWindowMinStep v1.ResourceList
HeadroomReporterSlidingWindowMaxStep v1.ResourceList
HeadroomReporterSyncPeriod time.Duration
HeadroomReporterSlidingWindowTime time.Duration
HeadroomReporterSlidingWindowMinStep v1.ResourceList
HeadroomReporterSlidingWindowMaxStep v1.ResourceList
HeadroomReporterSlidingWindowAggregateFunction string
HeadroomReporterSlidingWindowAggregateArguments string

*CPUHeadroomManagerConfiguration
*MemoryHeadroomManagerConfiguration
Expand Down
102 changes: 102 additions & 0 deletions pkg/util/general/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,35 @@ limitations under the License.
package general

import (
"math"
"sort"
"strconv"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/resource"
)

const (
SmoothWindowAggFuncAvg = "average"
SmoothWindowAggFuncPerc = "percentile"
)

// SmoothWindow is used to smooth the resource
type SmoothWindow interface {
// GetWindowedResources receives a sample and returns the result after smoothing,
// it can return nil if there are not enough samples in this window
GetWindowedResources(value resource.Quantity) *resource.Quantity
}

type SmoothWindowOpts struct {
WindowSize int
TTL time.Duration
UsedMillValue bool
AggregateFunc string
AggregateArgs string
}

type CappedSmoothWindow struct {
sync.Mutex
last *resource.Quantity
Expand Down Expand Up @@ -100,6 +116,21 @@ type sample struct {
timestamp time.Time
}

func NewAggregatorSmoothWindow(opts SmoothWindowOpts) SmoothWindow {
switch opts.AggregateFunc {
case SmoothWindowAggFuncAvg:
return NewAverageWithTTLSmoothWindow(opts.WindowSize, opts.TTL, opts.UsedMillValue)
case SmoothWindowAggFuncPerc:
perc, err := strconv.ParseFloat(opts.AggregateArgs, 64)
if err != nil {
Errorf("failed to parse AggregateArgs %v, fallback to default aggregator", opts.AggregateFunc)
} else {
return NewPercentileWithTTLSmoothWindow(opts.WindowSize, opts.TTL, perc, opts.UsedMillValue)
}
}
return NewAverageWithTTLSmoothWindow(opts.WindowSize, opts.TTL, opts.UsedMillValue)
}

// NewAverageWithTTLSmoothWindow create a smooth window with ttl and window size, and the window size
// is the sample count while the ttl is the valid lifetime of each sample, and the usedMillValue means
// whether calculate the result with milli-value.
Expand Down Expand Up @@ -149,3 +180,74 @@ func (w *averageWithTTLSmoothWindow) GetWindowedResources(value resource.Quantit

return resource.NewQuantity(total.Value()/count, value.Format)
}

type percentileWithTTLSmoothWindow struct {
sync.Mutex
windowSize int
percentile float64
ttl time.Duration
usedMillValue bool

index int
samples []*sample
}

// NewPercentileWithTTLSmoothWindow create a smooth window with ttl and window size, and the window size
// is the sample count while the ttl is the valid lifetime of each sample, and the usedMillValue means
// whether calculate the result with milli-value.
func NewPercentileWithTTLSmoothWindow(windowSize int, ttl time.Duration, percentile float64, usedMillValue bool) SmoothWindow {
return &percentileWithTTLSmoothWindow{
windowSize: windowSize,
percentile: percentile,
ttl: ttl,
usedMillValue: usedMillValue,
index: 0,
samples: make([]*sample, windowSize),
}
}

// GetWindowedResources inserts a sample, and returns the smoothed result by average all the valid samples.
func (w *percentileWithTTLSmoothWindow) GetWindowedResources(value resource.Quantity) *resource.Quantity {
w.Mutex.Lock()
defer w.Mutex.Unlock()

timestamp := time.Now()
w.samples[w.index] = &sample{
value: value,
timestamp: timestamp,
}

w.index++
if w.index >= w.windowSize {
w.index = 0
}

validSamples := make([]resource.Quantity, 0)
for _, s := range w.samples {
if s != nil && s.timestamp.Add(w.ttl).After(timestamp) {
validSamples = append(validSamples, s.value)
}
}

v := w.getValueByPercentile(validSamples, w.percentile)

if w.usedMillValue {
return resource.NewMilliQuantity(v.MilliValue(), value.Format)
}

return resource.NewQuantity(v.Value(), value.Format)
}

func (w *percentileWithTTLSmoothWindow) getValueByPercentile(values []resource.Quantity, percentile float64) resource.Quantity {
sort.Slice(values, func(i, j int) bool {
return values[i].Cmp(values[j]) < 0
})

percentileIndex := int(math.Ceil(float64(len(values))*percentile/100.0) - 1)
if percentileIndex < 0 {
percentileIndex = 0
} else if percentileIndex >= len(values) {
percentileIndex = len(values) - 1
}
return values[percentileIndex]
}
Loading