Skip to content

Commit

Permalink
Autoscaler fetches metrics via a client from the collector. (#4007)
Browse files Browse the repository at this point in the history
* Autoscaler fetches metrics via a client from the collector.

* Add some documentation.

* Remove 'Size' function.

* Add constant for bucketSize.

* Return an error if no data is available yet instead of 0.

* Add special case for 'no data available
' error

* Review feedback.
  • Loading branch information
markusthoemmes authored and knative-prow-robot committed May 9, 2019
1 parent b4bc050 commit c425b53
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 605 deletions.
11 changes: 6 additions & 5 deletions cmd/autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ func main() {
serviceInformer := kubeInformerFactory.Core().V1().Services()
hpaInformer := kubeInformerFactory.Autoscaling().V1().HorizontalPodAutoscalers()

collector := autoscaler.NewMetricCollector(statsScraperFactoryFunc(endpointsInformer.Lister()), statsCh, logger)
collector := autoscaler.NewMetricCollector(statsScraperFactoryFunc(endpointsInformer.Lister()), logger)

// Set up scalers.
// uniScalerFactory depends endpointsInformer to be set.
multiScaler := autoscaler.NewMultiScaler(stopCh, uniScalerFactoryFunc(endpointsInformer), logger)
multiScaler := autoscaler.NewMultiScaler(stopCh, uniScalerFactoryFunc(endpointsInformer, collector), logger)

controllers := []*controller.Impl{
kpa.NewController(&opt, paInformer, sksInformer, serviceInformer, endpointsInformer, multiScaler, collector),
Expand Down Expand Up @@ -135,7 +135,8 @@ func main() {
if !ok {
break
}
multiScaler.RecordStat(sm.Key, sm.Stat)
multiScaler.Poke(sm.Key, sm.Stat)
collector.Record(sm.Key, sm.Stat)
}
}()

Expand Down Expand Up @@ -166,7 +167,7 @@ func setupLogger() (*zap.SugaredLogger, zap.AtomicLevel) {
return logging.NewLoggerFromConfig(loggingConfig, component)
}

func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) {
func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer, metricClient autoscaler.MetricClient) func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) {
return func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) {
if v, ok := decider.Labels[serving.ConfigurationLabelKey]; !ok || v == "" {
return nil, fmt.Errorf("label %q not found or empty in Decider %s", serving.ConfigurationLabelKey, decider.Name)
Expand All @@ -184,7 +185,7 @@ func uniScalerFactoryFunc(endpointsInformer corev1informers.EndpointsInformer) f
return nil, err
}

return autoscaler.New(decider.Namespace, endpointsInformer, decider.Spec, reporter)
return autoscaler.New(decider.Namespace, decider.Name, metricClient, endpointsInformer, decider.Spec, reporter)
}
}

Expand Down
8 changes: 7 additions & 1 deletion cmd/autoscaler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,11 @@ func TestUniScalerFactoryFunc(t *testing.T) {
func getTestUniScalerFactory() func(decider *autoscaler.Decider) (autoscaler.UniScaler, error) {
kubeClient := fakeK8s.NewSimpleClientset()
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, 0)
return uniScalerFactoryFunc(kubeInformer.Core().V1().Endpoints())
return uniScalerFactoryFunc(kubeInformer.Core().V1().Endpoints(), &testMetricClient{})
}

type testMetricClient struct{}

func (t *testMetricClient) StableAndPanicConcurrency(key string) (float64, float64, error) {
return 1.0, 1.0, nil
}
87 changes: 19 additions & 68 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,18 @@ import (
"time"

"github.com/knative/pkg/logging"
"github.com/knative/serving/pkg/autoscaler/aggregation"
"github.com/knative/serving/pkg/resources"
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
)

const (
// bucketSize is the size of the buckets of stats we create.
bucketSize time.Duration = 2 * time.Second
)

// Stat defines a single measurement at a point in time
type Stat struct {
// The time the data point was received by autoscaler.
Time *time.Time

// The unique identity of this pod. Used to count how many pods
// are contributing to the metrics.
PodName string

// Average number of requests currently being handled by this pod.
AverageConcurrentRequests float64

// Part of AverageConcurrentRequests, for requests going through a proxy.
AverageProxiedConcurrentRequests float64

// Number of requests received since last Stat (approximately QPS).
RequestCount int32

// Part of RequestCount, for requests going through a proxy.
ProxiedRequestCount int32
}

// StatMessage wraps a Stat with identifying information so it can be routed
// to the correct receiver.
type StatMessage struct {
Key string
Stat Stat
}

// Autoscaler stores current state of an instance of an autoscaler
type Autoscaler struct {
namespace string
revision string
metricClient MetricClient
endpointsLister corev1listers.EndpointsLister
reporter StatsReporter

Expand All @@ -81,13 +48,13 @@ type Autoscaler struct {
// specMux guards the current DeciderSpec.
specMux sync.RWMutex
deciderSpec DeciderSpec

buckets *aggregation.TimedFloat64Buckets
}

// New creates a new instance of autoscaler
func New(
namespace string,
revision string,
metricClient MetricClient,
endpointsInformer corev1informers.EndpointsInformer,
deciderSpec DeciderSpec,
reporter StatsReporter) (*Autoscaler, error) {
Expand All @@ -103,9 +70,10 @@ func New(

return &Autoscaler{
namespace: namespace,
revision: revision,
metricClient: metricClient,
endpointsLister: endpointsInformer.Lister(),
deciderSpec: deciderSpec,
buckets: aggregation.NewTimedFloat64Buckets(bucketSize),
reporter: reporter,
}, nil
}
Expand All @@ -118,21 +86,10 @@ func (a *Autoscaler) Update(deciderSpec DeciderSpec) error {
return nil
}

// Record a data point.
func (a *Autoscaler) Record(ctx context.Context, stat Stat) {
if stat.Time == nil {
logger := logging.FromContext(ctx)
logger.Errorf("Missing time from stat: %+v", stat)
return
}

// Proxied requests have been counted at the activator. Subtract
// AverageProxiedConcurrentRequests to avoid double counting.
a.buckets.Record(*stat.Time, stat.PodName, stat.AverageConcurrentRequests-stat.AverageProxiedConcurrentRequests)
}

// Scale calculates the desired scale based on current statistics given the current time.
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
// desiredPodCount is the calculated pod count the autoscaler would like to set.
// validScale signifies whether the desiredPodCount should be applied or not.
func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount int32, validScale bool) {
logger := logging.FromContext(ctx)

spec := a.currentSpec()
Expand All @@ -148,31 +105,26 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
// Use 1 if there are zero current pods.
readyPodsCount := math.Max(1, float64(originalReadyPodsCount))

// Remove outdated data.
a.buckets.RemoveOlderThan(now.Add(-spec.MetricSpec.StableWindow))
if a.buckets.IsEmpty() {
logger.Debug("No data to scale on.")
metricKey := NewMetricKey(a.namespace, a.revision)
observedStableConcurrency, observedPanicConcurrency, err := a.metricClient.StableAndPanicConcurrency(metricKey)
if err != nil {
if err == ErrNoData {
logger.Debug("No data to scale on yet")
} else {
logger.Errorw("Failed to obtain metrics", zap.Error(err))
}
return 0, false
}

// Compute data to scale on.
panicAverage := aggregation.Average{}
stableAverage := aggregation.Average{}
a.buckets.ForEachBucket(
aggregation.YoungerThan(now.Add(-spec.MetricSpec.PanicWindow), panicAverage.Accumulate),
stableAverage.Accumulate, // No need to add a YoungerThan condition as we already deleted all outdated stats above.
)
observedStableConcurrency := stableAverage.Value()
observedPanicConcurrency := panicAverage.Value()
desiredStablePodCount := int32(math.Min(math.Ceil(observedStableConcurrency/spec.TargetConcurrency), spec.MaxScaleUpRate*readyPodsCount))
desiredPanicPodCount := int32(math.Min(math.Ceil(observedPanicConcurrency/spec.TargetConcurrency), spec.MaxScaleUpRate*readyPodsCount))

a.reporter.ReportStableRequestConcurrency(observedStableConcurrency)
a.reporter.ReportPanicRequestConcurrency(observedPanicConcurrency)
a.reporter.ReportTargetRequestConcurrency(spec.TargetConcurrency)

logger.Debugf("STABLE: Observed average %0.3f concurrency over %v seconds.", observedStableConcurrency, spec.MetricSpec.StableWindow)
logger.Debugf("PANIC: Observed average %0.3f concurrency over %v seconds.", observedPanicConcurrency, spec.MetricSpec.PanicWindow)
logger.Debugf("STABLE: Observed average %0.3f concurrency, targeting %v.", observedStableConcurrency, spec.TargetConcurrency)
logger.Debugf("PANIC: Observed average %0.3f concurrency, targeting %v.", observedPanicConcurrency, spec.TargetConcurrency)

isOverPanicThreshold := observedPanicConcurrency/readyPodsCount >= spec.PanicThreshold

Expand All @@ -191,7 +143,6 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (int32, bool) {
a.reporter.ReportPanic(0)
}

var desiredPodCount int32
if a.panicTime != nil {
logger.Debug("Operating in panic mode.")
// We do not scale down while in panic mode. Only increases will be applied.
Expand Down
Loading

0 comments on commit c425b53

Please sign in to comment.