Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve autoscaler's observed average calculation. #1194

Merged
merged 4 commits into from
Jun 14, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 94 additions & 41 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,62 @@ type statKey struct {
time time.Time
}

// Creates a new totalAggregation
func newTotalAggregation() *totalAggregation {
return &totalAggregation{
perPodAggregations: make(map[string]*perPodAggregation),
}
}

// Holds an aggregation across all pods
type totalAggregation struct {
perPodAggregations map[string]*perPodAggregation
probeCount int32
}

// Aggregates a given stat to the correct pod-aggregation
func (agg *totalAggregation) aggregate(stat Stat) {
current, exists := agg.perPodAggregations[stat.PodName]
if !exists {
current = &perPodAggregation{}
agg.perPodAggregations[stat.PodName] = current
}
current.aggregate(stat.AverageConcurrentRequests)
agg.probeCount += 1
}

// The number of pods that are observable via stats
func (agg *totalAggregation) observedPods() int {
return len(agg.perPodAggregations)
}

// The observed concurrency per pod (sum of all average concurrencies
// distributed over the observed pods)
func (agg *totalAggregation) observedConcurrencyPerPod() float64 {
accumulatedConcurrency := float64(0)
for _, perPod := range agg.perPodAggregations {
accumulatedConcurrency += perPod.calculateAverage()
}
return accumulatedConcurrency / float64(agg.observedPods())
}

// Hols an aggregation per pod
type perPodAggregation struct {
accumulatedConcurrency float64
probeCount int32
}

// Aggregates the given concurrency
func (agg *perPodAggregation) aggregate(concurrency float64) {
agg.accumulatedConcurrency += concurrency
agg.probeCount += 1
}

// Calculates the average concurrency over all values given
func (agg *perPodAggregation) calculateAverage() float64 {
return agg.accumulatedConcurrency / float64(agg.probeCount)
}

var (
lastRequestTime = time.Now()
)
Expand Down Expand Up @@ -96,36 +152,33 @@ func (a *Autoscaler) Record(ctx context.Context, stat Stat) {
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
logger := logging.FromContext(ctx)
// 60 second window
stableTotal := float64(0)
stableCount := float64(0)
stablePods := make(map[string]bool)
stableData := newTotalAggregation()

// 6 second window
panicTotal := float64(0)
panicCount := float64(0)
panicPods := make(map[string]bool)
panicData := newTotalAggregation()

// Last stat per Pod
lastStat := make(map[string]Stat)

// accumulate stats into their respective buckets
for key, stat := range a.stats {
instant := key.time
if instant.Add(*a.PanicWindow.Get()).After(now) {
panicTotal = panicTotal + stat.AverageConcurrentRequests
panicCount = panicCount + 1
panicPods[stat.PodName] = true
panicData.aggregate(stat)
}
if instant.Add(*a.StableWindow.Get()).After(now) {
stableTotal = stableTotal + stat.AverageConcurrentRequests
stableCount = stableCount + 1
stablePods[stat.PodName] = true
stableData.aggregate(stat)

// If there's no last stat for this pod, set it
if _, ok := lastStat[stat.PodName]; !ok {
lastStat[stat.PodName] = stat
}
// If the current last stat is older than the new one, override
if lastStat[stat.PodName].Time.Before(*stat.Time) {
lastStat[stat.PodName] = stat
}
// Update lastRequestTime if the current stat is newer and
// actually contains requests
if lastRequestTime.Before(*stat.Time) && stat.RequestCount > 0 {
lastRequestTime = *stat.Time
}
Expand All @@ -135,10 +188,17 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
}
}

// Scale to zero if the last request is from too long ago
if lastRequestTime.Add(*a.ScaleToZeroThreshold.Get()).Before(now) {
return 0, true
}

// Do nothing when we have no data.
if stableData.observedPods() == 0 {
logger.Debug("No data to scale on.")
return 0, false
}

// Log system totals
totalCurrentQPS := int32(0)
totalCurrentConcurrency := float64(0)
Expand All @@ -148,6 +208,26 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
}
logger.Debugf("Current QPS: %v Current concurrent clients: %v", totalCurrentQPS, totalCurrentConcurrency)

observedStableConcurrencyPerPod := stableData.observedConcurrencyPerPod()
observedPanicConcurrencyPerPod := panicData.observedConcurrencyPerPod()
// Desired scaling ratio is observed concurrency over desired (stable) concurrency.
// Rate limited to within MaxScaleUpRate.
desiredStableScalingRatio := a.rateLimited(observedStableConcurrencyPerPod / a.TargetConcurrency.Get())
desiredPanicScalingRatio := a.rateLimited(observedPanicConcurrencyPerPod / a.TargetConcurrency.Get())

desiredStablePodCount := desiredStableScalingRatio * float64(stableData.observedPods())
desiredPanicPodCount := desiredPanicScalingRatio * float64(stableData.observedPods())

a.reporter.Report(ObservedPodCountM, float64(stableData.observedPods()))
a.reporter.Report(ObservedStableConcurrencyM, observedStableConcurrencyPerPod)
a.reporter.Report(ObservedPanicConcurrencyM, observedPanicConcurrencyPerPod)
a.reporter.Report(TargetConcurrencyM, a.TargetConcurrency.Get())

logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.",
observedStableConcurrencyPerPod, a.StableWindow.Get(), stableData.probeCount, stableData.observedPods())
logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.",
observedPanicConcurrencyPerPod, a.PanicWindow.Get(), panicData.probeCount, panicData.observedPods())

// Stop panicking after the surge has made its way into the stable metric.
if a.panicking && a.panicTime.Add(*a.StableWindow.Get()).Before(now) {
logger.Info("Un-panicking.")
Expand All @@ -157,35 +237,8 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
a.maxPanicPods = 0
}

// Do nothing when we have no data.
if len(stablePods) == 0 {
logger.Debug("No data to scale on.")
return 0, false
}

a.reporter.Report(ObservedPodCountM, float64(len(stablePods)))
// Observed concurrency is the average of all data points in each window
observedStableConcurrency := stableTotal / stableCount
a.reporter.Report(ObservedStableConcurrencyM, observedStableConcurrency)
observedPanicConcurrency := panicTotal / panicCount
a.reporter.Report(ObservedPanicConcurrencyM, observedPanicConcurrency)

a.reporter.Report(TargetConcurrencyM, a.TargetConcurrency.Get())
// Desired scaling ratio is observed concurrency over desired
// (stable) concurrency. Rate limited to within MaxScaleUpRate.
desiredStableScalingRatio := a.rateLimited(observedStableConcurrency / a.TargetConcurrency.Get())
desiredPanicScalingRatio := a.rateLimited(observedPanicConcurrency / a.TargetConcurrency.Get())

desiredStablePodCount := desiredStableScalingRatio * float64(len(stablePods))
desiredPanicPodCount := desiredPanicScalingRatio * float64(len(stablePods))

logger.Debugf("Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.",
observedStableConcurrency, a.StableWindow.Get(), stableCount, len(stablePods))
logger.Debugf("Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.",
observedPanicConcurrency, a.PanicWindow.Get(), panicCount, len(panicPods))

// Begin panicking when we cross the 6 second concurrency threshold.
if !a.panicking && len(panicPods) > 0 && observedPanicConcurrency >= (a.TargetConcurrency.Get()*2) {
if !a.panicking && panicData.observedPods() > 0 && observedPanicConcurrencyPerPod >= (a.TargetConcurrency.Get()*2) {
logger.Info("PANICKING")
a.reporter.Report(PanicM, 1)
a.panicking = true
Expand All @@ -195,7 +248,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
if a.panicking {
logger.Debug("Operating in panic mode.")
if desiredPanicPodCount > a.maxPanicPods {
logger.Infof("Increasing pods from %v to %v.", len(panicPods), int(desiredPanicPodCount))
logger.Infof("Increasing pods from %v to %v.", panicData.observedPods(), int(desiredPanicPodCount))
a.panicTime = &now
a.maxPanicPods = desiredPanicPodCount
}
Expand Down