Skip to content

Commit

Permalink
fix: stop scale loop for pause Scaledbject (issue kedacore#4253). err…
Browse files Browse the repository at this point in the history
…or msg and new condition msg

Signed-off-by: Tobo Atchou <tobo.atchou@gmail.com>
  • Loading branch information
tobotg committed Jun 4, 2023
1 parent 80323d4 commit 949bb08
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 30 deletions.
2 changes: 2 additions & 0 deletions apis/keda/v1alpha1/condition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ const (
ScaledObjectConditionReadySucccesReason = "ScaledObjectReady"
// ScaledObjectConditionReadySuccessMessage defines the default Message for correct ScaledObject
ScaledObjectConditionReadySuccessMessage = "ScaledObject is defined correctly and is ready for scaling"
// ScaledObjectConditionPausedMessage defines the default Message for paused ScaledObject
ScaledObjectConditionPausedMessage = "ScaledObject is paused"
)

// Condition to store the condition state
Expand Down
3 changes: 3 additions & 0 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ func updateHealthStatus(scaledObject *kedav1alpha1.ScaledObject, externalMetricN

// getHPAName returns generated HPA name for ScaledObject specified in the parameter
func getHPAName(scaledObject *kedav1alpha1.ScaledObject) string {
if scaledObject.Status.HpaName != "" {
return scaledObject.Status.HpaName
}
if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig.Name != "" {
return scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig.Name
}
Expand Down
50 changes: 20 additions & 30 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get ScaledObject")
reqLogger.Error(err, "failed to get ScaledObject")
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -204,12 +204,12 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
if paused {
logger.Info("ScaledObject is paused, so skipping the request.")
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
return "Failed to stop the scale loop for paused ScaledObject", err
return "failed to stop the scale loop for paused ScaledObject", err
}
if deleted, err := r.ensureHPAForScaledObjectIsDeleted(ctx, logger, scaledObject); !deleted {
return "Failed to delete HPA for paused ScaledObject", err
return "failed to delete HPA for paused ScaledObject", err
}
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
return kedav1alpha1.ScaledObjectConditionPausedMessage, nil
}

// Check scale target Name is specified
Expand All @@ -221,7 +221,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
// Check the label needed for Metrics servers is present on ScaledObject
err := r.ensureScaledObjectLabel(ctx, logger, scaledObject)
if err != nil {
return "Failed to update ScaledObject with scaledObjectName label", err
return "failed to update ScaledObject with scaledObjectName label", err
}

// Check if resource targeted for scaling exists and exposes /scale subresource
Expand All @@ -243,7 +243,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
// Create a new HPA or update existing one according to ScaledObject
newHPACreated, err := r.ensureHPAForScaledObjectExists(ctx, logger, scaledObject, &gvkr)
if err != nil {
return "Failed to ensure HPA is correctly created for ScaledObject", err
return "failed to ensure HPA is correctly created for ScaledObject", err
}
scaleObjectSpecChanged := false
if !newHPACreated {
Expand All @@ -252,14 +252,14 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
// (we can omit this check if a new HPA was created, which fires new ScaleLoop anyway)
scaleObjectSpecChanged, err = r.scaledObjectGenerationChanged(logger, scaledObject)
if err != nil {
return "Failed to check whether ScaledObject's Generation was changed", err
return "failed to check whether ScaledObject's Generation was changed", err
}
}

// Notify ScaleHandler if a new HPA was created or if ScaledObject was updated
if newHPACreated || scaleObjectSpecChanged {
if r.requestScaleLoop(ctx, logger, scaledObject) != nil {
return "Failed to start a new scale loop with scaling logic", err
return "failed to start a new scale loop with scaling logic", err
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}
Expand Down Expand Up @@ -287,7 +287,7 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, lo
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
if err != nil {
logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
logger.Error(err, "failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
return gvkr, err
}
gvkString := gvkr.GVKString()
Expand All @@ -313,11 +313,11 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
logger.Error(err, "target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
}
// resource exist but doesn't expose /scale subresource
logger.Error(errScale, "Target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
logger.Error(errScale, "target resource doesn't expose /scale subresource", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, errScale
}
isScalableCache.Store(gr.String(), true)
Expand Down Expand Up @@ -409,12 +409,7 @@ func (r *ScaledObjectReconciler) checkReplicaCountBoundsAreValid(scaledObject *k

// ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (bool, error) {
var hpaName string
if scaledObject.Status.HpaName != "" {
hpaName = scaledObject.Status.HpaName
} else {
hpaName = getHPAName(scaledObject)
}
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
Expand All @@ -428,7 +423,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
// new HPA created successfully -> notify Reconcile function so it could fire a new ScaleLoop
return true, nil
} else if err != nil {
logger.Error(err, "Failed to get HPA from cluster")
logger.Error(err, "failed to get HPA from cluster")
return false, err
}

Expand All @@ -445,7 +440,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
// HPA was found -> let's check if we need to update it
err = r.updateHPAIfNeeded(ctx, logger, scaledObject, foundHpa, gvkr)
if err != nil {
logger.Error(err, "Failed to check HPA for possible update")
logger.Error(err, "failed to check HPA for possible update")
return false, err
}

Expand All @@ -454,24 +449,19 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont

// ensureHPAForScaledObjectIsDeleted ensures that in cluster any HPA for specified ScaledObject is deleted, returns true if no HPA exists
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectIsDeleted(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
var hpaName string
if scaledObject.Status.HpaName != "" {
hpaName = scaledObject.Status.HpaName
} else {
hpaName = getHPAName(scaledObject)
}
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
return true, nil
} else if err != nil {
logger.Error(err, "Failed to get HPA from cluster")
logger.Error(err, "failed to get HPA from cluster")
return false, err
}

if err := r.deleteHPA(ctx, logger, scaledObject, foundHpa); err != nil {
logger.Error(err, "Failed to delete HPA from cluster")
logger.Error(err, "failed to delete HPA from cluster")
return false, err
}
return true, nil
Expand All @@ -492,7 +482,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger lo

key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
logger.Error(err, "error getting key for scaledObject")
return err
}

Expand All @@ -510,7 +500,7 @@ func (r *ScaledObjectReconciler) requestScaleLoop(ctx context.Context, logger lo
func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
logger.Error(err, "error getting key for scaledObject")
return err
}

Expand All @@ -526,7 +516,7 @@ func (r *ScaledObjectReconciler) stopScaleLoop(ctx context.Context, logger logr.
func (r *ScaledObjectReconciler) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) {
key, err := cache.MetaNamespaceKeyFunc(scaledObject)
if err != nil {
logger.Error(err, "Error getting key for scaledObject")
logger.Error(err, "error getting key for scaledObject")
return true, err
}

Expand Down

0 comments on commit 949bb08

Please sign in to comment.