diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index 7c3217de1ab1..4e7aca62bc3f 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -45,6 +45,62 @@ type statKey struct { time time.Time } +// Creates a new totalAggregation +func newTotalAggregation() *totalAggregation { + return &totalAggregation{ + perPodAggregations: make(map[string]*perPodAggregation), + } +} + +// Holds an aggregation across all pods +type totalAggregation struct { + perPodAggregations map[string]*perPodAggregation + probeCount int32 +} + +// Aggregates a given stat to the correct pod-aggregation +func (agg *totalAggregation) aggregate(stat Stat) { + current, exists := agg.perPodAggregations[stat.PodName] + if !exists { + current = &perPodAggregation{} + agg.perPodAggregations[stat.PodName] = current + } + current.aggregate(stat.AverageConcurrentRequests) + agg.probeCount += 1 +} + +// The number of pods that are observable via stats +func (agg *totalAggregation) observedPods() int { + return len(agg.perPodAggregations) +} + +// The observed concurrency per pod (sum of all average concurrencies +// distributed over the observed pods) +func (agg *totalAggregation) observedConcurrencyPerPod() float64 { + accumulatedConcurrency := float64(0) + for _, perPod := range agg.perPodAggregations { + accumulatedConcurrency += perPod.calculateAverage() + } + return accumulatedConcurrency / float64(agg.observedPods()) +} + +// Hols an aggregation per pod +type perPodAggregation struct { + accumulatedConcurrency float64 + probeCount int32 +} + +// Aggregates the given concurrency +func (agg *perPodAggregation) aggregate(concurrency float64) { + agg.accumulatedConcurrency += concurrency + agg.probeCount += 1 +} + +// Calculates the average concurrency over all values given +func (agg *perPodAggregation) calculateAverage() float64 { + return agg.accumulatedConcurrency / float64(agg.probeCount) +} + var ( lastRequestTime = time.Now() ) @@ -96,36 +152,33 @@ func (a *Autoscaler) Record(ctx context.Context, stat Stat) { func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { logger := logging.FromContext(ctx) // 60 second window - stableTotal := float64(0) - stableCount := float64(0) - stablePods := make(map[string]bool) + stableData := newTotalAggregation() // 6 second window - panicTotal := float64(0) - panicCount := float64(0) - panicPods := make(map[string]bool) + panicData := newTotalAggregation() // Last stat per Pod lastStat := make(map[string]Stat) + // accumulate stats into their respective buckets for key, stat := range a.stats { instant := key.time if instant.Add(*a.PanicWindow.Get()).After(now) { - panicTotal = panicTotal + stat.AverageConcurrentRequests - panicCount = panicCount + 1 - panicPods[stat.PodName] = true + panicData.aggregate(stat) } if instant.Add(*a.StableWindow.Get()).After(now) { - stableTotal = stableTotal + stat.AverageConcurrentRequests - stableCount = stableCount + 1 - stablePods[stat.PodName] = true + stableData.aggregate(stat) + // If there's no last stat for this pod, set it if _, ok := lastStat[stat.PodName]; !ok { lastStat[stat.PodName] = stat } + // If the current last stat is older than the new one, override if lastStat[stat.PodName].Time.Before(*stat.Time) { lastStat[stat.PodName] = stat } + // Update lastRequestTime if the current stat is newer and + // actually contains requests if lastRequestTime.Before(*stat.Time) && stat.RequestCount > 0 { lastRequestTime = *stat.Time } @@ -135,10 +188,17 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { } } + // Scale to zero if the last request is from too long ago if lastRequestTime.Add(*a.ScaleToZeroThreshold.Get()).Before(now) { return 0, true } + // Do nothing when we have no data. + if stableData.observedPods() == 0 { + logger.Debug("No data to scale on.") + return 0, false + } + // Log system totals totalCurrentQPS := int32(0) totalCurrentConcurrency := float64(0) @@ -148,6 +208,26 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { } logger.Debugf("Current QPS: %v Current concurrent clients: %v", totalCurrentQPS, totalCurrentConcurrency) + observedStableConcurrencyPerPod := stableData.observedConcurrencyPerPod() + observedPanicConcurrencyPerPod := panicData.observedConcurrencyPerPod() + // Desired scaling ratio is observed concurrency over desired (stable) concurrency. + // Rate limited to within MaxScaleUpRate. + desiredStableScalingRatio := a.rateLimited(observedStableConcurrencyPerPod / a.TargetConcurrency.Get()) + desiredPanicScalingRatio := a.rateLimited(observedPanicConcurrencyPerPod / a.TargetConcurrency.Get()) + + desiredStablePodCount := desiredStableScalingRatio * float64(stableData.observedPods()) + desiredPanicPodCount := desiredPanicScalingRatio * float64(stableData.observedPods()) + + a.reporter.Report(ObservedPodCountM, float64(stableData.observedPods())) + a.reporter.Report(ObservedStableConcurrencyM, observedStableConcurrencyPerPod) + a.reporter.Report(ObservedPanicConcurrencyM, observedPanicConcurrencyPerPod) + a.reporter.Report(TargetConcurrencyM, a.TargetConcurrency.Get()) + + logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.", + observedStableConcurrencyPerPod, a.StableWindow.Get(), stableData.probeCount, stableData.observedPods()) + logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.", + observedPanicConcurrencyPerPod, a.PanicWindow.Get(), panicData.probeCount, panicData.observedPods()) + // Stop panicking after the surge has made its way into the stable metric. if a.panicking && a.panicTime.Add(*a.StableWindow.Get()).Before(now) { logger.Info("Un-panicking.") @@ -157,35 +237,8 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { a.maxPanicPods = 0 } - // Do nothing when we have no data. - if len(stablePods) == 0 { - logger.Debug("No data to scale on.") - return 0, false - } - - a.reporter.Report(ObservedPodCountM, float64(len(stablePods))) - // Observed concurrency is the average of all data points in each window - observedStableConcurrency := stableTotal / stableCount - a.reporter.Report(ObservedStableConcurrencyM, observedStableConcurrency) - observedPanicConcurrency := panicTotal / panicCount - a.reporter.Report(ObservedPanicConcurrencyM, observedPanicConcurrency) - - a.reporter.Report(TargetConcurrencyM, a.TargetConcurrency.Get()) - // Desired scaling ratio is observed concurrency over desired - // (stable) concurrency. Rate limited to within MaxScaleUpRate. - desiredStableScalingRatio := a.rateLimited(observedStableConcurrency / a.TargetConcurrency.Get()) - desiredPanicScalingRatio := a.rateLimited(observedPanicConcurrency / a.TargetConcurrency.Get()) - - desiredStablePodCount := desiredStableScalingRatio * float64(len(stablePods)) - desiredPanicPodCount := desiredPanicScalingRatio * float64(len(stablePods)) - - logger.Debugf("Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.", - observedStableConcurrency, a.StableWindow.Get(), stableCount, len(stablePods)) - logger.Debugf("Observed average %0.3f concurrency over %v seconds over %v samples over %v pods.", - observedPanicConcurrency, a.PanicWindow.Get(), panicCount, len(panicPods)) - // Begin panicking when we cross the 6 second concurrency threshold. - if !a.panicking && len(panicPods) > 0 && observedPanicConcurrency >= (a.TargetConcurrency.Get()*2) { + if !a.panicking && panicData.observedPods() > 0 && observedPanicConcurrencyPerPod >= (a.TargetConcurrency.Get()*2) { logger.Info("PANICKING") a.reporter.Report(PanicM, 1) a.panicking = true @@ -195,7 +248,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { if a.panicking { logger.Debug("Operating in panic mode.") if desiredPanicPodCount > a.maxPanicPods { - logger.Infof("Increasing pods from %v to %v.", len(panicPods), int(desiredPanicPodCount)) + logger.Infof("Increasing pods from %v to %v.", panicData.observedPods(), int(desiredPanicPodCount)) a.panicTime = &now a.maxPanicPods = desiredPanicPodCount }