Skip to content

Commit

Permalink
Expose prometheus metrics for ScaledJob resources (#4913)
Browse files Browse the repository at this point in the history
* Expose prometheus metrics at ScaledJob like ScaledObject

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Use assertion for the scaledjob e2e error

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Change to scaledjob for opentelemetry

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Resolve conflicts after rebase

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Fix e2e test failing issue

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Fix a duplicated variable

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Delete unrelated line at CHANGELOG.md

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Use metric name to keda_scaledjob_errors_total

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Add time.Sleep between creating and deleting wrong scaled resources

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Add missed sacledjob error metric value case

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

* Use original Help message after deciding not to use different label

Signed-off-by: Yoon Park <yoongon.park@gmail.com>

---------

Signed-off-by: Yoon Park <yoongon.park@gmail.com>
  • Loading branch information
yoongon committed Jan 15, 2024
1 parent 3e12e20 commit b220384
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ New deprecation(s):

- **General**: Adds support for GCP Secret Manager as a source for TriggerAuthentication ([#4831](https://github.com/kedacore/keda/issues/4831))
- **General**: Introduce new AWS Authentication ([#4134](https://github.com/kedacore/keda/issues/4134))
- **Prometheus Metrics**: Expose prometheus metrics for ScaledJob resources ([#4798](https://github.com/kedacore/keda/issues/4798))

#### Experimental

Expand Down
38 changes: 24 additions & 14 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,29 @@ var (
)

type MetricsCollector interface {
RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64)
RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)

// RecordScalerLatency create a measurement of the latency to external metric
RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64)
RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64)

// RecordScalerActive create a measurement of the activity of the scaler
RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool)
RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool)

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
RecordScaledObjectPaused(namespace string, scaledObject string, active bool)

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error)
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error)

// RecordScaledObjectError counts the number of errors with the scaled object
RecordScaledObjectError(namespace string, scaledObject string, err error)

// RecordScaledJobError counts the number of errors with the scaled job
RecordScaledJobError(namespace string, scaledJob string, err error)

IncrementTriggerTotal(triggerType string)

DecrementTriggerTotal(triggerType string)
Expand Down Expand Up @@ -82,16 +85,16 @@ func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetri
}

// RecordScalerMetric create a measurement of the external metric used by the HPA
func RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
for _, element := range collectors {
element.RecordScalerMetric(namespace, scaledObject, scaler, triggerIndex, metric, value)
element.RecordScalerMetric(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value)
}
}

// RecordScalerLatency create a measurement of the latency to external metric
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
for _, element := range collectors {
element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, value)
element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value)
}
}

Expand All @@ -103,9 +106,9 @@ func RecordScalableObjectLatency(namespace string, name string, isScaledObject b
}

// RecordScalerActive create a measurement of the activity of the scaler
func RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) {
func RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
for _, element := range collectors {
element.RecordScalerActive(namespace, scaledObject, scaler, triggerIndex, metric, active)
element.RecordScalerActive(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, active)
}
}

Expand All @@ -116,10 +119,10 @@ func RecordScaledObjectPaused(namespace string, scaledObject string, active bool
}
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) {
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) {
for _, element := range collectors {
element.RecordScalerError(namespace, scaledObject, scaler, triggerIndex, metric, err)
element.RecordScalerError(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, err)
}
}

Expand All @@ -130,6 +133,13 @@ func RecordScaledObjectError(namespace string, scaledObject string, err error) {
}
}

// RecordScaledJobError counts the number of errors with the scaled job
func RecordScaledJobError(namespace string, scaledJob string, err error) {
for _, element := range collectors {
element.RecordScaledJobError(namespace, scaledJob, err)
}
}

func IncrementTriggerTotal(triggerType string) {
for _, element := range collectors {
element.IncrementTriggerTotal(triggerType)
Expand Down
50 changes: 38 additions & 12 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
meter api.Meter
otScalerErrorsCounter api.Int64Counter
otScaledObjectErrorsCounter api.Int64Counter
otScaledJobErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter

Expand Down Expand Up @@ -89,6 +90,11 @@ func initMeters() {
otLog.Error(err, msg)
}

otScaledJobErrorsCounter, err = meter.Int64Counter("keda.scaledjob.errors", api.WithDescription("Number of scaled job errors"))
if err != nil {
otLog.Error(err, msg)
}

otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers"))
if err != nil {
otLog.Error(err, msg)
Expand Down Expand Up @@ -188,9 +194,9 @@ func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) err
return nil
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricVal.val = value
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric)
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand All @@ -202,9 +208,9 @@ func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer)
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricsLatencyVal.val = value
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric)
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand Down Expand Up @@ -240,14 +246,14 @@ func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
}

// RecordScalerActive create a measurement of the activity of the scaler
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) {
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := -1
if active {
activeVal = 1
}

otelScalerActiveVal.val = float64(activeVal)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand Down Expand Up @@ -276,11 +282,11 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st
}
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) {
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func (o *OtelMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) {
if err != nil {
otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric))
o.RecordScaledObjectError(namespace, scaledObject, err)
otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject))
o.RecordScaledObjectError(namespace, scaledResource, err)
return
}
}
Expand All @@ -296,6 +302,17 @@ func (o *OtelMetrics) RecordScaledObjectError(namespace string, scaledObject str
}
}

// RecordScaledJobError counts the number of errors with the scaled job
func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, err error) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledJob").String(scaledJob))
if err != nil {
otScaledJobErrorsCounter.Add(context.Background(), 1, opt)
return
}
}

func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
Expand Down Expand Up @@ -332,10 +349,19 @@ func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) {
otCrdTotalsCounter.Add(context.Background(), -1, opt)
}

func getScalerMeasurementOption(namespace string, scaledObject string, scaler string, triggerIndex int, metric string) api.MeasurementOption {
func getScalerMeasurementOption(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool) api.MeasurementOption {
if isScaledObject {
return api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledObject").String(scaledResource),
attribute.Key("scaler").String(scaler),
attribute.Key("scalerIndex").String(strconv.Itoa(triggerIndex)),
attribute.Key("metric").String(metric),
)
}
return api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledObject").String(scaledObject),
attribute.Key("scaledJob").String(scaledResource),
attribute.Key("scaler").String(scaler),
attribute.Key("triggerIndex").String(strconv.Itoa(triggerIndex)),
attribute.Key("metric").String(metric),
Expand Down
66 changes: 47 additions & 19 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
var log = logf.Log.WithName("prometheus_server")

var (
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "triggerIndex"}
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "triggerIndex", "type"}
buildInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Expand Down Expand Up @@ -102,6 +102,15 @@ var (
},
[]string{"namespace", "scaledObject"},
)
scaledJobErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled_job",
Name: "errors",
Help: "Number of scaled job errors",
},
[]string{"namespace", "scaledJob"},
)

triggerTotalsGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -165,6 +174,7 @@ func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)
metrics.Registry.MustRegister(scaledObjectPaused)
metrics.Registry.MustRegister(scaledJobErrors)

metrics.Registry.MustRegister(triggerTotalsGaugeVec)
metrics.Registry.MustRegister(crdTotalsGaugeVec)
Expand All @@ -183,32 +193,28 @@ func RecordBuildInfo() {
}

// RecordScalerMetric create a measurement of the external metric used by the HPA
func (p *PromMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(value)
func (p *PromMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value)
}

// RecordScalerLatency create a measurement of the latency to external metric
func (p *PromMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(value)
func (p *PromMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value)
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
func (p *PromMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
resourceType := "scaledjob"
if isScaledObject {
resourceType = "scaledobject"
}
internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value)
internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value)
}

// RecordScalerActive create a measurement of the activity of the scaler
func (p *PromMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) {
func (p *PromMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := 0
if active {
activeVal = 1
}

scalerActive.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(float64(activeVal))
scalerActive.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(float64(activeVal))
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand All @@ -223,16 +229,16 @@ func (p *PromMetrics) RecordScaledObjectPaused(namespace string, scaledObject st
scaledObjectPaused.With(labels).Set(float64(activeVal))
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (p *PromMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) {
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func (p *PromMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) {
if err != nil {
scalerErrors.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Inc()
p.RecordScaledObjectError(namespace, scaledObject, err)
scalerErrors.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Inc()
p.RecordScaledObjectError(namespace, scaledResource, err)
scalerErrorsTotal.With(prometheus.Labels{}).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, triggerIndex, metric))
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject))
if errscaler != nil {
log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v")
}
Expand All @@ -253,8 +259,30 @@ func (p *PromMetrics) RecordScaledObjectError(namespace string, scaledObject str
}
}

func getLabels(namespace string, scaledObject string, scaler string, triggerIndex int, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "triggerIndex": strconv.Itoa(triggerIndex), "metric": metric}
// RecordScaledJobError counts the number of errors with the scaled job
func (p *PromMetrics) RecordScaledJobError(namespace string, scaledJob string, err error) {
labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob}
if err != nil {
scaledJobErrors.With(labels).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaledjob := scaledJobErrors.GetMetricWith(labels)
if errscaledjob != nil {
log.Error(err, "Unable to write to metrics to Prometheus Server: %v")
return
}
}

func getLabels(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "triggerIndex": strconv.Itoa(triggerIndex), "metric": metric, "type": getResourceType(isScaledObject)}
}

func getResourceType(isScaledObject bool) string {
if isScaledObject {
return "scaledobject"
}
return "scaledjob"
}

func (p *PromMetrics) IncrementTriggerTotal(triggerType string) {
Expand Down
Loading

0 comments on commit b220384

Please sign in to comment.