From cc6a5bb6ea7108f44991a4bdfb3e23ee869c62fa Mon Sep 17 00:00:00 2001 From: mishamo Date: Wed, 14 Jul 2021 13:54:50 +0100 Subject: [PATCH] Add fallback functionality (#1910) Signed-off-by: misha --- CHANGELOG.md | 1 + api/v1alpha1/condition_types.go | 29 +- api/v1alpha1/scaledobject_types.go | 34 ++ api/v1alpha1/zz_generated.deepcopy.go | 52 +++ config/crd/bases/keda.sh_scaledjobs.yaml | 3 + config/crd/bases/keda.sh_scaledobjects.yaml | 32 ++ controllers/hpa.go | 15 + controllers/hpa_test.go | 127 ++++++ controllers/scaledobject_controller.go | 6 +- controllers/scaledobject_finalizer.go | 4 +- pkg/mock/mock_scale/mock_interfaces.go | 122 +++++ pkg/mock/mock_scaler/mock_scaler.go | 189 ++++++++ pkg/provider/fallback.go | 120 +++++ pkg/provider/fallback_test.go | 425 ++++++++++++++++++ pkg/provider/provider.go | 4 +- pkg/scaling/executor/scale_executor.go | 26 +- pkg/scaling/executor/scale_scaledobjects.go | 29 +- .../executor/scale_scaledobjects_test.go | 78 ++++ pkg/scaling/scale_hander_test.go | 51 --- pkg/scaling/scale_handler.go | 21 +- pkg/scaling/scale_handler_test.go | 122 +++++ 21 files changed, 1412 insertions(+), 78 deletions(-) create mode 100644 controllers/hpa_test.go create mode 100644 pkg/mock/mock_scale/mock_interfaces.go create mode 100644 pkg/mock/mock_scaler/mock_scaler.go create mode 100644 pkg/provider/fallback.go create mode 100644 pkg/provider/fallback_test.go create mode 100644 pkg/scaling/executor/scale_scaledobjects_test.go delete mode 100644 pkg/scaling/scale_hander_test.go create mode 100644 pkg/scaling/scale_handler_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c2599d8f666..bba14397e9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ - Extend Azure Monitor scaler to support custom metrics ([#1883](https://github.com/kedacore/keda/pull/1883)) - Support non-public cloud environments in the Azure Storage Queue and Azure Storage Blob scalers ([#1863](https://github.com/kedacore/keda/pull/1863)) - Show HashiCorp Vault Address when using `kubectl get ta` or `kubectl get cta` ([#1862](https://github.com/kedacore/keda/pull/1862)) +- Add fallback functionality ([#1872](https://github.com/kedacore/keda/issues/1872)) ### Improvements diff --git a/api/v1alpha1/condition_types.go b/api/v1alpha1/condition_types.go index 2e1c68d450a..0e1b33cf02d 100644 --- a/api/v1alpha1/condition_types.go +++ b/api/v1alpha1/condition_types.go @@ -14,6 +14,8 @@ const ( // ConditionActive specifies that the resource has finished. // For resource which run to completion. ConditionActive ConditionType = "Active" + // ConditionFallback specifies that the resource has a fallback active. + ConditionFallback ConditionType = "Fallback" ) // Condition to store the condition state @@ -44,6 +46,7 @@ type Conditions []Condition func (c *Conditions) AreInitialized() bool { foundReady := false foundActive := false + foundFallback := false if *c != nil { for _, condition := range *c { if condition.Type == ConditionReady { @@ -57,14 +60,20 @@ func (c *Conditions) AreInitialized() bool { break } } + for _, condition := range *c { + if condition.Type == ConditionFallback { + foundFallback = true + break + } + } } - return foundReady && foundActive + return foundReady && foundActive && foundFallback } // GetInitializedConditions returns Conditions initialized to the default -> Status: Unknown func GetInitializedConditions() *Conditions { - return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}} + return &Conditions{{Type: ConditionReady, Status: metav1.ConditionUnknown}, {Type: ConditionActive, Status: metav1.ConditionUnknown}, {Type: ConditionFallback, Status: metav1.ConditionUnknown}} } // IsTrue is true if the condition is True @@ -107,6 +116,14 @@ func (c *Conditions) SetActiveCondition(status metav1.ConditionStatus, reason st c.setCondition(ConditionActive, status, reason, message) } +// SetFallbackCondition modifies Fallback Condition according to input parameters +func (c *Conditions) SetFallbackCondition(status metav1.ConditionStatus, reason string, message string) { + if *c == nil { + c = GetInitializedConditions() + } + c.setCondition(ConditionFallback, status, reason, message) +} + // GetActiveCondition returns Condition of type Active func (c *Conditions) GetActiveCondition() Condition { if *c == nil { @@ -123,6 +140,14 @@ func (c *Conditions) GetReadyCondition() Condition { return c.getCondition(ConditionReady) } +// GetFallbackCondition returns Condition of type Ready +func (c *Conditions) GetFallbackCondition() Condition { + if *c == nil { + c = GetInitializedConditions() + } + return c.getCondition(ConditionFallback) +} + func (c Conditions) getCondition(conditionType ConditionType) Condition { for i := range c { if c[i].Type == conditionType { diff --git a/api/v1alpha1/scaledobject_types.go b/api/v1alpha1/scaledobject_types.go index b55d543cd90..f45a287a32d 100644 --- a/api/v1alpha1/scaledobject_types.go +++ b/api/v1alpha1/scaledobject_types.go @@ -17,6 +17,7 @@ import ( // +kubebuilder:printcolumn:name="Authentication",type="string",JSONPath=".spec.triggers[*].authenticationRef.name" // +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.conditions[?(@.type==\"Ready\")].status" // +kubebuilder:printcolumn:name="Active",type="string",JSONPath=".status.conditions[?(@.type==\"Active\")].status" +// +kubebuilder:printcolumn:name="Fallback",type="string",JSONPath=".status.conditions[?(@.type==\"Fallback\")].status" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // ScaledObject is a specification for a ScaledObject resource @@ -29,6 +30,25 @@ type ScaledObject struct { Status ScaledObjectStatus `json:"status,omitempty"` } +// HealthStatus is the status for a ScaledObject's health +type HealthStatus struct { + // +optional + NumberOfFailures *int32 `json:"numberOfFailures,omitempty"` + // +optional + Status HealthStatusType `json:"status,omitempty"` +} + +// HealthStatusType is an indication of whether the health status is happy or failing +type HealthStatusType string + +const ( + // HealthStatusHappy means the status of the health object is happy + HealthStatusHappy HealthStatusType = "Happy" + + // HealthStatusFailing means the status of the health object is failing + HealthStatusFailing HealthStatusType = "Failing" +) + // ScaledObjectSpec is the spec for a ScaledObject resource type ScaledObjectSpec struct { ScaleTargetRef *ScaleTarget `json:"scaleTargetRef"` @@ -44,6 +64,14 @@ type ScaledObjectSpec struct { Advanced *AdvancedConfig `json:"advanced,omitempty"` Triggers []ScaleTriggers `json:"triggers"` + // +optional + Fallback *Fallback `json:"fallback,omitempty"` +} + +// Fallback is the spec for fallback options +type Fallback struct { + FailureThreshold int32 `json:"failureThreshold"` + Replicas int32 `json:"replicas"` } // AdvancedConfig specifies advance scaling options @@ -79,8 +107,12 @@ type ScaleTriggers struct { Metadata map[string]string `json:"metadata"` // +optional AuthenticationRef *ScaledObjectAuthRef `json:"authenticationRef,omitempty"` + // +optional + FallbackReplicas *int32 `json:"fallback,omitempty"` } +// +k8s:openapi-gen=true + // ScaledObjectStatus is the status for a ScaledObject resource // +optional type ScaledObjectStatus struct { @@ -98,6 +130,8 @@ type ScaledObjectStatus struct { ResourceMetricNames []string `json:"resourceMetricNames,omitempty"` // +optional Conditions Conditions `json:"conditions,omitempty"` + // +optional + Health map[string]HealthStatus `json:"health,omitempty"` } // +kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index b5afe3ee424..3ec46460583 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -198,6 +198,21 @@ func (in *Credential) DeepCopy() *Credential { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Fallback) DeepCopyInto(out *Fallback) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Fallback. +func (in *Fallback) DeepCopy() *Fallback { + if in == nil { + return nil + } + out := new(Fallback) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GroupVersionKindResource) DeepCopyInto(out *GroupVersionKindResource) { *out = *in @@ -238,6 +253,26 @@ func (in *HashiCorpVault) DeepCopy() *HashiCorpVault { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *HealthStatus) DeepCopyInto(out *HealthStatus) { + *out = *in + if in.NumberOfFailures != nil { + in, out := &in.NumberOfFailures, &out.NumberOfFailures + *out = new(int32) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new HealthStatus. +func (in *HealthStatus) DeepCopy() *HealthStatus { + if in == nil { + return nil + } + out := new(HealthStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HorizontalPodAutoscalerConfig) DeepCopyInto(out *HorizontalPodAutoscalerConfig) { *out = *in @@ -288,6 +323,11 @@ func (in *ScaleTriggers) DeepCopyInto(out *ScaleTriggers) { *out = new(ScaledObjectAuthRef) **out = **in } + if in.FallbackReplicas != nil { + in, out := &in.FallbackReplicas, &out.FallbackReplicas + *out = new(int32) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScaleTriggers. @@ -545,6 +585,11 @@ func (in *ScaledObjectSpec) DeepCopyInto(out *ScaledObjectSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Fallback != nil { + in, out := &in.Fallback, &out.Fallback + *out = new(Fallback) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScaledObjectSpec. @@ -589,6 +634,13 @@ func (in *ScaledObjectStatus) DeepCopyInto(out *ScaledObjectStatus) { *out = make(Conditions, len(*in)) copy(*out, *in) } + if in.Health != nil { + in, out := &in.Health, &out.Health + *out = make(map[string]HealthStatus, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScaledObjectStatus. diff --git a/config/crd/bases/keda.sh_scaledjobs.yaml b/config/crd/bases/keda.sh_scaledjobs.yaml index f5f23014ecf..9dd16b5dac8 100644 --- a/config/crd/bases/keda.sh_scaledjobs.yaml +++ b/config/crd/bases/keda.sh_scaledjobs.yaml @@ -6760,6 +6760,9 @@ spec: required: - name type: object + fallback: + format: int32 + type: integer metadata: additionalProperties: type: string diff --git a/config/crd/bases/keda.sh_scaledobjects.yaml b/config/crd/bases/keda.sh_scaledobjects.yaml index 7fc61cf1bf7..ea80e62c940 100644 --- a/config/crd/bases/keda.sh_scaledobjects.yaml +++ b/config/crd/bases/keda.sh_scaledobjects.yaml @@ -43,6 +43,9 @@ spec: - jsonPath: .status.conditions[?(@.type=="Active")].status name: Active type: string + - jsonPath: .status.conditions[?(@.type=="Fallback")].status + name: Fallback + type: string - jsonPath: .metadata.creationTimestamp name: Age type: date @@ -200,6 +203,19 @@ spec: cooldownPeriod: format: int32 type: integer + fallback: + description: Fallback is the spec for fallback options + properties: + failureThreshold: + format: int32 + type: integer + replicas: + format: int32 + type: integer + required: + - failureThreshold + - replicas + type: object maxReplicaCount: format: int32 type: integer @@ -242,6 +258,9 @@ spec: required: - name type: object + fallback: + format: int32 + type: integer metadata: additionalProperties: type: string @@ -290,6 +309,19 @@ spec: items: type: string type: array + health: + additionalProperties: + description: HealthStatus is the status for a ScaledObject's health + properties: + numberOfFailures: + format: int32 + type: integer + status: + description: HealthStatusType is an indication of whether the + health status is happy or failing + type: string + type: object + type: object lastActiveTime: format: date-time type: string diff --git a/controllers/hpa.go b/controllers/hpa.go index 13b1d545e6f..550709b628b 100644 --- a/controllers/hpa.go +++ b/controllers/hpa.go @@ -184,6 +184,9 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, status := scaledObject.Status.DeepCopy() status.ExternalMetricNames = externalMetricNames status.ResourceMetricNames = resourceMetricNames + + updateHealthStatus(scaledObject, externalMetricNames, status) + err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status) if err != nil { logger.Error(err, "Error updating scaledObject status with used externalMetricNames") @@ -193,6 +196,18 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, return scaledObjectMetricSpecs, nil } +func updateHealthStatus(scaledObject *kedav1alpha1.ScaledObject, externalMetricNames []string, status *kedav1alpha1.ScaledObjectStatus) { + health := scaledObject.Status.Health + newHealth := make(map[string]kedav1alpha1.HealthStatus) + for _, metricName := range externalMetricNames { + entry, exists := health[metricName] + if exists { + newHealth[metricName] = entry + } + } + status.Health = newHealth +} + // checkMinK8sVersionforHPABehavior min version (k8s v1.18) for HPA Behavior func (r *ScaledObjectReconciler) checkMinK8sVersionforHPABehavior(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) { if r.kubeVersion.MinorVersion < 18 { diff --git a/controllers/hpa_test.go b/controllers/hpa_test.go new file mode 100644 index 00000000000..e320d3412f5 --- /dev/null +++ b/controllers/hpa_test.go @@ -0,0 +1,127 @@ +package controllers + +import ( + "github.com/go-logr/logr" + "github.com/golang/mock/gomock" + "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" + "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" + kedascalers "github.com/kedacore/keda/v2/pkg/scalers" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/api/autoscaling/v2beta2" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var _ = Describe("hpa", func() { + var ( + reconciler ScaledObjectReconciler + scaleHandler *mock_scaling.MockScaleHandler + client *mock_client.MockClient + statusWriter *mock_client.MockStatusWriter + scaler *mock_scalers.MockScaler + logger logr.Logger + ctrl *gomock.Controller + ) + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + client = mock_client.NewMockClient(ctrl) + scaleHandler = mock_scaling.NewMockScaleHandler(ctrl) + scaler = mock_scalers.NewMockScaler(ctrl) + statusWriter = mock_client.NewMockStatusWriter(ctrl) + logger = logr.DiscardLogger{} + reconciler = ScaledObjectReconciler{ + Client: client, + scaleHandler: scaleHandler, + } + }) + + AfterEach(func() { + ctrl.Finish() + }) + + It("should remove deleted metric from health status", func() { + numberOfFailures := int32(87) + health := make(map[string]v1alpha1.HealthStatus) + health["another metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + scaledObject := setupTest(health, scaler, scaleHandler) + + var capturedScaledObject v1alpha1.ScaledObject + client.EXPECT().Status().Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(arg interface{}, scaledObject *v1alpha1.ScaledObject, anotherArg interface{}, opts ...interface{}) { + capturedScaledObject = *scaledObject + }) + + _, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject) + + Expect(err).ToNot(HaveOccurred()) + Expect(capturedScaledObject.Status.Health).To(BeEmpty()) + }) + + It("should not remove existing metric from health status", func() { + numberOfFailures := int32(87) + health := make(map[string]v1alpha1.HealthStatus) + health["another metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + health["some metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + scaledObject := setupTest(health, scaler, scaleHandler) + + var capturedScaledObject v1alpha1.ScaledObject + client.EXPECT().Status().Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(arg interface{}, scaledObject *v1alpha1.ScaledObject, anotherArg interface{}, opts ...interface{}) { + capturedScaledObject = *scaledObject + }) + + _, err := reconciler.getScaledObjectMetricSpecs(logger, scaledObject) + + expectedHealth := make(map[string]v1alpha1.HealthStatus) + expectedHealth["some metric name"] = v1alpha1.HealthStatus{ + NumberOfFailures: &numberOfFailures, + Status: v1alpha1.HealthStatusFailing, + } + + Expect(err).ToNot(HaveOccurred()) + Expect(capturedScaledObject.Status.Health).To(HaveLen(1)) + Expect(capturedScaledObject.Status.Health).To(Equal(expectedHealth)) + }) + +}) + +func setupTest(health map[string]v1alpha1.HealthStatus, scaler *mock_scalers.MockScaler, scaleHandler *mock_scaling.MockScaleHandler) *v1alpha1.ScaledObject { + scaledObject := &v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "some scaled object name", + }, + Status: v1alpha1.ScaledObjectStatus{ + Health: health, + }, + } + + scalers := []kedascalers.Scaler{scaler} + metricSpec := v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: "some metric name", + }, + }, + } + metricSpecs := []v2beta2.MetricSpec{metricSpec} + scaler.EXPECT().GetMetricSpecForScaling().Return(metricSpecs) + scaler.EXPECT().Close() + scaleHandler.EXPECT().GetScalers(gomock.Eq(scaledObject)).Return(scalers, nil) + + return scaledObject +} diff --git a/controllers/scaledobject_controller.go b/controllers/scaledobject_controller.go index 19d391ba347..161bacbd075 100644 --- a/controllers/scaledobject_controller.go +++ b/controllers/scaledobject_controller.go @@ -51,7 +51,7 @@ type ScaledObjectReconciler struct { GlobalHTTPTimeout time.Duration Recorder record.EventRecorder - scaleClient *scale.ScalesGetter + scaleClient scale.ScalesGetter restMapper meta.RESTMapper scaledObjectsGenerations *sync.Map scaleHandler scaling.ScaleHandler @@ -90,7 +90,7 @@ func (r *ScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error { // Create Scale Client scaleClient := initScaleClient(mgr, clientset) - r.scaleClient = &scaleClient + r.scaleClient = scaleClient // Init the rest of ScaledObjectReconciler r.restMapper = mgr.GetRESTMapper() @@ -266,7 +266,7 @@ func (r *ScaledObjectReconciler) checkTargetResourceIsScalable(logger logr.Logge // not cached, let's try to detect /scale subresource // also rechecks when we need to update the status. var errScale error - scale, errScale = (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + scale, errScale = (r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), gr, scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) if errScale != nil { // not able to get /scale subresource -> let's check if the resource even exist in the cluster unstruct := &unstructured.Unstructured{} diff --git a/controllers/scaledobject_finalizer.go b/controllers/scaledobject_finalizer.go index a4e3c72b5c7..a0ac0bc4949 100644 --- a/controllers/scaledobject_finalizer.go +++ b/controllers/scaledobject_finalizer.go @@ -30,7 +30,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled // if enabled, scale scaleTarget back to the original replica count (to the state it was before scaling with KEDA) if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.RestoreToOriginalReplicaCount { - scale, err := (*r.scaleClient).Scales(scaledObject.Namespace).Get(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + scale, err := r.scaleClient.Scales(scaledObject.Namespace).Get(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { logger.V(1).Info("Failed to get scaleTarget's scale status, because it was probably deleted", "error", err) @@ -39,7 +39,7 @@ func (r *ScaledObjectReconciler) finalizeScaledObject(logger logr.Logger, scaled } } else { scale.Spec.Replicas = *scaledObject.Status.OriginalReplicaCount - _, err = (*r.scaleClient).Scales(scaledObject.Namespace).Update(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) + _, err = r.scaleClient.Scales(scaledObject.Namespace).Update(context.TODO(), scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) if err != nil { logger.Error(err, "Failed to restore scaleTarget's replica count back to the original", "finalizer", scaledObjectFinalizer) } diff --git a/pkg/mock/mock_scale/mock_interfaces.go b/pkg/mock/mock_scale/mock_interfaces.go new file mode 100644 index 00000000000..08aa971a0fa --- /dev/null +++ b/pkg/mock/mock_scale/mock_interfaces.go @@ -0,0 +1,122 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: /home/misha/go/pkg/mod/k8s.io/client-go@v0.20.7/scale/interfaces.go + +// Package mock_scale is a generated GoMock package. +package mock_scale + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + v1 "k8s.io/api/autoscaling/v1" + v10 "k8s.io/apimachinery/pkg/apis/meta/v1" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + scale "k8s.io/client-go/scale" +) + +// MockScalesGetter is a mock of ScalesGetter interface. +type MockScalesGetter struct { + ctrl *gomock.Controller + recorder *MockScalesGetterMockRecorder +} + +// MockScalesGetterMockRecorder is the mock recorder for MockScalesGetter. +type MockScalesGetterMockRecorder struct { + mock *MockScalesGetter +} + +// NewMockScalesGetter creates a new mock instance. +func NewMockScalesGetter(ctrl *gomock.Controller) *MockScalesGetter { + mock := &MockScalesGetter{ctrl: ctrl} + mock.recorder = &MockScalesGetterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockScalesGetter) EXPECT() *MockScalesGetterMockRecorder { + return m.recorder +} + +// Scales mocks base method. +func (m *MockScalesGetter) Scales(namespace string) scale.ScaleInterface { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Scales", namespace) + ret0, _ := ret[0].(scale.ScaleInterface) + return ret0 +} + +// Scales indicates an expected call of Scales. +func (mr *MockScalesGetterMockRecorder) Scales(namespace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Scales", reflect.TypeOf((*MockScalesGetter)(nil).Scales), namespace) +} + +// MockScaleInterface is a mock of ScaleInterface interface. +type MockScaleInterface struct { + ctrl *gomock.Controller + recorder *MockScaleInterfaceMockRecorder +} + +// MockScaleInterfaceMockRecorder is the mock recorder for MockScaleInterface. +type MockScaleInterfaceMockRecorder struct { + mock *MockScaleInterface +} + +// NewMockScaleInterface creates a new mock instance. +func NewMockScaleInterface(ctrl *gomock.Controller) *MockScaleInterface { + mock := &MockScaleInterface{ctrl: ctrl} + mock.recorder = &MockScaleInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockScaleInterface) EXPECT() *MockScaleInterfaceMockRecorder { + return m.recorder +} + +// Get mocks base method. +func (m *MockScaleInterface) Get(ctx context.Context, resource schema.GroupResource, name string, opts v10.GetOptions) (*v1.Scale, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", ctx, resource, name, opts) + ret0, _ := ret[0].(*v1.Scale) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockScaleInterfaceMockRecorder) Get(ctx, resource, name, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockScaleInterface)(nil).Get), ctx, resource, name, opts) +} + +// Patch mocks base method. +func (m *MockScaleInterface) Patch(ctx context.Context, gvr schema.GroupVersionResource, name string, pt types.PatchType, data []byte, opts v10.PatchOptions) (*v1.Scale, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Patch", ctx, gvr, name, pt, data, opts) + ret0, _ := ret[0].(*v1.Scale) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Patch indicates an expected call of Patch. +func (mr *MockScaleInterfaceMockRecorder) Patch(ctx, gvr, name, pt, data, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Patch", reflect.TypeOf((*MockScaleInterface)(nil).Patch), ctx, gvr, name, pt, data, opts) +} + +// Update mocks base method. +func (m *MockScaleInterface) Update(ctx context.Context, resource schema.GroupResource, scale *v1.Scale, opts v10.UpdateOptions) (*v1.Scale, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Update", ctx, resource, scale, opts) + ret0, _ := ret[0].(*v1.Scale) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Update indicates an expected call of Update. +func (mr *MockScaleInterfaceMockRecorder) Update(ctx, resource, scale, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Update", reflect.TypeOf((*MockScaleInterface)(nil).Update), ctx, resource, scale, opts) +} diff --git a/pkg/mock/mock_scaler/mock_scaler.go b/pkg/mock/mock_scaler/mock_scaler.go new file mode 100644 index 00000000000..917d39b3cdc --- /dev/null +++ b/pkg/mock/mock_scaler/mock_scaler.go @@ -0,0 +1,189 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/scalers/scaler.go + +// Package mock_scalers is a generated GoMock package. +package mock_scalers + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + v2beta2 "k8s.io/api/autoscaling/v2beta2" + labels "k8s.io/apimachinery/pkg/labels" + external_metrics "k8s.io/metrics/pkg/apis/external_metrics" +) + +// MockScaler is a mock of Scaler interface. +type MockScaler struct { + ctrl *gomock.Controller + recorder *MockScalerMockRecorder +} + +// MockScalerMockRecorder is the mock recorder for MockScaler. +type MockScalerMockRecorder struct { + mock *MockScaler +} + +// NewMockScaler creates a new mock instance. +func NewMockScaler(ctrl *gomock.Controller) *MockScaler { + mock := &MockScaler{ctrl: ctrl} + mock.recorder = &MockScalerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockScaler) EXPECT() *MockScalerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockScaler) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockScalerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockScaler)(nil).Close)) +} + +// GetMetricSpecForScaling mocks base method. +func (m *MockScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricSpecForScaling") + ret0, _ := ret[0].([]v2beta2.MetricSpec) + return ret0 +} + +// GetMetricSpecForScaling indicates an expected call of GetMetricSpecForScaling. +func (mr *MockScalerMockRecorder) GetMetricSpecForScaling() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricSpecForScaling", reflect.TypeOf((*MockScaler)(nil).GetMetricSpecForScaling)) +} + +// GetMetrics mocks base method. +func (m *MockScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetrics", ctx, metricName, metricSelector) + ret0, _ := ret[0].([]external_metrics.ExternalMetricValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMetrics indicates an expected call of GetMetrics. +func (mr *MockScalerMockRecorder) GetMetrics(ctx, metricName, metricSelector interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetrics", reflect.TypeOf((*MockScaler)(nil).GetMetrics), ctx, metricName, metricSelector) +} + +// IsActive mocks base method. +func (m *MockScaler) IsActive(ctx context.Context) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsActive", ctx) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsActive indicates an expected call of IsActive. +func (mr *MockScalerMockRecorder) IsActive(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsActive", reflect.TypeOf((*MockScaler)(nil).IsActive), ctx) +} + +// MockPushScaler is a mock of PushScaler interface. +type MockPushScaler struct { + ctrl *gomock.Controller + recorder *MockPushScalerMockRecorder +} + +// MockPushScalerMockRecorder is the mock recorder for MockPushScaler. +type MockPushScalerMockRecorder struct { + mock *MockPushScaler +} + +// NewMockPushScaler creates a new mock instance. +func NewMockPushScaler(ctrl *gomock.Controller) *MockPushScaler { + mock := &MockPushScaler{ctrl: ctrl} + mock.recorder = &MockPushScalerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPushScaler) EXPECT() *MockPushScalerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockPushScaler) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockPushScalerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPushScaler)(nil).Close)) +} + +// GetMetricSpecForScaling mocks base method. +func (m *MockPushScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetricSpecForScaling") + ret0, _ := ret[0].([]v2beta2.MetricSpec) + return ret0 +} + +// GetMetricSpecForScaling indicates an expected call of GetMetricSpecForScaling. +func (mr *MockPushScalerMockRecorder) GetMetricSpecForScaling() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetricSpecForScaling", reflect.TypeOf((*MockPushScaler)(nil).GetMetricSpecForScaling)) +} + +// GetMetrics mocks base method. +func (m *MockPushScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetMetrics", ctx, metricName, metricSelector) + ret0, _ := ret[0].([]external_metrics.ExternalMetricValue) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetMetrics indicates an expected call of GetMetrics. +func (mr *MockPushScalerMockRecorder) GetMetrics(ctx, metricName, metricSelector interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetMetrics", reflect.TypeOf((*MockPushScaler)(nil).GetMetrics), ctx, metricName, metricSelector) +} + +// IsActive mocks base method. +func (m *MockPushScaler) IsActive(ctx context.Context) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsActive", ctx) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IsActive indicates an expected call of IsActive. +func (mr *MockPushScalerMockRecorder) IsActive(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsActive", reflect.TypeOf((*MockPushScaler)(nil).IsActive), ctx) +} + +// Run mocks base method. +func (m *MockPushScaler) Run(ctx context.Context, active chan<- bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run", ctx, active) +} + +// Run indicates an expected call of Run. +func (mr *MockPushScalerMockRecorder) Run(ctx, active interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockPushScaler)(nil).Run), ctx, active) +} diff --git a/pkg/provider/fallback.go b/pkg/provider/fallback.go new file mode 100644 index 00000000000..6bda098bace --- /dev/null +++ b/pkg/provider/fallback.go @@ -0,0 +1,120 @@ +package provider + +import ( + "context" + "fmt" + + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/scalers" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" +) + +func isFallbackEnabled(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) bool { + return scaledObject.Spec.Fallback != nil && metricSpec.External.Target.Type == v2beta2.AverageValueMetricType +} + +func (p *KedaProvider) getMetricsWithFallback(scaler scalers.Scaler, metricName string, metricSelector labels.Selector, scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) ([]external_metrics.ExternalMetricValue, error) { + initHealthStatus(scaledObject) + metrics, err := scaler.GetMetrics(context.TODO(), metricName, metricSelector) + healthStatus := getHealthStatus(scaledObject, metricName) + + if err == nil { + zero := int32(0) + healthStatus.NumberOfFailures = &zero + healthStatus.Status = kedav1alpha1.HealthStatusHappy + scaledObject.Status.Health[metricName] = *healthStatus + + p.updateStatus(scaledObject, metricSpec) + return metrics, nil + } + + healthStatus.Status = kedav1alpha1.HealthStatusFailing + *healthStatus.NumberOfFailures++ + scaledObject.Status.Health[metricName] = *healthStatus + + p.updateStatus(scaledObject, metricSpec) + + switch { + case !isFallbackEnabled(scaledObject, metricSpec): + return nil, err + case !validateFallback(scaledObject): + logger.Info("Failed to validate ScaledObject Spec. Please check that parameters are positive integers") + return nil, err + case *healthStatus.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold: + return doFallback(scaledObject, metricSpec, metricName, err), nil + default: + return nil, err + } +} + +func fallbackExistsInScaledObject(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) bool { + if !isFallbackEnabled(scaledObject, metricSpec) || !validateFallback(scaledObject) { + return false + } + + for _, element := range scaledObject.Status.Health { + if element.Status == kedav1alpha1.HealthStatusFailing && *element.NumberOfFailures > scaledObject.Spec.Fallback.FailureThreshold { + return true + } + } + + return false +} + +func validateFallback(scaledObject *kedav1alpha1.ScaledObject) bool { + return scaledObject.Spec.Fallback.FailureThreshold >= 0 && + scaledObject.Spec.Fallback.Replicas >= 0 +} + +func doFallback(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec, metricName string, suppressedError error) []external_metrics.ExternalMetricValue { + replicas := int64(scaledObject.Spec.Fallback.Replicas) + normalisationValue, _ := metricSpec.External.Target.AverageValue.AsInt64() + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(normalisationValue*replicas, resource.DecimalSI), + Timestamp: metav1.Now(), + } + fallbackMetrics := []external_metrics.ExternalMetricValue{metric} + + logger.Info(fmt.Sprintf("Suppressing error %s, falling back to %d replicas", suppressedError, replicas)) + return fallbackMetrics +} + +func (p *KedaProvider) updateStatus(scaledObject *kedav1alpha1.ScaledObject, metricSpec v2beta2.MetricSpec) { + if fallbackExistsInScaledObject(scaledObject, metricSpec) { + scaledObject.Status.Conditions.SetFallbackCondition(metav1.ConditionTrue, "FallbackExists", "At least one trigger is falling back on this scaled object") + } else { + scaledObject.Status.Conditions.SetFallbackCondition(metav1.ConditionFalse, "NoFallbackFound", "No fallbacks are active on this scaled object") + } + + err := p.client.Status().Update(context.TODO(), scaledObject) + if err != nil { + logger.Error(err, "Error updating ScaledObject status", "Error") + } +} + +func getHealthStatus(scaledObject *kedav1alpha1.ScaledObject, metricName string) *kedav1alpha1.HealthStatus { + // Get health status for a specific metric + _, healthStatusExists := scaledObject.Status.Health[metricName] + if !healthStatusExists { + zero := int32(0) + status := kedav1alpha1.HealthStatus{ + NumberOfFailures: &zero, + Status: kedav1alpha1.HealthStatusHappy, + } + scaledObject.Status.Health[metricName] = status + } + healthStatus := scaledObject.Status.Health[metricName] + return &healthStatus +} + +func initHealthStatus(scaledObject *kedav1alpha1.ScaledObject) { + // Init health status if missing + if scaledObject.Status.Health == nil { + scaledObject.Status.Health = make(map[string]kedav1alpha1.HealthStatus) + } +} diff --git a/pkg/provider/fallback_test.go b/pkg/provider/fallback_test.go new file mode 100644 index 00000000000..66f55b2c79f --- /dev/null +++ b/pkg/provider/fallback_test.go @@ -0,0 +1,425 @@ +package provider + +import ( + "errors" + "fmt" + "testing" + + "github.com/go-logr/logr" + "github.com/golang/mock/gomock" + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" + "github.com/kedacore/keda/v2/pkg/mock/mock_scaling" + "github.com/kubernetes-sigs/custom-metrics-apiserver/pkg/provider" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/types" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/metrics/pkg/apis/external_metrics" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" +) + +const metricName = "some_metric_name" + +func TestFallback(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecsWithDefaultAndCustomReporters(t, + "Controller Suite", + []Reporter{printer.NewlineReporter{}}) +} + +var _ = Describe("fallback", func() { + var ( + scaleHandler *mock_scaling.MockScaleHandler + client *mock_client.MockClient + providerUnderTest *KedaProvider + scaler *mock_scalers.MockScaler + ctrl *gomock.Controller + ) + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + scaleHandler = mock_scaling.NewMockScaleHandler(ctrl) + client = mock_client.NewMockClient(ctrl) + providerUnderTest = &KedaProvider{ + values: make(map[provider.CustomMetricInfo]int64), + externalMetrics: make([]externalMetric, 2, 10), + client: client, + scaleHandler: scaleHandler, + watchedNamespace: "", + } + scaler = mock_scalers.NewMockScaler(ctrl) + + logger = logr.DiscardLogger{} + }) + + AfterEach(func() { + ctrl.Finish() + }) + + It("should return the expected metric when fallback is disabled", func() { + + expectedMetricValue := int64(5) + primeGetMetrics(scaler, expectedMetricValue) + so := buildScaledObject(nil, nil) + metricSpec := createMetricSpec(3) + expectStatusUpdate(ctrl, client) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + }) + + It("should reset the health status when scaler metrics are available", func() { + expectedMetricValue := int64(6) + startingNumberOfFailures := int32(5) + primeGetMetrics(scaler, expectedMetricValue) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusFailing, + }, + }, + }, + ) + + metricSpec := createMetricSpec(3) + expectStatusUpdate(ctrl, client) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(0, kedav1alpha1.HealthStatusHappy)) + }) + + It("should propagate the error when fallback is disabled", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + + so := buildScaledObject(nil, nil) + metricSpec := createMetricSpec(3) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(Equal("Some error")) + }) + + It("should bump the number of failures when metrics call fails", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(0) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(Equal("Some error")) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(1, kedav1alpha1.HealthStatusFailing)) + }) + + It("should return a normalised metric when number of failures are beyond threshold", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + expectedMetricValue := int64(100) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(4, kedav1alpha1.HealthStatusFailing)) + }) + + It("should behave as if fallback is disabled when the metrics spec target type is not average value metric", func() { + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, nil, + ) + + qty := resource.NewQuantity(int64(3), resource.DecimalSI) + metricsSpec := v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Target: v2beta2.MetricTarget{ + Type: v2beta2.UtilizationMetricType, + Value: qty, + }, + }, + } + + isEnabled := isFallbackEnabled(so, metricsSpec) + Expect(isEnabled).Should(BeFalse()) + }) + + It("should ignore error if we fail to update kubernetes status", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + expectedMetricValue := int64(100) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + + statusWriter := mock_client.NewMockStatusWriter(ctrl) + statusWriter.EXPECT().Update(gomock.Any(), gomock.Any()).Return(errors.New("Some error")) + client.EXPECT().Status().Return(statusWriter) + + metrics, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ToNot(HaveOccurred()) + value, _ := metrics[0].Value.AsInt64() + Expect(value).Should(Equal(expectedMetricValue)) + Expect(so.Status.Health[metricName]).To(haveFailureAndStatus(4, kedav1alpha1.HealthStatusFailing)) + }) + + It("should return error when fallback is enabled but scaledobject has invalid parameter", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(-3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(Equal("Some error")) + }) + + It("should set the fallback condition when a fallback exists in the scaled object", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + failingNumberOfFailures := int32(6) + anotherMetricName := "another metric name" + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + anotherMetricName: { + NumberOfFailures: &failingNumberOfFailures, + Status: kedav1alpha1.HealthStatusFailing, + }, + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + Expect(err).ToNot(HaveOccurred()) + condition := so.Status.Conditions.GetFallbackCondition() + Expect(condition.IsTrue()).Should(BeTrue()) + }) + + It("should set the fallback condition to false if the config is invalid", func() { + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return(nil, errors.New("Some error")) + startingNumberOfFailures := int32(3) + failingNumberOfFailures := int32(6) + anotherMetricName := "another metric name" + + so := buildScaledObject( + &kedav1alpha1.Fallback{ + FailureThreshold: int32(-3), + Replicas: int32(10), + }, + &kedav1alpha1.ScaledObjectStatus{ + Health: map[string]kedav1alpha1.HealthStatus{ + anotherMetricName: { + NumberOfFailures: &failingNumberOfFailures, + Status: kedav1alpha1.HealthStatusFailing, + }, + metricName: { + NumberOfFailures: &startingNumberOfFailures, + Status: kedav1alpha1.HealthStatusHappy, + }, + }, + }, + ) + metricSpec := createMetricSpec(10) + expectStatusUpdate(ctrl, client) + + _, err := providerUnderTest.getMetricsWithFallback(scaler, metricName, nil, so, metricSpec) + Expect(err).ShouldNot(BeNil()) + Expect(err.Error()).Should(Equal("Some error")) + condition := so.Status.Conditions.GetFallbackCondition() + Expect(condition.IsTrue()).Should(BeFalse()) + }) +}) + +func haveFailureAndStatus(numberOfFailures int, status kedav1alpha1.HealthStatusType) types.GomegaMatcher { + return &healthStatusMatcher{numberOfFailures: numberOfFailures, status: status} +} + +type healthStatusMatcher struct { + numberOfFailures int + status kedav1alpha1.HealthStatusType +} + +func (h *healthStatusMatcher) Match(actual interface{}) (success bool, err error) { + switch v := actual.(type) { + case kedav1alpha1.HealthStatus: + return *v.NumberOfFailures == int32(h.numberOfFailures) && v.Status == h.status, nil + default: + return false, fmt.Errorf("expected kedav1alpha1.HealthStatus, got %v", actual) + } +} + +func (h *healthStatusMatcher) FailureMessage(actual interface{}) (message string) { + switch v := actual.(type) { + case kedav1alpha1.HealthStatus: + return fmt.Sprintf("expected HealthStatus with NumberOfFailures %d and Status %s, but got NumberOfFailures %d and Status %s", h.numberOfFailures, h.status, *v.NumberOfFailures, v.Status) + default: + return "unexpected error" + } +} + +func (h *healthStatusMatcher) NegatedFailureMessage(actual interface{}) (message string) { + switch v := actual.(type) { + case kedav1alpha1.HealthStatus: + return fmt.Sprintf("did not expect HealthStatus with NumberOfFailures %d and Status %s, but got NumberOfFailures %d and Status %s", h.numberOfFailures, h.status, *v.NumberOfFailures, v.Status) + default: + return "unexpected error" + } +} + +func expectStatusUpdate(ctrl *gomock.Controller, client *mock_client.MockClient) { + statusWriter := mock_client.NewMockStatusWriter(ctrl) + statusWriter.EXPECT().Update(gomock.Any(), gomock.Any()) + client.EXPECT().Status().Return(statusWriter) +} + +func buildScaledObject(fallbackConfig *kedav1alpha1.Fallback, status *kedav1alpha1.ScaledObjectStatus) *kedav1alpha1.ScaledObject { + scaledObject := &kedav1alpha1.ScaledObject{ + ObjectMeta: metav1.ObjectMeta{Name: "clean-up-test", Namespace: "default"}, + Spec: kedav1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &kedav1alpha1.ScaleTarget{ + Name: "myapp", + }, + Triggers: []kedav1alpha1.ScaleTriggers{ + { + Type: "cron", + Metadata: map[string]string{ + "timezone": "UTC", + "start": "0 * * * *", + "end": "1 * * * *", + "desiredReplicas": "1", + }, + }, + }, + Fallback: fallbackConfig, + }, + } + + if status != nil { + scaledObject.Status = *status + } + + scaledObject.Status.Conditions = *kedav1alpha1.GetInitializedConditions() + + return scaledObject +} + +func primeGetMetrics(scaler *mock_scalers.MockScaler, value int64) { + expectedMetric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(value, resource.DecimalSI), + Timestamp: metav1.Now(), + } + + scaler.EXPECT().GetMetrics(gomock.Any(), gomock.Eq(metricName), gomock.Any()).Return([]external_metrics.ExternalMetricValue{expectedMetric}, nil) +} + +func createMetricSpec(averageValue int) v2beta2.MetricSpec { + qty := resource.NewQuantity(int64(averageValue), resource.DecimalSI) + return v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: qty, + }, + }, + } +} diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index adf02a9f965..ab0bb020cbb 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -94,7 +94,8 @@ func (p *KedaProvider) GetExternalMetric(namespace string, metricSelector labels } // Filter only the desired metric if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { - metrics, err := scaler.GetMetrics(context.TODO(), info.Metric, metricSelector) + metrics, err := p.getMetricsWithFallback(scaler, info.Metric, metricSelector, scaledObject, metricSpec) + if err != nil { logger.Error(err, "error getting metric for scaler", "scaledObject.Namespace", scaledObject.Namespace, "scaledObject.Name", scaledObject.Name, "scaler", scaler) } else { @@ -107,7 +108,6 @@ func (p *KedaProvider) GetExternalMetric(namespace string, metricSelector labels metricsServer.RecordHPAScalerError(namespace, scaledObject.Name, scalerName, scalerIndex, info.Metric, err) } } - scaler.Close() } diff --git a/pkg/scaling/executor/scale_executor.go b/pkg/scaling/executor/scale_executor.go index b8fd1d144ed..0d503817cc5 100644 --- a/pkg/scaling/executor/scale_executor.go +++ b/pkg/scaling/executor/scale_executor.go @@ -24,19 +24,19 @@ const ( // ScaleExecutor contains methods RequestJobScale and RequestScale type ScaleExecutor interface { RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) - RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool) + RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) } type scaleExecutor struct { client client.Client - scaleClient *scale.ScalesGetter + scaleClient scale.ScalesGetter reconcilerScheme *runtime.Scheme logger logr.Logger recorder record.EventRecorder } // NewScaleExecutor creates a ScaleExecutor object -func NewScaleExecutor(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor { +func NewScaleExecutor(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, recorder record.EventRecorder) ScaleExecutor { return &scaleExecutor{ client: client, scaleClient: scaleClient, @@ -71,17 +71,17 @@ func (e *scaleExecutor) updateLastActiveTime(ctx context.Context, logger logr.Lo return err } -func (e *scaleExecutor) setActiveCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, mesage string) error { +func (e *scaleExecutor) setCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string, setCondition func(kedav1alpha1.Conditions, metav1.ConditionStatus, string, string)) error { var patch client.Patch runtimeObj := object.(runtime.Object) switch obj := runtimeObj.(type) { case *kedav1alpha1.ScaledObject: patch = client.MergeFrom(obj.DeepCopy()) - obj.Status.Conditions.SetActiveCondition(status, reason, mesage) + setCondition(obj.Status.Conditions, status, reason, message) case *kedav1alpha1.ScaledJob: patch = client.MergeFrom(obj.DeepCopy()) - obj.Status.Conditions.SetActiveCondition(status, reason, mesage) + setCondition(obj.Status.Conditions, status, reason, message) default: err := fmt.Errorf("unknown scalable object type %v", obj) logger.Error(err, "Failed to patch Objects Status") @@ -94,3 +94,17 @@ func (e *scaleExecutor) setActiveCondition(ctx context.Context, logger logr.Logg } return err } + +func (e *scaleExecutor) setActiveCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error { + active := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) { + conditions.SetActiveCondition(status, reason, message) + } + return e.setCondition(ctx, logger, object, status, reason, message, active) +} + +func (e *scaleExecutor) setFallbackCondition(ctx context.Context, logger logr.Logger, object interface{}, status metav1.ConditionStatus, reason string, message string) error { + fallback := func(conditions kedav1alpha1.Conditions, status metav1.ConditionStatus, reason string, message string) { + conditions.SetFallbackCondition(status, reason, message) + } + return e.setCondition(ctx, logger, object, status, reason, message, fallback) +} diff --git a/pkg/scaling/executor/scale_scaledobjects.go b/pkg/scaling/executor/scale_scaledobjects.go index c863db288a0..52be6e11b3d 100644 --- a/pkg/scaling/executor/scale_scaledobjects.go +++ b/pkg/scaling/executor/scale_scaledobjects.go @@ -16,7 +16,7 @@ import ( kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" ) -func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool) { +func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, isActive bool, isError bool) { logger := e.logger.WithValues("scaledobject.Name", scaledObject.Name, "scaledObject.Namespace", scaledObject.Namespace, "scaleTarget.Name", scaledObject.Spec.ScaleTargetRef.Name) @@ -59,6 +59,17 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al // current replica count is 0, but there is an active trigger. // scale the ScaleTarget up e.scaleFromZero(ctx, logger, scaledObject, currentScale) + case !isActive && + isError && + scaledObject.Spec.Fallback != nil && + scaledObject.Spec.Fallback.Replicas != 0: + // there are no active triggers, but a scaler responded with an error + // AND + // there is a fallback replicas count defined + + // Scale to the fallback replicas count + e.doFallbackScaling(ctx, scaledObject, currentScale, logger, currentReplicas) + case !isActive && currentReplicas > 0 && (scaledObject.Spec.MinReplicaCount == nil || *scaledObject.Spec.MinReplicaCount == 0): @@ -109,6 +120,18 @@ func (e *scaleExecutor) RequestScale(ctx context.Context, scaledObject *kedav1al } } +func (e *scaleExecutor) doFallbackScaling(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, currentScale *autoscalingv1.Scale, logger logr.Logger, currentReplicas int32) { + _, err := e.updateScaleOnScaleTarget(ctx, scaledObject, currentScale, scaledObject.Spec.Fallback.Replicas) + if err == nil { + logger.Info("Successfully set ScaleTarget replicas count to ScaledObject fallback.replicas", + "Original Replicas Count", currentReplicas, + "New Replicas Count", scaledObject.Spec.Fallback.Replicas) + } + if e := e.setFallbackCondition(ctx, logger, scaledObject, metav1.ConditionTrue, "FallbackExists", "At least one trigger is falling back on this scaled object"); e != nil { + logger.Error(e, "Error setting fallback condition") + } +} + // An object will be scaled down to 0 only if it's passed its cooldown period // or if LastActiveTime is nil func (e *scaleExecutor) scaleToZero(ctx context.Context, logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) { @@ -178,7 +201,7 @@ func (e *scaleExecutor) scaleFromZero(ctx context.Context, logger logr.Logger, s } func (e *scaleExecutor) getScaleTargetScale(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv1.Scale, error) { - return (*e.scaleClient).Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) + return e.scaleClient.Scales(scaledObject.Namespace).Get(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name, metav1.GetOptions{}) } func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale, replicas int32) (int32, error) { @@ -195,6 +218,6 @@ func (e *scaleExecutor) updateScaleOnScaleTarget(ctx context.Context, scaledObje currentReplicas := scale.Spec.Replicas scale.Spec.Replicas = replicas - _, err := (*e.scaleClient).Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) + _, err := e.scaleClient.Scales(scaledObject.Namespace).Update(ctx, scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale, metav1.UpdateOptions{}) return currentReplicas, err } diff --git a/pkg/scaling/executor/scale_scaledobjects_test.go b/pkg/scaling/executor/scale_scaledobjects_test.go new file mode 100644 index 00000000000..61f8bfe27d3 --- /dev/null +++ b/pkg/scaling/executor/scale_scaledobjects_test.go @@ -0,0 +1,78 @@ +package executor + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + "github.com/kedacore/keda/v2/pkg/mock/mock_scale" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/record" +) + +func TestScaleToFallbackReplicasWhenNotActiveAndIsError(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + mockScaleClient := mock_scale.NewMockScalesGetter(ctrl) + mockScaleInterface := mock_scale.NewMockScaleInterface(ctrl) + statusWriter := mock_client.NewMockStatusWriter(ctrl) + + scaleExecutor := NewScaleExecutor(client, mockScaleClient, nil, recorder) + + scaledObject := v1alpha1.ScaledObject{ + ObjectMeta: v1.ObjectMeta{ + Name: "some name", + Namespace: "some namespace", + }, + Spec: v1alpha1.ScaledObjectSpec{ + ScaleTargetRef: &v1alpha1.ScaleTarget{ + Name: "some name", + }, + Fallback: &v1alpha1.Fallback{ + FailureThreshold: 3, + Replicas: 5, + }, + }, + Status: v1alpha1.ScaledObjectStatus{ + ScaleTargetGVKR: &v1alpha1.GroupVersionKindResource{ + Group: "apps", + Kind: "Deployment", + }, + }, + } + + scaledObject.Status.Conditions = *v1alpha1.GetInitializedConditions() + + numberOfReplicas := int32(2) + + client.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).SetArg(2, appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Replicas: &numberOfReplicas, + }, + }) + + scale := &autoscalingv1.Scale{ + Spec: autoscalingv1.ScaleSpec{ + Replicas: numberOfReplicas, + }, + } + + mockScaleClient.EXPECT().Scales(gomock.Any()).Return(mockScaleInterface).Times(2) + mockScaleInterface.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(scale, nil) + mockScaleInterface.EXPECT().Update(gomock.Any(), gomock.Any(), gomock.Eq(scale), gomock.Any()) + + client.EXPECT().Status().Times(2).Return(statusWriter) + statusWriter.EXPECT().Patch(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) + + scaleExecutor.RequestScale(context.TODO(), &scaledObject, false, true) + + assert.Equal(t, int32(5), scale.Spec.Replicas) + condition := scaledObject.Status.Conditions.GetFallbackCondition() + assert.Equal(t, true, condition.IsTrue()) +} diff --git a/pkg/scaling/scale_hander_test.go b/pkg/scaling/scale_hander_test.go deleted file mode 100644 index b3b78b421b7..00000000000 --- a/pkg/scaling/scale_hander_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package scaling - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/api/autoscaling/v2beta2" - "k8s.io/apimachinery/pkg/api/resource" -) - -func TestTargetAverageValue(t *testing.T) { - // count = 0 - specs := []v2beta2.MetricSpec{} - targetAverageValue := getTargetAverageValue(specs) - assert.Equal(t, int64(0), targetAverageValue) - // 1 1 - specs = []v2beta2.MetricSpec{ - createMetricSpec(1), - createMetricSpec(1), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(1), targetAverageValue) - // 5 5 3 - specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(4), targetAverageValue) - - // 5 5 4 - specs = []v2beta2.MetricSpec{ - createMetricSpec(5), - createMetricSpec(5), - createMetricSpec(3), - } - targetAverageValue = getTargetAverageValue(specs) - assert.Equal(t, int64(4), targetAverageValue) -} - -func createMetricSpec(averageValue int) v2beta2.MetricSpec { - qty := resource.NewQuantity(int64(averageValue), resource.DecimalSI) - return v2beta2.MetricSpec{ - External: &v2beta2.ExternalMetricSource{ - Target: v2beta2.MetricTarget{ - AverageValue: qty, - }, - }, - } -} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 1c1e7fe027d..62776b15b38 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -41,7 +41,7 @@ type scaleHandler struct { } // NewScaleHandler creates a ScaleHandler object -func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler { +func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme, globalHTTPTimeout time.Duration, recorder record.EventRecorder) ScaleHandler { return &scaleHandler{ client: client, logger: logf.Log.WithName("scalehandler"), @@ -178,7 +178,7 @@ func (h *scaleHandler) startPushScalers(ctx context.Context, withTriggers *kedav scalingMutex.Lock() switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: - h.scaleExecutor.RequestScale(ctx, obj, active) + h.scaleExecutor.RequestScale(ctx, obj, active, false) case *kedav1alpha1.ScaledJob: h.logger.Info("Warning: External Push Scaler does not support ScaledJob", "object", scalableObject) } @@ -202,7 +202,8 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac defer scalingMutex.Unlock() switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: - h.scaleExecutor.RequestScale(ctx, obj, h.isScaledObjectActive(ctx, scalers, obj)) + isActive, isError := h.isScaledObjectActive(ctx, scalers, obj) + h.scaleExecutor.RequestScale(ctx, obj, isActive, isError) case *kedav1alpha1.ScaledJob: scaledJob := scalableObject.(*kedav1alpha1.ScaledJob) isActive, scaleTo, maxScale := h.isScaledJobActive(ctx, scalers, scaledJob) @@ -210,29 +211,31 @@ func (h *scaleHandler) checkScalers(ctx context.Context, scalableObject interfac } } -func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) bool { +func (h *scaleHandler) isScaledObjectActive(ctx context.Context, scalers []scalers.Scaler, scaledObject *kedav1alpha1.ScaledObject) (bool, bool) { isActive := false + isError := false for i, scaler := range scalers { isTriggerActive, err := scaler.IsActive(ctx) scaler.Close() if err != nil { h.logger.V(1).Info("Error getting scale decision", "Error", err) + isError = true h.recorder.Event(scaledObject, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) continue } else if isTriggerActive { isActive = true - if scaler.GetMetricSpecForScaling()[0].External != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", scaler.GetMetricSpecForScaling()[0].External.Metric.Name) + if externalMetricsSpec := scaler.GetMetricSpecForScaling()[0].External; externalMetricsSpec != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", externalMetricsSpec.Metric.Name) } - if scaler.GetMetricSpecForScaling()[0].Resource != nil { - h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", scaler.GetMetricSpecForScaling()[0].Resource.Name) + if resourceMetricsSpec := scaler.GetMetricSpecForScaling()[0].Resource; resourceMetricsSpec != nil { + h.logger.V(1).Info("Scaler for scaledObject is active", "Metrics Name", resourceMetricsSpec.Name) } closeScalers(scalers[i+1:]) break } } - return isActive + return isActive, isError } func (h *scaleHandler) isScaledJobActive(ctx context.Context, scalers []scalers.Scaler, scaledJob *kedav1alpha1.ScaledJob) (bool, int64, int64) { diff --git a/pkg/scaling/scale_handler_test.go b/pkg/scaling/scale_handler_test.go new file mode 100644 index 00000000000..f4d68a38af5 --- /dev/null +++ b/pkg/scaling/scale_handler_test.go @@ -0,0 +1,122 @@ +package scaling + +import ( + "context" + "errors" + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + kedav1alpha1 "github.com/kedacore/keda/v2/api/v1alpha1" + "github.com/kedacore/keda/v2/pkg/mock/mock_client" + mock_scalers "github.com/kedacore/keda/v2/pkg/mock/mock_scaler" + "github.com/kedacore/keda/v2/pkg/scalers" + "github.com/kedacore/keda/v2/pkg/scaling/executor" + "k8s.io/client-go/tools/record" + logf "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/stretchr/testify/assert" + "k8s.io/api/autoscaling/v2beta2" + "k8s.io/apimachinery/pkg/api/resource" +) + +func TestTargetAverageValue(t *testing.T) { + // count = 0 + specs := []v2beta2.MetricSpec{} + targetAverageValue := getTargetAverageValue(specs) + assert.Equal(t, int64(0), targetAverageValue) + // 1 1 + specs = []v2beta2.MetricSpec{ + createMetricSpec(1), + createMetricSpec(1), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(1), targetAverageValue) + // 5 5 3 + specs = []v2beta2.MetricSpec{ + createMetricSpec(5), + createMetricSpec(5), + createMetricSpec(3), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(4), targetAverageValue) + + // 5 5 4 + specs = []v2beta2.MetricSpec{ + createMetricSpec(5), + createMetricSpec(5), + createMetricSpec(3), + } + targetAverageValue = getTargetAverageValue(specs) + assert.Equal(t, int64(4), targetAverageValue) +} + +func TestCheckScaledObjectScalersWithError(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + + scaleHandler := &scaleHandler{ + client: client, + logger: logf.Log.WithName("scalehandler"), + scaleLoopContexts: &sync.Map{}, + scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), + globalHTTPTimeout: 5 * time.Second, + recorder: recorder, + } + scaler := mock_scalers.NewMockScaler(ctrl) + scalers := []scalers.Scaler{scaler} + scaledObject := &kedav1alpha1.ScaledObject{} + + scaler.EXPECT().IsActive(gomock.Any()).Return(false, errors.New("Some error")) + scaler.EXPECT().Close() + + isActive, isError := scaleHandler.isScaledObjectActive(context.TODO(), scalers, scaledObject) + + assert.Equal(t, false, isActive) + assert.Equal(t, true, isError) +} + +func TestCheckScaledObjectFindFirstActiveIgnoringOthers(t *testing.T) { + ctrl := gomock.NewController(t) + client := mock_client.NewMockClient(ctrl) + recorder := record.NewFakeRecorder(1) + + scaleHandler := &scaleHandler{ + client: client, + logger: logf.Log.WithName("scalehandler"), + scaleLoopContexts: &sync.Map{}, + scaleExecutor: executor.NewScaleExecutor(client, nil, nil, recorder), + globalHTTPTimeout: 5 * time.Second, + recorder: recorder, + } + + activeScaler := mock_scalers.NewMockScaler(ctrl) + failingScaler := mock_scalers.NewMockScaler(ctrl) + scalers := []scalers.Scaler{activeScaler, failingScaler} + scaledObject := &kedav1alpha1.ScaledObject{} + + metricsSpecs := []v2beta2.MetricSpec{createMetricSpec(1)} + + activeScaler.EXPECT().IsActive(gomock.Any()).Return(true, nil) + activeScaler.EXPECT().GetMetricSpecForScaling().Times(2).Return(metricsSpecs) + activeScaler.EXPECT().Close() + failingScaler.EXPECT().Close() + + isActive, isError := scaleHandler.isScaledObjectActive(context.TODO(), scalers, scaledObject) + + assert.Equal(t, true, isActive) + assert.Equal(t, false, isError) +} + +func createMetricSpec(averageValue int) v2beta2.MetricSpec { + qty := resource.NewQuantity(int64(averageValue), resource.DecimalSI) + return v2beta2.MetricSpec{ + External: &v2beta2.ExternalMetricSource{ + Target: v2beta2.MetricTarget{ + AverageValue: qty, + }, + }, + } +}