Skip to content

Commit

Permalink
Improve the implementation of OpenTelemetry integration (kedacore#5116)
Browse files Browse the repository at this point in the history
Signed-off-by: SpiritZhou <iammrzhouzhenghan@gmail.com>
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
Co-authored-by: Tom Kerkhove <kerkhove.tom@gmail.com>
Co-authored-by: Zbynek Roubalik <zroubalik@gmail.com>
Signed-off-by: anton.lysina <alysina@gmail.com>
  • Loading branch information
3 people authored and toniiiik committed Jan 15, 2024
1 parent 78c8a21 commit 5c0baa0
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ New deprecation(s):

- **General**: Fix CVE-2023-45142 in Opentelemetry ([#5089](https://github.com/kedacore/keda/issues/5089))
- **General**: Fix logger in Opentelemetry collector ([#5094](https://github.com/kedacore/keda/issues/5094))
- **General**: Reduce amount of gauge creations for OpenTelemetry metrics ([#5101](https://github.com/kedacore/keda/issues/5101))
- **General**: Support profiling for KEDA components ([#4789](https://github.com/kedacore/keda/issues/4789))

## v2.12.0
Expand Down
186 changes: 119 additions & 67 deletions pkg/metricscollector/opentelemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,28 @@ var (
otScaledObjectErrorsCounter api.Int64Counter
otTriggerTotalsCounter api.Int64UpDownCounter
otCrdTotalsCounter api.Int64UpDownCounter

otelScalerMetricVal OtelMetricFloat64Val
otelScalerMetricsLatencyVal OtelMetricFloat64Val
otelInternalLoopLatencyVal OtelMetricFloat64Val
otelBuildInfoVal OtelMetricInt64Val

otelScalerActiveVal OtelMetricFloat64Val
)

type OtelMetrics struct {
}

type OtelMetricInt64Val struct {
val int64
measurementOption api.MeasurementOption
}

type OtelMetricFloat64Val struct {
val float64
measurementOption api.MeasurementOption
}

func NewOtelMetrics(options ...metric.Option) *OtelMetrics {
// create default options with env
if options == nil {
Expand All @@ -48,14 +65,14 @@ func NewOtelMetrics(options ...metric.Option) *OtelMetrics {
otel.SetMeterProvider(meterProvider)

meter = meterProvider.Meter(meterName)
initCounter()
initMeters()

otel := &OtelMetrics{}
otel.RecordBuildInfo()
return otel
}

func initCounter() {
func initMeters() {
var err error
msg := "create opentelemetry counter failed"

Expand All @@ -78,37 +95,107 @@ func initCounter() {
if err != nil {
otLog.Error(err, msg)
}
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.value",
api.WithDescription("Metric Value used for HPA"),
api.WithFloat64Callback(cback),
api.WithFloat64Callback(ScalerMetricValueCallback),
)
if err != nil {
otLog.Error(err, "failed to register scaler metrics value", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
otLog.Error(err, msg)
}
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
_, err = meter.Float64ObservableGauge(
"keda.scaler.metrics.latency",
api.WithDescription("Scaler Metrics Latency"),
api.WithFloat64Callback(cback),
api.WithFloat64Callback(ScalerMetricsLatencyCallback),
)
if err != nil {
otLog.Error(err, "failed to register scaler metrics latency", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithFloat64Callback(ScalableObjectLatencyCallback),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithFloat64Callback(ScalerActiveCallback),
)
if err != nil {
otLog.Error(err, msg)
}

_, err = meter.Int64ObservableGauge(
"keda.build.info",
api.WithDescription("A metric with a constant '1' value labeled by version, git_commit and goversion from which KEDA was built."),
api.WithInt64Callback(BuildInfoCallback),
)
if err != nil {
otLog.Error(err, msg)
}
}

func BuildInfoCallback(_ context.Context, obsrv api.Int64Observer) error {
if otelBuildInfoVal.measurementOption != nil {
obsrv.Observe(otelBuildInfoVal.val, otelBuildInfoVal.measurementOption)
}
otelBuildInfoVal = OtelMetricInt64Val{}
return nil
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func (o *OtelMetrics) RecordBuildInfo() {
opt := api.WithAttributes(
attribute.Key("version").String(version.Version),
attribute.Key("git_commit").String(version.GitCommit),
attribute.Key("goversion").String(runtime.Version()),
attribute.Key("goos").String(runtime.GOOS),
attribute.Key("goarch").String(runtime.GOARCH),
)
otelBuildInfoVal.val = 1
otelBuildInfoVal.measurementOption = opt
}

func ScalerMetricValueCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricVal.val, otelScalerMetricVal.measurementOption)
}
otelScalerMetricVal = OtelMetricFloat64Val{}
return nil
}

func (o *OtelMetrics) RecordScalerMetric(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
otelScalerMetricVal.val = value
otelScalerMetricVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

func ScalerMetricsLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerMetricsLatencyVal.measurementOption != nil {
obsrv.Observe(otelScalerMetricsLatencyVal.val, otelScalerMetricsLatencyVal.measurementOption)
}
otelScalerMetricsLatencyVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalerLatency create a measurement of the latency to external metric
func (o *OtelMetrics) RecordScalerLatency(namespace string, scaledObject string, scaler string, scalerIndex int, metric string, value float64) {
otelScalerMetricsLatencyVal.val = value
otelScalerMetricsLatencyVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

func ScalableObjectLatencyCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelInternalLoopLatencyVal.measurementOption != nil {
obsrv.Observe(otelInternalLoopLatencyVal.val, otelInternalLoopLatencyVal.measurementOption)
}
otelInternalLoopLatencyVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalableObjectLatency create a measurement of the latency executing scalable object loop
Expand All @@ -123,18 +210,16 @@ func (o *OtelMetrics) RecordScalableObjectLatency(namespace string, name string,
attribute.Key("type").String(resourceType),
attribute.Key("name").String(name))

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(value, opt)
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.internal.scale.loop.latency",
api.WithDescription("Internal latency of ScaledObject/ScaledJob loop execution"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register internal scale loop latency", "namespace", namespace, resourceType, name)
otelInternalLoopLatencyVal.val = value
otelInternalLoopLatencyVal.measurementOption = opt
}

func ScalerActiveCallback(_ context.Context, obsrv api.Float64Observer) error {
if otelScalerActiveVal.measurementOption != nil {
obsrv.Observe(otelScalerActiveVal.val, otelScalerActiveVal.measurementOption)
}
otelScalerActiveVal = OtelMetricFloat64Val{}
return nil
}

// RecordScalerActive create a measurement of the activity of the scaler
Expand All @@ -144,18 +229,8 @@ func (o *OtelMetrics) RecordScalerActive(namespace string, scaledObject string,
activeVal = 1
}

cback := func(ctx context.Context, obsrv api.Float64Observer) error {
obsrv.Observe(float64(activeVal), getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric))
return nil
}
_, err := meter.Float64ObservableGauge(
"keda.scaler.active",
api.WithDescription("Activity of a Scaler Metric"),
api.WithFloat64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register scaler activity", "namespace", namespace, "scaledObject", scaledObject, "scaler", scaler, "scalerIndex", scalerIndex, "metric", metric)
}
otelScalerActiveVal.val = float64(activeVal)
otelScalerActiveVal.measurementOption = getScalerMeasurementOption(namespace, scaledObject, scaler, scalerIndex, metric)
}

// RecordScaledObjectPaused marks whether the current ScaledObject is paused.
Expand Down Expand Up @@ -204,29 +279,6 @@ func (o *OtelMetrics) RecordScaledObjectError(namespace string, scaledObject str
}
}

// RecordBuildInfo publishes information about KEDA version and runtime info through an info metric (gauge).
func (o *OtelMetrics) RecordBuildInfo() {
opt := api.WithAttributes(
attribute.Key("version").String(version.Version),
attribute.Key("git_commit").String(version.GitCommit),
attribute.Key("goversion").String(runtime.Version()),
attribute.Key("goos").String(runtime.GOOS),
attribute.Key("goarch").String(runtime.GOARCH),
)
cback := func(ctx context.Context, obsrv api.Int64Observer) error {
obsrv.Observe(1, opt)
return nil
}
_, err := meter.Int64ObservableGauge(
"keda.build.info",
api.WithDescription("A metric with a constant '1' value labeled by version, git_commit and goversion from which KEDA was built."),
api.WithInt64Callback(cback),
)
if err != nil {
otLog.Error(err, "failed to register build info")
}
}

func (o *OtelMetrics) IncrementTriggerTotal(triggerType string) {
if triggerType != "" {
otTriggerTotalsCounter.Add(context.Background(), 1, api.WithAttributes(attribute.Key("type").String(triggerType)))
Expand Down

0 comments on commit 5c0baa0

Please sign in to comment.