Skip to content

Fix autoscaler behaviour when encountering zero-valued recommendations #2350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 25 additions & 40 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package autoscaler
import (
"fmt"
"math"
"sync"
"time"

"github.com/cortexlabs/cortex/pkg/lib/cron"
"github.com/cortexlabs/cortex/pkg/lib/errors"
libmath "github.com/cortexlabs/cortex/pkg/lib/math"
"github.com/cortexlabs/cortex/pkg/lib/pointer"
"github.com/cortexlabs/cortex/pkg/lib/telemetry"
libtime "github.com/cortexlabs/cortex/pkg/lib/time"
"github.com/cortexlabs/cortex/pkg/types/spec"
Expand All @@ -44,19 +44,18 @@ type Scaler interface {
}

type Autoscaler struct {
sync.Mutex
logger *zap.SugaredLogger
crons map[string]cron.Cron
scalers map[userconfig.Kind]Scaler
lastAwakenTimestamp map[string]time.Time
logger *zap.SugaredLogger
crons map[string]cron.Cron
scalers map[userconfig.Kind]Scaler
recs map[string]*recommendations
}

func New(logger *zap.SugaredLogger) *Autoscaler {
return &Autoscaler{
logger: logger,
crons: make(map[string]cron.Cron),
scalers: make(map[userconfig.Kind]Scaler),
lastAwakenTimestamp: make(map[string]time.Time),
logger: logger,
crons: make(map[string]cron.Cron),
scalers: make(map[userconfig.Kind]Scaler),
recs: make(map[string]*recommendations),
}
}

Expand All @@ -65,9 +64,6 @@ func (a *Autoscaler) AddScaler(scaler Scaler, kind userconfig.Kind) {
}

func (a *Autoscaler) Awaken(api userconfig.Resource) error {
a.Lock()
defer a.Unlock()

scaler, ok := a.scalers[api.Kind]
if !ok {
return errors.ErrorUnexpected(
Expand All @@ -94,7 +90,7 @@ func (a *Autoscaler) Awaken(api userconfig.Resource) error {
return errors.Wrap(err, "failed to scale api to one")
}

a.lastAwakenTimestamp[api.Name] = time.Now()
a.recs[api.Name].add(1)

return nil
}
Expand All @@ -104,11 +100,6 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error {
return nil
}

autoscaleFn, err := a.autoscaleFn(api)
if err != nil {
return err
}

errorHandler := func(err error) {
log := a.logger.With(
zap.String("apiName", api.Name),
Expand All @@ -119,12 +110,12 @@ func (a *Autoscaler) AddAPI(api userconfig.Resource) error {
telemetry.Error(err)
}

a.crons[api.Name] = cron.Run(autoscaleFn, errorHandler, spec.AutoscalingTickInterval)
autoscaleFn, err := a.autoscaleFn(api)
if err != nil {
return err
}

// make sure there is no awaken call registered to an older API with the same name
a.Lock()
delete(a.lastAwakenTimestamp, api.Name)
a.Unlock()
a.crons[api.Name] = cron.Run(autoscaleFn, errorHandler, spec.AutoscalingTickInterval)

return nil
}
Expand All @@ -140,10 +131,7 @@ func (a *Autoscaler) RemoveAPI(api userconfig.Resource) {
delete(a.crons, api.Name)
}

a.Lock()
delete(a.lastAwakenTimestamp, api.Name)
a.Unlock()

delete(a.recs, api.Name)
log.Info("autoscaler stop")
}

Expand All @@ -170,7 +158,7 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
log.Info("autoscaler init")

var startTime time.Time
recs := make(recommendations)
a.recs[api.Name] = newRecommendations()

return func() error {
autoscalingSpec, err := scaler.GetAutoscalingSpec(api.Name)
Expand Down Expand Up @@ -227,6 +215,8 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)
recommendation = autoscalingSpec.MaxReplicas
}

recs := a.recs[api.Name]

// Rule of thumb: any modifications that don't consider historical recommendations should be performed before
// recording the recommendation, any modifications that use historical recommendations should be performed after
recs.add(recommendation)
Expand All @@ -240,25 +230,20 @@ func (a *Autoscaler) autoscaleFn(api userconfig.Resource) (func() error, error)

if request < currentReplicas {
downscaleStabilizationFloor = recs.maxSince(autoscalingSpec.DownscaleStabilizationPeriod)
if downscaleStabilizationFloor != nil {
downscaleStabilizationFloor = pointer.Int32(libmath.MinInt32(*downscaleStabilizationFloor, currentReplicas))
}
if time.Since(startTime) < autoscalingSpec.DownscaleStabilizationPeriod {
request = currentReplicas
} else if downscaleStabilizationFloor != nil && request < *downscaleStabilizationFloor {
request = *downscaleStabilizationFloor
}

// awaken state: was scaled from zero
// This needs to be protected by a Mutex because an Awaken call will also modify it
a.Lock()
lastAwakenTimestamp := a.lastAwakenTimestamp[api.Name]

// Make sure we don't scale below zero if API was recently awaken
if time.Since(lastAwakenTimestamp) < autoscalingSpec.DownscaleStabilizationPeriod {
request = libmath.MaxInt32(request, 1)
}
a.Unlock()
}
if request > currentReplicas {
upscaleStabilizationCeil = recs.minSince(autoscalingSpec.UpscaleStabilizationPeriod)
if upscaleStabilizationCeil != nil {
upscaleStabilizationCeil = pointer.Int32(libmath.MaxInt32(*upscaleStabilizationCeil, currentReplicas))
}
if time.Since(startTime) < autoscalingSpec.UpscaleStabilizationPeriod {
request = currentReplicas
} else if upscaleStabilizationCeil != nil && request > *upscaleStabilizationCeil {
Expand Down
Loading