Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide explicitly pausing annotations of ScaledObjects at the current replica count #4809

Merged
merged 15 commits into from
Sep 25, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
- **General**: Adding a changelog validating script to check for formatting and order ([#3190](https://github.com/kedacore/keda/issues/3190))
- **General**: Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledObject to pause autoscaling ([#3304](https://github.com/kedacore/keda/issues/3304))
- **General**: Update golangci-lint version documented in CONTRIBUTING.md since old version doesn't support go 1.20 (N/A)
- **General**: Updated AWS SDK and updated all the aws scalers ([#4905](https://github.com/kedacore/keda/issues/4905))
- **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528))
Expand Down
20 changes: 20 additions & 0 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type ScaledObject struct {

const ScaledObjectOwnerAnnotation = "scaledobject.keda.sh/name"
const ScaledObjectTransferHpaOwnershipAnnotation = "scaledobject.keda.sh/transfer-hpa-ownership"
const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"
const PausedAnnotation = "autoscaling.keda.sh/paused"

// HealthStatus is the status for a ScaledObject's health
type HealthStatus struct {
Expand Down Expand Up @@ -183,6 +185,24 @@ func (so *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier("ScaledObject", so.Namespace, so.Name)
}

// HasPausedAnnotition returns whether this ScaledObject has PausedAnnotation or PausedReplicasAnnotation
func (so *ScaledObject) HasPausedAnnotation() bool {
_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
return pausedAnnotationFound || pausedReplicasAnnotationFound
}

// NeedToBePausedByAnnotation will check whether ScaledObject needs to be paused based on PausedAnnotation or PausedReplicaCount
func (so *ScaledObject) NeedToBePausedByAnnotation() bool {
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
if pausedReplicasAnnotationFound {
return so.Status.PausedReplicaCount != nil
}

_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
return pausedAnnotationFound
}

// IsUsingModifiers determines whether scalingModifiers are defined or not
func (so *ScaledObject) IsUsingModifiers() bool {
return so.Spec.Advanced != nil && !reflect.DeepEqual(so.Spec.Advanced.ScalingModifiers, ScalingModifiers{})
Expand Down
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log

// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop.
func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) {
_, pausedAnnotation := scaledJob.GetAnnotations()[kedacontrollerutil.PausedAnnotation]
_, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation]
pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue
if pausedAnnotation {
if !pausedStatus {
Expand Down
44 changes: 39 additions & 5 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package keda
import (
"context"
"fmt"
"strconv"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -123,6 +124,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(
predicate.Or(
kedacontrollerutil.PausedPredicate{},
kedacontrollerutil.PausedReplicasPredicate{},
kedacontrollerutil.ScaleObjectReadyConditionPredicate{},
predicate.GenerationChangedPredicate{},
Expand Down Expand Up @@ -206,12 +208,17 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) {
// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause
// autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object.
_, pausedAnnotationFound := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
pausedAnnotationFound := scaledObject.HasPausedAnnotation()
if pausedAnnotationFound {
scaledToPausedCount := true
if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
// If scaledobject is in paused condition but replica count is not equal to paused replica count, the following scaling logic needs to be trigger again.
scaledToPausedCount = r.checkIfTargetResourceReachPausedCount(ctx, logger, scaledObject)
if scaledToPausedCount {
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}
}
if scaledObject.Status.PausedReplicaCount != nil {
if scaledObject.NeedToBePausedByAnnotation() && scaledToPausedCount {
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
msg = "failed to stop the scale loop for paused ScaledObject"
Expand Down Expand Up @@ -279,7 +286,6 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}

if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled")
}
Expand All @@ -303,6 +309,34 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, lo
return r.Client.Update(ctx, scaledObject)
}

func (r *ScaledObjectReconciler) checkIfTargetResourceReachPausedCount(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) bool {
pausedReplicaCount, pausedReplicasAnnotationFound := scaledObject.GetAnnotations()[kedav1alpha1.PausedReplicasAnnotation]
if !pausedReplicasAnnotationFound {
return true
}
pausedReplicaCountNum, err := strconv.ParseInt(pausedReplicaCount, 10, 32)
if err != nil {
return true
}

gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
if err != nil {
logger.Error(err, "failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
return true
}
gvkString := gvkr.GVKString()
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// check if we already know.
var scale *autoscalingv1.Scale
gr := gvkr.GroupResource()
scale, errScale := (r.ScaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
return true
}
return scale.Spec.Replicas == int32(pausedReplicaCountNum)
}

// checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
Expand All @@ -316,7 +350,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// do we need the scale to update the status later?
_, present := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
present := scaledObject.HasPausedAnnotation()
removePausedStatus := scaledObject.Status.PausedReplicaCount != nil && !present
wantStatusUpdate := scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil || removePausedStatus

Expand Down
11 changes: 5 additions & 6 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
"github.com/kedacore/keda/v2/pkg/mock/mock_scaling"
"github.com/kedacore/keda/v2/pkg/scalers"
Expand Down Expand Up @@ -912,7 +911,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
annotations := make(map[string]string)
annotations[kedacontrollerutil.PausedReplicasAnnotation] = "1"
annotations[kedav1alpha1.PausedReplicasAnnotation] = "1"
so.SetAnnotations(annotations)
pollingInterval := int32(6)
so.Spec.PollingInterval = &pollingInterval
Expand All @@ -924,8 +923,7 @@ var _ = Describe("ScaledObjectController", func() {
Eventually(func() bool {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
_, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
return paused
return so.HasPausedAnnotation()
}).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue())

Eventually(func() metav1.ConditionStatus {
Expand All @@ -952,7 +950,7 @@ var _ = Describe("ScaledObjectController", func() {
Name: soName,
Namespace: "default",
Annotations: map[string]string{
kedacontrollerutil.PausedReplicasAnnotation: pausedReplicasCountForAnnotation,
kedav1alpha1.PausedReplicasAnnotation: pausedReplicasCountForAnnotation,
},
},
Spec: kedav1alpha1.ScaledObjectSpec{
Expand Down Expand Up @@ -992,7 +990,8 @@ var _ = Describe("ScaledObjectController", func() {
// validate annotation is set correctly
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
pausedReplicasCount, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
paused := so.HasPausedAnnotation()
pausedReplicasCount := so.GetAnnotations()[kedav1alpha1.PausedReplicasAnnotation]
Expect(paused).To(Equal(true))
Expect(pausedReplicasCount).To(Equal(pausedReplicasCountForAnnotation))

Expand Down
12 changes: 4 additions & 8 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"

const PausedAnnotation = "autoscaling.keda.sh/paused"

type PausedReplicasPredicate struct {
predicate.Funcs
}
Expand All @@ -27,11 +23,11 @@ func (PausedReplicasPredicate) Update(e event.UpdateEvent) bool {
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedReplicasAnnotation]
newPausedValue = newAnnotations[kedav1alpha1.PausedReplicasAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedReplicasAnnotation]
oldPausedValue = oldAnnotations[kedav1alpha1.PausedReplicasAnnotation]
}

return newPausedValue != oldPausedValue
Expand Down Expand Up @@ -84,11 +80,11 @@ func (PausedPredicate) Update(e event.UpdateEvent) bool {
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedAnnotation]
newPausedValue = newAnnotations[kedav1alpha1.PausedAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedAnnotation]
oldPausedValue = oldAnnotations[kedav1alpha1.PausedAnnotation]
}

return newPausedValue != oldPausedValue
Expand Down
3 changes: 1 addition & 2 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)
Expand Down Expand Up @@ -362,7 +361,7 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool
// If not paused, it returns nil.
func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) {
if scaledObject.Annotations != nil {
if val, ok := scaledObject.Annotations[kedacontrollerutil.PausedReplicasAnnotation]; ok {
if val, ok := scaledObject.Annotations[kedav1alpha1.PausedReplicasAnnotation]; ok {
conv, err := strconv.ParseInt(val, 10, 32)
if err != nil {
return nil, err
Expand Down
Loading