Skip to content

Commit

Permalink
feat: add prometheus metric
Browse files Browse the repository at this point in the history
Signed-off-by: Jorge Turrado <jorge_turrado@hotmail.es>
  • Loading branch information
JorTurFer committed Jun 16, 2023
1 parent d917094 commit 1eaee4b
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### Improvements

- **General**: Metrics Adapter: remove deprecated Prometheus Metrics and non-gRPC code ([#3930](https://github.com/kedacore/keda/issues/3930))
- **General**: Add a Promethean metric for measuring the processing loop lag ([#4702](https://github.com/kedacore/keda/issues/4702))
- **Azure Data Exporer Scaler**: Use azidentity SDK ([#4489](https://github.com/kedacore/keda/issues/4489))
- **External Scaler**: Add tls options in TriggerAuth metadata. ([#3565](https://github.com/kedacore/keda/issues/3565))
- **GCP PubSub Scaler**: Make it more flexible for metrics ([#4243](https://github.com/kedacore/keda/issues/4243))
Expand Down
30 changes: 30 additions & 0 deletions pkg/prommetrics/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,33 @@ var (
},
[]string{"type", "namespace"},
)

scaledObjectLatency = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled_object",
Name: "latency",
Help: "ScaledObject Latency",
},
[]string{"namespace", "scaledObject"},
)

scaledJobLatency = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled_job",
Name: "latency",
Help: "ScaledJob Latency",
},
[]string{"namespace", "scaledJob"},
)
)

func init() {
metrics.Registry.MustRegister(scalerErrorsTotal)
metrics.Registry.MustRegister(scalerMetricsValue)
metrics.Registry.MustRegister(scalerMetricsLatency)
metrics.Registry.MustRegister(scaledObjectLatency)
metrics.Registry.MustRegister(scalerActive)
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)
Expand All @@ -133,6 +154,15 @@ func RecordScalerLatency(namespace string, scaledObject string, scaler string, s
scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, scalerIndex, metric)).Set(value)
}

// RecordScaledObjectLatency create a measurement of the latency executing scalable object loop
func RecordScalableObjectLatency(namespace string, scalableObject string, isScaledObject bool, value float64) {
if isScaledObject {
scaledObjectLatency.WithLabelValues(namespace, scalableObject).Set(value)
} else {
scaledJobLatency.WithLabelValues(namespace, scalableObject).Set(value)
}
}

// RecordScalerActive create a measurement of the activity of the scaler
func RecordScalerActive(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, active bool) {
activeVal := 0
Expand Down
16 changes: 13 additions & 3 deletions pkg/scaling/scale_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ func (h *scaleHandler) HandleScalableObject(ctx context.Context, scalableObject
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex, true)
case *kedav1alpha1.ScaledJob:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex, false)
}
return nil
}
Expand Down Expand Up @@ -155,16 +155,26 @@ func (h *scaleHandler) DeleteScalableObject(ctx context.Context, scalableObject
}

// startScaleLoop blocks forever and checks the scalableObject based on its pollingInterval
func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker) {
func (h *scaleHandler) startScaleLoop(ctx context.Context, withTriggers *kedav1alpha1.WithTriggers, scalableObject interface{}, scalingMutex sync.Locker, isScaledObject bool) {
logger := log.WithValues("type", withTriggers.Kind, "namespace", withTriggers.Namespace, "name", withTriggers.Name)

pollingInterval := withTriggers.GetPollingInterval()
logger.V(1).Info("Watching with pollingInterval", "PollingInterval", pollingInterval)

next := time.Now()

for {
// we calculate the next execution time based on the pollingInterval and record the difference
// between the expected execution time and the real execution time
delay := time.Now().Sub(next)
miliseconds := delay.Milliseconds()
prommetrics.RecordScalableObjectLatency(withTriggers.Namespace, withTriggers.Name, isScaledObject, float64(miliseconds))
next = time.Now().Add(pollingInterval)

tmr := time.NewTimer(pollingInterval)
h.checkScalers(ctx, scalableObject, scalingMutex)

time.Sleep(1300 * time.Millisecond)
select {
case <-tmr.C:
tmr.Stop()
Expand Down
39 changes: 39 additions & 0 deletions tests/sequential/prometheus_metrics/prometheus_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func TestPrometheusMetrics(t *testing.T) {
testScalerErrorsTotal(t, data)
testOperatorMetrics(t, kc, data)
testWebhookMetrics(t, data)
testScalableObjectMetrics(t)

// cleanup
DeleteKubernetesResources(t, testNamespace, data, templates)
Expand Down Expand Up @@ -466,6 +467,44 @@ func testScalerMetricLatency(t *testing.T) {
}
}

func testScalableObjectMetrics(t *testing.T) {
t.Log("--- testing scalable objects latency ---")

family := fetchAndParsePrometheusMetrics(t, fmt.Sprintf("curl --insecure %s", kedaOperatorPrometheusURL))

if val, ok := family["keda_scaled_object_latency"]; ok {
var found bool
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == scaledObjectName {
found = true
}
}
}
assert.Equal(t, true, found)
} else {
t.Errorf("metric not available")
}

if val, ok := family["keda_scaled_job_latency"]; ok {
var found bool
metrics := val.GetMetric()
for _, metric := range metrics {
labels := metric.GetLabel()
for _, label := range labels {
if *label.Name == labelScaledObject && *label.Value == cronScaledJobName {
found = true
}
}
}
assert.Equal(t, true, found)
} else {
t.Errorf("metric not available")
}
}

func testScalerActiveMetric(t *testing.T) {
t.Log("--- testing scaler active metric ---")

Expand Down

0 comments on commit 1eaee4b

Please sign in to comment.