Skip to content

Commit

Permalink
Fix issue where paused annotation being set to false still leads to s…
Browse files Browse the repository at this point in the history
…caled objects/jobs being paused (#5257)

Signed-off-by: Nate Appelson <nate.appelson@regrow.ag>
  • Loading branch information
nappelson authored Dec 22, 2023
1 parent 64e7012 commit df16ac1
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ Here is an overview of all new **experimental** features:
- **General**: Add parameter queryParameters to prometheus-scaler ([#4962](https://github.com/kedacore/keda/issues/4962))
- **General**: Add validations for replica counts when creating ScaledObjects ([#5288](https://github.com/kedacore/keda/issues/5288))
- **General**: Bubble up AuthRef TriggerAuthentication errors as ScaledObject events ([#5190](https://github.com/kedacore/keda/issues/5190))
- **General**: Fix issue where paused annotation being set to false still leads to scaled objects/jobs being paused ([#5215](https://github.com/kedacore/keda/issues/5215))
- **General**: Support TriggerAuthentication properties from ConfigMap ([#4830](https://github.com/kedacore/keda/issues/4830))
- **General**: Use client-side round-robin load balancing for grpc calls ([#5224](https://github.com/kedacore/keda/issues/5224))
- **GCP pubsub scaler**: Support distribution-valued metrics and metrics from topics ([#5070](https://github.com/kedacore/keda/issues/5070))
Expand Down
18 changes: 16 additions & 2 deletions apis/keda/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1alpha1
import (
"fmt"
"reflect"
"strconv"

autoscalingv2 "k8s.io/api/autoscaling/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -193,6 +194,11 @@ func (so *ScaledObject) GenerateIdentifier() string {
return GenerateIdentifier("ScaledObject", so.Namespace, so.Name)
}

func (so *ScaledObject) HasPausedReplicaAnnotation() bool {
_, pausedReplicasAnnotationFound := so.GetAnnotations()[PausedReplicasAnnotation]
return pausedReplicasAnnotationFound
}

// HasPausedAnnotition returns whether this ScaledObject has PausedAnnotation or PausedReplicasAnnotation
func (so *ScaledObject) HasPausedAnnotation() bool {
_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
Expand All @@ -207,8 +213,16 @@ func (so *ScaledObject) NeedToBePausedByAnnotation() bool {
return so.Status.PausedReplicaCount != nil
}

_, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
return pausedAnnotationFound
pausedAnnotationValue, pausedAnnotationFound := so.GetAnnotations()[PausedAnnotation]
if !pausedAnnotationFound {
return false
}
shouldPause, err := strconv.ParseBool(pausedAnnotationValue)
if err != nil {
// if annotation value is not a boolean, we assume user wants to pause the ScaledObject
return true
}
return shouldPause
}

// IsUsingModifiers determines whether scalingModifiers are defined or not
Expand Down
12 changes: 10 additions & 2 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package keda
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -216,9 +217,17 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log

// checkIfPaused checks the presence of "autoscaling.keda.sh/paused" annotation on the scaledJob and stop the scale loop.
func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) {
_, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation]
pausedAnnotationValue, pausedAnnotation := scaledJob.GetAnnotations()[kedav1alpha1.PausedAnnotation]
pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue
shouldPause := false
if pausedAnnotation {
var err error
shouldPause, err = strconv.ParseBool(pausedAnnotationValue)
if err != nil {
shouldPause = true
}
}
if shouldPause {
if !pausedStatus {
logger.Info("ScaledJob is paused, stopping scaling loop.")
msg := kedav1alpha1.ScaledJobConditionPausedMessage
Expand Down Expand Up @@ -286,7 +295,6 @@ func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context
// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Starting a new ScaleLoop")

key, err := cache.MetaNamespaceKeyFunc(scaledJob)
if err != nil {
logger.Error(err, "Error getting key for scaledJob")
Expand Down
181 changes: 181 additions & 0 deletions controllers/keda/scaledjob_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package keda

import (
"context"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
)

var _ = Describe("ScaledJobController", func() {

var (
testLogger = zap.New(zap.UseDevMode(true), zap.WriteTo(GinkgoWriter))
)

Describe("functional tests", func() {
It("scaledjob paused condition status changes to true on annotation", func() {
jobName := "toggled-to-paused-annotation-name"
sjName := "sj-" + jobName

sj := &kedav1alpha1.ScaledJob{
ObjectMeta: metav1.ObjectMeta{
Name: sjName,
Namespace: "default",
},
Spec: kedav1alpha1.ScaledJobSpec{
JobTargetRef: generateJobSpec(jobName),
Triggers: []kedav1alpha1.ScaleTriggers{
{
Type: "cron",
Metadata: map[string]string{
"timezone": "UTC",
"start": "0 * * * *",
"end": "1 * * * *",
"desiredReplicas": "1",
},
},
},
},
}
pollingInterval := int32(5)
sj.Spec.PollingInterval = &pollingInterval
err := k8sClient.Create(context.Background(), sj)
Expect(err).ToNot(HaveOccurred())

Eventually(func() metav1.ConditionStatus {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
if err != nil {
return metav1.ConditionTrue
}
return sj.Status.Conditions.GetPausedCondition().Status
}, 5*time.Second).Should(Or(Equal(metav1.ConditionFalse), Equal(metav1.ConditionUnknown)))

// set annotation
Eventually(func() error {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
Expect(err).ToNot(HaveOccurred())
annotations := make(map[string]string)
annotations[kedav1alpha1.PausedAnnotation] = "true"
sj.SetAnnotations(annotations)
pollingInterval := int32(6)
sj.Spec.PollingInterval = &pollingInterval
return k8sClient.Update(context.Background(), sj)
}).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).ShouldNot(HaveOccurred())
testLogger.Info("annotation is set")

// validate annotation is set correctly
Eventually(func() bool {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
Expect(err).ToNot(HaveOccurred())
_, hasAnnotation := sj.GetAnnotations()[kedav1alpha1.PausedAnnotation]
return hasAnnotation
}).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue())

Eventually(func() metav1.ConditionStatus {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
if err != nil {
return metav1.ConditionUnknown
}
return sj.Status.Conditions.GetPausedCondition().Status
}).WithTimeout(2 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionTrue))
})
It("scaledjob paused status stays false when annotation is set to false", func() {
jobName := "turn-off-paused-annotation-name"
sjName := "sj-" + jobName
// create object already paused
sj := &kedav1alpha1.ScaledJob{
ObjectMeta: metav1.ObjectMeta{
Name: sjName,
Namespace: "default",
},
Spec: kedav1alpha1.ScaledJobSpec{
JobTargetRef: generateJobSpec(jobName),
Triggers: []kedav1alpha1.ScaleTriggers{
{
Type: "cron",
Metadata: map[string]string{
"timezone": "UTC",
"start": "0 * * * *",
"end": "1 * * * *",
"desiredReplicas": "1",
},
},
},
},
}
pollingInterval := int32(5)
sj.Spec.PollingInterval = &pollingInterval
err := k8sClient.Create(context.Background(), sj)
Expect(err).ToNot(HaveOccurred())
falseAnnotationValue := "false"
// set annotation
Eventually(func() error {
err = k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
Expect(err).ToNot(HaveOccurred())
annotations := make(map[string]string)
annotations[kedav1alpha1.PausedAnnotation] = falseAnnotationValue
sj.SetAnnotations(annotations)
pollingInterval := int32(6)
sj.Spec.PollingInterval = &pollingInterval
return k8sClient.Update(context.Background(), sj)
}).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).ShouldNot(HaveOccurred())
testLogger.Info("annotation is set")

// validate annotation is set correctly
Eventually(func() bool {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
Expect(err).ToNot(HaveOccurred())
value, hasPausedAnnotation := sj.GetAnnotations()[kedav1alpha1.PausedAnnotation]
if !hasPausedAnnotation {
return false
}
return value == falseAnnotationValue
}).WithTimeout(1 * time.Minute).WithPolling(2 * time.Second).Should(BeTrue())

// TODO(nappelson) - update assertion to be ConditionFalse
// https://github.com/kedacore/keda/issues/5251 prevents Condition from updating appropriately
Eventually(func() metav1.ConditionStatus {
err := k8sClient.Get(context.Background(), types.NamespacedName{Name: sjName, Namespace: "default"}, sj)
if err != nil {
return metav1.ConditionUnknown
}
return sj.Status.Conditions.GetPausedCondition().Status
}).WithTimeout(1 * time.Minute).WithPolling(10 * time.Second).Should(Equal(metav1.ConditionUnknown))
})

})
})

func generateJobSpec(name string) *batchv1.JobSpec {
return &batchv1.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": name,
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: name,
Image: name,
},
},
},
},
}
}
10 changes: 5 additions & 5 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request

// reconcileScaledObject implements reconciler logic for ScaledObject
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, conditions *kedav1alpha1.Conditions) (string, error) {
// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledObject (since the presence of this annotation will pause
// Check the presence of "autoscaling.keda.sh/paused" annotation on the scaledObject (since the presence of this annotation will pause
// autoscaling no matter what number of replicas is provided), and if so, stop the scale loop and delete the HPA on the scaled object.
pausedAnnotationFound := scaledObject.HasPausedAnnotation()
if pausedAnnotationFound {
needsToPause := scaledObject.NeedToBePausedByAnnotation()
if needsToPause {
scaledToPausedCount := true
if conditions.GetPausedCondition().Status == metav1.ConditionTrue {
// If scaledobject is in paused condition but replica count is not equal to paused replica count, the following scaling logic needs to be trigger again.
Expand All @@ -228,7 +228,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
}
}
if scaledObject.NeedToBePausedByAnnotation() && scaledToPausedCount {
if scaledToPausedCount {
msg := kedav1alpha1.ScaledObjectConditionPausedMessage
if err := r.stopScaleLoop(ctx, logger, scaledObject); err != nil {
msg = "failed to stop the scale loop for paused ScaledObject"
Expand Down Expand Up @@ -298,7 +298,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}
logger.Info("Initializing Scaling logic according to ScaledObject Specification")
}
if pausedAnnotationFound && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
if scaledObject.HasPausedReplicaAnnotation() && conditions.GetPausedCondition().Status != metav1.ConditionTrue {
return "ScaledObject paused replicas are being scaled", fmt.Errorf("ScaledObject paused replicas are being scaled")
}
return kedav1alpha1.ScaledObjectConditionReadySuccessMessage, nil
Expand Down
Loading

0 comments on commit df16ac1

Please sign in to comment.