From c425b531a569b84ba1c59cc90334e442271ffd84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Markus=20Th=C3=B6mmes?= Date: Thu, 9 May 2019 18:57:37 +0200 Subject: [PATCH] Autoscaler fetches metrics via a client from the collector. (#4007) * Autoscaler fetches metrics via a client from the collector. * Add some documentation. * Remove 'Size' function. * Add constant for bucketSize. * Return an error if no data is available yet instead of 0. * Add special case for 'no data available ' error * Review feedback. --- cmd/autoscaler/main.go | 11 +- cmd/autoscaler/main_test.go | 8 +- pkg/autoscaler/autoscaler.go | 87 +---- pkg/autoscaler/autoscaler_test.go | 556 ++++++--------------------- pkg/autoscaler/collector.go | 131 ++++++- pkg/autoscaler/collector_test.go | 81 +++- pkg/autoscaler/multiscaler.go | 12 +- pkg/autoscaler/multiscaler_test.go | 44 +-- pkg/autoscaler/stats_scraper_test.go | 6 +- 9 files changed, 331 insertions(+), 605 deletions(-) diff --git a/cmd/autoscaler/main.go b/cmd/autoscaler/main.go index 1504410f615d..5b43ed9be0f5 100644 --- a/cmd/autoscaler/main.go +++ b/cmd/autoscaler/main.go @@ -89,11 +89,11 @@ func main() { serviceInformer := kubeInformerFactory.Core().V1().Services() hpaInformer := kubeInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers() - collector := autoscaler.NewMetricCollector(statsScraperFactoryFunc(endpointsInformer.Lister()), statsCh, logger) + collector := autoscaler.NewMetricCollector(statsScraperFactoryFunc(endpointsInformer.Lister()), logger) // Set up scalers. // uniScalerFactory depends endpointsInformer to be set. - multiScaler := autoscaler.NewMultiScaler(stopCh, uniScalerFactoryFunc(endpointsInformer), logger) + multiScaler := autoscaler.NewMultiScaler(stopCh, uniScalerFactoryFunc(endpointsInformer, collector), logger) controllers := []*controller.Impl{ kpa.NewController(&opt, paInformer, sksInformer, serviceInformer, endpointsInformer, multiScaler, collector), @@ -135,7 +135,8 @@ func main() { if !ok { break } - multiScaler.RecordStat(sm.Key, sm.Stat) + multiScaler.Poke(sm.Key, sm.Stat) + collector.Record(sm.Key, sm.Stat) } }() @@ -166,7 +167,7 @@ func setupLogger() (*zap.SugaredLogger, zap.AtomicLevel) { return logging.NewLoggerFromConfig(loggingConfig, component) } -func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) { +func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer, metricClient autoscaler.MetricClient) func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) { return func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) { if v, ok := decider.Labels[serving.ConfigurationLabelKey]; !ok || v == "" { return nil, fmt.Errorf("label %q not found or empty in Decider %s", serving.ConfigurationLabelKey, decider.Name) @@ -184,7 +185,7 @@ func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) f return nil, err } - return autoscaler.New(decider.Namespace, endpointsInformer, decider.Spec, reporter) + return autoscaler.New(decider.Namespace, decider.Name, metricClient, endpointsInformer, decider.Spec, reporter) } } diff --git a/cmd/autoscaler/main_test.go b/cmd/autoscaler/main_test.go index b0c5a2c993e7..311b01cadaad 100644 --- a/cmd/autoscaler/main_test.go +++ b/cmd/autoscaler/main_test.go @@ -127,5 +127,11 @@ func TestUniScalerFactoryFunc(t *testing.T) { func getTestUniScalerFactory() func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) { kubeClient := fakeK8s.NewSimpleClientset() kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, 0) - return uniScalerFactoryFunc(kubeInformer.Core().V1().Endpoints()) + return uniScalerFactoryFunc(kubeInformer.Core().V1().Endpoints(), &testMetricClient{}) +} + +type testMetricClient struct{} + +func (t *testMetricClient) StableAndPanicConcurrency(key string) (float64, float64, error) { + return 1.0, 1.0, nil } diff --git a/pkg/autoscaler/autoscaler.go b/pkg/autoscaler/autoscaler.go index f7c832a28a88..14f5f3a3fd97 100644 --- a/pkg/autoscaler/autoscaler.go +++ b/pkg/autoscaler/autoscaler.go @@ -24,7 +24,6 @@ import ( "time" "github.com/knative/pkg/logging" - "github.com/knative/serving/pkg/autoscaler/aggregation" "github.com/knative/serving/pkg/resources" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -32,43 +31,11 @@ import ( corev1listers "k8s.io/client-go/listers/core/v1" ) -const ( - // bucketSize is the size of the buckets of stats we create. - bucketSize time.Duration = 2 * time.Second -) - -// Stat defines a single measurement at a point in time -type Stat struct { - // The time the data point was received by autoscaler. - Time *time.Time - - // The unique identity of this pod. Used to count how many pods - // are contributing to the metrics. - PodName string - - // Average number of requests currently being handled by this pod. - AverageConcurrentRequests float64 - - // Part of AverageConcurrentRequests, for requests going through a proxy. - AverageProxiedConcurrentRequests float64 - - // Number of requests received since last Stat (approximately QPS). - RequestCount int32 - - // Part of RequestCount, for requests going through a proxy. - ProxiedRequestCount int32 -} - -// StatMessage wraps a Stat with identifying information so it can be routed -// to the correct receiver. -type StatMessage struct { - Key string - Stat Stat -} - // Autoscaler stores current state of an instance of an autoscaler type Autoscaler struct { namespace string + revision string + metricClient MetricClient endpointsLister corev1listers.EndpointsLister reporter StatsReporter @@ -81,13 +48,13 @@ type Autoscaler struct { // specMux guards the current DeciderSpec. specMux sync.RWMutex deciderSpec DeciderSpec - - buckets *aggregation.TimedFloat64Buckets } // New creates a new instance of autoscaler func New( namespace string, + revision string, + metricClient MetricClient, endpointsInformer corev1informers.EndpointsInformer, deciderSpec DeciderSpec, reporter StatsReporter) (*Autoscaler, error) { @@ -103,9 +70,10 @@ func New( return &Autoscaler{ namespace: namespace, + revision: revision, + metricClient: metricClient, endpointsLister: endpointsInformer.Lister(), deciderSpec: deciderSpec, - buckets: aggregation.NewTimedFloat64Buckets(bucketSize), reporter: reporter, }, nil } @@ -118,21 +86,10 @@ func (a *Autoscaler) Update(deciderSpec DeciderSpec) error { return nil } -// Record a data point. -func (a *Autoscaler) Record(ctx context.Context, stat Stat) { - if stat.Time == nil { - logger := logging.FromContext(ctx) - logger.Errorf("Missing time from stat: %+v", stat) - return - } - - // Proxied requests have been counted at the activator. Subtract - // AverageProxiedConcurrentRequests to avoid double counting. - a.buckets.Record(*stat.Time, stat.PodName, stat.AverageConcurrentRequests-stat.AverageProxiedConcurrentRequests) -} - // Scale calculates the desired scale based on current statistics given the current time. -func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { +// desiredPodCount is the calculated pod count the autoscaler would like to set. +// validScale signifies whether the desiredPodCount should be applied or not. +func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, validScale bool) { logger := logging.FromContext(ctx) spec := a.currentSpec() @@ -148,22 +105,17 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { // Use 1 if there are zero current pods. readyPodsCount := math.Max(1, float64(originalReadyPodsCount)) - // Remove outdated data. - a.buckets.RemoveOlderThan(now.Add(-spec.MetricSpec.StableWindow)) - if a.buckets.IsEmpty() { - logger.Debug("No data to scale on.") + metricKey := NewMetricKey(a.namespace, a.revision) + observedStableConcurrency, observedPanicConcurrency, err := a.metricClient.StableAndPanicConcurrency(metricKey) + if err != nil { + if err == ErrNoData { + logger.Debug("No data to scale on yet") + } else { + logger.Errorw("Failed to obtain metrics", zap.Error(err)) + } return 0, false } - // Compute data to scale on. - panicAverage := aggregation.Average{} - stableAverage := aggregation.Average{} - a.buckets.ForEachBucket( - aggregation.YoungerThan(now.Add(-spec.MetricSpec.PanicWindow), panicAverage.Accumulate), - stableAverage.Accumulate, // No need to add a YoungerThan condition as we already deleted all outdated stats above. - ) - observedStableConcurrency := stableAverage.Value() - observedPanicConcurrency := panicAverage.Value() desiredStablePodCount := int32(math.Min(math.Ceil(observedStableConcurrency/spec.TargetConcurrency), spec.MaxScaleUpRate*readyPodsCount)) desiredPanicPodCount := int32(math.Min(math.Ceil(observedPanicConcurrency/spec.TargetConcurrency), spec.MaxScaleUpRate*readyPodsCount)) @@ -171,8 +123,8 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { a.reporter.ReportPanicRequestConcurrency(observedPanicConcurrency) a.reporter.ReportTargetRequestConcurrency(spec.TargetConcurrency) - logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds.", observedStableConcurrency, spec.MetricSpec.StableWindow) - logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds.", observedPanicConcurrency, spec.MetricSpec.PanicWindow) + logger.Debugf("STABLE: Observed average %0.3f concurrency, targeting %v.", observedStableConcurrency, spec.TargetConcurrency) + logger.Debugf("PANIC: Observed average %0.3f concurrency, targeting %v.", observedPanicConcurrency, spec.TargetConcurrency) isOverPanicThreshold := observedPanicConcurrency/readyPodsCount >= spec.PanicThreshold @@ -191,7 +143,6 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) { a.reporter.ReportPanic(0) } - var desiredPodCount int32 if a.panicTime != nil { logger.Debug("Operating in panic mode.") // We do not scale down while in panic mode. Only increases will be applied. diff --git a/pkg/autoscaler/autoscaler_test.go b/pkg/autoscaler/autoscaler_test.go index a9987a7ab0c3..2e3fc6e8e06f 100644 --- a/pkg/autoscaler/autoscaler_test.go +++ b/pkg/autoscaler/autoscaler_test.go @@ -17,6 +17,7 @@ limitations under the License. package autoscaler import ( + "errors" "fmt" "testing" "time" @@ -43,7 +44,7 @@ var ( func TestNew_ErrorWhenGivenNilInterface(t *testing.T) { var endpointsInformer corev1informers.EndpointsInformer - _, err := New(testNamespace, endpointsInformer, DeciderSpec{TargetConcurrency: 10, ServiceName: testService}, &mockReporter{}) + _, err := New(testNamespace, testRevision, &testMetricClient{}, endpointsInformer, DeciderSpec{TargetConcurrency: 10, ServiceName: testService}, &mockReporter{}) if err == nil { t.Error("Expected error when EndpointsInformer interface is nil, but got none.") } @@ -52,7 +53,7 @@ func TestNew_ErrorWhenGivenNilInterface(t *testing.T) { func TestNew_ErrorWhenGivenNilStatsReporter(t *testing.T) { var reporter StatsReporter - _, err := New(testNamespace, kubeInformer.Core().V1().Endpoints(), + _, err := New(testNamespace, testRevision, &testMetricClient{}, kubeInformer.Core().V1().Endpoints(), DeciderSpec{TargetConcurrency: 10, ServiceName: testService}, reporter) if err == nil { t.Error("Expected error when EndpointsInformer interface is nil, but got none.") @@ -61,401 +62,137 @@ func TestNew_ErrorWhenGivenNilStatsReporter(t *testing.T) { func TestAutoscaler_NoData_NoAutoscale(t *testing.T) { defer ClearAll() - a := newTestAutoscaler(10.0) - a.expectScale(t, roundedNow(), 0, false) + metrics := &testMetricClient{ + err: errors.New("no metrics"), + } + + a := newTestAutoscaler(10.0, metrics) + a.expectScale(t, time.Now(), 0, false) } func TestAutoscaler_NoDataAtZero_NoAutoscale(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 0, - endConcurrency: 0, - duration: stableWindow, - podCount: 1, - }) - - a.expectScale(t, now, 0, true) - now = now.Add(2 * time.Minute) - a.expectScale(t, now, 0, false) // do nothing + a := newTestAutoscaler(10.0, &testMetricClient{}) + a.expectScale(t, time.Now(), 0, true) } func TestAutoscaler_StableMode_NoChange(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 10, - duration: stableWindow, - podCount: 10, - }) - a.expectScale(t, now, 10, true) + metrics := &testMetricClient{stableConcurrency: 50.0} + a := newTestAutoscaler(10.0, metrics) + a.expectScale(t, time.Now(), 5, true) } -func TestAutoscaler_StableMode_SlowIncrease(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 20, - duration: stableWindow, - podCount: 10, - }) - a.expectScale(t, now, 15, true) -} +func TestAutoscaler_StableMode_Increase(t *testing.T) { + metrics := &testMetricClient{stableConcurrency: 50.0} + a := newTestAutoscaler(10.0, metrics) + a.expectScale(t, time.Now(), 5, true) -func TestAutoscaler_StableMode_SlowDecrease(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 20, - endConcurrency: 10, - duration: stableWindow, - podCount: 10, - }) - a.expectScale(t, now, 15, true) + metrics.stableConcurrency = 100.0 + a.expectScale(t, time.Now(), 10, true) } -func TestAutoscaler_StableModeLowPodCount_NoChange(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 10, - duration: stableWindow, - podCount: 1, - }) - a.expectScale(t, now, 1, true) +func TestAutoscaler_StableMode_Decrease(t *testing.T) { + metrics := &testMetricClient{stableConcurrency: 100.0} + a := newTestAutoscaler(10.0, metrics) + a.expectScale(t, time.Now(), 10, true) + + metrics.stableConcurrency = 50.0 + a.expectScale(t, time.Now(), 5, true) } func TestAutoscaler_StableModeNoTraffic_ScaleToZero(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 1, - endConcurrency: 1, - duration: stableWindow, - podCount: 1, - }) - a.expectScale(t, now, 1, true) - - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 0, - endConcurrency: 0, - duration: stableWindow + bucketSize, - podCount: 1, - }) - a.expectScale(t, now, 0, true) - - // Should not scale to zero again if there is no more traffic. - // Note: scale of 1 will be ignored since the autoscaler is not responsible for scaling from 0. - a.expectScale(t, now, 0, true) -} + metrics := &testMetricClient{stableConcurrency: 1.0} + a := newTestAutoscaler(10.0, metrics) + a.expectScale(t, time.Now(), 1, true) -func TestAutoscaler_StableModeLowTraffic_NoChange(t *testing.T) { - a := newTestAutoscaler(10.0) - - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 1, - endConcurrency: 1, - duration: time.Second, - podCount: 1, - }) - a.expectScale(t, now, 1, true) - - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 0, - endConcurrency: 0, - duration: stableWindow - bucketSize, - podCount: 1, - }) - a.expectScale(t, now, 1, true) + metrics.stableConcurrency = 0.0 + a.expectScale(t, time.Now(), 0, true) } func TestAutoscaler_PanicMode_DoublePodCount(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 10, - duration: stableWindow, - podCount: 10, - }) - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 20, - endConcurrency: 20, - duration: panicWindow, - podCount: 10, - }) - a.expectScale(t, now, 20, true) + metrics := &testMetricClient{stableConcurrency: 50.0, panicConcurrency: 100.0} + a := newTestAutoscaler(10.0, metrics) + + // PanicConcurrency takes precedence. + a.expectScale(t, time.Now(), 10, true) } // QPS is increasing exponentially. Each scaling event bring concurrency // back to the target level (1.0) but then traffic continues to increase. // At 1296 QPS traffic stablizes. func TestAutoscaler_PanicModeExponential_TrackAndStablize(t *testing.T) { - a := newTestAutoscaler(1.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 1, - endConcurrency: 10, - duration: panicWindow, - podCount: 1, - }) - a.expectScale(t, now, 6, true) - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 1, - endConcurrency: 10, - duration: panicWindow, - podCount: 6, - }) - a.expectScale(t, now, 36, true) - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 1, - endConcurrency: 10, - duration: panicWindow, - podCount: 36, - }) - a.expectScale(t, now, 216, true) - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 1, - endConcurrency: 10, - duration: panicWindow, - podCount: 216, - }) - a.expectScale(t, now, 1296, true) - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 1, - endConcurrency: 1, // achieved desired concurrency - duration: panicWindow, - podCount: 1296, - }) - a.expectScale(t, now, 1296, true) -} - -func TestAutoscaler_PanicThenUnPanic_ScaleDown(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 10, - duration: stableWindow, - podCount: 10, - }) - a.expectScale(t, now, 10, true) - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 100, - endConcurrency: 100, - duration: panicWindow, - podCount: 10, - }) - a.expectScale(t, now, 100, true) - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 1, // traffic drops off - endConcurrency: 1, - duration: 30 * time.Second, - podCount: 100, - }) - a.expectScale(t, now, 100, true) // still in panic mode--no decrease - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 1, - endConcurrency: 1, - duration: 31 * time.Second, - podCount: 100, - }) - a.expectScale(t, now, 10, true) // back to stable mode -} + metrics := &testMetricClient{panicConcurrency: 6.0} + a := newTestAutoscaler(1.0, metrics) + a.expectScale(t, time.Now(), 6, true) + endpoints(6) -func TestAutoscaler_Activator_CausesInstantScale(t *testing.T) { - a := newTestAutoscaler(10.0) + metrics.panicConcurrency = 36.0 + a.expectScale(t, time.Now(), 36, true) + endpoints(36) - now := roundedNow() - now = a.recordMetric(t, Stat{ - Time: &now, - PodName: activatorPodName, - RequestCount: 0, - AverageConcurrentRequests: 100.0, - }) + metrics.panicConcurrency = 216.0 + a.expectScale(t, time.Now(), 216, true) + endpoints(216) - a.expectScale(t, now, 10, true) + metrics.panicConcurrency = 1296.0 + a.expectScale(t, time.Now(), 1296, true) + endpoints(1296) } -func TestAutoscaler_Activator_MultipleInstancesAreAggregated(t *testing.T) { - a := newTestAutoscaler(10.0) - - now := roundedNow() - now = a.recordMetric(t, Stat{ - Time: &now, - PodName: activatorPodName + "-0", - RequestCount: 0, - AverageConcurrentRequests: 50.0, - }) - now = a.recordMetric(t, Stat{ - Time: &now, - PodName: activatorPodName + "-1", - RequestCount: 0, - AverageConcurrentRequests: 50.0, - }) +func TestAutoscaler_PanicThenUnPanic_ScaleDown(t *testing.T) { + metrics := &testMetricClient{stableConcurrency: 100.0, panicConcurrency: 100.0} + a := newTestAutoscaler(10.0, metrics) + a.expectScale(t, time.Now(), 10, true) + endpoints(10) - a.expectScale(t, now, 10, true) -} + panicTime := time.Now() + metrics.panicConcurrency = 1000.0 + a.expectScale(t, panicTime, 100, true) -// Autoscaler should drop data after 60 seconds. -func TestAutoscaler_Stats_TrimAfterStableWindow(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 10, - duration: stableWindow, - podCount: 1, - }) - a.expectScale(t, now, 1, true) - now = now.Add(time.Minute) - a.expectScale(t, now, 0, false) -} + // Traffic dropped off, scale stays as we're still in panic. + metrics.panicConcurrency = 1.0 + metrics.stableConcurrency = 1.0 + a.expectScale(t, panicTime.Add(30*time.Second), 100, true) -func TestAutoscaler_Stats_DenyNoTime(t *testing.T) { - a := newTestAutoscaler(10.0) - stat := Stat{ - Time: nil, - PodName: "pod-1", - AverageConcurrentRequests: 1.0, - RequestCount: 5, - } - a.Record(TestContextWithLogger(t), stat) - a.expectScale(t, roundedNow(), 0, false) + // Scale down after the StableWindow + a.expectScale(t, panicTime.Add(61*time.Second), 1, true) } func TestAutoscaler_RateLimit_ScaleUp(t *testing.T) { - a := newTestAutoscaler(10.0) - - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 1000, - endConcurrency: 1000, - duration: time.Second, - podCount: 1, - }) + metrics := &testMetricClient{stableConcurrency: 1000.0} + a := newTestAutoscaler(10.0, metrics) + endpoints(1) // Need 100 pods but only scale x10 - a.expectScale(t, now, 10, true) - - now = a.recordLinearSeries( - t, - now, - linearSeries{ - startConcurrency: 1000, - endConcurrency: 1000, - duration: time.Second, - podCount: 10, - }) + a.expectScale(t, time.Now(), 10, true) + endpoints(10) // Scale x10 again - a.expectScale(t, now, 100, true) + a.expectScale(t, time.Now(), 100, true) } -func TestAutoscaler_UseOnePodAsMinimunIfEndpointsNotFound(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 1000, - endConcurrency: 1000, - duration: time.Second, - podCount: 2, - }) - ep := makeEndpoints() - kubeClient.CoreV1().Endpoints(testNamespace).Update(ep) - kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Update(ep) +func TestAutoscaler_UseOnePodAsMinimumIfEndpointsNotFound(t *testing.T) { + metrics := &testMetricClient{stableConcurrency: 1000.0} + a := newTestAutoscaler(10.0, metrics) + + endpoints(0) // 2*10 as the rate limited if we can get the actual pods number. // 1*10 as the rate limited since no read pods are there from K8S API. - a.expectScale(t, now, 10, true) - - now = a.recordLinearSeries( - t, - now.Add(60*time.Second), - linearSeries{ - startConcurrency: 1000, - endConcurrency: 1000, - duration: time.Second, - podCount: 2, - }) - kubeClient.CoreV1().Endpoints(testNamespace).Delete(ep.Name, nil) + a.expectScale(t, time.Now(), 10, true) + + ep, _ := kubeClient.CoreV1().Endpoints(testNamespace).Get(testService, metav1.GetOptions{}) + kubeClient.CoreV1().Endpoints(testNamespace).Delete(testService, nil) kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Delete(ep) // 2*10 as the rate limited if we can get the actual pods number. // 1*10 as the rate limited since no Endpoints object is there from K8S API. - a.expectScale(t, now, 10, true) + a.expectScale(t, time.Now(), 10, true) } func TestAutoscaler_UpdateTarget(t *testing.T) { - a := newTestAutoscaler(10.0) - now := a.recordLinearSeries( - t, - roundedNow(), - linearSeries{ - startConcurrency: 10, - endConcurrency: 10, - duration: stableWindow, - podCount: 10, - }) - a.expectScale(t, now, 10, true) + metrics := &testMetricClient{stableConcurrency: 100.0} + a := newTestAutoscaler(10.0, metrics) + a.expectScale(t, time.Now(), 10, true) + endpoints(10) + a.Update(DeciderSpec{ TargetConcurrency: 1.0, PanicThreshold: 2.0, @@ -463,42 +200,7 @@ func TestAutoscaler_UpdateTarget(t *testing.T) { MetricSpec: a.deciderSpec.MetricSpec, ServiceName: testService, }) - a.expectScale(t, now, 100, true) -} - -func TestAutoScaler_NotCountProxied(t *testing.T) { - a := newTestAutoscaler(1.0) - now := roundedNow() - stat := Stat{ - Time: &now, - PodName: "activator", - AverageConcurrentRequests: 1.0, - RequestCount: 1, - } - a.Record(TestContextWithLogger(t), stat) - // This stat indicate 3 pending requests, one of the which is proxied. - // So the concurrency from this stat is 2.0, because the proxied one - // has been counted at the activator. - stat = Stat{ - Time: &now, - PodName: "pod1", - AverageConcurrentRequests: 3.0, - AverageProxiedConcurrentRequests: 1.0, - RequestCount: 4, - ProxiedRequestCount: 2, - } - a.Record(TestContextWithLogger(t), stat) - // The total concurrency is 3.0, with 1.0 from "activator" and 2.0 from - // "pod1". With target concurrency of 1.0, this results in 3 desired pods. - a.expectScale(t, now, 3, true) -} - -type linearSeries struct { - startConcurrency int - endConcurrency int - duration time.Duration - podCount int - podIDOffset int + a.expectScale(t, time.Now(), 100, true) } type mockReporter struct{} @@ -543,7 +245,7 @@ func (r *mockReporter) ReportPanic(v int64) error { return nil } -func newTestAutoscaler(containerConcurrency int) *Autoscaler { +func newTestAutoscaler(containerConcurrency int, metrics MetricClient) *Autoscaler { deciderSpec := DeciderSpec{ TargetConcurrency: float64(containerConcurrency), PanicThreshold: 2 * float64(containerConcurrency), @@ -555,50 +257,10 @@ func newTestAutoscaler(containerConcurrency int) *Autoscaler { ServiceName: testService, } - a, _ := New(testNamespace, kubeInformer.Core().V1().Endpoints(), deciderSpec, &mockReporter{}) + a, _ := New(testNamespace, testRevision, metrics, kubeInformer.Core().V1().Endpoints(), deciderSpec, &mockReporter{}) return a } -// Record a data point every second, for every pod, for duration of the -// linear series, on the line from start to end concurrency. -func (a *Autoscaler) recordLinearSeries(test *testing.T, now time.Time, s linearSeries) time.Time { - points := make([]int32, 0, int(s.duration.Seconds()+1)) - for i := 1; i <= int(s.duration.Seconds()); i++ { - points = append(points, - int32( - float64(s.startConcurrency)+ - float64(s.endConcurrency-s.startConcurrency)*(float64(i)/s.duration.Seconds()))) - } - test.Logf("Recording points: %v.", points) - for _, point := range points { - t := now - now = now.Add(time.Second) - for j := 1; j <= s.podCount; j++ { - t = t.Add(time.Millisecond) - requestCount := 0 - if point > 0 { - requestCount = 1 - } - stat := Stat{ - Time: &t, - PodName: fmt.Sprintf("pod-%v", j+s.podIDOffset), - AverageConcurrentRequests: float64(point), - RequestCount: int32(requestCount), - } - a.Record(TestContextWithLogger(test), stat) - } - } - // Change the IP count according to podCount - createEndpoints(addIps(makeEndpoints(), s.podCount)) - return now -} - -// Record a single datapoint -func (a *Autoscaler) recordMetric(test *testing.T, stat Stat) time.Time { - a.Record(TestContextWithLogger(test), stat) - return *stat.Time -} - func (a *Autoscaler) expectScale(t *testing.T, now time.Time, expectScale int32, expectOk bool) { t.Helper() scale, ok := a.Scale(TestContextWithLogger(t), now) @@ -610,32 +272,32 @@ func (a *Autoscaler) expectScale(t *testing.T, now time.Time, expectScale int32, } } -func makeEndpoints() *corev1.Endpoints { - return &corev1.Endpoints{ +type testMetricClient struct { + stableConcurrency float64 + panicConcurrency float64 + err error +} + +func (t *testMetricClient) StableAndPanicConcurrency(key string) (float64, float64, error) { + return t.stableConcurrency, t.panicConcurrency, t.err +} + +func endpoints(count int) { + epAddresses := make([]corev1.EndpointAddress, count) + for i := 0; i < count; i++ { + ip := fmt.Sprintf("127.0.0.%v", i+1) + epAddresses[i] = corev1.EndpointAddress{IP: ip} + } + + ep := &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNamespace, Name: testService, }, + Subsets: []corev1.EndpointSubset{{ + Addresses: epAddresses, + }}, } -} - -func addIps(ep *corev1.Endpoints, ipCount int) *corev1.Endpoints { - epAddresses := []corev1.EndpointAddress{} - for i := 1; i <= ipCount; i++ { - ip := fmt.Sprintf("127.0.0.%v", i) - epAddresses = append(epAddresses, corev1.EndpointAddress{IP: ip}) - } - ep.Subsets = []corev1.EndpointSubset{{ - Addresses: epAddresses, - }} - return ep -} - -func createEndpoints(ep *corev1.Endpoints) { kubeClient.CoreV1().Endpoints(testNamespace).Create(ep) kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(ep) } - -func roundedNow() time.Time { - return time.Now().Truncate(bucketSize) -} diff --git a/pkg/autoscaler/collector.go b/pkg/autoscaler/collector.go index 8342cc6f6ff5..d31eb6287b32 100644 --- a/pkg/autoscaler/collector.go +++ b/pkg/autoscaler/collector.go @@ -18,9 +18,12 @@ package autoscaler import ( "context" + "errors" "sync" "time" + "github.com/knative/serving/pkg/autoscaler/aggregation" + kpa "github.com/knative/serving/pkg/apis/autoscaling/v1alpha1" "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -32,6 +35,14 @@ const ( // all pods of a revision. // TODO(yanweiguo): tuning this value. To be based on pod population? scrapeTickInterval = time.Second / 3 + + // bucketSize is the size of the buckets of stats we create. + bucketSize = 2 * time.Second +) + +var ( + // ErrNoData denotes that the collector could not calculate data. + ErrNoData = errors.New("no data available") ) // Metric represents a resource to configure the metric collector with. @@ -54,24 +65,59 @@ type MetricStatus struct{} // StatsScraperFactory creates a StatsScraper for a given Metric. type StatsScraperFactory func(*Metric) (StatsScraper, error) +// Stat defines a single measurement at a point in time +type Stat struct { + // The time the data point was received by autoscaler. + Time *time.Time + + // The unique identity of this pod. Used to count how many pods + // are contributing to the metrics. + PodName string + + // Average number of requests currently being handled by this pod. + AverageConcurrentRequests float64 + + // Part of AverageConcurrentRequests, for requests going through a proxy. + AverageProxiedConcurrentRequests float64 + + // Number of requests received since last Stat (approximately QPS). + RequestCount int32 + + // Part of RequestCount, for requests going through a proxy. + ProxiedRequestCount int32 +} + +// StatMessage wraps a Stat with identifying information so it can be routed +// to the correct receiver. +type StatMessage struct { + Key string + Stat Stat +} + +// MetricClient surfaces the metrics that can be obtained via the collector. +type MetricClient interface { + // StableAndPanicConcurrency returns both the stable and the panic concurrency. + StableAndPanicConcurrency(key string) (float64, float64, error) +} + // MetricCollector manages collection of metrics for many entities. type MetricCollector struct { logger *zap.SugaredLogger statsScraperFactory StatsScraperFactory - statsCh chan *StatMessage collections map[string]*collection collectionsMutex sync.RWMutex } +var _ MetricClient = &MetricCollector{} + // NewMetricCollector creates a new metric collector. -func NewMetricCollector(statsScraperFactory StatsScraperFactory, statsCh chan *StatMessage, logger *zap.SugaredLogger) *MetricCollector { +func NewMetricCollector(statsScraperFactory StatsScraperFactory, logger *zap.SugaredLogger) *MetricCollector { collector := &MetricCollector{ logger: logger, collections: make(map[string]*collection), statsScraperFactory: statsScraperFactory, - statsCh: statsCh, } return collector @@ -107,7 +153,7 @@ func (c *MetricCollector) Create(ctx context.Context, metric *Metric) (*Metric, if err != nil { return nil, err } - coll = newCollection(metric, scraper, c.statsCh, c.logger) + coll = newCollection(metric, scraper, c.logger) c.collections[key] = coll } @@ -143,18 +189,43 @@ func (c *MetricCollector) Delete(ctx context.Context, namespace, name string) er return nil } +// Record records a stat that's been generated outside of the metric collector. +func (c *MetricCollector) Record(key string, stat Stat) { + c.collectionsMutex.RLock() + defer c.collectionsMutex.RUnlock() + + if collection, exists := c.collections[key]; exists { + collection.record(stat) + } +} + +// StableAndPanicConcurrency returns both the stable and the panic concurrency. +func (c *MetricCollector) StableAndPanicConcurrency(key string) (float64, float64, error) { + collection, exists := c.collections[key] + if !exists { + return 0, 0, k8serrors.NewNotFound(kpa.Resource("Metrics"), key) + } + + return collection.stableAndPanicConcurrency(time.Now()) +} + // collection represents the collection of metrics for one specific entity. type collection struct { - metricMux sync.RWMutex - metric *Metric + metricMutex sync.RWMutex + metric *Metric + + buckets *aggregation.TimedFloat64Buckets grp sync.WaitGroup stopCh chan struct{} } -func newCollection(metric *Metric, scraper StatsScraper, statsCh chan *StatMessage, logger *zap.SugaredLogger) *collection { +// newCollection creates a new collection. +func newCollection(metric *Metric, scraper StatsScraper, logger *zap.SugaredLogger) *collection { c := &collection{ - metric: metric, + metric: metric, + buckets: aggregation.NewTimedFloat64Buckets(bucketSize), + stopCh: make(chan struct{}), } @@ -174,7 +245,7 @@ func newCollection(metric *Metric, scraper StatsScraper, statsCh chan *StatMessa logger.Errorw("Failed to scrape metrics", zap.Error(err)) } if message != nil { - statsCh <- message + c.record(message.Stat) } } } @@ -183,13 +254,51 @@ func newCollection(metric *Metric, scraper StatsScraper, statsCh chan *StatMessa return c } +// updateMetric safely updates the metric stored in the collection. func (c *collection) updateMetric(metric *Metric) { - c.metricMux.Lock() - defer c.metricMux.Unlock() + c.metricMutex.Lock() + defer c.metricMutex.Unlock() c.metric = metric } +// currentMetric safely returns the current metric stored in the collection. +func (c *collection) currentMetric() *Metric { + c.metricMutex.RLock() + defer c.metricMutex.RUnlock() + + return c.metric +} + +// record adds a stat to the current collection. +func (c *collection) record(stat Stat) { + // Proxied requests have been counted at the activator. Subtract + // AverageProxiedConcurrentRequests to avoid double counting. + c.buckets.Record(*stat.Time, stat.PodName, stat.AverageConcurrentRequests-stat.AverageProxiedConcurrentRequests) +} + +// stableAndPanicConcurrency calculates both stable and panic concurrency based on the +// current stats. +func (c *collection) stableAndPanicConcurrency(now time.Time) (float64, float64, error) { + spec := c.currentMetric().Spec + + c.buckets.RemoveOlderThan(now.Add(-spec.StableWindow)) + + if c.buckets.IsEmpty() { + return 0, 0, ErrNoData + } + + panicAverage := aggregation.Average{} + stableAverage := aggregation.Average{} + c.buckets.ForEachBucket( + aggregation.YoungerThan(now.Add(-spec.PanicWindow), panicAverage.Accumulate), + stableAverage.Accumulate, // No need to add a YoungerThan condition as we already deleted all outdated stats above. + ) + + return stableAverage.Value(), panicAverage.Value(), nil +} + +// close stops collecting metrics, stops the scraper. func (c *collection) close() { close(c.stopCh) c.grp.Wait() diff --git a/pkg/autoscaler/collector_test.go b/pkg/autoscaler/collector_test.go index daa4d61eb993..ffd8786273de 100644 --- a/pkg/autoscaler/collector_test.go +++ b/pkg/autoscaler/collector_test.go @@ -25,7 +25,9 @@ import ( "github.com/google/go-cmp/cmp" . "github.com/knative/pkg/logging/testing" + k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" ) var ( @@ -36,6 +38,10 @@ var ( Namespace: defaultNamespace, Name: defaultName, }, + Spec: MetricSpec{ + StableWindow: 60 * time.Second, + PanicWindow: 6 * time.Second, + }, } ) @@ -49,10 +55,9 @@ func TestMetricCollectorCrud(t *testing.T) { return nil, nil }) factory := scraperFactory(scraper, nil) - statsCh := make(chan *StatMessage) t.Run("error on mismatch", func(t *testing.T) { - coll := NewMetricCollector(factory, statsCh, logger) + coll := NewMetricCollector(factory, logger) coll.Create(ctx, &Metric{ ObjectMeta: metav1.ObjectMeta{ Namespace: "another-namespace", @@ -71,7 +76,7 @@ func TestMetricCollectorCrud(t *testing.T) { want := errors.New("factory failure") failingFactory := scraperFactory(nil, want) - coll := NewMetricCollector(failingFactory, statsCh, logger) + coll := NewMetricCollector(failingFactory, logger) _, got := coll.Create(ctx, defaultMetric) if got != want { @@ -80,7 +85,7 @@ func TestMetricCollectorCrud(t *testing.T) { }) t.Run("full crud", func(t *testing.T) { - coll := NewMetricCollector(factory, statsCh, logger) + coll := NewMetricCollector(factory, logger) coll.Create(ctx, defaultMetric) got, err := coll.Get(ctx, defaultNamespace, defaultName) @@ -111,32 +116,74 @@ func TestMetricCollectorScraper(t *testing.T) { logger := TestLogger(t) ctx := context.Background() - want := &StatMessage{ - Key: NewMetricKey(defaultNamespace, defaultName), + now := time.Now() + metricKey := NewMetricKey(defaultNamespace, defaultName) + want := 10.0 + stat := &StatMessage{ + Key: metricKey, Stat: Stat{ - PodName: "testPod", + Time: &now, + PodName: "testPod", + AverageConcurrentRequests: 10.0, }, } scraper := testScraper(func() (*StatMessage, error) { - return want, nil + return stat, nil }) factory := scraperFactory(scraper, nil) - statsCh := make(chan *StatMessage) - coll := NewMetricCollector(factory, statsCh, logger) + coll := NewMetricCollector(factory, logger) coll.Create(ctx, defaultMetric) - got := <-statsCh + // stable concurrency should eventually be equal to the stat. + var got float64 + wait.PollImmediate(10*time.Millisecond, 1*time.Second, func() (bool, error) { + got, _, _ = coll.StableAndPanicConcurrency(metricKey) + return got == want, nil + }) if got != want { - t.Errorf("<-statsCh = %v, want %v", got, want) + t.Errorf("StableAndPanicConcurrency() = %v, want %v", got, want) } coll.Delete(ctx, defaultNamespace, defaultName) - select { - case <-time.After(scrapeTickInterval * 2): - // All good! - case <-statsCh: - t.Error("Got unexpected metric after stopping collection") + _, _, err := coll.StableAndPanicConcurrency(metricKey) + if !k8serrors.IsNotFound(err) { + t.Errorf("StableAndPanicConcurrency() = %v, want a not found error", err) + } +} + +func TestMetricCollectorRecord(t *testing.T) { + defer ClearAll() + + logger := TestLogger(t) + ctx := context.Background() + + now := time.Now() + metricKey := NewMetricKey(defaultNamespace, defaultName) + want := 10.0 + stat := Stat{ + Time: &now, + PodName: "testPod", + AverageConcurrentRequests: want + 10, + AverageProxiedConcurrentRequests: 10, // this should be subtracted from the above. + } + scraper := testScraper(func() (*StatMessage, error) { + return nil, nil + }) + factory := scraperFactory(scraper, nil) + + coll := NewMetricCollector(factory, logger) + + // Freshly created collection does not contain any metrics and should return an error. + coll.Create(ctx, defaultMetric) + if _, _, err := coll.StableAndPanicConcurrency(metricKey); err == nil { + t.Error("StableAndPanicConcurrency() = nil, wanted an error") + } + + // After adding a stat the concurrencies are calculated correctly. + coll.Record(metricKey, stat) + if stable, panic, err := coll.StableAndPanicConcurrency(metricKey); stable != panic && stable != want && err != nil { + t.Errorf("StableAndPanicConcurrency() = %v, %v, %v; want %v, %v, nil", stable, panic, err, want, want) } } diff --git a/pkg/autoscaler/multiscaler.go b/pkg/autoscaler/multiscaler.go index e5ad35288bde..24f8a95f9f18 100644 --- a/pkg/autoscaler/multiscaler.go +++ b/pkg/autoscaler/multiscaler.go @@ -23,7 +23,6 @@ import ( "time" "github.com/knative/pkg/logging" - "github.com/knative/pkg/logging/logkey" kpa "github.com/knative/serving/pkg/apis/autoscaling/v1alpha1" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/errors" @@ -59,9 +58,6 @@ type DeciderStatus struct { // UniScaler records statistics for a particular Decider and proposes the scale for the Decider's target based on those statistics. type UniScaler interface { - // Record records the given statistics. - Record(context.Context, Stat) - // Scale either proposes a number of replicas or skips proposing. The proposal is requested at the given time. // The returned boolean is true if and only if a proposal was returned. Scale(context.Context, time.Time) (int32, bool) @@ -275,8 +271,8 @@ func (m *MultiScaler) tickScaler(ctx context.Context, scaler UniScaler, runner * } } -// RecordStat records some statistics for the given Decider. -func (m *MultiScaler) RecordStat(key string, stat Stat) { +// Poke checks if the autoscaler needs to be run immediately. +func (m *MultiScaler) Poke(key string, stat Stat) { m.scalersMutex.RLock() defer m.scalersMutex.RUnlock() @@ -285,10 +281,6 @@ func (m *MultiScaler) RecordStat(key string, stat Stat) { return } - logger := m.logger.With(zap.String(logkey.Key, key)) - ctx := logging.WithLogger(context.Background(), logger) - - scaler.scaler.Record(ctx, stat) if scaler.getLatestScale() == 0 && stat.AverageConcurrentRequests != 0 { scaler.pokeCh <- struct{}{} } diff --git a/pkg/autoscaler/multiscaler_test.go b/pkg/autoscaler/multiscaler_test.go index 67d61a9f784f..e84627507ad9 100644 --- a/pkg/autoscaler/multiscaler_test.go +++ b/pkg/autoscaler/multiscaler_test.go @@ -200,7 +200,7 @@ func TestMultiScalerScaleFromZero(t *testing.T) { AverageConcurrentRequests: 1, RequestCount: 1, } - ms.RecordStat(testKPAKey, testStat) + ms.Poke(testKPAKey, testStat) // Verify that we see a "tick" if err := verifyTick(errCh); err != nil { @@ -252,48 +252,6 @@ func TestMultiScalerIgnoreNegativeScale(t *testing.T) { } } -func TestMultiScalerRecordsStatistics(t *testing.T) { - ctx := context.Background() - ms, stopCh, statCh, uniScaler := createMultiScaler(t) - defer close(stopCh) - defer close(statCh) - - decider := newDecider() - - uniScaler.setScaleResult(1, true) - - _, err := ms.Create(ctx, decider) - if err != nil { - t.Errorf("Create() = %v", err) - } - - now := time.Now() - testStat := Stat{ - Time: &now, - PodName: "test-pod", - AverageConcurrentRequests: 3.5, - RequestCount: 20, - } - - ms.RecordStat(testKPAKey, testStat) - uniScaler.checkLastStat(t, testStat) - - testStat.RequestCount = 10 - ms.RecordStat(testKPAKey, testStat) - uniScaler.checkLastStat(t, testStat) - - err = ms.Delete(ctx, decider.Namespace, decider.Name) - if err != nil { - t.Errorf("Delete() = %v", err) - } - - // Should not continue to record statistics after the KPA has been deleted. - newStat := testStat - newStat.RequestCount = 30 - ms.RecordStat(testKPAKey, newStat) - uniScaler.checkLastStat(t, testStat) -} - func TestMultiScalerUpdate(t *testing.T) { ctx := context.Background() ms, stopCh, statCh, uniScaler := createMultiScaler(t) diff --git a/pkg/autoscaler/stats_scraper_test.go b/pkg/autoscaler/stats_scraper_test.go index 6bf9dd057c4a..3663f4fcfa01 100644 --- a/pkg/autoscaler/stats_scraper_test.go +++ b/pkg/autoscaler/stats_scraper_test.go @@ -113,7 +113,7 @@ func TestScrape_HappyCase(t *testing.T) { } // Make an Endpoints with 2 pods. - createEndpoints(addIps(makeEndpoints(), 2)) + endpoints(2) got, err := scraper.Scrape() if err != nil { t.Fatalf("unexpected error from scraper.Scrape(): %v", err) @@ -154,7 +154,7 @@ func TestScrape_PopulateErrorFromScrapeClient(t *testing.T) { } // Make an Endpoints with 2 pods. - createEndpoints(addIps(makeEndpoints(), 2)) + endpoints(2) if _, err := scraper.Scrape(); err != nil { if got, want := err.Error(), errMsg; got != want { @@ -174,7 +174,7 @@ func TestScrape_DoNotScrapeIfNoPodsFound(t *testing.T) { } // Override the Endpoints with 0 pods. - createEndpoints(addIps(makeEndpoints(), 0)) + endpoints(0) stat, err := scraper.Scrape() if err != nil {