diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b1a7a0562..b06e2172b4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ Main (unreleased) - Update Public preview `remotecfg` to use `alloy-remote-config` instead of `agent-remote-config`. The API has been updated to use the term `collector` over `agent`. (@erikbaranowski) +### Enhancements + +- (_Public preview_) Add native histogram support to `otelcol.receiver.prometheus`. (@wildum) + ### Bugfixes - Fixed an issue with `prometheus.scrape` in which targets that move from one diff --git a/internal/cmd/integration-tests/tests/scrape-prom-metrics/config.alloy b/internal/cmd/integration-tests/tests/scrape-prom-metrics/config.alloy index cf546412a5..682b6ecc1d 100644 --- a/internal/cmd/integration-tests/tests/scrape-prom-metrics/config.alloy +++ b/internal/cmd/integration-tests/tests/scrape-prom-metrics/config.alloy @@ -2,7 +2,7 @@ prometheus.scrape "scrape_prom_metrics" { targets = [ {"__address__" = "localhost:9001"}, ] - forward_to = [prometheus.remote_write.scrape_prom_metrics.receiver] + forward_to = [prometheus.remote_write.scrape_prom_metrics.receiver, otelcol.receiver.prometheus.scrape_prom_metrics_to_otlp.receiver] scrape_classic_histograms = true enable_protobuf_negotiation = true scrape_interval = "1s" @@ -24,3 +24,31 @@ prometheus.remote_write "scrape_prom_metrics" { test_name = "scrape_prom_metrics", } } + +otelcol.receiver.prometheus "scrape_prom_metrics_to_otlp" { + output { + metrics = [otelcol.processor.attributes.scrape_prom_metrics_to_otlp.input] + } +} + +otelcol.processor.attributes "scrape_prom_metrics_to_otlp" { + action { + key = "test_name" + value = "scrape_prom_metrics_to_otlp" + action = "insert" + } + + output { + metrics = [otelcol.exporter.otlphttp.scrape_prom_metrics_to_otlp.input] + } +} + +otelcol.exporter.otlphttp "scrape_prom_metrics_to_otlp" { + client { + endpoint = "http://localhost:9009/otlp" + tls { + insecure = true + insecure_skip_verify = true + } + } +} \ No newline at end of file diff --git a/internal/cmd/integration-tests/tests/scrape-prom-metrics/scrape_prom_metrics_to_otlp_test.go b/internal/cmd/integration-tests/tests/scrape-prom-metrics/scrape_prom_metrics_to_otlp_test.go new file mode 100644 index 0000000000..13c069810d --- /dev/null +++ b/internal/cmd/integration-tests/tests/scrape-prom-metrics/scrape_prom_metrics_to_otlp_test.go @@ -0,0 +1,13 @@ +//go:build !windows + +package main + +import ( + "testing" + + "github.com/grafana/alloy/internal/cmd/integration-tests/common" +) + +func TestScrapePromMetricsToOtlp(t *testing.T) { + common.MimirMetricsTest(t, common.PromDefaultMetrics, common.PromDefaultHistogramMetric, "scrape_prom_metrics_to_otlp") +} diff --git a/internal/cmd/integration-tests/utils.go b/internal/cmd/integration-tests/utils.go index 37ef8d731a..b47ac056fe 100644 --- a/internal/cmd/integration-tests/utils.go +++ b/internal/cmd/integration-tests/utils.go @@ -56,7 +56,7 @@ func runSingleTest(testDir string, port int) { dirName := filepath.Base(testDir) var alloyLogBuffer bytes.Buffer - cmd := exec.Command(alloyBinaryPath, "run", "config.alloy", "--server.http.listen-addr", fmt.Sprintf("0.0.0.0:%d", port)) + cmd := exec.Command(alloyBinaryPath, "run", "config.alloy", "--server.http.listen-addr", fmt.Sprintf("0.0.0.0:%d", port), "--stability.level", "experimental") cmd.Dir = testDir cmd.Stdout = &alloyLogBuffer cmd.Stderr = &alloyLogBuffer diff --git a/internal/component/otelcol/receiver/prometheus/internal/appendable.go b/internal/component/otelcol/receiver/prometheus/internal/appendable.go index d8b26a2900..98a3ed7b29 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/appendable.go +++ b/internal/component/otelcol/receiver/prometheus/internal/appendable.go @@ -16,14 +16,14 @@ import ( ) // appendable translates Prometheus scraping diffs into OpenTelemetry format. - type appendable struct { - sink consumer.Metrics - metricAdjuster MetricsAdjuster - useStartTimeMetric bool - trimSuffixes bool - startTimeMetricRegex *regexp.Regexp - externalLabels labels.Labels + sink consumer.Metrics + metricAdjuster MetricsAdjuster + useStartTimeMetric bool + enableNativeHistograms bool + trimSuffixes bool + startTimeMetricRegex *regexp.Regexp + externalLabels labels.Labels settings receiver.CreateSettings obsrecv *receiverhelper.ObsReport @@ -37,6 +37,7 @@ func NewAppendable( useStartTimeMetric bool, startTimeMetricRegex *regexp.Regexp, useCreatedMetric bool, + enableNativeHistograms bool, externalLabels labels.Labels, trimSuffixes bool) (storage.Appendable, error) { @@ -53,17 +54,18 @@ func NewAppendable( } return &appendable{ - sink: sink, - settings: set, - metricAdjuster: metricAdjuster, - useStartTimeMetric: useStartTimeMetric, - startTimeMetricRegex: startTimeMetricRegex, - externalLabels: externalLabels, - obsrecv: obsrecv, - trimSuffixes: trimSuffixes, + sink: sink, + settings: set, + metricAdjuster: metricAdjuster, + useStartTimeMetric: useStartTimeMetric, + enableNativeHistograms: enableNativeHistograms, + startTimeMetricRegex: startTimeMetricRegex, + externalLabels: externalLabels, + obsrecv: obsrecv, + trimSuffixes: trimSuffixes, }, nil } func (o *appendable) Appender(ctx context.Context) storage.Appender { - return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes) + return newTransaction(ctx, o.metricAdjuster, o.sink, o.externalLabels, o.settings, o.obsrecv, o.trimSuffixes, o.enableNativeHistograms) } diff --git a/internal/component/otelcol/receiver/prometheus/internal/doc.go b/internal/component/otelcol/receiver/prometheus/internal/doc.go index f4b8aefe77..652c2c6893 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/doc.go +++ b/internal/component/otelcol/receiver/prometheus/internal/doc.go @@ -1,5 +1,5 @@ // Package internal is a near copy of -// https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.87.0/receiver/prometheusreceiver/internal +// https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/90603afc2fe0c44c9398822e4afa3a4e045f4524/receiver/prometheusreceiver/internal // A copy was made because the upstream package is internal. If it is ever made // public, our copy can be removed. // diff --git a/internal/component/otelcol/receiver/prometheus/internal/logger.go b/internal/component/otelcol/receiver/prometheus/internal/logger.go index 726d236574..cd9ac7f20a 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/logger.go +++ b/internal/component/otelcol/receiver/prometheus/internal/logger.go @@ -30,10 +30,10 @@ type zapToGokitLogAdapter struct { type logData struct { level level.Value msg string - otherFields []interface{} + otherFields []any } -func (w *zapToGokitLogAdapter) Log(keyvals ...interface{}) error { +func (w *zapToGokitLogAdapter) Log(keyvals ...any) error { // expecting key value pairs, the number of items need to be even if len(keyvals)%2 == 0 { // Extract log level and message and log them using corresponding zap function @@ -47,7 +47,7 @@ func (w *zapToGokitLogAdapter) Log(keyvals ...interface{}) error { return nil } -func extractLogData(keyvals []interface{}) logData { +func extractLogData(keyvals []any) logData { ld := logData{ level: level.InfoValue(), // default } @@ -78,7 +78,7 @@ func extractLogData(keyvals []interface{}) logData { } // check if a given key-value pair represents go-kit log message and return it -func matchLogMessage(key interface{}, val interface{}) (string, bool) { +func matchLogMessage(key any, val any) (string, bool) { if strKey, ok := key.(string); !ok || strKey != msgKey { return "", false } @@ -91,7 +91,7 @@ func matchLogMessage(key interface{}, val interface{}) (string, bool) { } // check if a given key-value pair represents go-kit log level and return it -func matchLogLevel(key interface{}, val interface{}) (level.Value, bool) { +func matchLogLevel(key any, val any) (level.Value, bool) { strKey, ok := key.(string) if !ok || strKey != levelKey { return nil, false @@ -107,7 +107,7 @@ func matchLogLevel(key interface{}, val interface{}) (level.Value, bool) { //revive:disable:error-return // check if a given key-value pair represents an error and return it -func matchError(key interface{}, val interface{}) (error, bool) { +func matchError(key any, val any) (error, bool) { strKey, ok := key.(string) if !ok || strKey != errKey { return nil, false @@ -123,7 +123,7 @@ func matchError(key interface{}, val interface{}) (error, bool) { //revive:enable:error-return // find a matching zap logging function to be used for a given level -func levelToFunc(logger *zap.SugaredLogger, lvl level.Value) func(string, ...interface{}) { +func levelToFunc(logger *zap.SugaredLogger, lvl level.Value) func(string, ...any) { switch lvl { case level.DebugValue(): return logger.Debugw diff --git a/internal/component/otelcol/receiver/prometheus/internal/logger_test.go b/internal/component/otelcol/receiver/prometheus/internal/logger_test.go index 5a17fd051a..037e23d948 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/logger_test.go +++ b/internal/component/otelcol/receiver/prometheus/internal/logger_test.go @@ -20,13 +20,13 @@ import ( func TestLog(t *testing.T) { tcs := []struct { name string - input []interface{} + input []any wantLevel zapcore.Level wantMessage string }{ { name: "Starting provider", - input: []interface{}{ + input: []any{ "level", level.DebugValue(), "msg", @@ -41,7 +41,7 @@ func TestLog(t *testing.T) { }, { name: "Scrape failed", - input: []interface{}{ + input: []any{ "level", level.ErrorValue(), "scrape_pool", @@ -84,10 +84,10 @@ func TestLog(t *testing.T) { func TestExtractLogData(t *testing.T) { tcs := []struct { name string - input []interface{} + input []any wantLevel level.Value wantMessage string - wantOutput []interface{} + wantOutput []any }{ { name: "nil fields", @@ -98,14 +98,14 @@ func TestExtractLogData(t *testing.T) { }, { name: "empty fields", - input: []interface{}{}, + input: []any{}, wantLevel: level.InfoValue(), // Default wantMessage: "", wantOutput: nil, }, { name: "info level", - input: []interface{}{ + input: []any{ "level", level.InfoValue(), }, @@ -115,7 +115,7 @@ func TestExtractLogData(t *testing.T) { }, { name: "warn level", - input: []interface{}{ + input: []any{ "level", level.WarnValue(), }, @@ -125,7 +125,7 @@ func TestExtractLogData(t *testing.T) { }, { name: "error level", - input: []interface{}{ + input: []any{ "level", level.ErrorValue(), }, @@ -135,7 +135,7 @@ func TestExtractLogData(t *testing.T) { }, { name: "debug level + extra fields", - input: []interface{}{ + input: []any{ "timestamp", 1596604719, "level", @@ -145,13 +145,13 @@ func TestExtractLogData(t *testing.T) { }, wantLevel: level.DebugValue(), wantMessage: "http client error", - wantOutput: []interface{}{ + wantOutput: []any{ "timestamp", 1596604719, }, }, { name: "missing level field", - input: []interface{}{ + input: []any{ "timestamp", 1596604719, "msg", @@ -159,18 +159,18 @@ func TestExtractLogData(t *testing.T) { }, wantLevel: level.InfoValue(), // Default wantMessage: "http client error", - wantOutput: []interface{}{ + wantOutput: []any{ "timestamp", 1596604719, }, }, { name: "invalid level type", - input: []interface{}{ + input: []any{ "level", "warn", // String is not recognized }, wantLevel: level.InfoValue(), // Default - wantOutput: []interface{}{ + wantOutput: []any{ "level", "warn", // Field is preserved }, }, diff --git a/internal/component/otelcol/receiver/prometheus/internal/metricfamily.go b/internal/component/otelcol/receiver/prometheus/internal/metricfamily.go index e501ee5da3..ba87700d04 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/metricfamily.go +++ b/internal/component/otelcol/receiver/prometheus/internal/metricfamily.go @@ -11,6 +11,7 @@ import ( "strings" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/scrape" @@ -49,6 +50,8 @@ type metricGroup struct { hasSum bool created float64 value float64 + hValue *histogram.Histogram + fhValue *histogram.FloatHistogram complexValue []*dataPoint exemplars pmetric.ExemplarSlice } @@ -156,6 +159,118 @@ func (mg *metricGroup) toDistributionPoint(dest pmetric.HistogramDataPointSlice) mg.setExemplars(point.Exemplars()) } +// toExponentialHistogramDataPoints is based on +// https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#exponential-histograms +func (mg *metricGroup) toExponentialHistogramDataPoints(dest pmetric.ExponentialHistogramDataPointSlice) { + if !mg.hasCount { + return + } + point := dest.AppendEmpty() + point.SetTimestamp(timestampFromMs(mg.ts)) + + // We do not set Min or Max as native histograms don't have that information. + switch { + case mg.fhValue != nil: + fh := mg.fhValue + + if value.IsStaleNaN(fh.Sum) { + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + // The count and sum are initialized to 0, so we don't need to set them. + } else { + point.SetScale(fh.Schema) + // Input is a float native histogram. This conversion will lose + // precision,but we don't actually expect float histograms in scrape, + // since these are typically the result of operations on integer + // native histograms in the database. + point.SetCount(uint64(fh.Count)) + point.SetSum(fh.Sum) + point.SetZeroThreshold(fh.ZeroThreshold) + point.SetZeroCount(uint64(fh.ZeroCount)) + + if len(fh.PositiveSpans) > 0 { + point.Positive().SetOffset(fh.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertAbsoluteBuckets(fh.PositiveSpans, fh.PositiveBuckets, point.Positive().BucketCounts()) + } + if len(fh.NegativeSpans) > 0 { + point.Negative().SetOffset(fh.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertAbsoluteBuckets(fh.NegativeSpans, fh.NegativeBuckets, point.Negative().BucketCounts()) + } + } + + case mg.hValue != nil: + h := mg.hValue + + if value.IsStaleNaN(h.Sum) { + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + // The count and sum are initialized to 0, so we don't need to set them. + } else { + point.SetScale(h.Schema) + point.SetCount(h.Count) + point.SetSum(h.Sum) + point.SetZeroThreshold(h.ZeroThreshold) + point.SetZeroCount(h.ZeroCount) + + if len(h.PositiveSpans) > 0 { + point.Positive().SetOffset(h.PositiveSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertDeltaBuckets(h.PositiveSpans, h.PositiveBuckets, point.Positive().BucketCounts()) + } + if len(h.NegativeSpans) > 0 { + point.Negative().SetOffset(h.NegativeSpans[0].Offset - 1) // -1 because OTEL offset are for the lower bound, not the upper bound + convertDeltaBuckets(h.NegativeSpans, h.NegativeBuckets, point.Negative().BucketCounts()) + } + } + + default: + // This should never happen. + return + } + + tsNanos := timestampFromMs(mg.ts) + if mg.created != 0 { + point.SetStartTimestamp(timestampFromFloat64(mg.created)) + } else { + // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp + point.SetStartTimestamp(tsNanos) + } + point.SetTimestamp(tsNanos) + populateAttributes(pmetric.MetricTypeHistogram, mg.ls, point.Attributes()) + mg.setExemplars(point.Exemplars()) +} + +func convertDeltaBuckets(spans []histogram.Span, deltas []int64, buckets pcommon.UInt64Slice) { + buckets.EnsureCapacity(len(deltas)) + bucketIdx := 0 + bucketCount := int64(0) + for spanIdx, span := range spans { + if spanIdx > 0 { + for i := int32(0); i < span.Offset; i++ { + buckets.Append(uint64(0)) + } + } + for i := uint32(0); i < span.Length; i++ { + bucketCount += deltas[bucketIdx] + bucketIdx++ + buckets.Append(uint64(bucketCount)) + } + } +} + +func convertAbsoluteBuckets(spans []histogram.Span, counts []float64, buckets pcommon.UInt64Slice) { + buckets.EnsureCapacity(len(counts)) + bucketIdx := 0 + for spanIdx, span := range spans { + if spanIdx > 0 { + for i := int32(0); i < span.Offset; i++ { + buckets.Append(uint64(0)) + } + } + for i := uint32(0); i < span.Length; i++ { + buckets.Append(uint64(counts[bucketIdx])) + bucketIdx++ + } + } +} + func (mg *metricGroup) setExemplars(exemplars pmetric.ExemplarSlice) { if mg == nil { return @@ -240,19 +355,19 @@ func populateAttributes(mType pmetric.MetricType, ls labels.Labels, dest pcommon dest.EnsureCapacity(ls.Len()) names := getSortedNotUsefulLabels(mType) j := 0 - for i := range ls { - for j < len(names) && names[j] < ls[i].Name { + ls.Range(func(l labels.Label) { + for j < len(names) && names[j] < l.Name { j++ } - if j < len(names) && ls[i].Name == names[j] { - continue + if j < len(names) && l.Name == names[j] { + return } - if ls[i].Value == "" { + if l.Value == "" { // empty label values should be omitted - continue + return } - dest.PutStr(ls[i].Name, ls[i].Value) - } + dest.PutStr(l.Name, l.Value) + }) } func (mf *metricFamily) loadMetricGroupOrCreate(groupKey uint64, ls labels.Labels, ts int64) *metricGroup { @@ -287,7 +402,7 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels mg.ts = t mg.count = v mg.hasCount = true - case strings.HasSuffix(metricName, metricSuffixCreated): + case metricName == mf.metadata.Metric+metricSuffixCreated: mg.created = v default: boundary, err := getBoundary(mf.mtype, ls) @@ -296,13 +411,17 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels } mg.complexValue = append(mg.complexValue, &dataPoint{value: v, boundary: boundary}) } + case pmetric.MetricTypeExponentialHistogram: + if metricName == mf.metadata.Metric+metricSuffixCreated { + mg.created = v + } case pmetric.MetricTypeSum: - if strings.HasSuffix(metricName, metricSuffixCreated) { + if metricName == mf.metadata.Metric+metricSuffixCreated { mg.created = v } else { mg.value = v } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: fallthrough default: mg.value = v @@ -311,6 +430,37 @@ func (mf *metricFamily) addSeries(seriesRef uint64, metricName string, ls labels return nil } +func (mf *metricFamily) addExponentialHistogramSeries(seriesRef uint64, metricName string, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) error { + mg := mf.loadMetricGroupOrCreate(seriesRef, ls, t) + if mg.ts != t { + return fmt.Errorf("inconsistent timestamps on metric points for metric %v", metricName) + } + if mg.mtype != pmetric.MetricTypeExponentialHistogram { + return fmt.Errorf("metric type mismatch for exponential histogram metric %v type %s", metricName, mg.mtype.String()) + } + switch { + case fh != nil: + if mg.hValue != nil { + return fmt.Errorf("exponential histogram %v already has float counts", metricName) + } + mg.count = fh.Count + mg.sum = fh.Sum + mg.hasCount = true + mg.hasSum = true + mg.fhValue = fh + case h != nil: + if mg.fhValue != nil { + return fmt.Errorf("exponential histogram %v already has integer counts", metricName) + } + mg.count = float64(h.Count) + mg.sum = h.Sum + mg.hasCount = true + mg.hasSum = true + mg.hValue = h + } + return nil +} + func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes bool) { metric := pmetric.NewMetric() // Trims type and unit suffixes from metric name @@ -352,7 +502,16 @@ func (mf *metricFamily) appendMetric(metrics pmetric.MetricSlice, trimSuffixes b } pointCount = sdpL.Len() - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + histogram := metric.SetEmptyExponentialHistogram() + histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + hdpL := histogram.DataPoints() + for _, mg := range mf.groupOrders { + mg.toExponentialHistogramDataPoints(hdpL) + } + pointCount = hdpL.Len() + + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: fallthrough default: // Everything else should be set to a Gauge. gauge := metric.SetEmptyGauge() @@ -382,8 +541,8 @@ func (mf *metricFamily) addExemplar(seriesRef uint64, e exemplar.Exemplar) { func convertExemplar(pe exemplar.Exemplar, e pmetric.Exemplar) { e.SetTimestamp(timestampFromMs(pe.Ts)) e.SetDoubleValue(pe.Value) - e.FilteredAttributes().EnsureCapacity(len(pe.Labels)) - for _, lb := range pe.Labels { + e.FilteredAttributes().EnsureCapacity(pe.Labels.Len()) + pe.Labels.Range(func(lb labels.Label) { switch strings.ToLower(lb.Name) { case traceIDKey: var tid [16]byte @@ -404,7 +563,7 @@ func convertExemplar(pe exemplar.Exemplar, e pmetric.Exemplar) { default: e.FilteredAttributes().PutStr(lb.Name, lb.Value) } - } + }) } /* diff --git a/internal/component/otelcol/receiver/prometheus/internal/metricfamily_test.go b/internal/component/otelcol/receiver/prometheus/internal/metricfamily_test.go index 10c0f95794..759ba0bea6 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/metricfamily_test.go +++ b/internal/component/otelcol/receiver/prometheus/internal/metricfamily_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/textparse" "github.com/prometheus/prometheus/model/value" @@ -40,6 +41,12 @@ var mc = testMetadataStore{ Help: "This is some help for a counter", Unit: "By", }, + "counter_created": scrape.MetricMetadata{ + Metric: "counter", + Type: textparse.MetricTypeCounter, + Help: "This is some help for a counter", + Unit: "By", + }, "gauge": scrape.MetricMetadata{ Metric: "ge", Type: textparse.MetricTypeGauge, @@ -59,7 +66,7 @@ var mc = testMetadataStore{ Unit: "ms", }, "histogram_with_created": scrape.MetricMetadata{ - Metric: "hg", + Metric: "histogram_with_created", Type: textparse.MetricTypeHistogram, Help: "This is some help for a histogram", Unit: "ms", @@ -77,7 +84,7 @@ var mc = testMetadataStore{ Unit: "ms", }, "summary_with_created": scrape.MetricMetadata{ - Metric: "s", + Metric: "summary_with_created", Type: textparse.MetricTypeSummary, Help: "This is some help for a summary", Unit: "ms", @@ -287,6 +294,199 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) { } } +func TestMetricGroupData_toExponentialDistributionUnitTest(t *testing.T) { + type scrape struct { + at int64 + metric string + extraLabel labels.Label + + // Only one kind of value should be set. + value float64 + integerHistogram *histogram.Histogram + floatHistogram *histogram.FloatHistogram // TODO: add tests for float histograms. + } + tests := []struct { + name string + metricName string + labels labels.Labels + scrapes []*scrape + want func() pmetric.ExponentialHistogramDataPoint + wantErr bool + intervalStartTimeMs int64 + }{ + { + name: "integer histogram with startTimestamp", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A", "b": "B"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Schema: 1, + ZeroThreshold: 0.42, + ZeroCount: 1, + Count: 66, + Sum: 1004.78, + PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + PositiveBuckets: []int64{33, -30, 26}, // Delta encoded counts: 33, 3=(33-30), 30=(3+27) -> 65 + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{1}, // Delta encoded counts: 1 + }, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetCount(66) + point.SetSum(1004.78) + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetScale(1) + point.SetZeroThreshold(0.42) + point.SetZeroCount(1) + point.Positive().SetOffset(0) + point.Positive().BucketCounts().FromRaw([]uint64{33, 3, 0, 0, 0, 29}) + point.Negative().SetOffset(-1) + point.Negative().BucketCounts().FromRaw([]uint64{1}) + attributes := point.Attributes() + attributes.PutStr("a", "A") + attributes.PutStr("b", "B") + return point + }, + }, + { + name: "integer histogram with startTimestamp from _created", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + CounterResetHint: histogram.UnknownCounterReset, + Schema: 1, + ZeroThreshold: 0.42, + ZeroCount: 1, + Count: 66, + Sum: 1004.78, + PositiveSpans: []histogram.Span{{Offset: 1, Length: 2}, {Offset: 3, Length: 1}}, + PositiveBuckets: []int64{33, -30, 26}, // Delta encoded counts: 33, 3=(33-30), 30=(3+27) -> 65 + NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}}, + NegativeBuckets: []int64{1}, // Delta encoded counts: 1 + }, + }, + { + at: 11, + metric: "request_duration_seconds_created", + value: 600.78, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetCount(66) + point.SetSum(1004.78) + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetStartTimestamp(timestampFromFloat64(600.78)) // the time in milliseconds -> nanoseconds. + point.SetScale(1) + point.SetZeroThreshold(0.42) + point.SetZeroCount(1) + point.Positive().SetOffset(0) + point.Positive().BucketCounts().FromRaw([]uint64{33, 3, 0, 0, 0, 29}) + point.Negative().SetOffset(-1) + point.Negative().BucketCounts().FromRaw([]uint64{1}) + attributes := point.Attributes() + attributes.PutStr("a", "A") + return point + }, + }, + { + name: "integer histogram that is stale", + metricName: "request_duration_seconds", + intervalStartTimeMs: 11, + labels: labels.FromMap(map[string]string{"a": "A", "b": "B"}), + scrapes: []*scrape{ + { + at: 11, + metric: "request_duration_seconds", + integerHistogram: &histogram.Histogram{ + Sum: math.Float64frombits(value.StaleNaN), + }, + }, + }, + want: func() pmetric.ExponentialHistogramDataPoint { + point := pmetric.NewExponentialHistogramDataPoint() + point.SetTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + point.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + point.SetStartTimestamp(pcommon.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds. + attributes := point.Attributes() + attributes.PutStr("a", "A") + attributes.PutStr("b", "B") + return point + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + mp := newMetricFamily(tt.metricName, mc, zap.NewNop()) + for i, tv := range tt.scrapes { + var lbls labels.Labels + if tv.extraLabel.Name != "" { + lbls = labels.NewBuilder(tt.labels).Set(tv.extraLabel.Name, tv.extraLabel.Value).Labels() + } else { + lbls = tt.labels.Copy() + } + + var err error + switch { + case tv.integerHistogram != nil: + mp.mtype = pmetric.MetricTypeExponentialHistogram + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addExponentialHistogramSeries(sRef, tv.metric, lbls, tv.at, tv.integerHistogram, nil) + case tv.floatHistogram != nil: + mp.mtype = pmetric.MetricTypeExponentialHistogram + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addExponentialHistogramSeries(sRef, tv.metric, lbls, tv.at, nil, tv.floatHistogram) + default: + sRef, _ := getSeriesRef(nil, lbls, mp.mtype) + err = mp.addSeries(sRef, tv.metric, lbls, tv.at, tv.value) + } + if tt.wantErr { + if i != 0 { + require.Error(t, err) + } + } else { + require.NoError(t, err) + } + } + if tt.wantErr { + // Don't check the result if we got an error + return + } + + require.Len(t, mp.groups, 1) + + sl := pmetric.NewMetricSlice() + mp.appendMetric(sl, false) + + require.Equal(t, 1, sl.Len(), "Exactly one metric expected") + metric := sl.At(0) + require.Equal(t, mc[tt.metricName].Help, metric.Description(), "Expected help metadata in metric description") + require.Equal(t, mc[tt.metricName].Unit, metric.Unit(), "Expected unit metadata in metric") + + hdpL := metric.ExponentialHistogram().DataPoints() + require.Equal(t, 1, hdpL.Len(), "Exactly one point expected") + got := hdpL.At(0) + want := tt.want() + require.Equal(t, want, got, "Expected the points to be equal") + }) + } +} + func TestMetricGroupData_toSummaryUnitTest(t *testing.T) { type scrape struct { at int64 @@ -598,6 +798,29 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) { {at: 13, value: 33.7, metric: "value"}, {at: 13, value: 150, metric: "value_created"}, }, + want: func() pmetric.NumberDataPoint { + point := pmetric.NewNumberDataPoint() + point.SetDoubleValue(150) + + // the time in milliseconds -> nanoseconds. + point.SetTimestamp(pcommon.Timestamp(13 * time.Millisecond)) + point.SetStartTimestamp(pcommon.Timestamp(13 * time.Millisecond)) + + attributes := point.Attributes() + attributes.PutStr("a", "A") + attributes.PutStr("b", "B") + return point + }, + }, + { + metricKind: "counter_created", + name: "counter:: startTimestampMs from _created", + intervalStartTimestampMs: 11, + labels: labels.FromMap(map[string]string{"a": "A", "b": "B"}), + scrapes: []*scrape{ + {at: 13, value: 33.7, metric: "counter"}, + {at: 13, value: 150, metric: "counter_created"}, + }, want: func() pmetric.NumberDataPoint { point := pmetric.NewNumberDataPoint() point.SetDoubleValue(33.7) diff --git a/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster.go b/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster.go index a483b9588c..26825fd6ae 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster.go +++ b/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster.go @@ -102,11 +102,17 @@ func (tsm *timeseriesMap) get(metric pmetric.Metric, kv pcommon.Map) (*timeserie name: name, attributes: getAttributesSignature(kv), } - if metric.Type() == pmetric.MetricTypeHistogram { + switch metric.Type() { + case pmetric.MetricTypeHistogram: // There are 2 types of Histograms whose aggregation temporality needs distinguishing: // * CumulativeHistogram // * GaugeHistogram key.aggTemporality = metric.Histogram().AggregationTemporality() + case pmetric.MetricTypeExponentialHistogram: + // There are 2 types of ExponentialHistograms whose aggregation temporality needs distinguishing: + // * CumulativeHistogram + // * GaugeHistogram + key.aggTemporality = metric.ExponentialHistogram().AggregationTemporality() } tsm.mark = true @@ -285,7 +291,10 @@ func (a *initialPointAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { case pmetric.MetricTypeSum: a.adjustMetricSum(tsm, metric) - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + a.adjustMetricExponentialHistogram(tsm, metric) + + case pmetric.MetricTypeEmpty: fallthrough default: @@ -313,7 +322,54 @@ func (a *initialPointAdjuster) adjustMetricHistogram(tsm *timeseriesMap, current if a.useCreatedMetric && !currentDist.Flags().NoRecordedValue() && currentDist.StartTimestamp() < currentDist.Timestamp() { + continue + } + + tsi, found := tsm.get(current, currentDist.Attributes()) + if !found { + // initialize everything. + tsi.histogram.startTime = currentDist.StartTimestamp() + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + continue + } + + if currentDist.Flags().NoRecordedValue() { + // TODO: Investigate why this does not reset. + currentDist.SetStartTimestamp(tsi.histogram.startTime) + continue + } + + if currentDist.Count() < tsi.histogram.previousCount || currentDist.Sum() < tsi.histogram.previousSum { + // reset re-initialize everything. + tsi.histogram.startTime = currentDist.StartTimestamp() + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + continue + } + // Update only previous values. + tsi.histogram.previousCount = currentDist.Count() + tsi.histogram.previousSum = currentDist.Sum() + currentDist.SetStartTimestamp(tsi.histogram.startTime) + } +} + +func (a *initialPointAdjuster) adjustMetricExponentialHistogram(tsm *timeseriesMap, current pmetric.Metric) { + histogram := current.ExponentialHistogram() + if histogram.AggregationTemporality() != pmetric.AggregationTemporalityCumulative { + // Only dealing with CumulativeDistributions. + return + } + + currentPoints := histogram.DataPoints() + for i := 0; i < currentPoints.Len(); i++ { + currentDist := currentPoints.At(i) + + // start timestamp was set from _created + if a.useCreatedMetric && + !currentDist.Flags().NoRecordedValue() && + currentDist.StartTimestamp() < currentDist.Timestamp() { continue } @@ -356,7 +412,6 @@ func (a *initialPointAdjuster) adjustMetricSum(tsm *timeseriesMap, current pmetr if a.useCreatedMetric && !currentSum.Flags().NoRecordedValue() && currentSum.StartTimestamp() < currentSum.Timestamp() { - continue } @@ -397,7 +452,6 @@ func (a *initialPointAdjuster) adjustMetricSummary(tsm *timeseriesMap, current p if a.useCreatedMetric && !currentSummary.Flags().NoRecordedValue() && currentSummary.StartTimestamp() < currentSummary.Timestamp() { - continue } diff --git a/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster_test.go b/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster_test.go index df38dea9e9..d80dcf512a 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster_test.go +++ b/internal/component/otelcol/receiver/prometheus/internal/metrics_adjuster_test.go @@ -25,10 +25,11 @@ var ( bounds0 = []float64{1, 2, 4} percent0 = []float64{10, 50, 90} - sum1 = "sum1" - gauge1 = "gauge1" - histogram1 = "histogram1" - summary1 = "summary1" + sum1 = "sum1" + gauge1 = "gauge1" + histogram1 = "histogram1" + summary1 = "summary1" + exponentialHistogram1 = "exponentialHistogram1" k1v1k2v2 = []*kv{ {"k1", "v1"}, @@ -246,6 +247,67 @@ func TestHistogramFlagNoRecordedValueFirstObservation(t *testing.T) { runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) } +// In TestExponentHistogram we exclude negative buckets on purpose as they are +// not considered the main use case - response times that are most commonly +// observed are never negative. Negative buckets would make the Sum() non +// monotonic and cause unexpected resets. +func TestExponentialHistogram(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Exponential Histogram: round 1 - initial instance, start time is established", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 3, 1, 0, []uint64{}, -2, []uint64{4, 2, 3, 7}))), + }, { + description: "Exponential Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t2, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + adjusted: metrics(exponentialHistogramMetric(exponentialHistogram1, exponentialHistogramPoint(k1v1k2v2, t1, t2, 3, 1, 0, []uint64{}, -2, []uint64{6, 2, 3, 7}))), + }, { + description: "Exponential Histogram: round 3 - instance reset (value less than previous value), start time is reset", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t3, 3, 1, 0, []uint64{}, -2, []uint64{5, 3, 2, 7}))), + }, { + description: "Exponential Histogram: round 4 - instance adjusted based on round 3", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t4, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t3, t4, 3, 1, 0, []uint64{}, -2, []uint64{7, 4, 2, 12}))), + }, + } + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + +func TestExponentialHistogramFlagNoRecordedValue(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Histogram: round 1 - initial instance, start time is established", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 0, 2, 2, []uint64{7, 4, 2, 12}, 3, []uint64{}))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPoint(k1v1k2v2, t1, t1, 0, 2, 2, []uint64{7, 4, 2, 12}, 3, []uint64{}))), + }, + { + description: "Histogram: round 2 - instance adjusted based on round 1", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, t1, t2))), + }, + } + + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + +func TestExponentialHistogramFlagNoRecordedValueFirstObservation(t *testing.T) { + script := []*metricsAdjusterTest{ + { + description: "Histogram: round 1 - initial instance, start time is unknown", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t1))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t1))), + }, + { + description: "Histogram: round 2 - instance unchanged", + metrics: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + adjusted: metrics(exponentialHistogramMetric(histogram1, exponentialHistogramPointNoValue(k1v1k2v2, tUnknown, t2))), + }, + } + + runScript(t, NewInitialPointAdjuster(zap.NewNop(), time.Minute, true), "job", "0", script) +} + func TestSummaryFlagNoRecordedValueFirstObservation(t *testing.T) { script := []*metricsAdjusterTest{ { diff --git a/internal/component/otelcol/receiver/prometheus/internal/metricsutil_test.go b/internal/component/otelcol/receiver/prometheus/internal/metricsutil_test.go index 4ba25cfe84..8a0670a1d7 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/metricsutil_test.go +++ b/internal/component/otelcol/receiver/prometheus/internal/metricsutil_test.go @@ -78,6 +78,99 @@ func histogramMetric(name string, points ...pmetric.HistogramDataPoint) pmetric. return metric } +func exponentialHistogramMetric(name string, points ...pmetric.ExponentialHistogramDataPoint) pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName(name) + histogram := metric.SetEmptyExponentialHistogram() + histogram.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + destPointL := histogram.DataPoints() + // By default the AggregationTemporality is Cumulative until it'll be changed by the caller. + for _, point := range points { + destPoint := destPointL.AppendEmpty() + point.CopyTo(destPoint) + } + + return metric +} + +func exponentialHistogramPointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.ExponentialHistogramDataPoint { + hdp := pmetric.NewExponentialHistogramDataPoint() + hdp.SetStartTimestamp(startTimestamp) + hdp.SetTimestamp(timestamp) + + attrs := hdp.Attributes() + for _, kv := range attributes { + attrs.PutStr(kv.Key, kv.Value) + } + + return hdp +} + +func exponentialHistogramPoint(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp, scale int32, zeroCount uint64, negativeOffset int32, negativeBuckets []uint64, positiveOffset int32, positiveBuckets []uint64) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetScale(scale) + hdp.SetZeroCount(zeroCount) + hdp.Negative().SetOffset(negativeOffset) + hdp.Negative().BucketCounts().FromRaw(negativeBuckets) + hdp.Positive().SetOffset(positiveOffset) + hdp.Positive().BucketCounts().FromRaw(positiveBuckets) + + count := uint64(0) + sum := float64(0) + for i, bCount := range positiveBuckets { + count += bCount + sum += float64(bCount) * float64(i) + } + for i, bCount := range negativeBuckets { + count += bCount + sum -= float64(bCount) * float64(i) + } + hdp.SetCount(count) + hdp.SetSum(sum) + return hdp +} + +func exponentialHistogramPointNoValue(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetFlags(pmetric.DefaultDataPointFlags.WithNoRecordedValue(true)) + + return hdp +} + +// exponentialHistogramPointSimplified let's you define an exponential +// histogram with just a few parameters. +// Scale and ZeroCount are set to the provided values. +// Positive and negative buckets are generated using the offset and bucketCount +// parameters by adding buckets from offset in both positive and negative +// directions. Bucket counts start from 1 and increase by 1 for each bucket. +// Sum and Count will be proportional to the bucket count. +func exponentialHistogramPointSimplified(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp, scale int32, zeroCount uint64, offset int32, bucketCount int) pmetric.ExponentialHistogramDataPoint { + hdp := exponentialHistogramPointRaw(attributes, startTimestamp, timestamp) + hdp.SetScale(scale) + hdp.SetZeroCount(zeroCount) + + positive := hdp.Positive() + positive.SetOffset(offset) + positive.BucketCounts().EnsureCapacity(bucketCount) + negative := hdp.Negative() + negative.SetOffset(offset) + negative.BucketCounts().EnsureCapacity(bucketCount) + + var sum float64 + var count uint64 + for i := 0; i < bucketCount; i++ { + positive.BucketCounts().Append(uint64(i + 1)) + negative.BucketCounts().Append(uint64(i + 1)) + count += uint64(i+1) + uint64(i+1) + sum += float64(i+1)*10 + float64(i+1)*10.0 + } + hdp.SetCount(count) + hdp.SetSum(sum) + + return hdp +} + func doublePointRaw(attributes []*kv, startTimestamp, timestamp pcommon.Timestamp) pmetric.NumberDataPoint { ndp := pmetric.NewNumberDataPoint() ndp.SetStartTimestamp(startTimestamp) diff --git a/internal/component/otelcol/receiver/prometheus/internal/staleness_end_to_end_test.go b/internal/component/otelcol/receiver/prometheus/internal/staleness_end_to_end_test.go index 3d4baddbae..2a471746c2 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/staleness_end_to_end_test.go +++ b/internal/component/otelcol/receiver/prometheus/internal/staleness_end_to_end_test.go @@ -26,7 +26,7 @@ package internal_test // // 1. Setup the server that sends series that intermittently appear and disappear. // n := &atomic.Uint64{} -// scrapeServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { +// scrapeServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) { // // Increment the scrape count atomically per scrape. // i := n.Add(1) @@ -57,7 +57,7 @@ package internal_test // // 2. Set up the Prometheus RemoteWrite endpoint. // prweUploads := make(chan *prompb.WriteRequest) -// prweServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { +// prweServer := httptest.NewServer(http.HandlerFunc(func(_ http.ResponseWriter, req *http.Request) { // // Snappy decode the uploads. // payload, rerr := io.ReadAll(req.Body) // require.NoError(t, rerr) @@ -104,17 +104,17 @@ package internal_test // exporters: [prometheusremotewrite]`, serverURL.Host, prweServer.URL) // confFile, err := os.CreateTemp(os.TempDir(), "conf-") -// require.Nil(t, err) +// require.NoError(t, err) // defer os.Remove(confFile.Name()) // _, err = confFile.Write([]byte(cfg)) -// require.Nil(t, err) +// require.NoError(t, err) // // 4. Run the OpenTelemetry Collector. // receivers, err := receiver.MakeFactoryMap(prometheusreceiver.NewFactory()) -// require.Nil(t, err) +// require.NoError(t, err) // exporters, err := exporter.MakeFactoryMap(prometheusremotewriteexporter.NewFactory()) -// require.Nil(t, err) +// require.NoError(t, err) // processors, err := processor.MakeFactoryMap(batchprocessor.NewFactory()) -// require.Nil(t, err) +// require.NoError(t, err) // factories := otelcol.Factories{ // Receivers: receivers, @@ -149,7 +149,7 @@ package internal_test // } // app, err := otelcol.NewCollector(appSettings) -// require.Nil(t, err) +// require.NoError(t, err) // go func() { // assert.NoError(t, app.Run(context.Background())) diff --git a/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster.go b/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster.go index 9195136e78..ca7ae2a291 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster.go +++ b/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster.go @@ -68,7 +68,14 @@ func (stma *startTimeMetricAdjuster) AdjustMetrics(metrics pmetric.Metrics) erro dp.SetStartTimestamp(startTimeTs) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dataPoints := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dataPoints.Len(); l++ { + dp := dataPoints.At(l) + dp.SetStartTimestamp(startTimeTs) + } + + case pmetric.MetricTypeEmpty: fallthrough default: diff --git a/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster_test.go b/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster_test.go index 89e4b10f8e..84bdc2756e 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster_test.go +++ b/internal/component/otelcol/receiver/prometheus/internal/starttimemetricadjuster_test.go @@ -33,6 +33,7 @@ func TestStartTimeMetricMatch(t *testing.T) { summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime)), sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime+1)), + exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)), ), startTimeMetricRegex: regexp.MustCompile("^.*_process_start_time_seconds$"), expectedStartTime: timestampFromFloat64(matchBuilderStartTime), @@ -45,6 +46,7 @@ func TestStartTimeMetricMatch(t *testing.T) { summaryMetric("test_summary_metric", summaryPoint(nil, startTime, currentTime, 10, 100, []float64{10, 50, 90}, []float64{9, 15, 48})), sumMetric("example_process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime)), sumMetric("process_start_time_seconds", doublePoint(nil, startTime, currentTime, matchBuilderStartTime+1)), + exponentialHistogramMetric("test_exponential_histogram_metric", exponentialHistogramPointSimplified(nil, startTime, currentTime, 3, 1, -5, 3)), ), expectedStartTime: timestampFromFloat64(matchBuilderStartTime + 1), }, @@ -139,7 +141,12 @@ func TestStartTimeMetricMatch(t *testing.T) { for l := 0; l < dps.Len(); l++ { assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + assert.Equal(t, tt.expectedStartTime, dps.At(l).StartTimestamp()) + } + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: } } } diff --git a/internal/component/otelcol/receiver/prometheus/internal/transaction.go b/internal/component/otelcol/receiver/prometheus/internal/transaction.go index 6b59674c19..dc1bf78dba 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/transaction.go +++ b/internal/component/otelcol/receiver/prometheus/internal/transaction.go @@ -7,7 +7,7 @@ import ( "context" "errors" "fmt" - "sort" + "math" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" @@ -35,19 +35,20 @@ const ( ) type transaction struct { - isNew bool - trimSuffixes bool - ctx context.Context - families map[scopeID]map[string]*metricFamily - mc scrape.MetricMetadataStore - sink consumer.Metrics - externalLabels labels.Labels - nodeResource pcommon.Resource - scopeAttributes map[scopeID]pcommon.Map - logger *zap.Logger - buildInfo component.BuildInfo - metricAdjuster MetricsAdjuster - obsrecv *receiverhelper.ObsReport + isNew bool + trimSuffixes bool + enableNativeHistograms bool + ctx context.Context + families map[scopeID]map[string]*metricFamily + mc scrape.MetricMetadataStore + sink consumer.Metrics + externalLabels labels.Labels + nodeResource pcommon.Resource + scopeAttributes map[scopeID]pcommon.Map + logger *zap.Logger + buildInfo component.BuildInfo + metricAdjuster MetricsAdjuster + obsrecv *receiverhelper.ObsReport // Used as buffer to calculate series ref hash. bufBytes []byte } @@ -66,21 +67,22 @@ func newTransaction( externalLabels labels.Labels, settings receiver.CreateSettings, obsrecv *receiverhelper.ObsReport, - trimSuffixes bool) *transaction { - + trimSuffixes bool, + enableNativeHistograms bool) *transaction { return &transaction{ - ctx: ctx, - families: make(map[scopeID]map[string]*metricFamily), - isNew: true, - trimSuffixes: trimSuffixes, - sink: sink, - metricAdjuster: metricAdjuster, - externalLabels: externalLabels, - logger: settings.Logger, - buildInfo: settings.BuildInfo, - obsrecv: obsrecv, - bufBytes: make([]byte, 0, 1024), - scopeAttributes: make(map[scopeID]pcommon.Map), + ctx: ctx, + families: make(map[scopeID]map[string]*metricFamily), + isNew: true, + trimSuffixes: trimSuffixes, + enableNativeHistograms: enableNativeHistograms, + sink: sink, + metricAdjuster: metricAdjuster, + externalLabels: externalLabels, + logger: settings.Logger, + buildInfo: settings.BuildInfo, + obsrecv: obsrecv, + bufBytes: make([]byte, 0, 1024), + scopeAttributes: make(map[scopeID]pcommon.Map), } } @@ -92,9 +94,12 @@ func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64, default: } - if len(t.externalLabels) != 0 { - ls = append(ls, t.externalLabels...) - sort.Sort(ls) + if t.externalLabels.Len() != 0 { + b := labels.NewBuilder(ls) + t.externalLabels.Range(func(l labels.Label) { + b.Set(l.Name, l.Value) + }) + ls = b.Labels() } if t.isNew { @@ -144,16 +149,42 @@ func (t *transaction) Append(_ storage.SeriesRef, ls labels.Labels, atMs int64, return 0, nil } - curMF := t.getOrCreateMetricFamily(getScopeID(ls), metricName) - err := curMF.addSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, val) + curMF, existing := t.getOrCreateMetricFamily(getScopeID(ls), metricName) + + if t.enableNativeHistograms && curMF.mtype == pmetric.MetricTypeExponentialHistogram { + // If a histogram has both classic and native version, the native histogram is scraped + // first. Getting a float sample for the same series means that `scrape_classic_histogram` + // is set to true in the scrape config. In this case, we should ignore the native histogram. + curMF.mtype = pmetric.MetricTypeHistogram + } + + seriesRef := t.getSeriesRef(ls, curMF.mtype) + err := curMF.addSeries(seriesRef, metricName, ls, atMs, val) if err != nil { - t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + // Handle special case of float sample indicating staleness of native + // histogram. This is similar to how Prometheus handles it, but we + // don't have access to the previous value so we're applying some + // heuristics to figure out if this is native histogram or not. + // The metric type will indicate histogram, but presumably there will be no + // _bucket, _count, _sum suffix or `le` label, which makes addSeries fail + // with errEmptyLeLabel. + if t.enableNativeHistograms && errors.Is(err, errEmptyLeLabel) && !existing && value.IsStaleNaN(val) && curMF.mtype == pmetric.MetricTypeHistogram { + mg := curMF.loadMetricGroupOrCreate(seriesRef, ls, atMs) + curMF.mtype = pmetric.MetricTypeExponentialHistogram + mg.mtype = pmetric.MetricTypeExponentialHistogram + _ = curMF.addExponentialHistogramSeries(seriesRef, metricName, ls, atMs, &histogram.Histogram{Sum: math.Float64frombits(value.StaleNaN)}, nil) + // ignore errors here, this is best effort. + } else { + t.logger.Warn("failed to add datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + } } return 0, nil // never return errors, as that fails the whole scrape } -func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) *metricFamily { +// getOrCreateMetricFamily returns the metric family for the given metric name and scope, +// and true if an existing family was found. +func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) (*metricFamily, bool) { _, ok := t.families[scope] if !ok { t.families[scope] = make(map[string]*metricFamily) @@ -169,9 +200,10 @@ func (t *transaction) getOrCreateMetricFamily(scope scopeID, mn string) *metricF } else { curMf = newMetricFamily(mn, t.mc, t.logger) t.families[scope][curMf.name] = curMf + return curMf, false } } - return curMf + return curMf, true } func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) { @@ -198,13 +230,74 @@ func (t *transaction) AppendExemplar(_ storage.SeriesRef, l labels.Labels, e exe return 0, errMetricNameNotFound } - mf := t.getOrCreateMetricFamily(getScopeID(l), mn) + mf, _ := t.getOrCreateMetricFamily(getScopeID(l), mn) mf.addExemplar(t.getSeriesRef(l, mf.mtype), e) return 0, nil } -func (t *transaction) AppendHistogram(_ storage.SeriesRef, _ labels.Labels, _ int64, _ *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) { +func (t *transaction) AppendHistogram(_ storage.SeriesRef, ls labels.Labels, atMs int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if !t.enableNativeHistograms { + return 0, nil + } + + select { + case <-t.ctx.Done(): + return 0, errTransactionAborted + default: + } + + if t.externalLabels.Len() != 0 { + b := labels.NewBuilder(ls) + t.externalLabels.Range(func(l labels.Label) { + b.Set(l.Name, l.Value) + }) + ls = b.Labels() + } + + if t.isNew { + if err := t.initTransaction(ls); err != nil { + return 0, err + } + } + + // Any datapoint with duplicate labels MUST be rejected per: + // * https://github.com/open-telemetry/wg-prometheus/issues/44 + // * https://github.com/open-telemetry/opentelemetry-collector/issues/3407 + // as Prometheus rejects such too as of version 2.16.0, released on 2020-02-13. + if dupLabel, hasDup := ls.HasDuplicateLabelNames(); hasDup { + return 0, fmt.Errorf("invalid sample: non-unique label names: %q", dupLabel) + } + + metricName := ls.Get(model.MetricNameLabel) + if metricName == "" { + return 0, errMetricNameNotFound + } + + // The `up`, `target_info`, `otel_scope_info` metrics should never generate native histograms, + // thus we don't check for them here as opposed to the Append function. + + curMF, existing := t.getOrCreateMetricFamily(getScopeID(ls), metricName) + if !existing { + curMF.mtype = pmetric.MetricTypeExponentialHistogram + } else if curMF.mtype != pmetric.MetricTypeExponentialHistogram { + // Already scraped as classic histogram. + return 0, nil + } + + if h != nil && h.CounterResetHint == histogram.GaugeType || fh != nil && fh.CounterResetHint == histogram.GaugeType { + t.logger.Warn("dropping unsupported gauge histogram datapoint", zap.String("metric_name", metricName), zap.Any("labels", ls)) + } + + err := curMF.addExponentialHistogramSeries(t.getSeriesRef(ls, curMF.mtype), metricName, ls, atMs, h, fh) + if err != nil { + t.logger.Warn("failed to add histogram datapoint", zap.Error(err), zap.String("metric_name", metricName), zap.Any("labels", ls)) + } + + return 0, nil // never return errors, as that fails the whole scrape +} + +func (t *transaction) AppendCTZeroSample(_ storage.SeriesRef, _ labels.Labels, _, _ int64) (storage.SeriesRef, error) { //TODO: implement this func return 0, nil } @@ -255,14 +348,14 @@ func (t *transaction) getMetrics(resource pcommon.Resource) (pmetric.Metrics, er func getScopeID(ls labels.Labels) scopeID { var scope scopeID - for _, lbl := range ls { + ls.Range(func(lbl labels.Label) { if lbl.Name == scopeNameLabel { scope.name = lbl.Value } if lbl.Name == scopeVersionLabel { scope.version = lbl.Value } - } + }) return scope } @@ -321,33 +414,33 @@ func (t *transaction) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ met return 0, nil } -func (t *transaction) AddTargetInfo(labels labels.Labels) { +func (t *transaction) AddTargetInfo(ls labels.Labels) { attrs := t.nodeResource.Attributes() - for _, lbl := range labels { + ls.Range(func(lbl labels.Label) { if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel { - continue + return } attrs.PutStr(lbl.Name, lbl.Value) - } + }) } -func (t *transaction) addScopeInfo(labels labels.Labels) { +func (t *transaction) addScopeInfo(ls labels.Labels) { attrs := pcommon.NewMap() scope := scopeID{} - for _, lbl := range labels { + ls.Range(func(lbl labels.Label) { if lbl.Name == model.JobLabel || lbl.Name == model.InstanceLabel || lbl.Name == model.MetricNameLabel { - continue + return } if lbl.Name == scopeNameLabel { scope.name = lbl.Value - continue + return } if lbl.Name == scopeVersionLabel { scope.version = lbl.Value - continue + return } attrs.PutStr(lbl.Name, lbl.Value) - } + }) t.scopeAttributes[scope] = attrs } diff --git a/internal/component/otelcol/receiver/prometheus/internal/transaction_test.go b/internal/component/otelcol/receiver/prometheus/internal/transaction_test.go index 2946384be5..2956ec2e3b 100644 --- a/internal/component/otelcol/receiver/prometheus/internal/transaction_test.go +++ b/internal/component/otelcol/receiver/prometheus/internal/transaction_test.go @@ -6,14 +6,17 @@ package internal import ( "context" "errors" + "fmt" "testing" "time" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/metadata" "github.com/prometheus/prometheus/scrape" + "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -53,42 +56,89 @@ var ( ) func TestTransactionCommitWithoutAdding(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionCommitWithoutAdding(t, enableNativeHistograms) + }) + } +} + +func testTransactionCommitWithoutAdding(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) assert.NoError(t, tr.Commit()) } func TestTransactionRollbackDoesNothing(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionRollbackDoesNothing(t, enableNativeHistograms) + }) + } +} + +func testTransactionRollbackDoesNothing(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) assert.NoError(t, tr.Rollback()) } func TestTransactionUpdateMetadataDoesNothing(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionUpdateMetadataDoesNothing(t, enableNativeHistograms) + }) + } +} + +func testTransactionUpdateMetadataDoesNothing(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.UpdateMetadata(0, labels.New(), metadata.Metadata{}) assert.NoError(t, err) } func TestTransactionAppendNoTarget(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendNoTarget(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendNoTarget(t *testing.T, enableNativeHistograms bool) { badLabels := labels.FromStrings(model.MetricNameLabel, "counter_test") - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, badLabels, time.Now().Unix()*1000, 1.0) assert.Error(t, err) } func TestTransactionAppendNoMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendNoMetricName(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendNoMetricName(t *testing.T, enableNativeHistograms bool) { jobNotFoundLb := labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", }) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, jobNotFoundLb, time.Now().Unix()*1000, 1.0) assert.ErrorIs(t, err, errMetricNameNotFound) - assert.ErrorIs(t, tr.Commit(), errNoDataToBuild) } func TestTransactionAppendEmptyMetricName(t *testing.T) { - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendEmptyMetricName(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendEmptyMetricName(t *testing.T, enableNativeHistograms bool) { + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, consumertest.NewNop(), labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test2", @@ -98,8 +148,16 @@ func TestTransactionAppendEmptyMetricName(t *testing.T) { } func TestTransactionAppendResource(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendResource(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendResource(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -121,8 +179,16 @@ func TestTransactionAppendResource(t *testing.T) { } func TestReceiverVersionAndNameAreAttached(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testReceiverVersionAndNameAreAttached(t, enableNativeHistograms) + }) + } +} + +func testReceiverVersionAndNameAreAttached(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -143,6 +209,14 @@ func TestReceiverVersionAndNameAreAttached(t *testing.T) { } func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionCommitErrorWhenAdjusterError(t, enableNativeHistograms) + }) + } +} + +func testTransactionCommitErrorWhenAdjusterError(t *testing.T, enableNativeHistograms bool) { goodLabels := labels.FromMap(map[string]string{ model.InstanceLabel: "localhost:8080", model.JobLabel: "test", @@ -150,7 +224,7 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { }) sink := new(consumertest.MetricsSink) adjusterErr := errors.New("adjuster error") - tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &errorAdjuster{err: adjusterErr}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) _, err := tr.Append(0, goodLabels, time.Now().Unix()*1000, 1.0) assert.NoError(t, err) assert.ErrorIs(t, tr.Commit(), adjusterErr) @@ -158,8 +232,16 @@ func TestTransactionCommitErrorWhenAdjusterError(t *testing.T) { // Ensure that we reject duplicate label keys. See https://github.com/open-telemetry/wg-prometheus/issues/44. func TestTransactionAppendDuplicateLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendDuplicateLabels(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendDuplicateLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) dupLabels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -176,6 +258,14 @@ func TestTransactionAppendDuplicateLabels(t *testing.T) { } func TestTransactionAppendHistogramNoLe(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendHistogramNoLe(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendHistogramNoLe(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -184,10 +274,11 @@ func TestTransactionAppendHistogramNoLe(t *testing.T) { scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, - nil, + labels.EmptyLabels(), receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) goodLabels := labels.FromStrings( @@ -206,6 +297,14 @@ func TestTransactionAppendHistogramNoLe(t *testing.T) { } func TestTransactionAppendSummaryNoQuantile(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendSummaryNoQuantile(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendSummaryNoQuantile(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -214,10 +313,11 @@ func TestTransactionAppendSummaryNoQuantile(t *testing.T) { scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, - nil, + labels.EmptyLabels(), receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) goodLabels := labels.FromStrings( @@ -236,6 +336,14 @@ func TestTransactionAppendSummaryNoQuantile(t *testing.T) { } func TestTransactionAppendValidAndInvalid(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testTransactionAppendValidAndInvalid(t, enableNativeHistograms) + }) + } +} + +func testTransactionAppendValidAndInvalid(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) receiverSettings := receivertest.NewNopCreateSettings() core, observedLogs := observer.New(zap.InfoLevel) @@ -244,10 +352,11 @@ func TestTransactionAppendValidAndInvalid(t *testing.T) { scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, - nil, + labels.EmptyLabels(), receiverSettings, nopObsRecv(t), false, + enableNativeHistograms, ) // a valid counter @@ -281,8 +390,16 @@ func TestTransactionAppendValidAndInvalid(t *testing.T) { } func TestAppendExemplarWithNoMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithNoMetricName(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithNoMetricName(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -294,8 +411,16 @@ func TestAppendExemplarWithNoMetricName(t *testing.T) { } func TestAppendExemplarWithEmptyMetricName(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithEmptyMetricName(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithEmptyMetricName(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -307,8 +432,16 @@ func TestAppendExemplarWithEmptyMetricName(t *testing.T) { } func TestAppendExemplarWithDuplicateLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithDuplicateLabels(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithDuplicateLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -323,8 +456,16 @@ func TestAppendExemplarWithDuplicateLabels(t *testing.T) { } func TestAppendExemplarWithoutAddingMetric(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithoutAddingMetric(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithoutAddingMetric(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) labels := labels.FromStrings( model.InstanceLabel, "0.0.0.0:8855", @@ -337,24 +478,40 @@ func TestAppendExemplarWithoutAddingMetric(t *testing.T) { } func TestAppendExemplarWithNoLabels(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithNoLabels(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithNoLabels(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) - _, err := tr.AppendExemplar(0, nil, exemplar.Exemplar{Value: 0}) + _, err := tr.AppendExemplar(0, labels.EmptyLabels(), exemplar.Exemplar{Value: 0}) assert.Equal(t, errNoJobInstance, err) } func TestAppendExemplarWithEmptyLabelArray(t *testing.T) { + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("enableNativeHistograms=%v", enableNativeHistograms), func(t *testing.T) { + testAppendExemplarWithEmptyLabelArray(t, enableNativeHistograms) + }) + } +} + +func testAppendExemplarWithEmptyLabelArray(t *testing.T, enableNativeHistograms bool) { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) - _, err := tr.AppendExemplar(0, []labels.Label{}, exemplar.Exemplar{Value: 0}) + _, err := tr.AppendExemplar(0, labels.FromStrings(), exemplar.Exemplar{Value: 0}) assert.Equal(t, errNoJobInstance, err) } func nopObsRecv(t *testing.T) *receiverhelper.ObsReport { obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: component.NewID(component.MustNewType("prometheus")), + ReceiverID: component.MustNewID("prometheus"), Transport: transport, ReceiverCreateSettings: receivertest.NewNopCreateSettings(), }) @@ -402,22 +559,22 @@ func TestMetricBuilderCounters(t *testing.T) { { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: model.MetricNameLabel, Value: "counter_test"}, {Name: model.JobLabel, Value: "job"}, {Name: model.InstanceLabel, Value: "instance"}, {Name: "foo", Value: "bar"}}, + Labels: labels.New([]labels.Label{{Name: model.MetricNameLabel, Value: "counter_test"}, {Name: model.JobLabel, Value: "job"}, {Name: model.InstanceLabel, Value: "instance"}, {Name: "foo", Value: "bar"}}...), }, { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: ""}, {Name: "span_id", Value: ""}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: ""}, {Name: "span_id", Value: ""}}...), }, { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "10a47365b8aa04e08291fab9deca84db6170"}, {Name: "span_id", Value: "719cee4a669fd7d109ff"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "10a47365b8aa04e08291fab9deca84db6170"}, {Name: "span_id", Value: "719cee4a669fd7d109ff"}}...), }, { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc880"}, {Name: "span_id", Value: "dfa4597a9d"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc880"}, {Name: "span_id", Value: "dfa4597a9d"}}...), }, }, "foo", "bar"), @@ -442,9 +599,9 @@ func TestMetricBuilderCounters(t *testing.T) { e0.SetTimestamp(timestampFromMs(1663113420863)) e0.SetDoubleValue(1) e0.FilteredAttributes().PutStr(model.MetricNameLabel, "counter_test") - e0.FilteredAttributes().PutStr(model.JobLabel, "job") - e0.FilteredAttributes().PutStr(model.InstanceLabel, "instance") e0.FilteredAttributes().PutStr("foo", "bar") + e0.FilteredAttributes().PutStr(model.InstanceLabel, "instance") + e0.FilteredAttributes().PutStr(model.JobLabel, "job") e1 := pt0.Exemplars().AppendEmpty() e1.SetTimestamp(timestampFromMs(1663113420863)) @@ -575,9 +732,11 @@ func TestMetricBuilderCounters(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -635,22 +794,22 @@ func TestMetricBuilderGauges(t *testing.T) { { Value: 2, Ts: 1663350815890, - Labels: []labels.Label{{Name: model.MetricNameLabel, Value: "counter_test"}, {Name: model.JobLabel, Value: "job"}, {Name: model.InstanceLabel, Value: "instance"}, {Name: "foo", Value: "bar"}}, + Labels: labels.New([]labels.Label{{Name: model.MetricNameLabel, Value: "counter_test"}, {Name: model.JobLabel, Value: "job"}, {Name: model.InstanceLabel, Value: "instance"}, {Name: "foo", Value: "bar"}}...), }, { Value: 2, Ts: 1663350815890, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: ""}, {Name: "span_id", Value: ""}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: ""}, {Name: "span_id", Value: ""}}...), }, { Value: 2, Ts: 1663350815890, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "10a47365b8aa04e08291fab9deca84db6170"}, {Name: "span_id", Value: "719cee4a669fd7d109ff"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "10a47365b8aa04e08291fab9deca84db6170"}, {Name: "span_id", Value: "719cee4a669fd7d109ff"}}...), }, { Value: 2, Ts: 1663350815890, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc880"}, {Name: "span_id", Value: "dfa4597a9d"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc880"}, {Name: "span_id", Value: "dfa4597a9d"}}...), }, }, "foo", "bar"), @@ -678,9 +837,9 @@ func TestMetricBuilderGauges(t *testing.T) { e0.SetTimestamp(timestampFromMs(1663350815890)) e0.SetDoubleValue(2) e0.FilteredAttributes().PutStr(model.MetricNameLabel, "counter_test") - e0.FilteredAttributes().PutStr(model.JobLabel, "job") - e0.FilteredAttributes().PutStr(model.InstanceLabel, "instance") e0.FilteredAttributes().PutStr("foo", "bar") + e0.FilteredAttributes().PutStr(model.InstanceLabel, "instance") + e0.FilteredAttributes().PutStr(model.JobLabel, "job") e1 := pt0.Exemplars().AppendEmpty() e1.SetTimestamp(timestampFromMs(1663350815890)) @@ -798,9 +957,11 @@ func TestMetricBuilderGauges(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -894,9 +1055,11 @@ func TestMetricBuilderUntyped(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -946,27 +1109,27 @@ func TestMetricBuilderHistogram(t *testing.T) { { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: model.MetricNameLabel, Value: "counter_test"}, {Name: model.JobLabel, Value: "job"}, {Name: model.InstanceLabel, Value: "instance"}, {Name: "foo", Value: "bar"}}, + Labels: labels.New([]labels.Label{{Name: model.MetricNameLabel, Value: "counter_test"}, {Name: model.JobLabel, Value: "job"}, {Name: model.InstanceLabel, Value: "instance"}, {Name: "foo", Value: "bar"}}...), }, { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: ""}, {Name: "span_id", Value: ""}, {Name: "le", Value: "20"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: ""}, {Name: "span_id", Value: ""}, {Name: "le", Value: "20"}}...), }, { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "10a47365b8aa04e08291fab9deca84db6170"}, {Name: "traceid", Value: "e3688e1aa2961786"}, {Name: "span_id", Value: "719cee4a669fd7d109ff"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "10a47365b8aa04e08291fab9deca84db6170"}, {Name: "traceid", Value: "e3688e1aa2961786"}, {Name: "span_id", Value: "719cee4a669fd7d109ff"}}...), }, { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc880"}, {Name: "span_id", Value: "dfa4597a9d"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc880"}, {Name: "span_id", Value: "dfa4597a9d"}}...), }, { Value: 1, Ts: 1663113420863, - Labels: []labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc88"}, {Name: "span_id", Value: "dfa4597a9"}}, + Labels: labels.New([]labels.Label{{Name: "foo", Value: "bar"}, {Name: "trace_id", Value: "174137cab66dc88"}, {Name: "span_id", Value: "dfa4597a9"}}...), }, }, "foo", "bar", "le", "10"), @@ -997,9 +1160,9 @@ func TestMetricBuilderHistogram(t *testing.T) { e0.SetTimestamp(timestampFromMs(1663113420863)) e0.SetDoubleValue(1) e0.FilteredAttributes().PutStr(model.MetricNameLabel, "counter_test") - e0.FilteredAttributes().PutStr(model.JobLabel, "job") - e0.FilteredAttributes().PutStr(model.InstanceLabel, "instance") e0.FilteredAttributes().PutStr("foo", "bar") + e0.FilteredAttributes().PutStr(model.InstanceLabel, "instance") + e0.FilteredAttributes().PutStr(model.JobLabel, "job") e1 := pt0.Exemplars().AppendEmpty() e1.SetTimestamp(timestampFromMs(1663113420863)) @@ -1026,8 +1189,8 @@ func TestMetricBuilderHistogram(t *testing.T) { e4.SetTimestamp(timestampFromMs(1663113420863)) e4.SetDoubleValue(1) e4.FilteredAttributes().PutStr("foo", "bar") - e4.FilteredAttributes().PutStr("trace_id", "174137cab66dc88") e4.FilteredAttributes().PutStr("span_id", "dfa4597a9") + e4.FilteredAttributes().PutStr("trace_id", "174137cab66dc88") return []pmetric.Metrics{md0} }, @@ -1312,9 +1475,12 @@ func TestMetricBuilderHistogram(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{true, false} { + // None of the histograms above have native histogram versions, so enabling native hisotgrams has no effect. + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -1454,9 +1620,109 @@ func TestMetricBuilderSummary(t *testing.T) { } for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.run(t) - }) + for _, enableNativeHistograms := range []bool{false, true} { + t.Run(fmt.Sprintf("%s/enableNativeHistograms=%v", tt.name, enableNativeHistograms), func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } + } + +} + +func TestMetricBuilderNativeHistogram(t *testing.T) { + for _, enableNativeHistograms := range []bool{false, true} { + emptyH := &histogram.Histogram{ + Schema: 1, + Count: 0, + Sum: 0, + ZeroThreshold: 0.001, + ZeroCount: 0, + } + h0 := tsdbutil.GenerateTestHistogram(0) + + tests := []buildTestData{ + { + name: "empty integer histogram", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createHistogramDataPoint("hist_test", emptyH, nil, nil, "foo", "bar"), + }, + }, + }, + wants: func() []pmetric.Metrics { + md0 := pmetric.NewMetrics() + if !enableNativeHistograms { + return []pmetric.Metrics{md0} + } + mL0 := md0.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + m0 := mL0.AppendEmpty() + m0.SetName("hist_test") + m0.SetEmptyExponentialHistogram() + m0.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + pt0 := m0.ExponentialHistogram().DataPoints().AppendEmpty() + pt0.Attributes().PutStr("foo", "bar") + pt0.SetStartTimestamp(startTimestamp) + pt0.SetTimestamp(tsNanos) + pt0.SetCount(0) + pt0.SetSum(0) + pt0.SetZeroThreshold(0.001) + pt0.SetScale(1) + + return []pmetric.Metrics{md0} + }, + }, + { + name: "integer histogram", + inputs: []*testScrapedPage{ + { + pts: []*testDataPoint{ + createHistogramDataPoint("hist_test", h0, nil, nil, "foo", "bar"), + }, + }, + }, + wants: func() []pmetric.Metrics { + md0 := pmetric.NewMetrics() + if !enableNativeHistograms { + return []pmetric.Metrics{md0} + } + mL0 := md0.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics() + m0 := mL0.AppendEmpty() + m0.SetName("hist_test") + m0.SetEmptyExponentialHistogram() + m0.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + pt0 := m0.ExponentialHistogram().DataPoints().AppendEmpty() + pt0.Attributes().PutStr("foo", "bar") + pt0.SetStartTimestamp(startTimestamp) + pt0.SetTimestamp(tsNanos) + pt0.SetCount(12) + pt0.SetSum(18.4) + pt0.SetScale(1) + pt0.SetZeroThreshold(0.001) + pt0.SetZeroCount(2) + pt0.Positive().SetOffset(-1) + pt0.Positive().BucketCounts().Append(1) + pt0.Positive().BucketCounts().Append(2) + pt0.Positive().BucketCounts().Append(0) + pt0.Positive().BucketCounts().Append(1) + pt0.Positive().BucketCounts().Append(1) + pt0.Negative().SetOffset(-1) + pt0.Negative().BucketCounts().Append(1) + pt0.Negative().BucketCounts().Append(2) + pt0.Negative().BucketCounts().Append(0) + pt0.Negative().BucketCounts().Append(1) + pt0.Negative().BucketCounts().Append(1) + + return []pmetric.Metrics{md0} + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.run(t, enableNativeHistograms) + }) + } } } @@ -1466,17 +1732,25 @@ type buildTestData struct { wants func() []pmetric.Metrics } -func (tt buildTestData) run(t *testing.T) { +func (tt buildTestData) run(t *testing.T, enableNativeHistograms bool) { wants := tt.wants() assert.EqualValues(t, len(wants), len(tt.inputs)) st := ts for i, page := range tt.inputs { sink := new(consumertest.MetricsSink) - tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, nil, receivertest.NewNopCreateSettings(), nopObsRecv(t), false) + tr := newTransaction(scrapeCtx, &startTimeAdjuster{startTime: startTimestamp}, sink, labels.EmptyLabels(), receivertest.NewNopCreateSettings(), nopObsRecv(t), false, enableNativeHistograms) for _, pt := range page.pts { // set ts for testing pt.t = st - _, err := tr.Append(0, pt.lb, pt.t, pt.v) + var err error + switch { + case pt.fh != nil: + _, err = tr.AppendHistogram(0, pt.lb, pt.t, nil, pt.fh) + case pt.h != nil: + _, err = tr.AppendHistogram(0, pt.lb, pt.t, pt.h, nil) + default: + _, err = tr.Append(0, pt.lb, pt.t, pt.v) + } assert.NoError(t, err) for _, e := range pt.exemplars { @@ -1533,7 +1807,12 @@ func (s *startTimeAdjuster) AdjustMetrics(metrics pmetric.Metrics) error { for l := 0; l < dps.Len(); l++ { dps.At(l).SetStartTimestamp(s.startTime) } - case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram: + case pmetric.MetricTypeExponentialHistogram: + dps := metric.ExponentialHistogram().DataPoints() + for l := 0; l < dps.Len(); l++ { + dps.At(l).SetStartTimestamp(s.startTime) + } + case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge: } } } @@ -1545,6 +1824,8 @@ type testDataPoint struct { lb labels.Labels t int64 v float64 + h *histogram.Histogram + fh *histogram.FloatHistogram exemplars []exemplar.Exemplar } @@ -1567,6 +1848,13 @@ func createDataPoint(mname string, value float64, es []exemplar.Exemplar, tagPai } } +func createHistogramDataPoint(mname string, h *histogram.Histogram, fh *histogram.FloatHistogram, es []exemplar.Exemplar, tagPairs ...string) *testDataPoint { + dataPoint := createDataPoint(mname, 0, es, tagPairs...) + dataPoint.h = h + dataPoint.fh = fh + return dataPoint +} + func assertEquivalentMetrics(t *testing.T, want, got pmetric.Metrics) { require.Equal(t, want.ResourceMetrics().Len(), got.ResourceMetrics().Len()) if want.ResourceMetrics().Len() == 0 { @@ -1597,4 +1885,5 @@ func assertEquivalentMetrics(t *testing.T, want, got pmetric.Metrics) { assert.EqualValues(t, wmap, gmap) } } + } diff --git a/internal/component/otelcol/receiver/prometheus/prometheus.go b/internal/component/otelcol/receiver/prometheus/prometheus.go index fcdf7de4cd..29b659c223 100644 --- a/internal/component/otelcol/receiver/prometheus/prometheus.go +++ b/internal/component/otelcol/receiver/prometheus/prometheus.go @@ -112,6 +112,8 @@ func (c *Component) Update(newConfig component.Arguments) error { // When supported, this could be added as an arg. trimMetricSuffixes = false + enableNativeHistograms = c.opts.MinStability.Permits(featuregate.StabilityPublicPreview) + gcInterval = 5 * time.Minute ) @@ -147,6 +149,7 @@ func (c *Component) Update(newConfig component.Arguments) error { useStartTimeMetric, startTimeMetricRegex, useCreatedMetric, + enableNativeHistograms, labels.Labels{}, trimMetricSuffixes, )