From 58a252f0b70364693bfa6cf3387de0327109ad8c Mon Sep 17 00:00:00 2001 From: Shubham Kuchhal Date: Wed, 2 Jun 2021 13:19:16 +0530 Subject: [PATCH 1/4] Fix: READY AND ACTIVE fields of ScaledJob to show status. Signed-off-by: Shubham Kuchhal --- controllers/scaledjob_controller.go | 61 ++++++++++++++++++----------- pkg/scaling/executor/scale_jobs.go | 19 ++++++++- 2 files changed, 56 insertions(+), 24 deletions(-) diff --git a/controllers/scaledjob_controller.go b/controllers/scaledjob_controller.go index f048916925d..3d790c2daad 100644 --- a/controllers/scaledjob_controller.go +++ b/controllers/scaledjob_controller.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + kedacontrollerutil "github.com/kedacore/keda/v2/controllers/util" "github.com/kedacore/keda/v2/pkg/eventreason" corev1 "k8s.io/api/core/v1" @@ -80,47 +81,63 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } - var errMsg string - if scaledJob.Spec.JobTargetRef != nil { - reqLogger.Info("Detected ScaleType = Job") - conditions := scaledJob.Status.Conditions.DeepCopy() - msg, err := r.reconcileScaledJob(reqLogger, scaledJob) - if err != nil { - reqLogger.Error(err, msg) - conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) - conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") - r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg) - } else { - wasReady := conditions.GetReadyCondition() - if wasReady.IsFalse() || wasReady.IsUnknown() { - r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling") - } - reqLogger.V(1).Info(msg) - conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) + // ensure Status Conditions are initialized + if !scaledJob.Status.Conditions.AreInitialized() { + conditions := kedav1alpha1.GetInitializedConditions() + if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, conditions); err != nil { + return ctrl.Result{}, err } + } + // Check jobTargetRef is specified + if scaledJob.Spec.JobTargetRef == nil { + errMsg := "scaledJob.spec.jobTargetRef is not set" + err := fmt.Errorf(errMsg) + reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found") return ctrl.Result{}, err } + msg, err := r.reconcileScaledJob(reqLogger, scaledJob) + conditions := scaledJob.Status.Conditions.DeepCopy() + if err != nil { + reqLogger.Error(err, msg) + conditions.SetReadyCondition(metav1.ConditionFalse, "ScaledJobCheckFailed", msg) + conditions.SetActiveCondition(metav1.ConditionUnknown, "UnknownState", "ScaledJob check failed") + r.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.ScaledJobCheckFailed, msg) + } else { + wasReady := conditions.GetReadyCondition() + if wasReady.IsFalse() || wasReady.IsUnknown() { + r.Recorder.Event(scaledJob, corev1.EventTypeNormal, eventreason.ScaledJobReady, "ScaledJob is ready for scaling") + } + reqLogger.V(1).Info(msg) + conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg) + } - errMsg = "scaledJob.Spec.JobTargetRef is not set" - err = fmt.Errorf(errMsg) - reqLogger.Error(err, "scaledJob.Spec.JobTargetRef not found") + if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, &conditions); err != nil { + return ctrl.Result{}, err + } return ctrl.Result{}, err } -// reconcileJobType implemets reconciler logic for K8s Jobs based ScaleObject +// reconcileJobType implemets reconciler logic for K8s Jobs based ScaledJob func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) { msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob) if err != nil { return msg, err } + // Check ScaledJob is Ready or not + _, err = r.scaleHandler.GetScalers(scaledJob) + if err != nil { + logger.Error(err, "Error getting scalers") + return "Failed to ensure ScaledJob is correctly created", err + } + // scaledJob was created or modified - let's start a new ScaleLoop err = r.requestScaleLoop(logger, scaledJob) if err != nil { return "Failed to start a new scale loop with scaling logic", err } - logger.Info("Initializing Scaling logic according to ScaledObject Specification") + logger.Info("Initializing Scaling logic according to ScaledJob Specification") return "ScaledJob is defined correctly and is ready to scaling", nil } diff --git a/pkg/scaling/executor/scale_jobs.go b/pkg/scaling/executor/scale_jobs.go index 57eb5b637d1..76b81dc9a60 100644 --- a/pkg/scaling/executor/scale_jobs.go +++ b/pkg/scaling/executor/scale_jobs.go @@ -50,6 +50,21 @@ func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1al logger.V(1).Info("No change in activity") } + condition := scaledJob.Status.Conditions.GetActiveCondition() + if condition.IsUnknown() || condition.IsTrue() != isActive { + if isActive { + if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil { + logger.Error(err, "Error setting active condition when triggers are active") + return + } + } else { + if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil { + logger.Error(err, "Error setting active condition when triggers are not active") + return + } + } + } + err := e.cleanUp(scaledJob) if err != nil { logger.Error(err, "Failed to cleanUp jobs") @@ -98,10 +113,10 @@ func (e *scaleExecutor) createJobs(logger logr.Logger, scaledJob *kedav1alpha1.S job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure } - // Set ScaledObject instance as the owner and controller + // Set ScaledJob instance as the owner and controller err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme) if err != nil { - logger.Error(err, "Failed to set ScaledObject as the owner of the new Job") + logger.Error(err, "Failed to set ScaledJob as the owner of the new Job") } err = e.client.Create(context.TODO(), job) From 4c695a208589cfe66318ba473d4d77cd08ea161b Mon Sep 17 00:00:00 2001 From: Shubham Kuchhal Date: Wed, 2 Jun 2021 17:25:55 +0530 Subject: [PATCH 2/4] Change reconcileJobType implemets to reconcileScaledJob implements in comment. Signed-off-by: Shubham Kuchhal --- controllers/scaledjob_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controllers/scaledjob_controller.go b/controllers/scaledjob_controller.go index 3d790c2daad..6886f6dfde7 100644 --- a/controllers/scaledjob_controller.go +++ b/controllers/scaledjob_controller.go @@ -118,7 +118,7 @@ func (r *ScaledJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { return ctrl.Result{}, err } -// reconcileJobType implemets reconciler logic for K8s Jobs based ScaledJob +// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) { msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob) if err != nil { From c317580a1ea54688107e158db68631eb1245a692 Mon Sep 17 00:00:00 2001 From: Shubham Kuchhal Date: Wed, 2 Jun 2021 18:30:15 +0530 Subject: [PATCH 3/4] Update the changelog. Signed-off-by: Shubham Kuchhal --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 13b4ffd910a..f17475c55ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ - Fix cleanup of removed triggers ([#1768](https://github.com/kedacore/keda/pull/1768)) - Eventhub Scaler: Add trigger parameter `checkpointStrategy` to support more language-specific checkpoints ([#1621](https://github.com/kedacore/keda/pull/1621)) - Fix Azure Blob scaler when using multiple triggers with the same `blobContainerName` and added a optional `metricName` field ([#1816](https://github.com/kedacore/keda/pull/1816)) +- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855)) ### Breaking Changes From 08ea5f7a7337a53a17bbb0c0fbb9e18507a634a1 Mon Sep 17 00:00:00 2001 From: Shubham Kuchhal Date: Wed, 2 Jun 2021 19:32:51 +0530 Subject: [PATCH 4/4] Move this PR link to Unreleased in CHANGELOG Signed-off-by: Shubham Kuchhal --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f17475c55ab..514674e17d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,7 @@ ### Improvements -- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX)) +- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855)) ### Breaking Changes @@ -54,7 +54,6 @@ - Fix cleanup of removed triggers ([#1768](https://github.com/kedacore/keda/pull/1768)) - Eventhub Scaler: Add trigger parameter `checkpointStrategy` to support more language-specific checkpoints ([#1621](https://github.com/kedacore/keda/pull/1621)) - Fix Azure Blob scaler when using multiple triggers with the same `blobContainerName` and added a optional `metricName` field ([#1816](https://github.com/kedacore/keda/pull/1816)) -- Fix READY and ACTIVE fields of ScaledJob to show status when we run `kubectl get sj` ([#1855](https://github.com/kedacore/keda/pull/1855)) ### Breaking Changes