Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate contexts down scaler call stacks #2202

Merged
merged 9 commits into from
Oct 26, 2021
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

### New

- TODO ([#XXX](https://github.com/kedacore/keda/pull/XXX))
- ScaledJob: introduce MultipleScalersCalculation ([#2016](https://github.com/kedacore/keda/pull/2016))
- Add Graphite Scaler ([#1628](https://github.com/kedacore/keda/pull/2092))

Expand Down Expand Up @@ -53,6 +52,7 @@

### Other

- Ensure that `context.Context` values are passed down the stack from all scaler gRPC handler implementation to scaler implementation code ([#2202](https://github.com/kedacore/keda/pull/2202))
- Migrate to Kubebuilder v3 ([#2082](https://github.com/kedacore/keda/pull/2082))
- API path has been changed: `github.com/kedacore/keda/v2/api/v1alpha1` -> `github.com/kedacore/keda/v2/apis/keda/v1alpha1`
- Use Patch to set FallbackCondition on ScaledObject.Status ([#2037](https://github.com/kedacore/keda/pull/2037))
Expand Down
1 change: 0 additions & 1 deletion apis/keda/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions controllers/keda/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ const (
)

// createAndDeployNewHPA creates and deploy HPA in the cluster for specified ScaledObject
func (r *ScaledObjectReconciler) createAndDeployNewHPA(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) error {
func (r *ScaledObjectReconciler) createAndDeployNewHPA(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) error {
hpaName := getHPAName(scaledObject)
logger.Info("Creating a new HPA", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", hpaName)
hpa, err := r.newHPAForScaledObject(logger, scaledObject, gvkr)
hpa, err := r.newHPAForScaledObject(ctx, logger, scaledObject, gvkr)
if err != nil {
logger.Error(err, "Failed to create new HPA resource", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", hpaName)
return err
Expand All @@ -59,8 +59,8 @@ func (r *ScaledObjectReconciler) createAndDeployNewHPA(logger logr.Logger, scale
}

// newHPAForScaledObject returns HPA as it is specified in ScaledObject
func (r *ScaledObjectReconciler) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject)
func (r *ScaledObjectReconciler) newHPAForScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) {
scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(ctx, logger, scaledObject)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -120,8 +120,8 @@ func (r *ScaledObjectReconciler) newHPAForScaledObject(logger logr.Logger, scale
}

// updateHPAIfNeeded checks whether update of HPA is needed
func (r *ScaledObjectReconciler) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta2.HorizontalPodAutoscaler, gvkr *kedav1alpha1.GroupVersionKindResource) error {
hpa, err := r.newHPAForScaledObject(logger, scaledObject, gvkr)
func (r *ScaledObjectReconciler) updateHPAIfNeeded(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta2.HorizontalPodAutoscaler, gvkr *kedav1alpha1.GroupVersionKindResource) error {
hpa, err := r.newHPAForScaledObject(ctx, logger, scaledObject, gvkr)
if err != nil {
logger.Error(err, "Failed to create new HPA resource", "HPA.Namespace", scaledObject.Namespace, "HPA.Name", getHPAName(scaledObject))
return err
Expand Down Expand Up @@ -155,19 +155,19 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(logger logr.Logger, scaledObj
}

// getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject
func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) {
func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta2.MetricSpec
var externalMetricNames []string
var resourceMetricNames []string

scalers, err := r.scaleHandler.GetScalers(scaledObject)
scalers, err := r.scaleHandler.GetScalers(ctx, scaledObject)
if err != nil {
logger.Error(err, "Error getting scalers")
return nil, err
}

for _, scaler := range scalers {
metricSpecs := scaler.GetMetricSpecForScaling()
metricSpecs := scaler.GetMetricSpecForScaling(ctx)

for _, metricSpec := range metricSpecs {
if metricSpec.Resource != nil {
Expand All @@ -187,7 +187,7 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger,
}
}
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...)
scaler.Close()
scaler.Close(ctx)
}

// sort metrics in ScaledObject, this way we always check the same resource in Reconcile loop and we can prevent unnecessary HPA updates,
Expand Down
13 changes: 8 additions & 5 deletions controllers/keda/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package keda

import (
"context"

"github.com/go-logr/logr"
"github.com/golang/mock/gomock"
. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -75,7 +77,7 @@ var _ = Describe("hpa", func() {
capturedScaledObject = *scaledObject
})

_, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject)
_, err := reconciler.getScaledObjectMetricSpecs(context.Background(), logger, scaledObject)

Expect(err).ToNot(HaveOccurred())
Expect(capturedScaledObject.Status.Health).To(BeEmpty())
Expand All @@ -102,7 +104,7 @@ var _ = Describe("hpa", func() {
capturedScaledObject = *scaledObject
})

_, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject)
_, err := reconciler.getScaledObjectMetricSpecs(context.Background(), logger, scaledObject)

expectedHealth := make(map[string]v1alpha1.HealthStatus)
expectedHealth["some metric name"] = v1alpha1.HealthStatus{
Expand Down Expand Up @@ -136,9 +138,10 @@ func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.Moc
},
}
metricSpecs := []v2beta2.MetricSpec{metricSpec}
scaler.EXPECT().GetMetricSpecForScaling().Return(metricSpecs)
scaler.EXPECT().Close()
scaleHandler.EXPECT().GetScalers(gomock.Eq(scaledObject)).Return(scalers, nil)
ctx := context.Background()
scaler.EXPECT().GetMetricSpecForScaling(ctx).Return(metricSpecs)
scaler.EXPECT().Close(ctx)
scaleHandler.EXPECT().GetScalers(context.Background(), gomock.Eq(scaledObject)).Return(scalers, nil)

return scaledObject
}
6 changes: 3 additions & 3 deletions controllers/keda/scaledjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
reqLogger.Error(err, "scaledJob.spec.jobTargetRef not found")
return ctrl.Result{}, err
}
msg, err := r.reconcileScaledJob(reqLogger, scaledJob)
msg, err := r.reconcileScaledJob(ctx, reqLogger, scaledJob)
conditions := scaledJob.Status.Conditions.DeepCopy()
if err != nil {
reqLogger.Error(err, msg)
Expand All @@ -133,14 +133,14 @@ func (r *ScaledJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}

// reconcileScaledJob implements reconciler logic for K8s Jobs based ScaledJob
func (r *ScaledJobReconciler) reconcileScaledJob(logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
func (r *ScaledJobReconciler) reconcileScaledJob(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) (string, error) {
msg, err := r.deletePreviousVersionScaleJobs(logger, scaledJob)
if err != nil {
return msg, err
}

// Check ScaledJob is Ready or not
_, err = r.scaleHandler.GetScalers(scaledJob)
_, err = r.scaleHandler.GetScalers(ctx, scaledJob)
if err != nil {
logger.Error(err, "Error getting scalers")
return "Failed to ensure ScaledJob is correctly created", err
Expand Down
12 changes: 6 additions & 6 deletions controllers/keda/scaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

// reconcile ScaledObject and set status appropriately
msg, err := r.reconcileScaledObject(reqLogger, scaledObject)
msg, err := r.reconcileScaledObject(ctx, reqLogger, scaledObject)
conditions := scaledObject.Status.Conditions.DeepCopy()
if err != nil {
reqLogger.Error(err, msg)
Expand All @@ -199,7 +199,7 @@ func (r *ScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request
}

// reconcileScaledObject implements reconciler logic for ScaleObject
func (r *ScaledObjectReconciler) reconcileScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (string, error) {
func (r *ScaledObjectReconciler) reconcileScaledObject(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (string, error) {
// Check scale target Name is specified
if scaledObject.Spec.ScaleTargetRef.Name == "" {
err := fmt.Errorf("ScaledObject.spec.scaleTargetRef.name is missing")
Expand All @@ -224,7 +224,7 @@ func (r *ScaledObjectReconciler) reconcileScaledObject(logger logr.Logger, scale
}

// Create a new HPA or update existing one according to ScaledObject
newHPACreated, err := r.ensureHPAForScaledObjectExists(logger, scaledObject, &gvkr)
newHPACreated, err := r.ensureHPAForScaledObjectExists(ctx, logger, scaledObject, &gvkr)
if err != nil {
return "Failed to ensure HPA is correctly created for ScaledObject", err
}
Expand Down Expand Up @@ -349,14 +349,14 @@ func (r *ScaledObjectReconciler) checkReplicaCountBoundsAreValid(scaledObject *k
}

// ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (bool, error) {
func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedav1alpha1.GroupVersionKindResource) (bool, error) {
hpaName := getHPAName(scaledObject)
foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{}
// Check if HPA for this ScaledObject already exists
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa)
if err != nil && errors.IsNotFound(err) {
// HPA wasn't found -> let's create a new one
err = r.createAndDeployNewHPA(logger, scaledObject, gvkr)
err = r.createAndDeployNewHPA(ctx, logger, scaledObject, gvkr)
if err != nil {
return false, err
}
Expand All @@ -372,7 +372,7 @@ func (r *ScaledObjectReconciler) ensureHPAForScaledObjectExists(logger logr.Logg
}

// HPA was found -> let's check if we need to update it
err = r.updateHPAIfNeeded(logger, scaledObject, foundHpa, gvkr)
err = r.updateHPAIfNeeded(ctx, logger, scaledObject, foundHpa, gvkr)
if err != nil {
logger.Error(err, "Failed to check HPA for possible update")
return false, err
Expand Down
16 changes: 8 additions & 8 deletions controllers/keda/scaledobject_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,20 @@ var _ = Describe("ScaledObjectController", func() {
}

testScalers = append(testScalers, s)
for _, metricSpec := range s.GetMetricSpecForScaling() {
for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) {
if metricSpec.External != nil {
expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name)
}
}
}

// Set up expectations
mockScaleHandler.EXPECT().GetScalers(uniquelyNamedScaledObject).Return(testScalers, nil)
mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return(testScalers, nil)
mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())

// Call function to be tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, uniquelyNamedScaledObject)
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, uniquelyNamedScaledObject)

// Test that the status was updated with metric names
Ω(uniquelyNamedScaledObject.Status.ExternalMetricNames).Should(Equal(expectedExternalMetricNames))
Expand All @@ -139,19 +139,19 @@ var _ = Describe("ScaledObjectController", func() {
if err != nil {
Fail(err.Error())
}
for _, metricSpec := range s.GetMetricSpecForScaling() {
for _, metricSpec := range s.GetMetricSpecForScaling(context.Background()) {
if metricSpec.External != nil {
expectedExternalMetricNames = append(expectedExternalMetricNames, metricSpec.External.Metric.Name)
}
}

// Set up expectations
mockScaleHandler.EXPECT().GetScalers(uniquelyNamedScaledObject).Return([]scalers.Scaler{s}, nil)
mockScaleHandler.EXPECT().GetScalers(context.Background(), uniquelyNamedScaledObject).Return([]scalers.Scaler{s}, nil)
mockClient.EXPECT().Status().Return(mockStatusWriter)
mockStatusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any())

// Call function to be tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, uniquelyNamedScaledObject)
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, uniquelyNamedScaledObject)

// Test that the status was updated
Ω(uniquelyNamedScaledObject.Status.ExternalMetricNames).Should(Equal(expectedExternalMetricNames))
Expand Down Expand Up @@ -186,10 +186,10 @@ var _ = Describe("ScaledObjectController", func() {
}

// Set up expectations
mockScaleHandler.EXPECT().GetScalers(duplicateNamedScaledObject).Return(testScalers, nil)
mockScaleHandler.EXPECT().GetScalers(context.Background(), duplicateNamedScaledObject).Return(testScalers, nil)

// Call function tobe tested
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(testLogger, duplicateNamedScaledObject)
metricSpecs, err := metricNameTestReconciler.getScaledObjectMetricSpecs(context.Background(), testLogger, duplicateNamedScaledObject)

// Test that the status was not updated
Ω(duplicateNamedScaledObject.Status.ExternalMetricNames).Should(BeNil())
Expand Down
2 changes: 1 addition & 1 deletion pkg/mock/mock_client/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mock/mock_scale/mock_interfaces.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions pkg/mock/mock_scaler/mock_scaler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions pkg/mock/mock_scaling/mock_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading