From 5a6a897c26ab12f5f8dbef3ad237322b7b4f5fd2 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 1 Oct 2024 18:38:00 +0000 Subject: [PATCH] ExemplarProvider takes attributes as an argument --- sdk/metric/exemplar/fixed_size_reservoir.go | 2 +- sdk/metric/exemplar/histogram_reservoir.go | 2 +- sdk/metric/exemplar/reservoir.go | 7 ++++++- sdk/metric/internal/aggregate/aggregate.go | 4 ++-- sdk/metric/internal/aggregate/aggregate_test.go | 4 ++-- sdk/metric/internal/aggregate/drop.go | 4 +++- sdk/metric/internal/aggregate/drop_test.go | 3 ++- sdk/metric/internal/aggregate/exponential_histogram.go | 6 +++--- sdk/metric/internal/aggregate/histogram.go | 8 ++++---- sdk/metric/internal/aggregate/lastvalue.go | 8 ++++---- sdk/metric/internal/aggregate/sum.go | 10 +++++----- 11 files changed, 33 insertions(+), 25 deletions(-) diff --git a/sdk/metric/exemplar/fixed_size_reservoir.go b/sdk/metric/exemplar/fixed_size_reservoir.go index a10685a3c49..d4aab0aad4f 100644 --- a/sdk/metric/exemplar/fixed_size_reservoir.go +++ b/sdk/metric/exemplar/fixed_size_reservoir.go @@ -14,7 +14,7 @@ import ( // FixedSizeReservoirProvider returns a provider of [FixedSizeReservoir]. func FixedSizeReservoirProvider(k int) ReservoirProvider { - return func() Reservoir { + return func(_ attribute.Set) Reservoir { return NewFixedSizeReservoir(k) } } diff --git a/sdk/metric/exemplar/histogram_reservoir.go b/sdk/metric/exemplar/histogram_reservoir.go index 7791d5136b3..3b76cf305a4 100644 --- a/sdk/metric/exemplar/histogram_reservoir.go +++ b/sdk/metric/exemplar/histogram_reservoir.go @@ -16,7 +16,7 @@ import ( func HistogramReservoirProvider(bounds []float64) ReservoirProvider { cp := slices.Clone(bounds) slices.Sort(cp) - return func() Reservoir { + return func(_ attribute.Set) Reservoir { return NewHistogramReservoir(cp) } } diff --git a/sdk/metric/exemplar/reservoir.go b/sdk/metric/exemplar/reservoir.go index 1273f5b9b08..ba5cd1a6b3d 100644 --- a/sdk/metric/exemplar/reservoir.go +++ b/sdk/metric/exemplar/reservoir.go @@ -32,4 +32,9 @@ type Reservoir interface { } // ReservoirProvider creates new [Reservoir]s. -type ReservoirProvider func() Reservoir +// +// The attributes provided are attributes which are kept by the aggregation, and +// are exclusive with attributes passed to Offer. The combination of these +// attributes and the attributes passed to Offer is the complete set of +// attributes a measurement was made with. +type ReservoirProvider func(attr attribute.Set) Reservoir diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 25de3b05086..fde21933389 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -38,7 +38,7 @@ type Builder[N int64 | float64] struct { // // If this is not provided a default factory function that returns an // dropReservoir reservoir will be used. - ReservoirFunc func() FilteredExemplarReservoir[N] + ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -49,7 +49,7 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] { +func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] { if b.ReservoirFunc != nil { return b.ReservoirFunc } diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index c7e242c041f..b0034010861 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -72,8 +72,8 @@ func (c *clock) Register() (unregister func()) { return func() { now = orig } } -func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] { - return dropReservoir[N]() +func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] { + return dropReservoir[N](attr) } func TestBuilderFilter(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/drop.go b/sdk/metric/internal/aggregate/drop.go index dfc5a033395..76d52839b60 100644 --- a/sdk/metric/internal/aggregate/drop.go +++ b/sdk/metric/internal/aggregate/drop.go @@ -11,7 +11,9 @@ import ( ) // dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered. -func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} } +func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] { + return &dropRes[N]{} +} type dropRes[N int64 | float64] struct{} diff --git a/sdk/metric/internal/aggregate/drop_test.go b/sdk/metric/internal/aggregate/drop_test.go index fee90adc777..38781e4c6ad 100644 --- a/sdk/metric/internal/aggregate/drop_test.go +++ b/sdk/metric/internal/aggregate/drop_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/exemplar" ) @@ -17,7 +18,7 @@ func TestDrop(t *testing.T) { } func testDropFiltered[N int64 | float64](t *testing.T) { - r := dropReservoir[N]() + r := dropReservoir[N](*attribute.EmptySet()) var dest []exemplar.Exemplar r.Collect(&dest) diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index a4de5674ba1..b7aa721651e 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int32 - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex @@ -327,7 +327,7 @@ func (e *expoHistogram[N]) measure(ctx context.Context, value N, fltrAttr attrib v, ok := e.values[attr.Equivalent()] if !ok { v = newExpoHistogramDataPoint[N](attr, e.maxSize, e.maxScale, e.noMinMax, e.noSum) - v.res = e.newRes() + v.res = e.newRes(attr) e.values[attr.Equivalent()] = v } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index 35d020378bd..d577ae2c198 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -93,7 +93,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // // buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞) b = newBuckets[N](attr, len(s.bounds)+1) - b.res = s.newRes() + b.res = s.newRes(attr) // Ensure min and max are recorded values (not zero), for new buckets. b.min, b.max = value, value @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index a7b5fe572be..d3a93f085c9 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -19,7 +19,7 @@ type datapoint[N int64 | float64] struct { res FilteredExemplarReservoir[N] } -func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), @@ -32,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservo type lastValue[N int64 | float64] struct { sync.Mutex - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] start time.Time @@ -45,7 +45,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. attr := s.limit.Attributes(fltrAttr, s.values) d, ok := s.values[attr.Equivalent()] if !ok { - d.res = s.newRes() + d.res = s.newRes(attr) } d.attrs = attr @@ -114,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in // newPrecomputedLastValue returns an aggregator that summarizes a set of // observations as the last one made. -func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] { +func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] { return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index c3b591c37c0..8e132ad6181 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -21,12 +21,12 @@ type sumValue[N int64 | float64] struct { // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func() FilteredExemplarReservoir[N] + newRes func(attribute.Set) FilteredExemplarReservoir[N] limit limiter[sumValue[N]] values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), @@ -41,7 +41,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S attr := s.limit.Attributes(fltrAttr, s.values) v, ok := s.values[attr.Equivalent()] if !ok { - v.res = s.newRes() + v.res = s.newRes(attr) } v.attrs = attr @@ -54,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -143,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observations as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func() FilteredExemplarReservoir[N]) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic,