Skip to content

Commit

Permalink
Improve reconcile loop (#581)
Browse files Browse the repository at this point in the history
  • Loading branch information
zroubalik authored Jan 30, 2020
1 parent 2dd1333 commit 1dd23fd
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 34 deletions.
130 changes: 96 additions & 34 deletions pkg/controller/scaledobject/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scaledobject
import (
"context"
"fmt"
"reflect"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -44,7 +43,7 @@ func Add(mgr manager.Manager) error {

// newReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &ReconcileScaledObject{client: mgr.GetClient(), scheme: mgr.GetScheme(), scaleLoopContexts: &sync.Map{}}
return &ReconcileScaledObject{client: mgr.GetClient(), scheme: mgr.GetScheme(), scaleLoopContexts: &sync.Map{}, scaledObjectsGenerations: &sync.Map{}}
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
Expand All @@ -68,6 +67,15 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
if err != nil {
return err
}

// Watch for changes to secondary resource HPA and requeue the owner ScaledObject
err = c.Watch(&source.Kind{Type: &autoscalingv2beta1.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kedav1alpha1.ScaledObject{},
})
if err != nil {
return err
}
return nil
}

Expand All @@ -78,9 +86,10 @@ var _ reconcile.Reconciler = &ReconcileScaledObject{}
type ReconcileScaledObject struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
scaleLoopContexts *sync.Map
client client.Client
scheme *runtime.Scheme
scaleLoopContexts *sync.Map
scaledObjectsGenerations *sync.Map
}

// Reconcile reads that state of the cluster for a ScaledObject object and makes changes based on the state read
Expand Down Expand Up @@ -137,7 +146,7 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile.
}
}

reqLogger.Info("Detecting ScaleType from ScaledObject")
reqLogger.V(1).Info("Detecting ScaleType from ScaledObject")
if scaledObject.Spec.ScaleTargetRef == nil || scaledObject.Spec.ScaleTargetRef.DeploymentName == "" {
reqLogger.Info("Detected ScaleType = Job")
return r.reconcileJobType(reqLogger, scaledObject)
Expand Down Expand Up @@ -175,7 +184,11 @@ func (r *ReconcileScaledObject) reconcileJobType(logger logr.Logger, scaledObjec
}

// ScaledObject was created or modified - let's start a new ScaleLoop
r.startScaleLoop(logger, scaledObject)
err = r.startScaleLoop(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to start a new ScaleLoop")
return reconcile.Result{}, err
}

return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -216,7 +229,11 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal
}

// ScaledObject was created - let's start a new ScaleLoop
r.startScaleLoop(logger, scaledObject)
err = r.startScaleLoop(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to start a new ScaleLoop")
return reconcile.Result{}, err
}

// HPA created successfully & ScaleLoop started - don't requeue
return reconcile.Result{}, nil
Expand All @@ -225,31 +242,13 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal
return reconcile.Result{}, err
}

// Check whether update of HPA is needed
updateHPA := false
scaledObjectMinReplicaCount := getHpaMinReplicas(scaledObject)
if foundHpa.Spec.MinReplicas != scaledObjectMinReplicaCount {
updateHPA = true
foundHpa.Spec.MinReplicas = scaledObjectMinReplicaCount
}

scaledObjectMaxReplicaCount := getHpaMaxReplicas(scaledObject)
if foundHpa.Spec.MaxReplicas != scaledObjectMaxReplicaCount {
updateHPA = true
foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount
}

newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName)
// Update hpa HPA if needed
updateHpa, err := r.checkHPAForUpdate(logger, scaledObject, foundHpa, deploymentName)
if err != nil {
logger.Error(err, "Failed to create MetricSpec")
logger.Error(err, "Failed to check HPA for possible update")
return reconcile.Result{}, err
}
if !reflect.DeepEqual(foundHpa.Spec.Metrics, newMetricSpec) {
updateHPA = true
foundHpa.Spec.Metrics = newMetricSpec
}

if updateHPA {
if updateHpa {
err = r.client.Update(context.TODO(), foundHpa)
if err != nil {
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
Expand All @@ -258,8 +257,19 @@ func (r *ReconcileScaledObject) reconcileDeploymentType(logger logr.Logger, scal
logger.Info("Updated HPA according to ScaledObject", "HPA.Namespace", hpaNamespace, "HPA.Name", hpaName)
}

// ScaledObject was modified - let's start a new ScaleLoop
r.startScaleLoop(logger, scaledObject)
// Let's start a new ScaleLoop if ScaledObject's Generation was changed
updateNeeded, err := r.scaledObjectGenerationChanged(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to check ScaledObject's Generation change")
return reconcile.Result{}, err
}
if updateNeeded {
err = r.startScaleLoop(logger, scaledObject)
if err != nil {
logger.Error(err, "Failed to start a new ScaleLoop")
return reconcile.Result{}, err
}
}

return reconcile.Result{}, nil
}
Expand Down Expand Up @@ -293,16 +303,21 @@ func checkDeploymentTypeScaledObject(scaledObject *kedav1alpha1.ScaledObject) (s
}

// startScaleLoop starts ScaleLoop handler for the respective ScaledObject
func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) {
func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {

logger.V(1).Info("Starting a new ScaleLoop")

scaleHandler := scalehandler.NewScaleHandler(r.client, r.scheme)

key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
return
return err
}

// store ScaledObject's current Generation
r.scaledObjectsGenerations.Store(key, scaledObject.Generation)

ctx, cancel := context.WithCancel(context.TODO())

// cancel the outdated ScaleLoop for the same ScaledObject (if exists)
Expand All @@ -315,6 +330,25 @@ func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject
r.scaleLoopContexts.Store(key, cancel)
}
go scaleHandler.HandleScaleLoop(ctx, scaledObject)

return nil
}

func (r *ReconcileScaledObject) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
return true, err
}

value, loaded := r.scaledObjectsGenerations.Load(key)
if loaded {
generation := value.(int64)
if generation == scaledObject.Generation {
return false, nil
}
}
return true, nil
}

// newHPAForScaledObject returns HPA as it is specified in ScaledObject
Expand Down Expand Up @@ -346,6 +380,34 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled
}, nil
}

// checkHPAForUpdate checks whether update of HPA is needed
func (r *ReconcileScaledObject) checkHPAForUpdate(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta1.HorizontalPodAutoscaler, deploymentName string) (bool, error) {
updateHPA := false
scaledObjectMinReplicaCount := getHpaMinReplicas(scaledObject)
if *foundHpa.Spec.MinReplicas != *scaledObjectMinReplicaCount {
updateHPA = true
foundHpa.Spec.MinReplicas = scaledObjectMinReplicaCount
}

scaledObjectMaxReplicaCount := getHpaMaxReplicas(scaledObject)
if foundHpa.Spec.MaxReplicas != scaledObjectMaxReplicaCount {
updateHPA = true
foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount
}

newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject, deploymentName)
if err != nil {
logger.Error(err, "Failed to create MetricSpec")
return true, err
}
if fmt.Sprintf("%v", foundHpa.Spec.Metrics) != fmt.Sprintf("%v", newMetricSpec) {
updateHPA = true
foundHpa.Spec.Metrics = newMetricSpec
}

return updateHPA, nil
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, deploymentName string) ([]autoscalingv2beta1.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta1.MetricSpec
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/scaledobject/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ func (r *ReconcileScaledObject) finalizeScaledObject(logger logr.Logger, scaledO
return err
}

// store ScaledObject's current Generation
r.scaledObjectsGenerations.Delete(key)

result, ok := r.scaleLoopContexts.Load(key)
if ok {
cancel, ok := result.(context.CancelFunc)
Expand Down

0 comments on commit 1dd23fd

Please sign in to comment.