diff --git a/CHANGELOG.md b/CHANGELOG.md index 07dcff3692a..09f602370b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **CPU/Memory scaler**: Add support for scale to zero if there are multiple triggers([#4269](https://github.com/kedacore/keda/issues/4269)) - **Redis Scalers**: Allow scaling using redis stream length ([#4277](https://github.com/kedacore/keda/issues/4277)) - **General**: Introduce new Solr Scaler ([#4234](https://github.com/kedacore/keda/issues/4234)) +- **General**: Introduce annotation `autoscaling.keda.sh/paused: true` for ScaledJobs to pause autoscaling ([#3303](https://github.com/kedacore/keda/issues/3303)) ### Improvements diff --git a/apis/keda/v1alpha1/condition_types.go b/apis/keda/v1alpha1/condition_types.go index 59faec27a49..4f3a182ee77 100644 --- a/apis/keda/v1alpha1/condition_types.go +++ b/apis/keda/v1alpha1/condition_types.go @@ -47,6 +47,17 @@ const ( ScaledObjectConditionPausedMessage = "ScaledObject is paused" ) +const ( + // ScaledJobConditionPausedReason defines the default Reason for paused ScaledJob + ScaledJobConditionPausedReason = "ScaledJobPaused" + // ScaledJobConditionPausedReason defines the default Reason for paused ScaledJob + ScaledJobConditionUnpausedReason = "ScaledJobUnpaused" + // ScaledJobConditionPausedMessage defines the default Message for paused ScaledJob + ScaledJobConditionPausedMessage = "ScaledJob is paused" + // ScaledJobConditionPausedMessage defines the default Message for paused ScaledJob + ScaledJobConditionUnpausedMessage = "ScaledJob is unpaused" +) + // Condition to store the condition state type Condition struct { // Type of condition diff --git a/apis/keda/v1alpha1/scaledjob_types.go b/apis/keda/v1alpha1/scaledjob_types.go index d9e179bd725..d6aa1fa4852 100644 --- a/apis/keda/v1alpha1/scaledjob_types.go +++ b/apis/keda/v1alpha1/scaledjob_types.go @@ -36,6 +36,7 @@ const ( // +kubebuilder:printcolumn:name="Authentication",type="string",JSONPath=".spec.triggers[*].authenticationRef.name" // +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status" // +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status" +// +kubebuilder:printcolumn:name="Paused",type="string",JSONPath=".status.conditions[?(@.type==\"Paused\")].status" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // ScaledJob is the Schema for the scaledjobs API @@ -78,6 +79,8 @@ type ScaledJobStatus struct { LastActiveTime *metav1.Time `json:"lastActiveTime,omitempty"` // +optional Conditions Conditions `json:"conditions,omitempty"` + // +optional + Paused string `json:"Paused,omitempty"` } // ScaledJobList contains a list of ScaledJob diff --git a/config/crd/bases/keda.sh_scaledjobs.yaml b/config/crd/bases/keda.sh_scaledjobs.yaml index 331a33a80e6..7b5037432a5 100644 --- a/config/crd/bases/keda.sh_scaledjobs.yaml +++ b/config/crd/bases/keda.sh_scaledjobs.yaml @@ -35,6 +35,9 @@ spec: - jsonPath: .status.conditions[?(@.type=="Active")].status name: Active type: string + - jsonPath: .status.conditions[?(@.type=="Paused")].status + name: Paused + type: string - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -8264,6 +8267,8 @@ spec: status: description: ScaledJobStatus defines the observed state of ScaledJob properties: + Paused: + type: string conditions: description: Conditions an array representation to store multiple Conditions diff --git a/controllers/keda/scaledjob_controller.go b/controllers/keda/scaledjob_controller.go index 806f095e455..4a7d1836bf1 100644 --- a/controllers/keda/scaledjob_controller.go +++ b/controllers/keda/scaledjob_controller.go @@ -39,6 +39,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" + kedacontrollerutil "github.com/kedacore/keda/v2/controllers/keda/util" "github.com/kedacore/keda/v2/pkg/eventreason" "github.com/kedacore/keda/v2/pkg/prommetrics" "github.com/kedacore/keda/v2/pkg/scaling" @@ -84,7 +85,11 @@ func (r *ScaledJobReconciler) SetupWithManager(mgr ctrl.Manager, options control WithOptions(options). // Ignore updates to ScaledJob Status (in this case metadata.Generation does not change) // so reconcile loop is not started on Status updates - For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + For(&kedav1alpha1.ScaledJob{}, builder.WithPredicates( + predicate.Or( + kedacontrollerutil.PausedPredicate{}, + predicate.GenerationChangedPredicate{}, + ))). Complete(r) } @@ -136,8 +141,8 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found") return ctrl.Result{}, err } - msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob) conditions := scaledJob.Status.Conditions.DeepCopy() + msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob, &conditions) if err != nil { reqLogger.Error(err, msg) conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) @@ -159,7 +164,15 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( } // reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob -func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) { +func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (string, error) { + isPaused, err := r.checkIfPaused(ctx, logger, scaledJob, conditions) + if err != nil { + return "Failed to check if ScaledJob was paused", err + } + if isPaused { + return "ScaledJob is paused, skipping reconcile loop", err + } + // nosemgrep: trailofbits.go.invalid-usage-of-modified-variable.invalid-usage-of-modified-variable msg, err := r.deletePreviousVersionScaleJobs(ctx, logger, scaledJob) if err != nil { @@ -193,6 +206,31 @@ func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger log return "ScaledJob is defined correctly and is ready to scaling", nil } +// Check the presence of "autoscaling.keda.sh/paused-replicas" annotation on the scaledJob and stop the scale loop. +func (r *ScaledJobReconciler) checkIfPaused(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, conditions *kedav1alpha1.Conditions) (bool, error) { + _, pausedAnnotation := scaledJob.GetAnnotations()[kedacontrollerutil.PausedAnnotation] + pausedStatus := conditions.GetPausedCondition().Status == metav1.ConditionTrue + if pausedAnnotation { + if !pausedStatus { + logger.Info("ScaledJob is paused, so skipping the request.") + msg := kedav1alpha1.ScaledJobConditionPausedMessage + if err := r.stopScaleLoop(ctx, logger, scaledJob); err != nil { + msg = "failed to stop the scale loop for paused ScaledJob" + conditions.SetPausedCondition(metav1.ConditionFalse, "ScaledJobStopScaleLoopFailed", msg) + return false, err + } + conditions.SetPausedCondition(metav1.ConditionTrue, kedav1alpha1.ScaledJobConditionPausedReason, msg) + } + return true, nil + } + if pausedStatus { + logger.Info("ScaledJob is unpausing.") + msg := kedav1alpha1.ScaledJobConditionUnpausedMessage + conditions.SetPausedCondition(metav1.ConditionFalse, kedav1alpha1.ScaledJobConditionUnpausedReason, msg) + } + return false, nil +} + // Delete Jobs owned by the previous version of the scaledJob based on the rolloutStrategy given for this scaledJob, if any func (r *ScaledJobReconciler) deletePreviousVersionScaleJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) { var rolloutStrategy string diff --git a/controllers/keda/util/predicate.go b/controllers/keda/util/predicate.go index 03aeb16c0bf..c4fc5405620 100644 --- a/controllers/keda/util/predicate.go +++ b/controllers/keda/util/predicate.go @@ -9,6 +9,8 @@ import ( const PausedReplicasAnnotation = "autoscaling.keda.sh/paused-replicas" +const PausedAnnotation = "autoscaling.keda.sh/paused" + type PausedReplicasPredicate struct { predicate.Funcs } @@ -61,3 +63,29 @@ func (ScaleObjectReadyConditionPredicate) Update(e event.UpdateEvent) bool { return false } + +type PausedPredicate struct { + predicate.Funcs +} + +func (PausedPredicate) Update(e event.UpdateEvent) bool { + if e.ObjectOld == nil || e.ObjectNew == nil { + return false + } + + newAnnotations := e.ObjectNew.GetAnnotations() + oldAnnotations := e.ObjectOld.GetAnnotations() + + newPausedValue := "" + oldPausedValue := "" + + if newAnnotations != nil { + newPausedValue = newAnnotations[PausedAnnotation] + } + + if oldAnnotations != nil { + oldPausedValue = oldAnnotations[PausedAnnotation] + } + + return newPausedValue != oldPausedValue +} diff --git a/tests/internals/pause_scaledjob/pause_scaledjob_test.go b/tests/internals/pause_scaledjob/pause_scaledjob_test.go new file mode 100644 index 00000000000..8089c102b56 --- /dev/null +++ b/tests/internals/pause_scaledjob/pause_scaledjob_test.go @@ -0,0 +1,213 @@ +//go:build e2e +// +build e2e + +// go test -v -tags e2e ./internals/pause_scaledjob/pause_scaledjob_test.go + +package pause_scaledjob_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + . "github.com/kedacore/keda/v2/tests/helper" +) + +// Load environment variables from .env file + +const ( + testName = "pause-scaledjob-test" +) + +var ( + testNamespace = fmt.Sprintf("%s-ns", testName) + serviceName = fmt.Sprintf("%s-service", testName) + scalerName = fmt.Sprintf("%s-scaler", testName) + scaledJobName = fmt.Sprintf("%s-sj", testName) + minReplicaCount = 0 + maxReplicaCount = 3 + iterationCountInitial = 15 + iterationCountLatter = 30 +) + +type templateData struct { + TestNamespace string + ServiceName string + ScalerName string + ScaledJobName string + MinReplicaCount, MaxReplicaCount int + MetricThreshold, MetricValue int +} + +const ( + serviceTemplate = ` +apiVersion: v1 +kind: Service +metadata: + name: {{.ServiceName}} + namespace: {{.TestNamespace}} +spec: + ports: + - port: 6000 + targetPort: 6000 + selector: + app: {{.ScalerName}} +` + + scalerTemplate = ` +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{.ScalerName}} + namespace: {{.TestNamespace}} + labels: + app: {{.ScalerName}} +spec: + replicas: 1 + selector: + matchLabels: + app: {{.ScalerName}} + template: + metadata: + labels: + app: {{.ScalerName}} + spec: + containers: + - name: scaler + image: ghcr.io/kedacore/tests-external-scaler-e2e:latest + imagePullPolicy: Always + ports: + - containerPort: 6000 +` + + scaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + pollingInterval: 5 + maxReplicaCount: {{.MaxReplicaCount}} + minReplicaCount: {{.MinReplicaCount}} + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "15" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + triggers: + - type: external + metadata: + scalerAddress: {{.ServiceName}}.{{.TestNamespace}}:6000 + metricThreshold: "{{.MetricThreshold}}" + metricValue: "{{.MetricValue}}" +` +) + +// Util function +func WaitForJobByFilterCountUntilIteration(t *testing.T, kc *kubernetes.Clientset, namespace string, + target, iterations, intervalSeconds int, listOptions metav1.ListOptions) bool { + var isTargetAchieved = false + + for i := 0; i < iterations; i++ { + jobList, _ := kc.BatchV1().Jobs(namespace).List(context.Background(), listOptions) + count := len(jobList.Items) + + t.Logf("Waiting for job count to hit target. Namespace - %s, Current - %d, Target - %d", + namespace, count, target) + + if count == target { + isTargetAchieved = true + } else { + isTargetAchieved = false + } + + time.Sleep(time.Duration(intervalSeconds) * time.Second) + } + + return isTargetAchieved +} + +func TestScaler(t *testing.T) { + // setup + t.Log("--- setting up ---") + + // Create kubernetes resources + kc := GetKubernetesClient(t) + metricValue := 1 + + data, templates := getTemplateData(metricValue) + + listOptions := metav1.ListOptions{ + FieldSelector: "status.successful=0", + } + + CreateKubernetesResources(t, kc, testNamespace, data, templates) + + assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, data.MetricThreshold, iterationCountInitial, 1, listOptions), + "job count should be %d after %d iterations", data.MetricThreshold, iterationCountInitial) + + // test scaling + testPause(t, kc, listOptions) + testUnpause(t, kc, data, listOptions) + + // cleanup + DeleteKubernetesResources(t, testNamespace, data, templates) +} + +func getTemplateData(metricValue int) (templateData, []Template) { + return templateData{ + TestNamespace: testNamespace, + ScaledJobName: scaledJobName, + ScalerName: scalerName, + ServiceName: serviceName, + MinReplicaCount: minReplicaCount, + MaxReplicaCount: maxReplicaCount, + MetricThreshold: 1, + MetricValue: metricValue, + }, []Template{ + {Name: "scalerTemplate", Config: scalerTemplate}, + {Name: "serviceTemplate", Config: serviceTemplate}, + {Name: "scaledJobTemplate", Config: scaledJobTemplate}, + } +} + +func testPause(t *testing.T, kc *kubernetes.Clientset, listOptions metav1.ListOptions) { + t.Log("--- testing Paused annotation ---") + + _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused=true", scaledJobName)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) + + t.Log("job count does not change as job is paused") + + expectedTarget := 0 + assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions), + "job count should be %d after %d iterations", expectedTarget, iterationCountLatter) +} + +func testUnpause(t *testing.T, kc *kubernetes.Clientset, data templateData, listOptions metav1.ListOptions) { + t.Log("--- testing removing Paused annotation ---") + + _, err := ExecuteCommand(fmt.Sprintf("kubectl annotate scaledjob %s autoscaling.keda.sh/paused-", scaledJobName)) + assert.NoErrorf(t, err, "cannot execute command - %s", err) + + t.Log("job count increases from zero as job is no longer paused") + + expectedTarget := data.MetricThreshold + assert.True(t, WaitForJobByFilterCountUntilIteration(t, kc, testNamespace, expectedTarget, iterationCountLatter, 1, listOptions), + "job count should be %d after %d iterations", expectedTarget, iterationCountLatter) +} diff --git a/tests/internals/pause_scaling/pause_scaling_test.go b/tests/internals/pause_scaledobject/pause_scaledobject_test.go similarity index 98% rename from tests/internals/pause_scaling/pause_scaling_test.go rename to tests/internals/pause_scaledobject/pause_scaledobject_test.go index f98d9042fa7..24635bd89dc 100644 --- a/tests/internals/pause_scaling/pause_scaling_test.go +++ b/tests/internals/pause_scaledobject/pause_scaledobject_test.go @@ -1,7 +1,7 @@ //go:build e2e // +build e2e -package pause_scaling_test +package pause_scaledobject_test import ( "fmt" @@ -16,7 +16,7 @@ import ( // Load environment variables from .env file const ( - testName = "pause-scaling-test" + testName = "pause-scaledobject-test" ) var (