diff --git a/CHANGELOG.md b/CHANGELOG.md index 127fc1c5eb0..72b81a7c0e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio ### Improvements - **General**: Metrics Adapter: remove deprecated Prometheus Metrics and non-gRPC code ([#3930](https://github.com/kedacore/keda/issues/3930)) +- **General**: Add a Promethean metric for measuring the processing loop lag ([#4702](https://github.com/kedacore/keda/issues/4702)) - **Azure Data Exporer Scaler**: Use azidentity SDK ([#4489](https://github.com/kedacore/keda/issues/4489)) - **External Scaler**: Add tls options in TriggerAuth metadata. ([#3565](https://github.com/kedacore/keda/issues/3565)) - **GCP PubSub Scaler**: Make it more flexible for metrics ([#4243](https://github.com/kedacore/keda/issues/4243)) diff --git a/pkg/prommetrics/prommetrics.go b/pkg/prommetrics/prommetrics.go index 613d29aaa4c..2f0861e3057 100644 --- a/pkg/prommetrics/prommetrics.go +++ b/pkg/prommetrics/prommetrics.go @@ -109,12 +109,33 @@ var ( }, []string{"type", "namespace"}, ) + + scaledObjectLatency = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_object", + Name: "latency", + Help: "ScaledObject Latency", + }, + []string{"namespace", "scaledObject"}, + ) + + scaledJobLatency = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: DefaultPromMetricsNamespace, + Subsystem: "scaled_job", + Name: "latency", + Help: "ScaledJob Latency", + }, + []string{"namespace", "scaledJob"}, + ) ) func init() { metrics.Registry.MustRegister(scalerErrorsTotal) metrics.Registry.MustRegister(scalerMetricsValue) metrics.Registry.MustRegister(scalerMetricsLatency) + metrics.Registry.MustRegister(scaledObjectLatency) metrics.Registry.MustRegister(scalerActive) metrics.Registry.MustRegister(scalerErrors) metrics.Registry.MustRegister(scaledObjectErrors) @@ -133,6 +154,15 @@ func RecordScalerLatency(namespace string, scaledObject string, scaler string, s scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value) } +// RecordScaledObjectLatency create a measurement of the latency executing scalable object loop +func RecordScalableObjectLatency(namespace string, scalableObject string, isScaledObject bool, value float64) { + if isScaledObject { + scaledObjectLatency.WithLabelValues(namespace, scalableObject).Set(value) + } else { + scaledJobLatency.WithLabelValues(namespace, scalableObject).Set(value) + } +} + // RecordScalerActive create a measurement of the activity of the scaler func RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) { activeVal := 0 diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 031155b1384..b7d28962bf8 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -118,10 +118,10 @@ func (h *scaleHandler) HandleScalableObject(ctx context.Context, scalableObject switch obj := scalableObject.(type) { case *kedav1alpha1.ScaledObject: go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex) - go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex) + go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex, true) case *kedav1alpha1.ScaledJob: go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex) - go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex) + go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex, false) } return nil } @@ -155,16 +155,26 @@ func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject } // startScaleLoop blocks forever and checks the scalableObject based on its pollingInterval -func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) { +func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker, isScaledObject bool) { logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name) pollingInterval := withTriggers.GetPollingInterval() logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval) + next := time.Now() + for { + // we calculate the next execution time based on the pollingInterval and record the difference + // between the expected execution time and the real execution time + delay := time.Now().Sub(next) + miliseconds := delay.Milliseconds() + prommetrics.RecordScalableObjectLatency(withTriggers.Namespace, withTriggers.Name, isScaledObject, float64(miliseconds)) + next = time.Now().Add(pollingInterval) + tmr := time.NewTimer(pollingInterval) h.checkScalers(ctx, scalableObject, scalingMutex) + time.Sleep(1300 * time.Millisecond) select { case <-tmr.C: tmr.Stop() diff --git a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go index c3637e5186d..add798df7a8 100644 --- a/tests/sequential/prometheus_metrics/prometheus_metrics_test.go +++ b/tests/sequential/prometheus_metrics/prometheus_metrics_test.go @@ -264,6 +264,7 @@ func TestPrometheusMetrics(t *testing.T) { testScalerErrorsTotal(t, data) testOperatorMetrics(t, kc, data) testWebhookMetrics(t, data) + testScalableObjectMetrics(t) // cleanup DeleteKubernetesResources(t, testNamespace, data, templates) @@ -466,6 +467,44 @@ func testScalerMetricLatency(t *testing.T) { } } +func testScalableObjectMetrics(t *testing.T) { + t.Log("--- testing scalable objects latency ---") + + family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL)) + + if val, ok := family["keda_scaled_object_latency"]; ok { + var found bool + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == labelScaledObject && *label.Value == scaledObjectName { + found = true + } + } + } + assert.Equal(t, true, found) + } else { + t.Errorf("metric not available") + } + + if val, ok := family["keda_scaled_job_latency"]; ok { + var found bool + metrics := val.GetMetric() + for _, metric := range metrics { + labels := metric.GetLabel() + for _, label := range labels { + if *label.Name == labelScaledObject && *label.Value == cronScaledJobName { + found = true + } + } + } + assert.Equal(t, true, found) + } else { + t.Errorf("metric not available") + } +} + func testScalerActiveMetric(t *testing.T) { t.Log("--- testing scaler active metric ---")