From db92b324b6b716648953d564a3f83cd5dbbef0e8 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 26 Aug 2023 18:35:12 +0900 Subject: [PATCH 01/11] Expose prometheus metrics at ScaledJob like ScaledObject Signed-off-by: Yoon Park --- CHANGELOG.md | 2 + pkg/metricscollector/metricscollectors.go | 38 +++-- pkg/metricscollector/opentelemetry.go | 50 +++++-- pkg/metricscollector/prommetrics.go | 67 ++++++--- pkg/scaling/scale_handler.go | 93 +++++++----- .../opentelemetry_metrics_test.go | 132 +++++++++++++++++- .../prometheus_metrics_test.go | 129 ++++++++++++++++- 7 files changed, 424 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d530c62c3d8..d105247ef73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,8 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - **General**: Introduce new AWS Authentication ([#4134](https://github.com/kedacore/keda/issues/4134)) +- - **Prometheus Metrics**: Expose prometheus metrics for ScaledJob resources ([#4798](https://github.com/kedacore/keda/issues/4798)) +>>>>>>> 83108fc2 (Expose prometheus metrics at ScaledJob like ScaledObject) #### Experimental diff --git a/pkg/metricscollector/metricscollectors.go b/pkg/metricscollector/metricscollectors.go index 299b287a819..296bc6b86b6 100644 --- a/pkg/metricscollector/metricscollectors.go +++ b/pkg/metricscollector/metricscollectors.go @@ -31,26 +31,29 @@ var ( ) type MetricsCollector interface { - RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) + RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) // RecordScalerLatency create a measurement of the latency to external metric - RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) + RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) // RecordScalableObjectLatency create a measurement of the latency executing scalable object loop RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) // RecordScalerActive create a measurement of the activity of the scaler - RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) + RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) // RecordScaledObjectPaused marks whether the current ScaledObject is paused. RecordScaledObjectPaused(namespace string, scaledObject string, active bool) - // RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA - RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) + // RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA + RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) // RecordScaledObjectError counts the number of errors with the scaled object RecordScaledObjectError(namespace string, scaledObject string, err error) + // RecordScaledJobError counts the number of errors with the scaled job + RecordScaledJobError(namespace string, scaledJob string, err error) + IncrementTriggerTotal(triggerType string) DecrementTriggerTotal(triggerType string) @@ -82,16 +85,16 @@ func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetri } // RecordScalerMetric create a measurement of the external metric used by the HPA -func RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) { +func RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { for _, element := range collectors { - element.RecordScalerMetric(namespace, scaledObject, scaler, triggerIndex, metric, value) + element.RecordScalerMetric(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value) } } // RecordScalerLatency create a measurement of the latency to external metric -func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) { +func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { for _, element := range collectors { - element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, value) + element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value) } } @@ -103,9 +106,9 @@ func RecordScalableObjectLatency(namespace string, name string, isScaledObject b } // RecordScalerActive create a measurement of the activity of the scaler -func RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) { +func RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) { for _, element := range collectors { - element.RecordScalerActive(namespace, scaledObject, scaler, triggerIndex, metric, active) + element.RecordScalerActive(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, active) } } @@ -116,10 +119,10 @@ func RecordScaledObjectPaused(namespace string, scaledObject string, active bool } } -// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA -func RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) { +// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA +func RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) { for _, element := range collectors { - element.RecordScalerError(namespace, scaledObject, scaler, triggerIndex, metric, err) + element.RecordScalerError(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, err) } } @@ -130,6 +133,13 @@ func RecordScaledObjectError(namespace string, scaledObject string, err error) { } } +// RecordScaledJobError counts the number of errors with the scaled job +func RecordScaledJobError(namespace string, scaledJob string, err error) { + for _, element := range collectors { + element.RecordScaledJobError(namespace, scaledJob, err) + } +} + func IncrementTriggerTotal(triggerType string) { for _, element := range collectors { element.IncrementTriggerTotal(triggerType) diff --git a/pkg/metricscollector/opentelemetry.go b/pkg/metricscollector/opentelemetry.go index 0c4ff961634..ae3d00f1cb0 100644 --- a/pkg/metricscollector/opentelemetry.go +++ b/pkg/metricscollector/opentelemetry.go @@ -26,6 +26,7 @@ var ( meter api.Meter otScalerErrorsCounter api.Int64Counter otScaledObjectErrorsCounter api.Int64Counter + otScaledJobErrorsCounter api.Int64Counter otTriggerTotalsCounter api.Int64UpDownCounter otCrdTotalsCounter api.Int64UpDownCounter @@ -89,6 +90,11 @@ func initMeters() { otLog.Error(err, msg) } + otScaledJobErrorsCounter, err = meter.Int64Counter("keda.scaledjob.errors", api.WithDescription("Number of scaled job errors")) + if err != nil { + otLog.Error(err, msg) + } + otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers")) if err != nil { otLog.Error(err, msg) @@ -188,9 +194,9 @@ func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) err return nil } -func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) { +func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { otelScalerMetricVal.val = value - otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric) + otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) } func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error { @@ -202,9 +208,9 @@ func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) } // RecordScalerLatency create a measurement of the latency to external metric -func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) { +func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { otelScalerMetricsLatencyVal.val = value - otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric) + otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) } func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error { @@ -240,14 +246,14 @@ func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error { } // RecordScalerActive create a measurement of the activity of the scaler -func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) { +func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) { activeVal := -1 if active { activeVal = 1 } otelScalerActiveVal.val = float64(activeVal) - otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric) + otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject) } // RecordScaledObjectPaused marks whether the current ScaledObject is paused. @@ -276,11 +282,11 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st } } -// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA -func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) { +// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA +func (o *OtelMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) { if err != nil { - otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric)) - o.RecordScaledObjectError(namespace, scaledObject, err) + otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)) + o.RecordScaledObjectError(namespace, scaledResource, err) return } } @@ -296,6 +302,17 @@ func (o *OtelMetrics) RecordScaledObjectError(namespace string, scaledObject str } } +// RecordScaledJobError counts the number of errors with the scaled job +func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, err error) { + opt := api.WithAttributes( + attribute.Key("namespace").String(namespace), + attribute.Key("scaledJob").String(scaledJob)) + if err != nil { + otScaledJobErrorsCounter.Add(context.Background(), 1, opt) + return + } +} + func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) { if triggerType != "" { otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType))) @@ -332,10 +349,19 @@ func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) { otCrdTotalsCounter.Add(context.Background(), -1, opt) } -func getScalerMeasurementOption(namespace string, scaledObject string, scaler string, triggerIndex int, metric string) api.MeasurementOption { +func getScalerMeasurementOption(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool) api.MeasurementOption { + if isScaledObject { + return api.WithAttributes( + attribute.Key("namespace").String(namespace), + attribute.Key("scaledObject").String(scaledResource), + attribute.Key("scaler").String(scaler), + attribute.Key("scalerIndex").String(strconv.Itoa(triggerIndex)), + attribute.Key("metric").String(metric), + ) + } return api.WithAttributes( attribute.Key("namespace").String(namespace), - attribute.Key("scaledObject").String(scaledObject), + attribute.Key("scaledJob").String(scaledResource), attribute.Key("scaler").String(scaler), attribute.Key("triggerIndex").String(strconv.Itoa(triggerIndex)), attribute.Key("metric").String(metric), diff --git a/pkg/metricscollector/prommetrics.go b/pkg/metricscollector/prommetrics.go index 825bceed9a7..1e5e34e3be3 100644 --- a/pkg/metricscollector/prommetrics.go +++ b/pkg/metricscollector/prommetrics.go @@ -30,7 +30,7 @@ import ( var log = logf.Log.WithName("prometheus_server") var ( - metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "triggerIndex"} + metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "triggerIndex", "type"} buildInfo = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: DefaultPromMetricsNamespace, @@ -53,7 +53,7 @@ var ( Namespace: DefaultPromMetricsNamespace, Subsystem: "scaler", Name: "metrics_value", - Help: "Metric Value used for HPA", + Help: "Metric Value used for ScaledObject calculation", }, metricLabels, ) @@ -102,6 +102,15 @@ var ( }, []string{"namespace", "scaledObject"}, ) + scaledJobErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_job", + Name: "errors", + Help: "Number of scaled job errors", + }, + []string{"namespace", "scaledJob"}, + ) triggerTotalsGaugeVec = prometheus.NewGaugeVec( prometheus.GaugeOpts{ @@ -183,32 +192,28 @@ func RecordBuildInfo() { } // RecordScalerMetric create a measurement of the external metric used by the HPA -func (p *PromMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) { - scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(value) +func (p *PromMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { + scalerMetricsValue.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value) } // RecordScalerLatency create a measurement of the latency to external metric -func (p *PromMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) { - scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(value) +func (p *PromMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) { + scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value) } // RecordScalableObjectLatency create a measurement of the latency executing scalable object loop func (p *PromMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) { - resourceType := "scaledjob" - if isScaledObject { - resourceType = "scaledobject" - } - internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value) + internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value) } // RecordScalerActive create a measurement of the activity of the scaler -func (p *PromMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) { +func (p *PromMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) { activeVal := 0 if active { activeVal = 1 } - scalerActive.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(float64(activeVal)) + scalerActive.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(float64(activeVal)) } // RecordScaledObjectPaused marks whether the current ScaledObject is paused. @@ -223,16 +228,16 @@ func (p *PromMetrics) RecordScaledObjectPaused(namespace string, scaledObject st scaledObjectPaused.With(labels).Set(float64(activeVal)) } -// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA -func (p *PromMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) { +// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA +func (p *PromMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) { if err != nil { - scalerErrors.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Inc() - p.RecordScaledObjectError(namespace, scaledObject, err) + scalerErrors.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Inc() + p.RecordScaledObjectError(namespace, scaledResource, err) scalerErrorsTotal.With(prometheus.Labels{}).Inc() return } // initialize metric with 0 if not already set - _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)) + _, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)) if errscaler != nil { log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v") } @@ -253,8 +258,30 @@ func (p *PromMetrics) RecordScaledObjectError(namespace string, scaledObject str } } -func getLabels(namespace string, scaledObject string, scaler string, triggerIndex int, metric string) prometheus.Labels { - return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "triggerIndex": strconv.Itoa(triggerIndex), "metric": metric} +// RecordScaledJobError counts the number of errors with the scaled job +func (p *PromMetrics) RecordScaledJobError(namespace string, scaledJob string, err error) { + labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob} + if err != nil { + scaledJobErrors.With(labels).Inc() + return + } + // initialize metric with 0 if not already set + _, errscaledjob := scaledJobErrors.GetMetricWith(labels) + if errscaledjob != nil { + log.Error(err, "Unable to write to metrics to Prometheus Server: %v") + return + } +} + +func getLabels(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool) prometheus.Labels { + return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "triggerIndex": strconv.Itoa(triggerIndex), "metric": metric, "type": getResourceType(isScaledObject)} +} + +func getResourceType(isScaledObject bool) string { + if isScaledObject { + return "scaledobject" + } + return "scaledjob" } func (p *PromMetrics) IncrementTriggerTotal(triggerType string) { diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 099408ddd2b..3f3f93e5db7 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -524,7 +524,7 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN var latency int64 metrics, _, latency, err = cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) if latency != -1 { - metricscollector.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, triggerName, triggerIndex, metricName, float64(latency)) + metricscollector.RecordScalerLatency(scaledObjectNamespace, scaledObject.Name, triggerName, triggerIndex, metricName, true, float64(latency)) } logger.V(1).Info("Getting metrics from trigger", "trigger", triggerName, "metricName", metricName, "metrics", metrics, "scalerError", err) } @@ -555,13 +555,13 @@ func (h *scaleHandler) GetScaledObjectMetrics(ctx context.Context, scaledObjectN } else { for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - metricscollector.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, metric.MetricName, metricValue) + metricscollector.RecordScalerMetric(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, metric.MetricName, true, metricValue) } } if fallbackActive { isFallbackActive = true } - metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, err) + metricscollector.RecordScalerError(scaledObjectNamespace, scaledObjectName, result.triggerName, result.triggerIndex, result.metricName, true, err) matchingMetrics = append(matchingMetrics, metrics...) } // invalidate the cache for the ScaledObject, if we hit an error in any scaler @@ -675,8 +675,8 @@ func (h *scaleHandler) getScaledObjectState(ctx context.Context, scaledObject *k for _, metric := range matchingMetrics { value := metric.Value.AsApproximateFloat64() - metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, value) - metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, value > activationValue) + metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, true, value) + metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, kedav1alpha1.CompositeMetricName, 0, metric.MetricName, true, value > activationValue) if !isScaledObjectActive { isScaledObjectActive = value > activationValue } @@ -741,7 +741,7 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, var latency int64 metrics, isMetricActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, triggerIndex, metricName) if latency != -1 { - metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, triggerName, triggerIndex, metricName, float64(latency)) + metricscollector.RecordScalerLatency(scaledObject.Namespace, scaledObject.Name, triggerName, triggerIndex, metricName, true, float64(latency)) } result.Metrics = append(result.Metrics, metrics...) logger.V(1).Info("Getting metrics and activity from scaler", "scaler", triggerName, "metricName", metricName, "metrics", metrics, "activity", isMetricActive, "scalerError", err) @@ -767,7 +767,7 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, result.IsActive = isMetricActive for _, metric := range metrics { metricValue := metric.Value.AsApproximateFloat64() - metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, triggerName, triggerIndex, metric.MetricName, metricValue) + metricscollector.RecordScalerMetric(scaledObject.Namespace, scaledObject.Name, triggerName, triggerIndex, metric.MetricName, true, metricValue) } if !scaledObject.IsUsingModifiers() { if isMetricActive { @@ -778,7 +778,7 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, logger.V(1).Info("Scaler for scaledObject is active", "scaler", triggerName, "metricName", spec.Resource.Name) } } - metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, triggerName, triggerIndex, metricName, isMetricActive) + metricscollector.RecordScalerActive(scaledObject.Namespace, scaledObject.Name, triggerName, triggerIndex, metricName, true, isMetricActive) } } @@ -797,46 +797,73 @@ func (*scaleHandler) getScalerState(ctx context.Context, scaler scalers.Scaler, // getScaledJobMetrics returns metrics for specified metric name for a ScaledJob identified by its name and namespace. // It could either query the metric value directly from the scaler or from a cache, that's being stored for the scaler. func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob) []scaledjob.ScalerMetrics { + logger := log.WithValues("scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) + cache, err := h.GetScalersCache(ctx, scaledJob) + metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) if err != nil { log.Error(err, "error getting scalers cache", "scaledJob.Namespace", scaledJob.Namespace, "scaledJob.Name", scaledJob.Name) return nil } var scalersMetrics []scaledjob.ScalerMetrics - scalers, _ := cache.GetScalers() - for i, s := range scalers { + scalers, scalerConfigs := cache.GetScalers() + for scalerIndex, scaler := range scalers { + scalerName := strings.Replace(fmt.Sprintf("%T", scalers[scalerIndex]), "*scalers.", "", 1) + if scalerConfigs[scalerIndex].TriggerName != "" { + scalerName = scalerConfigs[scalerIndex].TriggerName + } isActive := false - scalerType := fmt.Sprintf("%T:", s) + scalerType := fmt.Sprintf("%T:", scaler) scalerLogger := log.WithValues("ScaledJob", scaledJob.Name, "Scaler", scalerType) - metricSpecs := s.GetMetricSpecForScaling(ctx) + metricSpecs := scaler.GetMetricSpecForScaling(ctx) - // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) - // or skip cpu/memory resource scaler - if len(metricSpecs) < 1 || metricSpecs[0].External == nil { - continue - } + for _, spec := range metricSpecs { + // skip scaler that doesn't return any metric specs (usually External scaler with incorrect metadata) + // or skip cpu/memory resource scaler + if len(metricSpecs) < 1 || spec.External == nil { + continue + } + metricName := spec.External.Metric.Name + metrics, isTriggerActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) + if latency != -1 { + metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, float64(latency)) + } + if err != nil { + scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) + cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) + continue + } + if isTriggerActive { + isActive = true + } + queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount()) - metrics, isTriggerActive, _, err := cache.GetMetricsAndActivityForScaler(ctx, i, metricSpecs[0].External.Metric.Name) - if err != nil { - scalerLogger.V(1).Info("Error getting scaler metrics and activity, but continue", "error", err) - cache.Recorder.Event(scaledJob, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) - continue - } - if isTriggerActive { - isActive = true - } + scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) - queueLength, maxValue, targetAverageValue := scaledjob.CalculateQueueLengthAndMaxValue(metrics, metricSpecs, scaledJob.MaxReplicaCount()) + scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ + QueueLength: queueLength, + MaxValue: maxValue, + IsActive: isActive, + }) + for _, metric := range metrics { + metricValue := metric.Value.AsApproximateFloat64() + metricscollector.RecordScalerMetric(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metric.MetricName, false, metricValue) + } - scalerLogger.V(1).Info("Scaler Metric value", "isTriggerActive", isTriggerActive, metricSpecs[0].External.Metric.Name, queueLength, "targetAverageValue", targetAverageValue) + if isTriggerActive { + if spec.External != nil { + logger.V(1).Info("Scaler for scaledJob is active", "scaler", scalerName, "metricName", metricName) + } + if spec.Resource != nil { + logger.V(1).Info("Scaler for scaledJob is active", "scaler", scalerName, "metricName", spec.Resource.Name) + } + } - scalersMetrics = append(scalersMetrics, scaledjob.ScalerMetrics{ - QueueLength: queueLength, - MaxValue: maxValue, - IsActive: isActive, - }) + metricscollector.RecordScalerError(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, err) + metricscollector.RecordScalerActive(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, isTriggerActive) + } } return scalersMetrics } diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 1373c3c9471..57f459e7f3f 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -26,6 +26,7 @@ import ( const ( testName = "opentelemetry-metrics-test" labelScaledObject = "scaledObject" + labelScaledObject = "scaledJob" labelType = "type" labelCloudEventSource = "cloudEventSource" eventsink = "eventsink" @@ -39,7 +40,9 @@ var ( deploymentName = fmt.Sprintf("%s-deployment", testName) monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) - wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName) + wrongScaledObjectName = fmt.Sprintf("%s-so-wrong", testName) + scaledJobName = fmt.Sprintf("%s-sj", testName) + wrongScaledJobName = fmt.Sprintf("%s-sj-wrong", testName) wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName) cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName) clientName = fmt.Sprintf("%s-client", testName) @@ -60,7 +63,9 @@ type templateData struct { TestNamespace string DeploymentName string ScaledObjectName string + ScaledJobName string WrongScaledObjectName string + WrongScaledJobName string WrongScalerName string CronScaledJobName string MonitoredDeploymentName string @@ -164,6 +169,69 @@ spec: query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledObject="{{.WrongScaledObjectName}}"}' ` + scaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' +` + + wrongScaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.WrongScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 2 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: prometheus + name: {{.WrongScalerName}} + metadata: + serverAddress: http://keda-prometheus.keda.svc.cluster.local:8080 + metricName: keda_scaler_errors_total + threshold: '1' + query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledJob="{{.WrongScaledJobName}}"}' +` + cronScaledJobTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledJob @@ -379,6 +447,7 @@ func TestOpenTelemetryMetrics(t *testing.T) { testScalerMetricLatency(t) testScalerActiveMetric(t) testScaledObjectErrors(t, data) + testScaledJobErrors(t, data) testScalerErrors(t, data) testOperatorMetrics(t, kc, data) testScalableObjectMetrics(t) @@ -397,6 +466,8 @@ func getTemplateData() (templateData, []Template) { DeploymentName: deploymentName, ScaledObjectName: scaledObjectName, WrongScaledObjectName: wrongScaledObjectName, + ScaledJobName: scaledJobName, + WrongScaledJobName: wrongScaledJobName, WrongScalerName: wrongScalerName, MonitoredDeploymentName: monitoredDeploymentName, ClientName: clientName, @@ -410,6 +481,7 @@ func getTemplateData() (templateData, []Template) { {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "scaledJobTemplate", Config: scaledJobTemplate}, {Name: "clientTemplate", Config: clientTemplate}, {Name: "authenticatioNTemplate", Config: authenticationTemplate}, {Name: "cloudEventHTTPReceiverTemplate", Config: cloudEventHTTPReceiverTemplate}, @@ -442,7 +514,8 @@ func testScalerMetricValue(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(4), *metric.Gauge.Value) found = true } @@ -487,6 +560,39 @@ func testScaledObjectErrors(t *testing.T, data templateData) { time.Sleep(10 * time.Second) } +func testScaledJobErrors(t *testing.T, data templateData) { + t.Log("--- testing scaled job errors ---") + + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + + time.Sleep(20 * time.Second) + + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + val, ok := family["keda_scaledjob_errors"] + assert.True(t, ok, "keda_scaledjob_errors not available") + if ok { + errCounterVal1 := getErrorMetricsValue(val) + + // wait for 2 seconds as pollinginterval is 2 + time.Sleep(5 * time.Second) + + family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) + val, ok := family["keda_scaledjob_errors"] + assert.True(t, ok, "keda_scaledjob_errors not available") + if ok { + errCounterVal2 := getErrorMetricsValue(val) + assert.NotEqual(t, errCounterVal2, float64(0)) + assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1) + } + } + + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + // wait for 10 seconds to correctly fetch metrics. + time.Sleep(10 * time.Second) +} + func testScalerErrors(t *testing.T, data templateData) { t.Log("--- testing scaler errors ---") @@ -494,6 +600,9 @@ func testScalerErrors(t *testing.T, data templateData) { time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + time.Sleep(15 * time.Second) family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) @@ -515,6 +624,9 @@ func testScalerErrors(t *testing.T, data templateData) { } } + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) @@ -532,6 +644,16 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 { } } } + case "keda_scaled_job_errors": + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "scaledJob" && *label.Value == wrongScaledJobName { + return *metric.Counter.Value + } + } + } case "keda_scaler_errors_total": metrics := val.GetMetric() for _, metric := range metrics { @@ -559,7 +681,8 @@ func testScalerMetricLatency(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(0), *metric.Gauge.Value) found = true } @@ -619,7 +742,8 @@ func testScalerActiveMetric(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(1), *metric.Gauge.Value) found = true } diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index 96ffc87a540..40d9be402f9 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -26,6 +26,7 @@ import ( const ( testName = "prometheus-metrics-test" labelScaledObject = "scaledObject" + labelScaledJob = "scaledJob" labelType = "type" labelCloudEventSource = "cloudeventsource" eventsink = "eventsink" @@ -39,7 +40,9 @@ var ( deploymentName = fmt.Sprintf("%s-deployment", testName) monitoredDeploymentName = fmt.Sprintf("%s-monitored", testName) scaledObjectName = fmt.Sprintf("%s-so", testName) - wrongScaledObjectName = fmt.Sprintf("%s-wrong", testName) + wrongScaledObjectName = fmt.Sprintf("%s-so-wrong", testName) + scaledJobName = fmt.Sprintf("%s-sj", testName) + wrongScaledJobName = fmt.Sprintf("%s-sj-wrong", testName) wrongScalerName = fmt.Sprintf("%s-wrong-scaler", testName) cronScaledJobName = fmt.Sprintf("%s-cron-sj", testName) clientName = fmt.Sprintf("%s-client", testName) @@ -59,7 +62,9 @@ type templateData struct { TestNamespace string DeploymentName string ScaledObjectName string + ScaledJobName string WrongScaledObjectName string + WrongScaledJobName string WrongScalerName string CronScaledJobName string MonitoredDeploymentName string @@ -163,6 +168,68 @@ spec: query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledObject="{{.WrongScaledObjectName}}"}' ` + scaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.ScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 5 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'app={{.MonitoredDeploymentName}}' + value: '1' +` + + wrongScaledJobTemplate = ` +apiVersion: keda.sh/v1alpha1 +kind: ScaledJob +metadata: + name: {{.WrongScaledJobName}} + namespace: {{.TestNamespace}} +spec: + jobTargetRef: + template: + spec: + containers: + - name: external-executor + image: busybox + command: + - sleep + - "30" + imagePullPolicy: IfNotPresent + restartPolicy: Never + backoffLimit: 1 + pollingInterval: 2 + maxReplicaCount: 3 + successfulJobsHistoryLimit: 0 + failedJobsHistoryLimit: 0 + triggers: + - type: prometheus + name: {{.WrongScalerName}} + metadata: + serverAddress: http://keda-prometheus.keda.svc.cluster.local:8080 + metricName: keda_scaler_errors_total + threshold: '1' + query: 'keda_scaler_errors_total{namespace="{{.TestNamespace}}",scaledJob="{{.WrongScaledJobName}}"}' +` cronScaledJobTemplate = ` apiVersion: keda.sh/v1alpha1 kind: ScaledJob @@ -374,6 +441,7 @@ func TestPrometheusMetrics(t *testing.T) { testScalerMetricLatency(t) testScalerActiveMetric(t) testScaledObjectErrors(t, data) + testScaledJobErrors(t, data) testScalerErrors(t, data) testScalerErrorsTotal(t, data) testOperatorMetrics(t, kc, data) @@ -394,6 +462,8 @@ func getTemplateData() (templateData, []Template) { DeploymentName: deploymentName, ScaledObjectName: scaledObjectName, WrongScaledObjectName: wrongScaledObjectName, + ScaledJobName: scaledJobName, + WrongScaledJobName: wrongScaledJobName, WrongScalerName: wrongScalerName, MonitoredDeploymentName: monitoredDeploymentName, ClientName: clientName, @@ -407,6 +477,7 @@ func getTemplateData() (templateData, []Template) { {Name: "deploymentTemplate", Config: deploymentTemplate}, {Name: "monitoredDeploymentTemplate", Config: monitoredDeploymentTemplate}, {Name: "scaledObjectTemplate", Config: scaledObjectTemplate}, + {Name: "scaledJobTemplate", Config: scaledJobTemplate}, {Name: "clientTemplate", Config: clientTemplate}, {Name: "authenticatioNTemplate", Config: authenticationTemplate}, {Name: "cloudEventHTTPReceiverTemplate", Config: cloudEventHTTPReceiverTemplate}, @@ -440,7 +511,8 @@ func testScalerMetricValue(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(4), *metric.Gauge.Value) found = true } @@ -484,6 +556,38 @@ func testScaledObjectErrors(t *testing.T, data templateData) { KubectlApplyWithTemplate(t, data, "scaledObjectTemplate", scaledObjectTemplate) } +func testScaledJobErrors(t *testing.T, data templateData) { + t.Log("--- testing scaled job errors ---") + + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + + // wait for 2 seconds as pollinginterval is 2 + time.Sleep(20 * time.Second) + + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + if val, ok := family["keda_scaled_job_errors"]; ok { + errCounterVal1 := getErrorMetricsValue(val) + + // wait for 2 seconds as pollinginterval is 2 + time.Sleep(2 * time.Second) + + family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + if val, ok := family["keda_scaled_job_errors"]; ok { + errCounterVal2 := getErrorMetricsValue(val) + assert.NotEqual(t, errCounterVal2, float64(0)) + assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1) + } else { + t.Errorf("metric not available") + } + } else { + t.Errorf("metric not available") + } + + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) +} + func testScalerErrors(t *testing.T, data templateData) { t.Log("--- testing scaler errors ---") @@ -491,6 +595,9 @@ func testScalerErrors(t *testing.T, data templateData) { time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) + KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) val, ok := family["keda_scaler_errors"] assert.True(t, ok, "keda_scaler_errors not available") @@ -509,6 +616,8 @@ func testScalerErrors(t *testing.T, data templateData) { assert.GreaterOrEqual(t, errCounterVal2, errCounterVal1) } } + KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) time.Sleep(2 * time.Second) @@ -562,6 +671,16 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 { } } } + case "keda_scaled_job_errors": + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "scaledJob" && *label.Value == wrongScaledJobName { + return *metric.Counter.Value + } + } + } case "keda_scaler_errors": metrics := val.GetMetric() for _, metric := range metrics { @@ -614,7 +733,8 @@ func testScalerMetricLatency(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(0), *metric.Gauge.Value) found = true } @@ -674,7 +794,8 @@ func testScalerActiveMetric(t *testing.T) { for _, metric := range metrics { labels := metric.GetLabel() for _, label := range labels { - if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + if (*label.Name == labelScaledObject && *label.Value == scaledObjectName) || + (*label.Name == labelScaledJob && *label.Value == scaledJobName) { assert.Equal(t, float64(1), *metric.Gauge.Value) found = true } From 01888f42c588d712e61dc54d719ecbba73dbbea5 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Mon, 18 Dec 2023 15:37:41 +0900 Subject: [PATCH 02/11] Use assertion for the scaledjob e2e error Signed-off-by: Yoon Park --- .../opentelemetry_metrics/opentelemetry_metrics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 57f459e7f3f..ea549208918 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -578,8 +578,8 @@ func testScaledJobErrors(t *testing.T, data templateData) { time.Sleep(5 * time.Second) family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) - val, ok := family["keda_scaledjob_errors"] - assert.True(t, ok, "keda_scaledjob_errors not available") + val, ok := family["keda_scaled_job_errors"] + assert.True(t, ok, "keda_scaled_job_errors_total not available") if ok { errCounterVal2 := getErrorMetricsValue(val) assert.NotEqual(t, errCounterVal2, float64(0)) From e51f9cac701f1297084eaa02d3aa9284d71efd84 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Tue, 19 Dec 2023 01:28:37 +0900 Subject: [PATCH 03/11] Change to scaledjob for opentelemetry Signed-off-by: Yoon Park --- .../opentelemetry_metrics/opentelemetry_metrics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index ea549208918..5c7d9786bfe 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -569,8 +569,8 @@ func testScaledJobErrors(t *testing.T, data templateData) { time.Sleep(20 * time.Second) family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) - val, ok := family["keda_scaledjob_errors"] - assert.True(t, ok, "keda_scaledjob_errors not available") + val, ok := family["keda_scaled_job_errors"] + assert.True(t, ok, "keda_scaled_job_errors not available") if ok { errCounterVal1 := getErrorMetricsValue(val) From 55f667c4dff1a010eeb2141f0d0642eb5551dd98 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Fri, 12 Jan 2024 23:09:58 +0900 Subject: [PATCH 04/11] Resolve conflicts after rebase Signed-off-by: Yoon Park --- .../opentelemetry_metrics/opentelemetry_metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 5c7d9786bfe..692fb90973c 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -579,7 +579,7 @@ func testScaledJobErrors(t *testing.T, data templateData) { family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) val, ok := family["keda_scaled_job_errors"] - assert.True(t, ok, "keda_scaled_job_errors_total not available") + assert.True(t, ok, "keda_scaled_job_errors not available") if ok { errCounterVal2 := getErrorMetricsValue(val) assert.NotEqual(t, errCounterVal2, float64(0)) From ce7cce6a278e4a59926d4dc8eb967d877b341f8e Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Fri, 12 Jan 2024 23:13:29 +0900 Subject: [PATCH 05/11] Fix e2e test failing issue Signed-off-by: Yoon Park --- pkg/metricscollector/prommetrics.go | 1 + pkg/scaling/scale_handler.go | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/metricscollector/prommetrics.go b/pkg/metricscollector/prommetrics.go index 1e5e34e3be3..0b960adb782 100644 --- a/pkg/metricscollector/prommetrics.go +++ b/pkg/metricscollector/prommetrics.go @@ -174,6 +174,7 @@ func NewPromMetrics() *PromMetrics { metrics.Registry.MustRegister(scalerErrors) metrics.Registry.MustRegister(scaledObjectErrors) metrics.Registry.MustRegister(scaledObjectPaused) + metrics.Registry.MustRegister(scaledJobErrors) metrics.Registry.MustRegister(triggerTotalsGaugeVec) metrics.Registry.MustRegister(crdTotalsGaugeVec) diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 3f3f93e5db7..3c28fa830a9 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -827,6 +827,7 @@ func (h *scaleHandler) getScaledJobMetrics(ctx context.Context, scaledJob *kedav } metricName := spec.External.Metric.Name metrics, isTriggerActive, latency, err := cache.GetMetricsAndActivityForScaler(ctx, scalerIndex, metricName) + metricscollector.RecordScaledJobError(scaledJob.Namespace, scaledJob.Name, err) if latency != -1 { metricscollector.RecordScalerLatency(scaledJob.Namespace, scaledJob.Name, scalerName, scalerIndex, metricName, false, float64(latency)) } From f4a439ec48ef2ff4d260c8ec653345863babc413 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Fri, 12 Jan 2024 23:27:55 +0900 Subject: [PATCH 06/11] Fix a duplicated variable Signed-off-by: Yoon Park --- .../opentelemetry_metrics/opentelemetry_metrics_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 692fb90973c..2557c6bb77a 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -26,7 +26,7 @@ import ( const ( testName = "opentelemetry-metrics-test" labelScaledObject = "scaledObject" - labelScaledObject = "scaledJob" + labelScaledJob = "scaledJob" labelType = "type" labelCloudEventSource = "cloudEventSource" eventsink = "eventsink" From 43bc8e75ed9060d39f5815191a28bde1abd64558 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 13 Jan 2024 00:20:31 +0900 Subject: [PATCH 07/11] Delete unrelated line at CHANGELOG.md Signed-off-by: Yoon Park --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d105247ef73..7057d5f4c3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,8 +52,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### New - **General**: Introduce new AWS Authentication ([#4134](https://github.com/kedacore/keda/issues/4134)) -- - **Prometheus Metrics**: Expose prometheus metrics for ScaledJob resources ([#4798](https://github.com/kedacore/keda/issues/4798)) ->>>>>>> 83108fc2 (Expose prometheus metrics at ScaledJob like ScaledObject) +- **Prometheus Metrics**: Expose prometheus metrics for ScaledJob resources ([#4798](https://github.com/kedacore/keda/issues/4798)) #### Experimental From 51b323411abc97f08852609970315e8527d953bd Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 13 Jan 2024 00:27:01 +0900 Subject: [PATCH 08/11] Use metric name to keda_scaledjob_errors_total Signed-off-by: Yoon Park --- .../opentelemetry_metrics/opentelemetry_metrics_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 2557c6bb77a..4241b388416 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -569,8 +569,8 @@ func testScaledJobErrors(t *testing.T, data templateData) { time.Sleep(20 * time.Second) family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) - val, ok := family["keda_scaled_job_errors"] - assert.True(t, ok, "keda_scaled_job_errors not available") + val, ok := family["keda_scaledjob_errors_total"] + assert.True(t, ok, "keda_scaledjob_errors_total not available") if ok { errCounterVal1 := getErrorMetricsValue(val) @@ -578,8 +578,8 @@ func testScaledJobErrors(t *testing.T, data templateData) { time.Sleep(5 * time.Second) family = fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorCollectorPrometheusExportURL)) - val, ok := family["keda_scaled_job_errors"] - assert.True(t, ok, "keda_scaled_job_errors not available") + val, ok := family["keda_scaledjob_errors_total"] + assert.True(t, ok, "keda_scaledjob_errors_total not available") if ok { errCounterVal2 := getErrorMetricsValue(val) assert.NotEqual(t, errCounterVal2, float64(0)) From 44ffcae8061de7a47845afadb8de60d856fdf438 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sat, 13 Jan 2024 14:42:23 +0900 Subject: [PATCH 09/11] Add time.Sleep between creating and deleting wrong scaled resources Signed-off-by: Yoon Park --- .../opentelemetry_metrics/opentelemetry_metrics_test.go | 4 ++++ .../sequential/prometheus_metrics/prometheus_metrics_test.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index 4241b388416..fac9206f456 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -564,6 +564,7 @@ func testScaledJobErrors(t *testing.T, data templateData) { t.Log("--- testing scaled job errors ---") KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) time.Sleep(20 * time.Second) @@ -588,6 +589,7 @@ func testScaledJobErrors(t *testing.T, data templateData) { } KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) // wait for 10 seconds to correctly fetch metrics. time.Sleep(10 * time.Second) @@ -601,6 +603,7 @@ func testScalerErrors(t *testing.T, data templateData) { KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) time.Sleep(15 * time.Second) @@ -625,6 +628,7 @@ func testScalerErrors(t *testing.T, data templateData) { } KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index 40d9be402f9..e516f6b8391 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -560,6 +560,7 @@ func testScaledJobErrors(t *testing.T, data templateData) { t.Log("--- testing scaled job errors ---") KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) // wait for 2 seconds as pollinginterval is 2 @@ -585,6 +586,7 @@ func testScaledJobErrors(t *testing.T, data templateData) { } KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) } @@ -596,6 +598,7 @@ func testScalerErrors(t *testing.T, data templateData) { KubectlApplyWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) KubectlDeleteWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) @@ -617,6 +620,7 @@ func testScalerErrors(t *testing.T, data templateData) { } } KubectlDeleteWithTemplate(t, data, "wrongScaledJobTemplate", wrongScaledJobTemplate) + time.Sleep(2 * time.Second) KubectlApplyWithTemplate(t, data, "scaledJobTemplate", scaledJobTemplate) KubectlDeleteWithTemplate(t, data, "wrongScaledObjectTemplate", wrongScaledObjectTemplate) From 75476df9115d6071234c970e904722577a9460c6 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 14 Jan 2024 18:21:49 +0900 Subject: [PATCH 10/11] Add missed sacledjob error metric value case Signed-off-by: Yoon Park --- .../opentelemetry_metrics_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go index fac9206f456..5ce2c93f7ba 100644 --- a/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go +++ b/tests/sequential/opentelemetry_metrics/opentelemetry_metrics_test.go @@ -648,6 +648,16 @@ func getErrorMetricsValue(val *prommodel.MetricFamily) float64 { } } } + case "keda_scaledjob_errors_total": + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == "scaledJob" && *label.Value == wrongScaledJobName { + return *metric.Counter.Value + } + } + } case "keda_scaled_job_errors": metrics := val.GetMetric() for _, metric := range metrics { From 1f1d9b7ab0b00f5ac70ec19acda6a56f627fdb98 Mon Sep 17 00:00:00 2001 From: Yoon Park Date: Sun, 14 Jan 2024 19:05:20 +0900 Subject: [PATCH 11/11] Use original Help message after deciding not to use different label Signed-off-by: Yoon Park --- pkg/metricscollector/prommetrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/metricscollector/prommetrics.go b/pkg/metricscollector/prommetrics.go index 0b960adb782..6bbac8239be 100644 --- a/pkg/metricscollector/prommetrics.go +++ b/pkg/metricscollector/prommetrics.go @@ -53,7 +53,7 @@ var ( Namespace: DefaultPromMetricsNamespace, Subsystem: "scaler", Name: "metrics_value", - Help: "Metric Value used for ScaledObject calculation", + Help: "Metric Value used for HPA", }, metricLabels, )