Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting batching resources scaling #32

Merged
merged 9 commits into from
Dec 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ and k8s service/s that can be used to route incoming requests.
For example, when the autoscaler decides it needs to scale some resource to zero, it executes the resource-scaler's
`SetScale` function which has the knowledge how to scale to zero its specific resource.

**The autoscaler** - Responsible for periodically checking whether some resources should be scaled to zero. this is
**The autoscaler** - Responsible for periodically checking whether some resources should be scaled to zero. This is
performed by by querying the custom metrics API. Upon deciding a resource should be scaled to zero, it uses the internal
resource-scaler module to scale the resource to zero.
The resource-scaler will first route all incoming traffic to the DLX, which in terms of K8s is done by changing a
Expand Down
2 changes: 1 addition & 1 deletion cmd/autoscaler/app/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func Run(kubeconfigPath string,
func createAutoScaler(restConfig *rest.Config,
resourceScaler scaler_types.ResourceScaler,
options scaler_types.AutoScalerOptions) (*autoscaler.Autoscaler, error) {
rootLogger, err := nucliozap.NewNuclioZap("autoscaler", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
rootLogger, err := nucliozap.NewNuclioZap("scaler", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize root logger")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/dlx/app/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func Run(kubeconfigPath string,
}

func createDLX(resourceScaler scaler_types.ResourceScaler, options scaler_types.DLXOptions) (*dlx.DLX, error) {
rootLogger, err := nucliozap.NewNuclioZap("dlx", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
rootLogger, err := nucliozap.NewNuclioZap("scaler", "console", os.Stdout, os.Stderr, nucliozap.DebugLevel)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Talked offline

if err != nil {
return nil, errors.Wrap(err, "Failed to initialize root logger")
}
Expand Down
30 changes: 21 additions & 9 deletions pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewAutoScaler(parentLogger logger.Logger,
customMetricsClientSet custommetricsv1.CustomMetricsClient,
options scaler_types.AutoScalerOptions) (*Autoscaler, error) {
childLogger := parentLogger.GetChild("autoscaler")
childLogger.DebugWith("Creating Autoscaler",
childLogger.InfoWith("Creating Autoscaler",
"options", options)

return &Autoscaler{
Expand All @@ -44,6 +44,8 @@ func NewAutoScaler(parentLogger logger.Logger,
}

func (as *Autoscaler) Start() error {
as.logger.DebugWith("Starting",
"scaleInterval", as.scaleInterval)
ticker := time.NewTicker(as.scaleInterval)

go func() {
Expand Down Expand Up @@ -145,7 +147,7 @@ func (as *Autoscaler) checkResourceToScale(resource scaler_types.Resource, resou
"value", value)
}

as.logger.InfoWith("All metric values below threshold, should scale to zero", "resourceName", resource.Name)
as.logger.DebugWith("All metric values below threshold, should scale to zero", "resourceName", resource.Name)
return true
}

Expand Down Expand Up @@ -175,6 +177,7 @@ func (as *Autoscaler) checkResourcesToScale() error {
return errors.Wrap(err, "Failed to get resources metrics")
}

resourcesToScale := make([]scaler_types.Resource, 0)
for idx, resource := range activeResources {
inScaleToZeroProcess, found := as.inScaleToZeroProcessMap[resource.Name]
if found && inScaleToZeroProcess {
Expand Down Expand Up @@ -207,19 +210,28 @@ func (as *Autoscaler) checkResourcesToScale() error {
}

as.inScaleToZeroProcessMap[resource.Name] = true
go func(resource scaler_types.Resource) {
err := as.scaleResourceToZero(resource)
resourcesToScale = append(resourcesToScale, activeResources[idx])
}

if len(resourcesToScale) > 0 {
go func(resources []scaler_types.Resource) {
as.logger.InfoWith("Scaling resources to zero", "resources", resources)
err := as.scaleResourcesToZero(resources)
if err != nil {
as.logger.WarnWith("Failed to scale resource to zero", "resource", resource, "err", errors.GetErrorStackString(err, 10))
as.logger.WarnWith("Failed to scale resources to zero", "resources", resources, "err", errors.GetErrorStackString(err, 10))
}
delete(as.inScaleToZeroProcessMap, resource.Name)
}(activeResources[idx])
as.logger.InfoWith("Successfully scaled resources to zero", "resources", resources)
for _, resource := range resources {
delete(as.inScaleToZeroProcessMap, resource.Name)
}
}(resourcesToScale)
}

return nil
}

func (as *Autoscaler) scaleResourceToZero(resource scaler_types.Resource) error {
if err := as.resourceScaler.SetScale(resource, 0); err != nil {
func (as *Autoscaler) scaleResourcesToZero(resources []scaler_types.Resource) error {
if err := as.resourceScaler.SetScale(resources, 0); err != nil {
return errors.Wrap(err, "Failed to set scale")
}

Expand Down
13 changes: 7 additions & 6 deletions pkg/dlx/dlx.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ type DLX struct {
handler Handler
}

func NewDLX(logger logger.Logger,
func NewDLX(parentLogger logger.Logger,
resourceScaler scaler_types.ResourceScaler,
options scaler_types.DLXOptions) (*DLX, error) {
resourceStarter, err := NewResourceStarter(logger, resourceScaler, options.Namespace, options.ResourceReadinessTimeout)
childLogger := parentLogger.GetChild("dlx")
childLogger.InfoWith("Creating DLX", "options", options)
resourceStarter, err := NewResourceStarter(childLogger, resourceScaler, options.Namespace, options.ResourceReadinessTimeout)
if err != nil {
return nil, errors.Wrap(err, "Failed to create function starter")
}

handler, err := NewHandler(logger,
handler, err := NewHandler(childLogger,
resourceStarter,
options.TargetNameHeader,
options.TargetPathHeader,
Expand All @@ -32,15 +34,14 @@ func NewDLX(logger logger.Logger,
}

return &DLX{
logger: logger,
logger: childLogger,
listenAddress: options.ListenAddress,
handler: handler,
}, nil
}

func (d *DLX) Start() error {
d.logger.InfoWith("Starting",
"listenAddress", d.listenAddress)
d.logger.DebugWith("Starting", "listenAddress", d.listenAddress)

http.HandleFunc("/", d.handler.HandleFunc)
if err := http.ListenAndServe(d.listenAddress, nil); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/dlx/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ type Handler struct {
targetPort int
}

func NewHandler(logger logger.Logger,
func NewHandler(parentLogger logger.Logger,
resourceStarter *ResourceStarter,
targetNameHeader string,
targetPathHeader string,
targetPort int) (Handler, error) {
h := Handler{
logger: logger,
logger: parentLogger.GetChild("handler"),
resourceStarter: resourceStarter,
targetNameHeader: targetNameHeader,
targetPathHeader: targetPathHeader,
Expand Down
6 changes: 3 additions & 3 deletions pkg/dlx/resourcestarter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (r *ResourceStarter) startResource(resourceSinkChannel chan responseChannel
// simple for now
resourceName := target

r.logger.DebugWith("Starting resource", "resource", resourceName)
r.logger.InfoWith("Starting resource", "resource", resourceName)
resourceReadyChannel := make(chan error, 1)
defer close(resourceReadyChannel)

Expand All @@ -91,7 +91,7 @@ func (r *ResourceStarter) startResource(resourceSinkChannel chan responseChannel
ResourceName: resourceName,
}
case err := <-resourceReadyChannel:
r.logger.DebugWith("Resource ready", "target", target, "err", errors.GetErrorStackString(err, 10))
r.logger.InfoWith("Resource ready", "target", target, "err", errors.GetErrorStackString(err, 10))

if err == nil {
resultStatus = ResourceStatusResult{
Expand Down Expand Up @@ -123,7 +123,7 @@ func (r *ResourceStarter) startResource(resourceSinkChannel chan responseChannel
}

func (r *ResourceStarter) waitResourceReadiness(resource scaler_types.Resource, resourceReadyChannel chan error) {
err := r.scaler.SetScale(resource, 1)
err := r.scaler.SetScale([]scaler_types.Resource{resource}, 1)
resourceReadyChannel <- err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/resourcescaler/resourcescaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ func New(kubeconfigPath string, namespace string) (scaler_types.ResourceScaler,
return &NopResourceScaler{}, nil
}

func (r *NopResourceScaler) SetScale(resource scaler_types.Resource, scale int) error {
func (r *NopResourceScaler) SetScale(resources []scaler_types.Resource, scale int) error {
return nil
}

Expand Down