Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly propagate context #2249

Merged
merged 2 commits into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

### Other

- Ensure that `context.Context` values are passed down the stack from all scaler gRPC handler implementation to scaler implementation code ([#2202](https://github.com/kedacore/keda/pull/2202))
- Ensure that `context.Context` values are properly passed down the stack ([#2202](https://github.com/kedacore/keda/pull/2202)|[#2249](https://github.com/kedacore/keda/pull/2249))
- Migrate to Kubebuilder v3 ([#2082](https://github.com/kedacore/keda/pull/2082))
- API path has been changed: `github.com/kedacore/keda/v2/api/v1alpha1` -> `github.com/kedacore/keda/v2/apis/keda/v1alpha1`
- Use Patch to set FallbackCondition on ScaledObject.Status ([#2037](https://github.com/kedacore/keda/pull/2037))
Expand Down
1 change: 1 addition & 0 deletions adapter/generated/openapi/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logg
return err
}

err = r.Client.Create(context.TODO(), hpa)
err = r.Client.Create(ctx, hpa)
if err != nil {
logger.Error(err, "Failed to create new HPA in cluster", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", hpaName)
return err
Expand Down Expand Up @@ -130,7 +130,7 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l
// DeepDerivative ignores extra entries in arrays which makes removing the last trigger not update things, so trigger and update any time the metrics count is different.
if len(hpa.Spec.Metrics) != len(foundHpa.Spec.Metrics) || !equality.Semantic.DeepDerivative(hpa.Spec, foundHpa.Spec) {
logger.V(1).Info("Found difference in the HPA spec accordint to ScaledObject", "currentHPA", foundHpa.Spec, "newHPA", hpa.Spec)
if r.Client.Update(context.TODO(), hpa) != nil {
if r.Client.Update(ctx, hpa) != nil {
foundHpa.Spec = hpa.Spec
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
return err
Expand All @@ -143,7 +143,7 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger l

if !equality.Semantic.DeepDerivative(hpa.ObjectMeta.Labels, foundHpa.ObjectMeta.Labels) {
logger.V(1).Info("Found difference in the HPA labels accordint to ScaledObject", "currentHPA", foundHpa.ObjectMeta.Labels, "newHPA", hpa.ObjectMeta.Labels)
if r.Client.Update(context.TODO(), hpa) != nil {
if r.Client.Update(ctx, hpa) != nil {
foundHpa.ObjectMeta.Labels = hpa.ObjectMeta.Labels
logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name)
return err
Expand Down Expand Up @@ -203,7 +203,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context,

updateHealthStatus(scaledObject, externalMetricNames, status)

err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status)
err = kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used externalMetricNames")
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// Check if the ScaledJob instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if scaledJob.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledJob(reqLogger, scaledJob)
return ctrl.Result{}, r.finalizeScaledJob(ctx, reqLogger, scaledJob)
}

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(reqLogger, scaledJob); err != nil {
if err := r.ensureFinalizer(ctx, reqLogger, scaledJob); err != nil {
return ctrl.Result{}, err
}

// ensure Status Conditions are initialized
if !scaledJob.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledJobReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledJob, &conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledJob, &conditions); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/scaledjob_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
)

// finalizeScaledJob runs finalization logic on ScaledJob if there's finalizer
func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
func (r *ScaledJobReconciler) finalizeScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
if util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
// Run finalization logic for scaledJobFinalizer. If the
// finalization logic fails, don't remove the finalizer so
Expand All @@ -44,7 +44,7 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
// Remove scaledJobFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledJob.SetFinalizers(util.Remove(scaledJob.GetFinalizers(), scaledJobFinalizer))
if err := r.Client.Update(context.TODO(), scaledJob); err != nil {
if err := r.Client.Update(ctx, scaledJob); err != nil {
logger.Error(err, "Failed to update ScaledJob after removing a finalizer", "finalizer", scaledJobFinalizer)
return err
}
Expand All @@ -56,13 +56,13 @@ func (r *ScaledJobReconciler) finalizeScaledJob(logger logr.Logger, scaledJob *k
}

// ensureFinalizer check there is finalizer present on the ScaledJob, if not it adds one
func (r *ScaledJobReconciler) ensureFinalizer(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
func (r *ScaledJobReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
if !util.Contains(scaledJob.GetFinalizers(), scaledJobFinalizer) {
logger.Info("Adding Finalizer for the ScaledJob")
scaledJob.SetFinalizers(append(scaledJob.GetFinalizers(), scaledJobFinalizer))

// Update CR
err := r.Client.Update(context.TODO(), scaledJob)
err := r.Client.Update(ctx, scaledJob)
if err != nil {
logger.Error(err, "Failed to update ScaledJob with a finalizer", "finalizer", scaledJobFinalizer)
return err
Expand Down
26 changes: 13 additions & 13 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,18 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
// Check if the ScaledObject instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if scaledObject.GetDeletionTimestamp() != nil {
return ctrl.Result{}, r.finalizeScaledObject(reqLogger, scaledObject)
return ctrl.Result{}, r.finalizeScaledObject(ctx, reqLogger, scaledObject)
}

// ensure finalizer is set on this CR
if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil {
if err := r.ensureFinalizer(ctx, reqLogger, scaledObject); err != nil {
return ctrl.Result{}, err
}

// ensure Status Conditions are initialized
if !scaledObject.Status.Conditions.AreInitialized() {
conditions := kedav1alpha1.GetInitializedConditions()
if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, conditions); err != nil {
return ctrl.Result{}, err
}
}
Expand All @@ -191,7 +191,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
conditions.SetReadyCondition(metav1.ConditionTrue, "ScaledObjectReady", msg)
}

if err := kedacontrollerutil.SetStatusConditions(r.Client, reqLogger, scaledObject, &conditions); err != nil {
if err := kedacontrollerutil.SetStatusConditions(ctx, r.Client, reqLogger, scaledObject, &conditions); err != nil {
return ctrl.Result{}, err
}

Expand All @@ -207,13 +207,13 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg
}

// Check the label needed for Metrics servers is present on ScaledObject
err := r.ensureScaledObjectLabel(logger, scaledObject)
err := r.ensureScaledObjectLabel(ctx, logger, scaledObject)
if err != nil {
return "Failed to update ScaledObject with scaledObjectName label", err
}

// Check if resource targeted for scaling exists and exposes /scale subresource
gvkr, err := r.checkTargetResourceIsScalable(logger, scaledObject)
gvkr, err := r.checkTargetResourceIsScalable(ctx, logger, scaledObject)
if err != nil {
return "ScaledObject doesn't have correct scaleTargetRef specification", err
}
Expand Down Expand Up @@ -251,7 +251,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logg

// ensureScaledObjectLabel ensures that scaledobject.keda.sh/name=<scaledObject.Name> label exist in the ScaledObject
// This is how the MetricsAdapter will know which ScaledObject a metric is for when the HPA queries it.
func (r *ScaledObjectReconciler) ensureScaledObjectLabel(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) ensureScaledObjectLabel(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
const labelScaledObjectName = "scaledobject.keda.sh/name"

if scaledObject.Labels == nil {
Expand All @@ -265,11 +265,11 @@ func (r *ScaledObjectReconciler) ensureScaledObjectLabel(logger logr.Logger, sca
}

logger.V(1).Info("Adding \"scaledobject.keda.sh/name\" label on ScaledObject", "value", scaledObject.Name)
return r.Client.Update(context.TODO(), scaledObject)
return r.Client.Update(ctx, scaledObject)
}

// checkTargetResourceIsScalable checks if resource targeted for scaling exists and exposes /scale subresource
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (kedav1alpha1.GroupVersionKindResource, error) {
gvkr, err := kedautil.ParseGVKR(r.restMapper, scaledObject.Spec.ScaleTargetRef.APIVersion, scaledObject.Spec.ScaleTargetRef.Kind)
if err != nil {
logger.Error(err, "Failed to parse Group, Version, Kind, Resource", "apiVersion", scaledObject.Spec.ScaleTargetRef.APIVersion, "kind", scaledObject.Spec.ScaleTargetRef.Kind)
Expand All @@ -289,12 +289,12 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge
// not cached, let's try to detect /scale subresource
// also rechecks when we need to update the status.
var errScale error
scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(ctx, gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if errScale != nil {
// not able to get /scale subresource -> let's check if the resource even exist in the cluster
unstruct := &unstructured.Unstructured{}
unstruct.SetGroupVersionKind(gvkr.GroupVersionKind())
if err := r.Client.Get(context.TODO(), client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
if err := r.Client.Get(ctx, client.ObjectKey{Namespace: scaledObject.Namespace, Name: scaledObject.Spec.ScaleTargetRef.Name}, unstruct); err != nil {
// resource doesn't exist
logger.Error(err, "Target resource doesn't exist", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
return gvkr, err
Expand All @@ -319,7 +319,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge
status.OriginalReplicaCount = &scale.Spec.Replicas
}

if err := kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status); err != nil {
if err := kedacontrollerutil.UpdateScaledObjectStatus(ctx, r.Client, logger, scaledObject, status); err != nil {
return gvkr, err
}
logger.Info("Detected resource targeted for scaling", "resource", gvkString, "name", scaledObject.Spec.ScaleTargetRef.Name)
Expand Down Expand Up @@ -353,7 +353,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Cont
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
err := r.Client.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
// HPA wasn't found -> let's create a new one
err = r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
Expand Down
12 changes: 6 additions & 6 deletions controllers/keda/scaledobject_finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
)

// finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer
func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) finalizeScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
if util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
// Run finalization logic for scaledObjectFinalizer. If the
// finalization logic fails, don't remove the finalizer so
Expand All @@ -45,7 +45,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled

// if enabled, scale scaleTarget back to the original replica count (to the state it was before scaling with KEDA)
if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.RestoreToOriginalReplicaCount {
scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
logger.V(1).Info("Failed to get scaleTarget's scale status, because it was probably deleted", "error", err)
Expand All @@ -54,7 +54,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}
} else {
scale.Spec.Replicas = *scaledObject.Status.OriginalReplicaCount
_, err = r.scaleClient.Scales(scaledObject.Namespace).Update(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{})
_, err = r.scaleClient.Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{})
if err != nil {
logger.Error(err, "Failed to restore scaleTarget's replica count back to the original", "finalizer", scaledObjectFinalizer)
}
Expand All @@ -65,7 +65,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
// Remove scaledObjectFinalizer. Once all finalizers have been
// removed, the object will be deleted.
scaledObject.SetFinalizers(util.Remove(scaledObject.GetFinalizers(), scaledObjectFinalizer))
if err := r.Client.Update(context.TODO(), scaledObject); err != nil {
if err := r.Client.Update(ctx, scaledObject); err != nil {
logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer)
return err
}
Expand All @@ -77,13 +77,13 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled
}

// ensureFinalizer check there is finalizer present on the ScaledObject, if not it adds one
func (r *ScaledObjectReconciler) ensureFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
func (r *ScaledObjectReconciler) ensureFinalizer(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error {
if !util.Contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) {
logger.Info("Adding Finalizer for the ScaledObject")
scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer))

// Update CR
err := r.Client.Update(context.TODO(), scaledObject)
err := r.Client.Update(ctx, scaledObject)
if err != nil {
logger.Error(err, "Failed to update ScaledObject with a finalizer", "finalizer", scaledObjectFinalizer)
return err
Expand Down
8 changes: 4 additions & 4 deletions controllers/keda/util/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

// SetStatusConditions patches given object with passed list of conditions based on the object's type or returns an error.
func SetStatusConditions(client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error {
func SetStatusConditions(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, object interface{}, conditions *kedav1alpha1.Conditions) error {
var patch runtimeclient.Patch

runtimeObj := object.(runtimeclient.Object)
Expand All @@ -44,18 +44,18 @@ func SetStatusConditions(client runtimeclient.StatusClient, logger logr.Logger,
return err
}

err := client.Status().Patch(context.TODO(), runtimeObj, patch)
err := client.Status().Patch(ctx, runtimeObj, patch)
if err != nil {
logger.Error(err, "Failed to patch Objects Status with Conditions")
}
return err
}

// UpdateScaledObjectStatus patches the given ScaledObject with the updated status passed to it or returns an error.
func UpdateScaledObjectStatus(client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
func UpdateScaledObjectStatus(ctx context.Context, client runtimeclient.StatusClient, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, status *kedav1alpha1.ScaledObjectStatus) error {
patch := runtimeclient.MergeFrom(scaledObject.DeepCopy())
scaledObject.Status = *status
err := client.Status().Patch(context.TODO(), scaledObject, patch)
err := client.Status().Patch(ctx, scaledObject, patch)
if err != nil {
logger.Error(err, "Failed to patch ScaledObjects Status")
}
Expand Down
Loading