Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide explicitly pausing annotations of ScaledObjects at the current replica count #4809

Merged
merged 15 commits into from
Sep 25, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
- **General**: Add ScaledObject/ScaledJob names to output of `kubectl get triggerauthentication/clustertriggerauthentication` ([#796](https://github.com/kedacore/keda/issues/796))
- **General**: Add standalone CRD generation to release workflow ([#2726](https://github.com/kedacore/keda/issues/2726))
- **General**: Adding a changelog validating script to check for formatting and order ([#3190](https://github.com/kedacore/keda/issues/3190))
- **General**: Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledObject to pause autoscaling ([#3303](https://github.com/kedacore/keda/issues/3304))
SpiritZhou marked this conversation as resolved.
Show resolved Hide resolved
- **General**: Update golangci-lint version documented in CONTRIBUTING.md since old version doesn't support go 1.20 (N/A)
- **Azure Pod Identity**: Introduce validation to prevent usage of empty identity ID for Azure identity providers ([#4528](https://github.com/kedacore/keda/issues/4528))

Expand Down
18 changes: 18 additions & 0 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type ScaledObject struct {

const ScaledObjectOwnerAnnotation = "scaledobject.keda.sh/name"
const ScaledObjectTransferHpaOwnershipAnnotation = "scaledobject.keda.sh/transfer-hpa-ownership"
const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"
const PausedAnnotation = "autoscaling.keda.sh/paused"

// HealthStatus is the status for a ScaledObject's health
type HealthStatus struct {
Expand Down Expand Up @@ -167,3 +169,19 @@ func init() {
func (so *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier("ScaledObject", so.Namespace, so.Name)
}

func (so *ScaledObject) HasPausedAnnotation() bool {
SpiritZhou marked this conversation as resolved.
Show resolved Hide resolved
_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
return pausedAnnotationFound || pausedReplicasAnnotationFound
}

func (so *ScaledObject) NeedPaused() bool {
SpiritZhou marked this conversation as resolved.
Show resolved Hide resolved
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
if pausedReplicasAnnotationFound {
return so.Status.PausedReplicaCount != nil
}

_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
return pausedAnnotationFound
}
2 changes: 1 addition & 1 deletion controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log

// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop.
func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) {
_, pausedAnnotation := scaledJob.GetAnnotations()[kedacontrollerutil.PausedAnnotation]
_, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation]
pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue
if pausedAnnotation {
if !pausedStatus {
Expand Down
44 changes: 39 additions & 5 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package keda
import (
"context"
"fmt"
"strconv"
"sync"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -123,6 +124,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, options cont
// so reconcile loop is not started on Status updates
For(&kedav1alpha1.ScaledObject{}, builder.WithPredicates(
predicate.Or(
kedacontrollerutil.PausedPredicate{},
kedacontrollerutil.PausedReplicasPredicate{},
kedacontrollerutil.ScaleObjectReadyConditionPredicate{},
predicate.GenerationChangedPredicate{},
Expand Down Expand Up @@ -206,12 +208,17 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) {
// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause
// autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object.
_, pausedAnnotationFound := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
pausedAnnotationFound := scaledObject.HasPausedAnnotation()
if pausedAnnotationFound {
scaledToPausedCount := true
if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
// If scaledobject is in paused condition but replica count is not equal to paused replica count, the following scaling logic needs to be trigger again.
scaledToPausedCount = r.checkIfTargetResourceReachPausedCount(ctx, logger, scaledObject)
if scaledToPausedCount {
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}
}
if scaledObject.Status.PausedReplicaCount != nil {
if scaledObject.NeedPaused() && scaledToPausedCount {
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
msg = "failed to stop the scale loop for paused ScaledObject"
Expand Down Expand Up @@ -279,7 +286,6 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}

if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled")
}
Expand All @@ -303,6 +309,34 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, lo
return r.Client.Update(ctx, scaledObject)
}

func (r *ScaledObjectReconciler) checkIfTargetResourceReachPausedCount(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) bool {
pausedReplicaCount, pausedReplicasAnnotationFound := scaledObject.GetAnnotations()[kedav1alpha1.PausedReplicasAnnotation]
if !pausedReplicasAnnotationFound {
return true
}
pausedReplicaCountNum, err := strconv.ParseInt(pausedReplicaCount, 10, 32)
if err != nil {
return true
}

gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
if err != nil {
logger.Error(err, "failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
return true
}
gvkString := gvkr.GVKString()
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// check if we already know.
var scale *autoscalingv1.Scale
gr := gvkr.GroupResource()
scale, errScale := (r.ScaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
return true
}
return scale.Spec.Replicas == int32(pausedReplicaCountNum)
}

// checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedav1alpha1.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
Expand All @@ -316,7 +350,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Conte
logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkString, "Resource", gvkr.Resource)

// do we need the scale to update the status later?
_, present := scaledObject.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
present := scaledObject.HasPausedAnnotation()
removePausedStatus := scaledObject.Status.PausedReplicaCount != nil && !present
wantStatusUpdate := scaledObject.Status.ScaleTargetKind != gvkString || scaledObject.Status.OriginalReplicaCount == nil || removePausedStatus

Expand Down
11 changes: 5 additions & 6 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
"github.com/kedacore/keda/v2/pkg/mock/mock_scaling"
"github.com/kedacore/keda/v2/pkg/scalers"
Expand Down Expand Up @@ -912,7 +911,7 @@ var _ = Describe("ScaledObjectController", func() {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
annotations := make(map[string]string)
annotations[kedacontrollerutil.PausedReplicasAnnotation] = "1"
annotations[kedav1alpha1.PausedReplicasAnnotation] = "1"
so.SetAnnotations(annotations)
pollingInterval := int32(6)
so.Spec.PollingInterval = &pollingInterval
Expand All @@ -924,8 +923,7 @@ var _ = Describe("ScaledObjectController", func() {
Eventually(func() bool {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
_, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
return paused
return so.HasPausedAnnotation()
}).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue())

Eventually(func() metav1.ConditionStatus {
Expand All @@ -952,7 +950,7 @@ var _ = Describe("ScaledObjectController", func() {
Name: soName,
Namespace: "default",
Annotations: map[string]string{
kedacontrollerutil.PausedReplicasAnnotation: pausedReplicasCountForAnnotation,
kedav1alpha1.PausedReplicasAnnotation: pausedReplicasCountForAnnotation,
},
},
Spec: kedav1alpha1.ScaledObjectSpec{
Expand Down Expand Up @@ -992,7 +990,8 @@ var _ = Describe("ScaledObjectController", func() {
// validate annotation is set correctly
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: soName, Namespace: "default"}, so)
Expect(err).ToNot(HaveOccurred())
pausedReplicasCount, paused := so.GetAnnotations()[kedacontrollerutil.PausedReplicasAnnotation]
paused := so.HasPausedAnnotation()
pausedReplicasCount := so.GetAnnotations()[kedav1alpha1.PausedReplicasAnnotation]
Expect(paused).To(Equal(true))
Expect(pausedReplicasCount).To(Equal(pausedReplicasCountForAnnotation))

Expand Down
12 changes: 4 additions & 8 deletions controllers/keda/util/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,6 @@ import (
kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas"

const PausedAnnotation = "autoscaling.keda.sh/paused"

type PausedReplicasPredicate struct {
predicate.Funcs
}
Expand All @@ -27,11 +23,11 @@ func (PausedReplicasPredicate) Update(e event.UpdateEvent) bool {
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedReplicasAnnotation]
newPausedValue = newAnnotations[kedav1alpha1.PausedReplicasAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedReplicasAnnotation]
oldPausedValue = oldAnnotations[kedav1alpha1.PausedReplicasAnnotation]
}

return newPausedValue != oldPausedValue
Expand Down Expand Up @@ -84,11 +80,11 @@ func (PausedPredicate) Update(e event.UpdateEvent) bool {
oldPausedValue := ""

if newAnnotations != nil {
newPausedValue = newAnnotations[PausedAnnotation]
newPausedValue = newAnnotations[kedav1alpha1.PausedAnnotation]
}

if oldAnnotations != nil {
oldPausedValue = oldAnnotations[PausedAnnotation]
oldPausedValue = oldAnnotations[kedav1alpha1.PausedAnnotation]
}

return newPausedValue != oldPausedValue
Expand Down
3 changes: 1 addition & 2 deletions pkg/scaling/executor/scale_scaledobjects.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util"
"github.com/kedacore/keda/v2/pkg/eventreason"
kedastatus "github.com/kedacore/keda/v2/pkg/status"
)
Expand Down Expand Up @@ -362,7 +361,7 @@ func getIdleOrMinimumReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (bool
// If not paused, it returns nil.
func GetPausedReplicaCount(scaledObject *kedav1alpha1.ScaledObject) (*int32, error) {
if scaledObject.Annotations != nil {
if val, ok := scaledObject.Annotations[kedacontrollerutil.PausedReplicasAnnotation]; ok {
if val, ok := scaledObject.Annotations[kedav1alpha1.PausedReplicasAnnotation]; ok {
conv, err := strconv.ParseInt(val, 10, 32)
if err != nil {
return nil, err
Expand Down
Loading