From 9180616500fb7f8d1ef5c73d3272bfc3ba147d55 Mon Sep 17 00:00:00 2001 From: Zbynek Roubalik <726523+zroubalik@users.noreply.github.com> Date: Wed, 8 Apr 2020 20:59:09 +0200 Subject: [PATCH] [v2] autoscaling v2beta1 -> v2beta2 and minor refactoring (#734) * scaledObjectFinalizer minor refactor Signed-off-by: Zbynek Roubalik * autoscaling v2beta1 -> v2beta2 Signed-off-by: Zbynek Roubalik * apiregistration.k8s.io v1beta1 -> v1 Signed-off-by: Zbynek Roubalik Signed-off-by: Amith Ganesh --- deploy/24-metrics-api_service.yaml | 2 +- pkg/controller/scaledobject/hpa.go | 53 ++++----------- .../scaledobject/scaledobject_controller.go | 68 ++++++++++--------- .../scaledobject/scaledobject_finalizer.go | 56 +++++++-------- pkg/handler/scale_handler.go | 4 +- pkg/handler/scale_loop.go | 2 +- pkg/handler/scale_scaledobjects.go | 4 +- pkg/provider/provider.go | 2 +- pkg/scalers/aws_cloudwatch_scaler.go | 21 ++++-- pkg/scalers/aws_kinesis_stream_scaler.go | 19 ++++-- pkg/scalers/aws_sqs_queue_scaler.go | 19 ++++-- pkg/scalers/azure_blob_scaler.go | 52 ++++++++------ pkg/scalers/azure_eventhub_scaler.go | 21 +++--- pkg/scalers/azure_monitor_scaler.go | 18 +++-- pkg/scalers/azure_queue_scaler.go | 18 +++-- pkg/scalers/azure_servicebus_scaler.go | 18 +++-- pkg/scalers/external_scaler.go | 19 ++++-- pkg/scalers/gcp_pub_sub_scaler.go | 19 ++++-- pkg/scalers/huawei_cloudeye_scaler.go | 23 ++++--- pkg/scalers/kafka_scaler.go | 21 +++--- pkg/scalers/liiklus_scaler.go | 21 +++--- pkg/scalers/mysql_scaler.go | 19 ++++-- pkg/scalers/postgresql_scaler.go | 19 ++++-- .../{prometheus.go => prometheus_scaler.go} | 25 ++++--- pkg/scalers/rabbitmq_scaler.go | 23 ++++--- pkg/scalers/redis_scaler.go | 20 ++++-- pkg/scalers/scaler.go | 4 +- pkg/scalers/stan_scaler.go | 25 ++++--- 28 files changed, 354 insertions(+), 261 deletions(-) rename pkg/scalers/{prometheus.go => prometheus_scaler.go} (87%) diff --git a/deploy/24-metrics-api_service.yaml b/deploy/24-metrics-api_service.yaml index 6ecfea72ad6..0eebbb4ae97 100644 --- a/deploy/24-metrics-api_service.yaml +++ b/deploy/24-metrics-api_service.yaml @@ -1,4 +1,4 @@ -apiVersion: apiregistration.k8s.io/v1beta1 +apiVersion: apiregistration.k8s.io/v1 kind: APIService metadata: labels: diff --git a/pkg/controller/scaledobject/hpa.go b/pkg/controller/scaledobject/hpa.go index 7b06302884c..1b762520325 100644 --- a/pkg/controller/scaledobject/hpa.go +++ b/pkg/controller/scaledobject/hpa.go @@ -10,7 +10,7 @@ import ( "github.com/go-logr/logr" version "github.com/kedacore/keda/version" - autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1" + autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" @@ -45,9 +45,8 @@ func (r *ReconcileScaledObject) createAndDeployNewHPA(logger logr.Logger, scaled return nil } -//FIXME unify location from where we load gvkt - param vs. scaledObject // newHPAForScaledObject returns HPA as it is specified in ScaledObject -func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (*autoscalingv2beta1.HorizontalPodAutoscaler, error) { +func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (*autoscalingv2beta2.HorizontalPodAutoscaler, error) { scaledObjectMetricSpecs, err := r.getScaledObjectMetricSpecs(logger, scaledObject) if err != nil { return nil, err @@ -65,12 +64,12 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled "app.kubernetes.io/managed-by": "keda-operator", } - return &autoscalingv2beta1.HorizontalPodAutoscaler{ - Spec: autoscalingv2beta1.HorizontalPodAutoscalerSpec{ + return &autoscalingv2beta2.HorizontalPodAutoscaler{ + Spec: autoscalingv2beta2.HorizontalPodAutoscalerSpec{ MinReplicas: getHPAMinReplicas(scaledObject), MaxReplicas: getHPAMaxReplicas(scaledObject), Metrics: scaledObjectMetricSpecs, - ScaleTargetRef: autoscalingv2beta1.CrossVersionObjectReference{ + ScaleTargetRef: autoscalingv2beta2.CrossVersionObjectReference{ Name: scaledObject.Spec.ScaleTargetRef.Name, Kind: gvkr.Kind, APIVersion: gvkr.GroupVersion().String(), @@ -81,38 +80,13 @@ func (r *ReconcileScaledObject) newHPAForScaledObject(logger logr.Logger, scaled Labels: labels, }, TypeMeta: metav1.TypeMeta{ - APIVersion: "v2beta1", + APIVersion: "v2beta2", }, }, nil } -// checkHPAForUpdate checks whether update of HPA is needed -func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta1.HorizontalPodAutoscaler, gvkr *kedautil.GroupVersionKindResource) error { - - // updateHPA := false - // scaledObjectMinReplicaCount := getHPAMinReplicas(scaledObject) - // if *foundHpa.Spec.MinReplicas != *scaledObjectMinReplicaCount { - // updateHPA = true - // foundHpa.Spec.MinReplicas = scaledObjectMinReplicaCount - // } - - // scaledObjectMaxReplicaCount := getHPAMaxReplicas(scaledObject) - // if foundHpa.Spec.MaxReplicas != scaledObjectMaxReplicaCount { - // updateHPA = true - // foundHpa.Spec.MaxReplicas = scaledObjectMaxReplicaCount - // } - - // newMetricSpec, err := r.getScaledObjectMetricSpecs(logger, scaledObject) - // if err != nil { - // logger.Error(err, "Failed to create MetricSpec") - // return err - // } - // if fmt.Sprintf("%v", foundHpa.Spec.Metrics) != fmt.Sprintf("%v", newMetricSpec) { - // updateHPA = true - // foundHpa.Spec.Metrics = newMetricSpec - // } - - //if updateHPA { +// updateHPAIfNeeded checks whether update of HPA is needed +func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, foundHpa *autoscalingv2beta2.HorizontalPodAutoscaler, gvkr *kedautil.GroupVersionKindResource) error { hpa, err := r.newHPAForScaledObject(logger, scaledObject, gvkr) if err != nil { @@ -122,7 +96,6 @@ func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObje if !equality.Semantic.DeepDerivative(hpa.Spec, foundHpa.Spec) { if r.client.Update(context.TODO(), foundHpa) != nil { - //foundHpa.Spec = *hpa.Spec.DeepCopy() foundHpa.Spec = hpa.Spec logger.Error(err, "Failed to update HPA", "HPA.Namespace", foundHpa.Namespace, "HPA.Name", foundHpa.Name) return err @@ -134,8 +107,8 @@ func (r *ReconcileScaledObject) updateHPAIfNeeded(logger logr.Logger, scaledObje } // getScaledObjectMetricSpecs returns MetricSpec for HPA, generater from Triggers defitinion in ScaledObject -func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta1.MetricSpec, error) { - var scaledObjectMetricSpecs []autoscalingv2beta1.MetricSpec +func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) { + var scaledObjectMetricSpecs []autoscalingv2beta2.MetricSpec var externalMetricNames []string scalers, err := scalehandler.NewScaleHandler(r.client, r.scaleClient, r.scheme).GetScaledObjectScalers(scaledObject) @@ -149,9 +122,9 @@ func (r *ReconcileScaledObject) getScaledObjectMetricSpecs(logger logr.Logger, s // add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it. for _, metricSpec := range metricSpecs { - metricSpec.External.MetricSelector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} - metricSpec.External.MetricSelector.MatchLabels["scaledObjectName"] = scaledObject.Name - externalMetricNames = append(externalMetricNames, metricSpec.External.MetricName) + metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)} + metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name + externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name) } scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...) scaler.Close() diff --git a/pkg/controller/scaledobject/scaledobject_controller.go b/pkg/controller/scaledobject/scaledobject_controller.go index 5fc4ee20c22..79de08bcb6b 100644 --- a/pkg/controller/scaledobject/scaledobject_controller.go +++ b/pkg/controller/scaledobject/scaledobject_controller.go @@ -10,7 +10,7 @@ import ( kedautil "github.com/kedacore/keda/pkg/util" "github.com/go-logr/logr" - autoscalingv2beta1 "k8s.io/api/autoscaling/v2beta1" + autoscalingv2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -41,11 +41,11 @@ func Add(mgr manager.Manager) error { if err != nil { return err } - return add(mgr, newReconciler(mgr, scaleClient)) + return add(mgr, newReconciler(mgr, &scaleClient)) } // newReconciler returns a new reconcile.Reconciler -func newReconciler(mgr manager.Manager, scaleClient scale.ScalesGetter) reconcile.Reconciler { +func newReconciler(mgr manager.Manager, scaleClient *scale.ScalesGetter) reconcile.Reconciler { return &ReconcileScaledObject{ client: mgr.GetClient(), scaleClient: scaleClient, @@ -78,7 +78,7 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { } // Watch for changes to secondary resource HPA and requeue the owner ScaledObject - err = c.Watch(&source.Kind{Type: &autoscalingv2beta1.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{ + err = c.Watch(&source.Kind{Type: &autoscalingv2beta2.HorizontalPodAutoscaler{}}, &handler.EnqueueRequestForOwner{ IsController: true, OwnerType: &kedav1alpha1.ScaledObject{}, }) @@ -110,7 +110,7 @@ type ReconcileScaledObject struct { // This client, initialized using mgr.Client() above, is a split client // that reads objects from the cache and writes to the apiserver client client.Client - scaleClient scale.ScalesGetter + scaleClient *scale.ScalesGetter restMapper meta.RESTMapper scheme *runtime.Scheme scaleLoopContexts *sync.Map @@ -143,32 +143,13 @@ func (r *ReconcileScaledObject) Reconcile(request reconcile.Request) (reconcile. // Check if the ScaledObject instance is marked to be deleted, which is // indicated by the deletion timestamp being set. - isScaledObjectMarkedToBeDeleted := scaledObject.GetDeletionTimestamp() != nil - if isScaledObjectMarkedToBeDeleted { - if contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) { - // Run finalization logic for scaledObjectFinalizer. If the - // finalization logic fails, don't remove the finalizer so - // that we can retry during the next reconciliation. - if err := r.finalizeScaledObject(reqLogger, scaledObject); err != nil { - return reconcile.Result{}, err - } - - // Remove scaledObjectFinalizer. Once all finalizers have been - // removed, the object will be deleted. - scaledObject.SetFinalizers(remove(scaledObject.GetFinalizers(), scaledObjectFinalizer)) - err := r.client.Update(context.TODO(), scaledObject) - if err != nil { - return reconcile.Result{}, err - } - } - return reconcile.Result{}, nil + if scaledObject.GetDeletionTimestamp() != nil { + return reconcile.Result{}, r.finalizeScaledObject(reqLogger, scaledObject) } - // Add finalizer for this CR - if !contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) { - if err := r.addFinalizer(reqLogger, scaledObject); err != nil { - return reconcile.Result{}, err - } + // ensure finalizer is set on this CR + if err := r.ensureFinalizer(reqLogger, scaledObject); err != nil { + return reconcile.Result{}, err } return reconcile.Result{}, r.reconcileScaledObject(reqLogger, scaledObject) @@ -256,7 +237,7 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger logger.V(1).Info("Parsed Group, Version, Kind, Resource", "GVK", gvkr.GVKString(), "Resource", gvkr.Resource) // let's try to detect /scale subresource - _, errScale := r.scaleClient.Scales(scaledObject.Namespace).Get(gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name) + _, errScale := (*r.scaleClient).Scales(scaledObject.Namespace).Get(gvkr.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name) if errScale != nil { // not able to get /scale subresource -> let's check if the resource even exist in the cluster unstruct := &unstructured.Unstructured{} @@ -286,7 +267,7 @@ func (r *ReconcileScaledObject) checkTargetResourceIsScalable(logger logr.Logger // ensureHPAForScaledObjectExists ensures that in cluster exist up-to-date HPA for specified ScaledObject, returns true if a new HPA was created func (r *ReconcileScaledObject) ensureHPAForScaledObjectExists(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject, gvkr *kedautil.GroupVersionKindResource) (bool, error) { hpaName := getHPAName(scaledObject) - foundHpa := &autoscalingv2beta1.HorizontalPodAutoscaler{} + foundHpa := &autoscalingv2beta2.HorizontalPodAutoscaler{} // Check if HPA for this ScaledObject already exists err := r.client.Get(context.TODO(), types.NamespacedName{Name: hpaName, Namespace: scaledObject.Namespace}, foundHpa) if err != nil && errors.IsNotFound(err) { @@ -344,6 +325,31 @@ func (r *ReconcileScaledObject) startScaleLoop(logger logr.Logger, scaledObject return nil } +// stopScaleLoop stops ScaleLoop handler for the respective ScaleObject +func (r *ReconcileScaledObject) stopScaleLoop(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { + key, err := cache.MetaNamespaceKeyFunc(scaledObject) + if err != nil { + logger.Error(err, "Error getting key for scaledObject") + return err + } + + // delete ScaledObject's current Generation + r.scaledObjectsGenerations.Delete(key) + + result, ok := r.scaleLoopContexts.Load(key) + if ok { + cancel, ok := result.(context.CancelFunc) + if ok { + cancel() + } + r.scaleLoopContexts.Delete(key) + } else { + logger.V(1).Info("ScaleObject was not found in controller cache", "key", key) + } + + return nil +} + // scaledObjectGenerationChanged returns true if ScaledObject's Generation was changed, ie. ScaledObject.Spec was changed func (r *ReconcileScaledObject) scaledObjectGenerationChanged(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) (bool, error) { key, err := cache.MetaNamespaceKeyFunc(scaledObject) diff --git a/pkg/controller/scaledobject/scaledobject_finalizer.go b/pkg/controller/scaledobject/scaledobject_finalizer.go index 544b97f1314..c96675a2038 100644 --- a/pkg/controller/scaledobject/scaledobject_finalizer.go +++ b/pkg/controller/scaledobject/scaledobject_finalizer.go @@ -3,52 +3,52 @@ package scaledobject import ( "context" - "github.com/go-logr/logr" - "k8s.io/client-go/tools/cache" - kedav1alpha1 "github.com/kedacore/keda/pkg/apis/keda/v1alpha1" + + "github.com/go-logr/logr" ) const ( scaledObjectFinalizer = "finalizer.keda.sh" ) -// finalizeScaledObject is stopping ScaleLoop for the respective ScaleObject +// finalizeScaledObject runs finalization logic on ScaledObject if there's finalizer func (r *ReconcileScaledObject) finalizeScaledObject(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { - key, err := cache.MetaNamespaceKeyFunc(scaledObject) - if err != nil { - logger.Error(err, "Error getting key for scaledObject (%s/%s)", scaledObject.GetNamespace(), scaledObject.GetName()) - return err - } - // store ScaledObject's current Generation - r.scaledObjectsGenerations.Delete(key) + if contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) { + // Run finalization logic for scaledObjectFinalizer. If the + // finalization logic fails, don't remove the finalizer so + // that we can retry during the next reconciliation. + if err := r.stopScaleLoop(logger, scaledObject); err != nil { + return err + } - result, ok := r.scaleLoopContexts.Load(key) - if ok { - cancel, ok := result.(context.CancelFunc) - if ok { - cancel() + // Remove scaledObjectFinalizer. Once all finalizers have been + // removed, the object will be deleted. + scaledObject.SetFinalizers(remove(scaledObject.GetFinalizers(), scaledObjectFinalizer)) + if err := r.client.Update(context.TODO(), scaledObject); err != nil { + logger.Error(err, "Failed to update ScaledObject after removing a finalizer", "finalizer", scaledObjectFinalizer) + return err } - r.scaleLoopContexts.Delete(key) - } else { - logger.V(1).Info("ScaleObject was not found in controller cache", "key", key) } logger.Info("Successfully finalized ScaledObject") return nil } -// addFinalizer adds finalizer to the ScaledObject -func (r *ReconcileScaledObject) addFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { - logger.Info("Adding Finalizer for the ScaledObject") - scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer)) +// ensureFinalizer check there is finalizer present on the ScaledObject, if not it adds one +func (r *ReconcileScaledObject) ensureFinalizer(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) error { - // Update CR - err := r.client.Update(context.TODO(), scaledObject) - if err != nil { - logger.Error(err, "Failed to update ScaledObject with finalizer") - return err + if !contains(scaledObject.GetFinalizers(), scaledObjectFinalizer) { + logger.Info("Adding Finalizer for the ScaledObject") + scaledObject.SetFinalizers(append(scaledObject.GetFinalizers(), scaledObjectFinalizer)) + + // Update CR + err := r.client.Update(context.TODO(), scaledObject) + if err != nil { + logger.Error(err, "Failed to update ScaledObject with a finalizer", "finalizer", scaledObjectFinalizer) + return err + } } return nil } diff --git a/pkg/handler/scale_handler.go b/pkg/handler/scale_handler.go index 464a2b6a732..30015f75b5e 100644 --- a/pkg/handler/scale_handler.go +++ b/pkg/handler/scale_handler.go @@ -21,7 +21,7 @@ import ( // each ScaledObject and making the final scale decision and operation type ScaleHandler struct { client client.Client - scaleClient scale.ScalesGetter // TODO pointer + scaleClient *scale.ScalesGetter logger logr.Logger reconcilerScheme *runtime.Scheme } @@ -34,7 +34,7 @@ const ( ) // NewScaleHandler creates a ScaleHandler object -func NewScaleHandler(client client.Client, scaleClient scale.ScalesGetter, reconcilerScheme *runtime.Scheme) *ScaleHandler { +func NewScaleHandler(client client.Client, scaleClient *scale.ScalesGetter, reconcilerScheme *runtime.Scheme) *ScaleHandler { handler := &ScaleHandler{ client: client, scaleClient: scaleClient, diff --git a/pkg/handler/scale_loop.go b/pkg/handler/scale_loop.go index 59484c4a2bb..f83ad60d6f6 100644 --- a/pkg/handler/scale_loop.go +++ b/pkg/handler/scale_loop.go @@ -58,7 +58,7 @@ func (h *ScaleHandler) handleScaleJob(ctx context.Context, scaledObject *kedav1a var metricValue int64 for _, metric := range metricSpecs { - metricValue, _ = metric.External.TargetAverageValue.AsInt64() + metricValue, _ = metric.External.Target.AverageValue.AsInt64() maxValue += metricValue } scalerLogger.Info("Scaler max value", "MaxValue", maxValue) diff --git a/pkg/handler/scale_scaledobjects.go b/pkg/handler/scale_scaledobjects.go index c76a6566e8f..78c1cbea9fa 100644 --- a/pkg/handler/scale_scaledobjects.go +++ b/pkg/handler/scale_scaledobjects.go @@ -175,11 +175,11 @@ func (h *ScaleHandler) scaleFromZero(scaledObject *kedav1alpha1.ScaledObject, sc } func (h *ScaleHandler) getScaleTargetScale(scaledObject *kedav1alpha1.ScaledObject) (*autoscalingv1.Scale, error) { - return h.scaleClient.Scales(scaledObject.Namespace).Get(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name) + return (*h.scaleClient).Scales(scaledObject.Namespace).Get(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scaledObject.Spec.ScaleTargetRef.Name) } func (h *ScaleHandler) updateScaleOnScaleTarget(scaledObject *kedav1alpha1.ScaledObject, scale *autoscalingv1.Scale) error { - _, err := h.scaleClient.Scales(scaledObject.Namespace).Update(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale) + _, err := (*h.scaleClient).Scales(scaledObject.Namespace).Update(scaledObject.Status.ScaleTargetGVKR.GroupResource(), scale) return err } diff --git a/pkg/provider/provider.go b/pkg/provider/provider.go index 2fb1449751a..dafc440312e 100644 --- a/pkg/provider/provider.go +++ b/pkg/provider/provider.go @@ -88,7 +88,7 @@ func (p *KedaProvider) GetExternalMetric(namespace string, metricSelector labels for _, metricSpec := range metricSpecs { // Filter only the desired metric - if strings.EqualFold(metricSpec.External.MetricName, info.Metric) { + if strings.EqualFold(metricSpec.External.Metric.Name, info.Metric) { metrics, err := scaler.GetMetrics(context.TODO(), info.Metric, metricSelector) if err != nil { logger.Error(err, "error getting metric for scaler", "ScaledObject.Namespace", scaledObject.Namespace, "ScaledObject.Name", scaledObject.Name, "Scaler", scaler) diff --git a/pkg/scalers/aws_cloudwatch_scaler.go b/pkg/scalers/aws_cloudwatch_scaler.go index b19012e3acc..111cef3564c 100644 --- a/pkg/scalers/aws_cloudwatch_scaler.go +++ b/pkg/scalers/aws_cloudwatch_scaler.go @@ -12,7 +12,7 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials/stscreds" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/cloudwatch" - "k8s.io/api/autoscaling/v2beta1" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -172,13 +172,20 @@ func (c *awsCloudwatchScaler) GetMetrics(ctx context.Context, metricName string, return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (c *awsCloudwatchScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (c *awsCloudwatchScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetMetricValue := resource.NewQuantity(int64(c.metadata.targetMetricValue), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s", strings.ReplaceAll(c.metadata.namespace, "/", "-"), - c.metadata.dimensionName, c.metadata.dimensionValue), - TargetAverageValue: targetMetricValue} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: fmt.Sprintf("%s-%s-%s", strings.ReplaceAll(c.metadata.namespace, "/", "-"), + c.metadata.dimensionName, c.metadata.dimensionValue), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } func (c *awsCloudwatchScaler) IsActive(ctx context.Context) (bool, error) { diff --git a/pkg/scalers/aws_kinesis_stream_scaler.go b/pkg/scalers/aws_kinesis_stream_scaler.go index 8740f427e36..2cda2238247 100644 --- a/pkg/scalers/aws_kinesis_stream_scaler.go +++ b/pkg/scalers/aws_kinesis_stream_scaler.go @@ -11,7 +11,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -100,12 +100,19 @@ func (s *awsKinesisStreamScaler) Close() error { return nil } -func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *awsKinesisStreamScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetShardCountQty := resource.NewQuantity(int64(s.metadata.targetShardCount), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s", "AWS-Kinesis-Stream", awsKinesisStreamMetricName, s.metadata.streamName), - TargetAverageValue: targetShardCountQty} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: fmt.Sprintf("%s-%s-%s", "AWS-Kinesis-Stream", awsKinesisStreamMetricName, s.metadata.streamName), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetShardCountQty, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } //GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/aws_sqs_queue_scaler.go b/pkg/scalers/aws_sqs_queue_scaler.go index 69f18b31d0c..9e4c9dd48df 100644 --- a/pkg/scalers/aws_sqs_queue_scaler.go +++ b/pkg/scalers/aws_sqs_queue_scaler.go @@ -13,7 +13,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -116,12 +116,19 @@ func (s *awsSqsQueueScaler) Close() error { return nil } -func (s *awsSqsQueueScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *awsSqsQueueScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s", "AWS-SQS-Queue", awsSqsQueueMetricName, s.metadata.queueName), - TargetAverageValue: targetQueueLengthQty} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: fmt.Sprintf("%s-%s-%s", "AWS-SQS-Queue", awsSqsQueueMetricName, s.metadata.queueName), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetQueueLengthQty, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } //GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/azure_blob_scaler.go b/pkg/scalers/azure_blob_scaler.go index f4a9bdd4382..91b7f4e795b 100644 --- a/pkg/scalers/azure_blob_scaler.go +++ b/pkg/scalers/azure_blob_scaler.go @@ -5,7 +5,7 @@ import ( "fmt" "strconv" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -14,15 +14,15 @@ import ( ) const ( - blobCountMetricName = "blobCount" - defaultTargetBlobCount = 5 - defaultBlobDelimiter = "/" - defaultBlobPrefix = "" + blobCountMetricName = "blobCount" + defaultTargetBlobCount = 5 + defaultBlobDelimiter = "/" + defaultBlobPrefix = "" defaultBlobConnectionSetting = "AzureWebJobsStorage" ) type azureBlobScaler struct { - metadata *azureBlobMetadata + metadata *azureBlobMetadata podIdentity string } @@ -30,7 +30,7 @@ type azureBlobMetadata struct { targetBlobCount int blobContainerName string blobDelimiter string - blobPrefix string + blobPrefix string connection string useAAdPodIdentity bool accountName string @@ -46,7 +46,7 @@ func NewAzureBlobScaler(resolvedEnv, metadata, authParams map[string]string, pod } return &azureBlobScaler{ - metadata: meta, + metadata: meta, podIdentity: podIdentity, }, nil } @@ -101,17 +101,17 @@ func parseAzureBlobMetadata(metadata, resolvedEnv, authParams map[string]string, // Found the connection in a parameter from TriggerAuthentication meta.connection = connection } else { - connectionSetting := defaultBlobConnectionSetting - if val, ok := metadata["connection"]; ok && val != "" { - connectionSetting = val + connectionSetting := defaultBlobConnectionSetting + if val, ok := metadata["connection"]; ok && val != "" { + connectionSetting = val + } + + if val, ok := resolvedEnv[connectionSetting]; ok { + meta.connection = val + } else { + return nil, "", fmt.Errorf("no connection setting given") + } } - - if val, ok := resolvedEnv[connectionSetting]; ok { - meta.connection = val - } else { - return nil, "", fmt.Errorf("no connection setting given") - } - } } else if podAuth == "azure" { // If the Use AAD Pod Identity is present then check account name if val, ok := metadata["accountName"]; ok && val != "" { @@ -150,11 +150,19 @@ func (s *azureBlobScaler) Close() error { return nil } -func (s *azureBlobScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *azureBlobScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetBlobCount := resource.NewQuantity(int64(s.metadata.targetBlobCount), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: blobCountMetricName, TargetAverageValue: targetBlobCount} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: blobCountMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetBlobCount, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } //GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 9e1de7fd70a..89256cefbe1 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -8,7 +8,7 @@ import ( eventhub "github.com/Azure/azure-event-hubs-go" "github.com/Azure/azure-storage-blob-go/azblob" - "k8s.io/api/autoscaling/v2beta1" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -173,16 +173,19 @@ func (scaler *AzureEventHubScaler) IsActive(ctx context.Context) (bool, error) { } // GetMetricSpecForScaling returns metric spec -func (scaler *AzureEventHubScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { - return []v2beta1.MetricSpec{ - { - External: &v2beta1.ExternalMetricSource{ - MetricName: thresholdMetricName, - TargetAverageValue: resource.NewQuantity(scaler.metadata.threshold, resource.DecimalSI), - }, - Type: eventHubMetricType, +func (scaler *AzureEventHubScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricVal := resource.NewQuantity(scaler.metadata.threshold, resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: thresholdMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricVal, }, } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: eventHubMetricType} + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics returns metric using total number of unprocessed events in event hub diff --git a/pkg/scalers/azure_monitor_scaler.go b/pkg/scalers/azure_monitor_scaler.go index a27d83a34a7..be0788b0329 100644 --- a/pkg/scalers/azure_monitor_scaler.go +++ b/pkg/scalers/azure_monitor_scaler.go @@ -6,7 +6,7 @@ import ( "strconv" "strings" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -169,11 +169,19 @@ func (s *azureMonitorScaler) Close() error { return nil } -func (s *azureMonitorScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *azureMonitorScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetMetricVal := resource.NewQuantity(int64(s.metadata.targetValue), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: azureMonitorMetricName, TargetAverageValue: targetMetricVal} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: azureMonitorMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricVal, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/azure_queue_scaler.go b/pkg/scalers/azure_queue_scaler.go index 4fb20b33b10..5605ee3c718 100644 --- a/pkg/scalers/azure_queue_scaler.go +++ b/pkg/scalers/azure_queue_scaler.go @@ -5,7 +5,7 @@ import ( "fmt" "strconv" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -131,11 +131,19 @@ func (s *azureQueueScaler) Close() error { return nil } -func (s *azureQueueScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *azureQueueScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetQueueLengthQty := resource.NewQuantity(int64(s.metadata.targetQueueLength), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: queueLengthMetricName, TargetAverageValue: targetQueueLengthQty} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: queueLengthMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetQueueLengthQty, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } //GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/azure_servicebus_scaler.go b/pkg/scalers/azure_servicebus_scaler.go index aebfcb86303..79bcc31531c 100755 --- a/pkg/scalers/azure_servicebus_scaler.go +++ b/pkg/scalers/azure_servicebus_scaler.go @@ -8,7 +8,7 @@ import ( servicebus "github.com/Azure/azure-service-bus-go" "github.com/Azure/azure-amqp-common-go/v3/auth" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -143,11 +143,19 @@ func (s *azureServiceBusScaler) Close() error { } // Returns the metric spec to be used by the HPA -func (s *azureServiceBusScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *azureServiceBusScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetLengthQty := resource.NewQuantity(int64(s.metadata.targetLength), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: queueLengthMetricName, TargetAverageValue: targetLengthQty} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: queueLengthMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetLengthQty, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } // Returns the current metrics to be served to the HPA diff --git a/pkg/scalers/external_scaler.go b/pkg/scalers/external_scaler.go index 13865c69164..4aff2dd62c3 100644 --- a/pkg/scalers/external_scaler.go +++ b/pkg/scalers/external_scaler.go @@ -7,7 +7,7 @@ import ( pb "github.com/kedacore/keda/pkg/scalers/externalscaler" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -128,7 +128,7 @@ func (s *externalScaler) Close() error { } // GetMetricSpecForScaling returns the metric spec for the HPA -func (s *externalScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *externalScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // TODO: Pass Context ctx := context.Background() @@ -140,19 +140,24 @@ func (s *externalScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { return nil } - var result []v2beta1.MetricSpec + var result []v2beta2.MetricSpec for _, spec := range response.MetricSpecs { // Construct the target subscription size as a quantity qty := resource.NewQuantity(int64(spec.TargetSize), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{ - MetricName: spec.MetricName, - TargetAverageValue: qty, + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: spec.MetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: qty, + }, } // Create the metric spec for the HPA - metricSpec := v2beta1.MetricSpec{ + metricSpec := v2beta2.MetricSpec{ External: externalMetric, Type: externalMetricType, } diff --git a/pkg/scalers/gcp_pub_sub_scaler.go b/pkg/scalers/gcp_pub_sub_scaler.go index 3e5e2680a2f..92439a49dcf 100644 --- a/pkg/scalers/gcp_pub_sub_scaler.go +++ b/pkg/scalers/gcp_pub_sub_scaler.go @@ -5,7 +5,7 @@ import ( "fmt" "strconv" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -97,23 +97,28 @@ func (s *pubsubScaler) Close() error { } // GetMetricSpecForScaling returns the metric spec for the HPA -func (s *pubsubScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *pubsubScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { // Construct the target subscription size as a quantity targetSubscriptionSizeQty := resource.NewQuantity(int64(s.metadata.targetSubscriptionSize), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{ - MetricName: pubSubSubscriptionSizeMetricName, - TargetAverageValue: targetSubscriptionSizeQty, + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: pubSubSubscriptionSizeMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetSubscriptionSizeQty, + }, } // Create the metric spec for the HPA - metricSpec := v2beta1.MetricSpec{ + metricSpec := v2beta2.MetricSpec{ External: externalMetric, Type: externalMetricType, } - return []v2beta1.MetricSpec{metricSpec} + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics connects to Stack Driver and finds the size of the pub sub subscription diff --git a/pkg/scalers/huawei_cloudeye_scaler.go b/pkg/scalers/huawei_cloudeye_scaler.go index 12642f66a27..c40015d3c4a 100644 --- a/pkg/scalers/huawei_cloudeye_scaler.go +++ b/pkg/scalers/huawei_cloudeye_scaler.go @@ -11,7 +11,7 @@ import ( "github.com/Huawei/gophercloud/auth/aksk" "github.com/Huawei/gophercloud/openstack" "github.com/Huawei/gophercloud/openstack/ces/v1/metricdata" - "k8s.io/api/autoscaling/v2beta1" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -238,14 +238,21 @@ func (h *huaweiCloudeyeScaler) GetMetrics(ctx context.Context, metricName string return append([]external_metrics.ExternalMetricValue{}, metric), nil } -func (h *huaweiCloudeyeScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (h *huaweiCloudeyeScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetMetricValue := resource.NewQuantity(int64(h.metadata.targetMetricValue), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: fmt.Sprintf("%s-%s-%s-%s", strings.ReplaceAll(h.metadata.namespace, ".", "-"), - h.metadata.metricsName, - h.metadata.dimensionName, h.metadata.dimensionValue), - TargetAverageValue: targetMetricValue} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: fmt.Sprintf("%s-%s-%s-%s", strings.ReplaceAll(h.metadata.namespace, ".", "-"), + h.metadata.metricsName, + h.metadata.dimensionName, h.metadata.dimensionValue), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: externalMetricType} + return []v2beta2.MetricSpec{metricSpec} } func (h *huaweiCloudeyeScaler) IsActive(ctx context.Context) (bool, error) { diff --git a/pkg/scalers/kafka_scaler.go b/pkg/scalers/kafka_scaler.go index 7df2163c8b6..6af7c3111af 100644 --- a/pkg/scalers/kafka_scaler.go +++ b/pkg/scalers/kafka_scaler.go @@ -11,7 +11,7 @@ import ( "time" "github.com/Shopify/sarama" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -326,16 +326,19 @@ func (s *kafkaScaler) Close() error { return nil } -func (s *kafkaScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { - return []v2beta1.MetricSpec{ - { - External: &v2beta1.ExternalMetricSource{ - MetricName: lagThresholdMetricName, - TargetAverageValue: resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI), - }, - Type: kafkaMetricType, +func (s *kafkaScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: lagThresholdMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, }, } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: kafkaMetricType} + return []v2beta2.MetricSpec{metricSpec} } //GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/liiklus_scaler.go b/pkg/scalers/liiklus_scaler.go index 2c37589f986..a18f67e0771 100644 --- a/pkg/scalers/liiklus_scaler.go +++ b/pkg/scalers/liiklus_scaler.go @@ -9,7 +9,7 @@ import ( liiklus_service "github.com/kedacore/keda/pkg/scalers/liiklus" "github.com/pkg/errors" "google.golang.org/grpc" - "k8s.io/api/autoscaling/v2beta1" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -77,16 +77,19 @@ func (s *liiklusScaler) GetMetrics(ctx context.Context, metricName string, metri } -func (s *liiklusScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { - return []v2beta1.MetricSpec{ - { - External: &v2beta1.ExternalMetricSource{ - MetricName: liiklusLagThresholdMetricName, - TargetAverageValue: resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI), - }, - Type: liiklusMetricType, +func (s *liiklusScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: liiklusLagThresholdMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, }, } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: liiklusMetricType} + return []v2beta2.MetricSpec{metricSpec} } func (s *liiklusScaler) Close() error { diff --git a/pkg/scalers/mysql_scaler.go b/pkg/scalers/mysql_scaler.go index b1a2c61916b..80ee56a223d 100644 --- a/pkg/scalers/mysql_scaler.go +++ b/pkg/scalers/mysql_scaler.go @@ -5,7 +5,7 @@ import ( "database/sql" "fmt" "github.com/go-sql-driver/mysql" - "k8s.io/api/autoscaling/v2beta1" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -183,16 +183,21 @@ func (s *mySQLScaler) getQueryResult() (int, error) { } // GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler -func (s *mySQLScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *mySQLScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetQueryValue := resource.NewQuantity(int64(s.metadata.queryValue), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{ - MetricName: mySQLMetricName, - TargetAverageValue: targetQueryValue, + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: mySQLMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetQueryValue, + }, } - metricSpec := v2beta1.MetricSpec{ + metricSpec := v2beta2.MetricSpec{ External: externalMetric, Type: externalMetricType, } - return []v2beta1.MetricSpec{metricSpec} + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/postgresql_scaler.go b/pkg/scalers/postgresql_scaler.go index ae3f580eae2..808db29c1cd 100644 --- a/pkg/scalers/postgresql_scaler.go +++ b/pkg/scalers/postgresql_scaler.go @@ -5,7 +5,7 @@ import ( "database/sql" "fmt" _ "github.com/lib/pq" - "k8s.io/api/autoscaling/v2beta1" + "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -182,16 +182,21 @@ func (s *postgreSQLScaler) getActiveNumber() (int, error) { } // GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler -func (s *postgreSQLScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *postgreSQLScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetQueryValue := resource.NewQuantity(int64(s.metadata.targetQueryValue), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{ - MetricName: pgMetricName, - TargetAverageValue: targetQueryValue, + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: pgMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetQueryValue, + }, } - metricSpec := v2beta1.MetricSpec{ + metricSpec := v2beta2.MetricSpec{ External: externalMetric, Type: externalMetricType, } - return []v2beta1.MetricSpec{metricSpec} + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/prometheus.go b/pkg/scalers/prometheus_scaler.go similarity index 87% rename from pkg/scalers/prometheus.go rename to pkg/scalers/prometheus_scaler.go index 1eaa0133a79..b6ec3b5d0b5 100644 --- a/pkg/scalers/prometheus.go +++ b/pkg/scalers/prometheus_scaler.go @@ -10,7 +10,7 @@ import ( "strconv" "time" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -101,7 +101,7 @@ func (s *prometheusScaler) IsActive(ctx context.Context) (bool, error) { prometheusLog.Error(err, "error executing prometheus query") return false, err } - + return val > -1, nil } @@ -109,16 +109,21 @@ func (s *prometheusScaler) Close() error { return nil } -func (s *prometheusScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { - return []v2beta1.MetricSpec{ - { - External: &v2beta1.ExternalMetricSource{ - TargetAverageValue: resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI), - MetricName: s.metadata.metricName, - }, - Type: externalMetricType, +func (s *prometheusScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.threshold), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: s.metadata.metricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, }, } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} } func (s *prometheusScaler) ExecutePromQuery() (float64, error) { diff --git a/pkg/scalers/rabbitmq_scaler.go b/pkg/scalers/rabbitmq_scaler.go index 97cd220c5a1..5b43aa67049 100644 --- a/pkg/scalers/rabbitmq_scaler.go +++ b/pkg/scalers/rabbitmq_scaler.go @@ -6,7 +6,7 @@ import ( "strconv" "github.com/streadway/amqp" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -133,16 +133,21 @@ func (s *rabbitMQScaler) getQueueMessages() (int, error) { } // GetMetricSpecForScaling returns the MetricSpec for the Horizontal Pod Autoscaler -func (s *rabbitMQScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { - return []v2beta1.MetricSpec{ - { - External: &v2beta1.ExternalMetricSource{ - MetricName: rabbitQueueLengthMetricName, - TargetAverageValue: resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI), - }, - Type: rabbitMetricType, +func (s *rabbitMQScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(int64(s.metadata.queueLength), resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: rabbitQueueLengthMetricName, }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: rabbitMetricType, } + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics returns value for a supported metric and an error if there is a problem getting the metric diff --git a/pkg/scalers/redis_scaler.go b/pkg/scalers/redis_scaler.go index 5324bd7b345..c803ca346af 100644 --- a/pkg/scalers/redis_scaler.go +++ b/pkg/scalers/redis_scaler.go @@ -7,7 +7,7 @@ import ( "strconv" "github.com/go-redis/redis" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -157,11 +157,21 @@ func (s *redisScaler) Close() error { } // GetMetricSpecForScaling returns the metric spec for the HPA -func (s *redisScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { +func (s *redisScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { targetListLengthQty := resource.NewQuantity(int64(s.metadata.targetListLength), resource.DecimalSI) - externalMetric := &v2beta1.ExternalMetricSource{MetricName: listLengthMetricName, TargetAverageValue: targetListLengthQty} - metricSpec := v2beta1.MetricSpec{External: externalMetric, Type: externalMetricType} - return []v2beta1.MetricSpec{metricSpec} + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: listLengthMetricName, + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetListLengthQty, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: externalMetricType, + } + return []v2beta2.MetricSpec{metricSpec} } // GetMetrics connects to Redis and finds the length of the list diff --git a/pkg/scalers/scaler.go b/pkg/scalers/scaler.go index 10815062336..c11755d157a 100644 --- a/pkg/scalers/scaler.go +++ b/pkg/scalers/scaler.go @@ -3,7 +3,7 @@ package scalers import ( "context" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/labels" "k8s.io/metrics/pkg/apis/external_metrics" ) @@ -15,7 +15,7 @@ type Scaler interface { // Returns the metrics based on which this scaler determines that the ScaleTarget scales. This is used to contruct the HPA spec that is created for // this scaled object. The labels used should match the selectors used in GetMetrics - GetMetricSpecForScaling() []v2beta1.MetricSpec + GetMetricSpecForScaling() []v2beta2.MetricSpec IsActive(ctx context.Context) (bool, error) diff --git a/pkg/scalers/stan_scaler.go b/pkg/scalers/stan_scaler.go index ab9ea637c04..91014464823 100644 --- a/pkg/scalers/stan_scaler.go +++ b/pkg/scalers/stan_scaler.go @@ -8,7 +8,7 @@ import ( "net/http" "strconv" - v2beta1 "k8s.io/api/autoscaling/v2beta1" + v2beta2 "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -156,7 +156,7 @@ func (s *stanScaler) getMaxMsgLag() int64 { return s.channelInfo.LastSequence - maxValue } -func (s *stanScaler) hasPendingMessage() bool { +func (s *stanScaler) hasPendingMessage() bool { subscriberFound := false combinedQueueName := s.metadata.durableName + ":" + s.metadata.queueGroup @@ -179,16 +179,21 @@ func (s *stanScaler) hasPendingMessage() bool { return false } -func (s *stanScaler) GetMetricSpecForScaling() []v2beta1.MetricSpec { - return []v2beta1.MetricSpec{ - { - External: &v2beta1.ExternalMetricSource{ - MetricName: lagThresholdMetricName, - TargetAverageValue: resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI), - }, - Type: stanMetricType, +func (s *stanScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(s.metadata.lagThreshold, resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: lagThresholdMetricName, }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{ + External: externalMetric, Type: stanMetricType, } + return []v2beta2.MetricSpec{metricSpec} } //GetMetrics returns value for a supported metric and an error if there is a problem getting the metric