Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose prometheus metrics for ScaledJob resources #4913

Merged
merged 11 commits into from
Jan 15, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
### New

- **General**: Introduce new AWS Authentication ([#4134](https://github.com/kedacore/keda/issues/4134))
- **Prometheus Metrics**: Expose prometheus metrics for ScaledJob resources ([#4798](https://github.com/kedacore/keda/issues/4798))

#### Experimental

Expand Down
38 changes: 24 additions & 14 deletions pkg/metricscollector/metricscollectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,29 @@ var (
)

type MetricsCollector interface {
RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64)
RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)

// RecordScalerLatency create a measurement of the latency to external metric
RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64)
RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64)

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64)

// RecordScalerActive create a measurement of the activity of the scaler
RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool)
RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool)

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
RecordScaledObjectPaused(namespace string, scaledObject string, active bool)

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error)
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error)

// RecordScaledObjectError counts the number of errors with the scaled object
RecordScaledObjectError(namespace string, scaledObject string, err error)

// RecordScaledJobError counts the number of errors with the scaled job
RecordScaledJobError(namespace string, scaledJob string, err error)

IncrementTriggerTotal(triggerType string)

DecrementTriggerTotal(triggerType string)
Expand Down Expand Up @@ -82,16 +85,16 @@ func NewMetricsCollectors(enablePrometheusMetrics bool, enableOpenTelemetryMetri
}

// RecordScalerMetric create a measurement of the external metric used by the HPA
func RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
for _, element := range collectors {
element.RecordScalerMetric(namespace, scaledObject, scaler, triggerIndex, metric, value)
element.RecordScalerMetric(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value)
}
}

// RecordScalerLatency create a measurement of the latency to external metric
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
for _, element := range collectors {
element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, value)
element.RecordScalerLatency(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, value)
}
}

Expand All @@ -103,9 +106,9 @@ func RecordScalableObjectLatency(namespace string, name string, isScaledObject b
}

// RecordScalerActive create a measurement of the activity of the scaler
func RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) {
func RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
for _, element := range collectors {
element.RecordScalerActive(namespace, scaledObject, scaler, triggerIndex, metric, active)
element.RecordScalerActive(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, active)
}
}

Expand All @@ -116,10 +119,10 @@ func RecordScaledObjectPaused(namespace string, scaledObject string, active bool
}
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) {
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) {
for _, element := range collectors {
element.RecordScalerError(namespace, scaledObject, scaler, triggerIndex, metric, err)
element.RecordScalerError(namespace, scaledObject, scaler, triggerIndex, metric, isScaledObject, err)
}
}

Expand All @@ -130,6 +133,13 @@ func RecordScaledObjectError(namespace string, scaledObject string, err error) {
}
}

// RecordScaledJobError counts the number of errors with the scaled job
func RecordScaledJobError(namespace string, scaledJob string, err error) {
for _, element := range collectors {
element.RecordScaledJobError(namespace, scaledJob, err)
}
}

func IncrementTriggerTotal(triggerType string) {
for _, element := range collectors {
element.IncrementTriggerTotal(triggerType)
Expand Down
50 changes: 38 additions & 12 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
meter api.Meter
otScalerErrorsCounter api.Int64Counter
otScaledObjectErrorsCounter api.Int64Counter
otScaledJobErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter

Expand Down Expand Up @@ -89,6 +90,11 @@ func initMeters() {
otLog.Error(err, msg)
}

otScaledJobErrorsCounter, err = meter.Int64Counter("keda.scaledjob.errors", api.WithDescription("Number of scaled job errors"))
if err != nil {
otLog.Error(err, msg)
}

otTriggerTotalsCounter, err = meter.Int64UpDownCounter("keda.trigger.totals", api.WithDescription("Total triggers"))
if err != nil {
otLog.Error(err, msg)
Expand Down Expand Up @@ -188,9 +194,9 @@ func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) err
return nil
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricVal.val = value
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric)
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand All @@ -202,9 +208,9 @@ func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer)
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
otelScalerMetricsLatencyVal.val = value
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric)
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
Expand Down Expand Up @@ -240,14 +246,14 @@ func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
}

// RecordScalerActive create a measurement of the activity of the scaler
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) {
func (o *OtelMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := -1
if active {
activeVal = 1
}

otelScalerActiveVal.val = float64(activeVal)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand Down Expand Up @@ -276,11 +282,11 @@ func (o *OtelMetrics) RecordScaledObjectPaused(namespace string, scaledObject st
}
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (o *OtelMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) {
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func (o *OtelMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) {
if err != nil {
otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledObject, scaler, triggerIndex, metric))
o.RecordScaledObjectError(namespace, scaledObject, err)
otScalerErrorsCounter.Add(context.Background(), 1, getScalerMeasurementOption(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject))
o.RecordScaledObjectError(namespace, scaledResource, err)
return
}
}
Expand All @@ -296,6 +302,17 @@ func (o *OtelMetrics) RecordScaledObjectError(namespace string, scaledObject str
}
}

// RecordScaledJobError counts the number of errors with the scaled job
func (o *OtelMetrics) RecordScaledJobError(namespace string, scaledJob string, err error) {
opt := api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledJob").String(scaledJob))
if err != nil {
otScaledJobErrorsCounter.Add(context.Background(), 1, opt)
return
}
}

func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
Expand Down Expand Up @@ -332,10 +349,19 @@ func (o *OtelMetrics) DecrementCRDTotal(crdType, namespace string) {
otCrdTotalsCounter.Add(context.Background(), -1, opt)
}

func getScalerMeasurementOption(namespace string, scaledObject string, scaler string, triggerIndex int, metric string) api.MeasurementOption {
func getScalerMeasurementOption(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool) api.MeasurementOption {
if isScaledObject {
return api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledObject").String(scaledResource),
attribute.Key("scaler").String(scaler),
attribute.Key("scalerIndex").String(strconv.Itoa(triggerIndex)),
attribute.Key("metric").String(metric),
)
}
return api.WithAttributes(
attribute.Key("namespace").String(namespace),
attribute.Key("scaledObject").String(scaledObject),
attribute.Key("scaledJob").String(scaledResource),
attribute.Key("scaler").String(scaler),
attribute.Key("triggerIndex").String(strconv.Itoa(triggerIndex)),
attribute.Key("metric").String(metric),
Expand Down
66 changes: 47 additions & 19 deletions pkg/metricscollector/prommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
var log = logf.Log.WithName("prometheus_server")

var (
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "triggerIndex"}
metricLabels = []string{"namespace", "metric", "scaledObject", "scaler", "triggerIndex", "type"}
buildInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: DefaultPromMetricsNamespace,
Expand Down Expand Up @@ -102,6 +102,15 @@ var (
},
[]string{"namespace", "scaledObject"},
)
scaledJobErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: DefaultPromMetricsNamespace,
Subsystem: "scaled_job",
Name: "errors",
Help: "Number of scaled job errors",
},
[]string{"namespace", "scaledJob"},
)

triggerTotalsGaugeVec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -165,6 +174,7 @@ func NewPromMetrics() *PromMetrics {
metrics.Registry.MustRegister(scalerErrors)
metrics.Registry.MustRegister(scaledObjectErrors)
metrics.Registry.MustRegister(scaledObjectPaused)
metrics.Registry.MustRegister(scaledJobErrors)

metrics.Registry.MustRegister(triggerTotalsGaugeVec)
metrics.Registry.MustRegister(crdTotalsGaugeVec)
Expand All @@ -183,32 +193,28 @@ func RecordBuildInfo() {
}

// RecordScalerMetric create a measurement of the external metric used by the HPA
func (p *PromMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(value)
func (p *PromMetrics) RecordScalerMetric(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsValue.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value)
}

// RecordScalerLatency create a measurement of the latency to external metric
func (p *PromMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(value)
func (p *PromMetrics) RecordScalerLatency(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, value float64) {
scalerMetricsLatency.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(value)
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
func (p *PromMetrics) RecordScalableObjectLatency(namespace string, name string, isScaledObject bool, value float64) {
resourceType := "scaledjob"
if isScaledObject {
resourceType = "scaledobject"
}
internalLoopLatency.WithLabelValues(namespace, resourceType, name).Set(value)
internalLoopLatency.WithLabelValues(namespace, getResourceType(isScaledObject), name).Set(value)
}

// RecordScalerActive create a measurement of the activity of the scaler
func (p *PromMetrics) RecordScalerActive(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, active bool) {
func (p *PromMetrics) RecordScalerActive(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, active bool) {
activeVal := 0
if active {
activeVal = 1
}

scalerActive.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Set(float64(activeVal))
scalerActive.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Set(float64(activeVal))
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand All @@ -223,16 +229,16 @@ func (p *PromMetrics) RecordScaledObjectPaused(namespace string, scaledObject st
scaledObjectPaused.With(labels).Set(float64(activeVal))
}

// RecordScalerError counts the number of errors occurred in trying get an external metric used by the HPA
func (p *PromMetrics) RecordScalerError(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, err error) {
// RecordScalerError counts the number of errors occurred in trying to get an external metric used by the HPA
func (p *PromMetrics) RecordScalerError(namespace string, scaledResource string, scaler string, triggerIndex int, metric string, isScaledObject bool, err error) {
if err != nil {
scalerErrors.With(getLabels(namespace, scaledObject, scaler, triggerIndex, metric)).Inc()
p.RecordScaledObjectError(namespace, scaledObject, err)
scalerErrors.With(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject)).Inc()
p.RecordScaledObjectError(namespace, scaledResource, err)
scalerErrorsTotal.With(prometheus.Labels{}).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledObject, scaler, triggerIndex, metric))
_, errscaler := scalerErrors.GetMetricWith(getLabels(namespace, scaledResource, scaler, triggerIndex, metric, isScaledObject))
if errscaler != nil {
log.Error(errscaler, "Unable to write to metrics to Prometheus Server: %v")
}
Expand All @@ -253,8 +259,30 @@ func (p *PromMetrics) RecordScaledObjectError(namespace string, scaledObject str
}
}

func getLabels(namespace string, scaledObject string, scaler string, triggerIndex int, metric string) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "triggerIndex": strconv.Itoa(triggerIndex), "metric": metric}
// RecordScaledJobError counts the number of errors with the scaled job
func (p *PromMetrics) RecordScaledJobError(namespace string, scaledJob string, err error) {
labels := prometheus.Labels{"namespace": namespace, "scaledJob": scaledJob}
if err != nil {
scaledJobErrors.With(labels).Inc()
return
}
// initialize metric with 0 if not already set
_, errscaledjob := scaledJobErrors.GetMetricWith(labels)
if errscaledjob != nil {
log.Error(err, "Unable to write to metrics to Prometheus Server: %v")
return
}
}

func getLabels(namespace string, scaledObject string, scaler string, triggerIndex int, metric string, isScaledObject bool) prometheus.Labels {
return prometheus.Labels{"namespace": namespace, "scaledObject": scaledObject, "scaler": scaler, "triggerIndex": strconv.Itoa(triggerIndex), "metric": metric, "type": getResourceType(isScaledObject)}
}

func getResourceType(isScaledObject bool) string {
if isScaledObject {
return "scaledobject"
}
return "scaledjob"
}

func (p *PromMetrics) IncrementTriggerTotal(triggerType string) {
Expand Down
Loading
Loading