From 45979d88b1e4db98e7bbe52d4a3c8ff162070b04 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 18:34:31 +0000 Subject: [PATCH 1/7] move filtered exemplar reservoir into aggregate --- sdk/metric/exemplar.go | 10 ---------- sdk/metric/internal/aggregate/aggregate.go | 19 +++++++++++-------- sdk/metric/pipeline.go | 5 +++-- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index 0335b8ae48e..36a0790e2c8 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -6,9 +6,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "runtime" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/exemplar" - "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" ) // ExemplarReservoirProviderSelector selects the @@ -16,14 +14,6 @@ import ( // based on the [Aggregation] of the metric. type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider -// reservoirFunc returns the appropriately configured exemplar reservoir -// creation func based on the passed InstrumentKind and filter configuration. -func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] { - return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] { - return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs)) - } -} - // DefaultExemplarReservoirProviderSelector returns the default // [exemplar.ReservoirProvider] for the // provided [Aggregation]. diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index fde21933389..032de53b0b8 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -8,6 +8,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -33,12 +34,12 @@ type Builder[N int64 | float64] struct { // Filter is the attribute filter the aggregate function will use on the // input of measurements. Filter attribute.Filter - // ReservoirFunc is the factory function used by aggregate functions to - // create new exemplar reservoirs for a new seen attribute set. - // - // If this is not provided a default factory function that returns an - // dropReservoir reservoir will be used. - ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N] + // ExemplarFilter is the filter applied to measurements before offering + // them to the exemplar Reservoir + ExemplarFilter exemplar.Filter + // ExemplarReservoirProvider is the factory function used to create a new + // exemplar Reservoir for a given attribute set. + ExemplarReservoirProvider exemplar.ReservoirProvider // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be // aggregated into a single aggregate for the "otel.metric.overflow" @@ -50,8 +51,10 @@ type Builder[N int64 | float64] struct { } func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] { - if b.ReservoirFunc != nil { - return b.ReservoirFunc + if b.ExemplarFilter != nil { + return func(attrs attribute.Set) FilteredExemplarReservoir[N] { + return NewFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs)) + } } return dropReservoir diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 960b0e0ef01..af3fa19d684 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -354,8 +354,9 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum normID := id.normalize() cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ - Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc[N](stream.ExemplarReservoirProviderSelector(stream.Aggregation), i.pipeline.exemplarFilter), + Temporality: i.pipeline.reader.temporality(kind), + ExemplarReservoirProvider: stream.ExemplarReservoirProviderSelector(stream.Aggregation), + ExemplarFilter: i.pipeline.exemplarFilter, } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation From faebc0d76ad27b8aec51d22e9fa77d3557cfcaa5 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 18:38:43 +0000 Subject: [PATCH 2/7] newFilteredExemplarReservoir is no longer exported --- sdk/metric/internal/aggregate/aggregate.go | 2 +- sdk/metric/internal/aggregate/filtered_reservoir.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 032de53b0b8..95ad201d736 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -53,7 +53,7 @@ type Builder[N int64 | float64] struct { func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] { if b.ExemplarFilter != nil { return func(attrs attribute.Set) FilteredExemplarReservoir[N] { - return NewFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs)) + return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs)) } } diff --git a/sdk/metric/internal/aggregate/filtered_reservoir.go b/sdk/metric/internal/aggregate/filtered_reservoir.go index 691a910608d..dcdb325df28 100644 --- a/sdk/metric/internal/aggregate/filtered_reservoir.go +++ b/sdk/metric/internal/aggregate/filtered_reservoir.go @@ -31,9 +31,9 @@ type filteredExemplarReservoir[N int64 | float64] struct { reservoir exemplar.Reservoir } -// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values +// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values // that are allowed by the filter. -func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] { +func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] { return &filteredExemplarReservoir[N]{ filter: f, reservoir: r, From c9cb3b49091149fc5f02db9348c2e6791a4533c8 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 19:03:02 +0000 Subject: [PATCH 3/7] remove unneccessary FilteredExemplarReservoir interface --- sdk/metric/internal/aggregate/aggregate.go | 11 ++++---- .../internal/aggregate/aggregate_test.go | 5 ++-- sdk/metric/internal/aggregate/drop.go | 26 ------------------ sdk/metric/internal/aggregate/drop_test.go | 27 ------------------- .../aggregate/exponential_histogram.go | 6 ++--- .../internal/aggregate/filtered_reservoir.go | 23 ++++------------ sdk/metric/internal/aggregate/histogram.go | 8 +++--- sdk/metric/internal/aggregate/lastvalue.go | 8 +++--- sdk/metric/internal/aggregate/sum.go | 10 +++---- 9 files changed, 29 insertions(+), 95 deletions(-) delete mode 100644 sdk/metric/internal/aggregate/drop.go delete mode 100644 sdk/metric/internal/aggregate/drop_test.go diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 95ad201d736..f821e0dcad5 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -50,14 +50,13 @@ type Builder[N int64 | float64] struct { AggregationLimit int } -func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] { - if b.ExemplarFilter != nil { - return func(attrs attribute.Set) FilteredExemplarReservoir[N] { - return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs)) +func (b Builder[N]) resFunc() func(attribute.Set) *filteredExemplarReservoir[N] { + return func(attrs attribute.Set) *filteredExemplarReservoir[N] { + if b.ExemplarReservoirProvider == nil { + return newFilteredExemplarReservoir[N](b.ExemplarFilter, exemplar.NewFixedSizeReservoir(0)) } + return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs)) } - - return dropReservoir } type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index b0034010861..c426f1caa74 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) @@ -72,8 +73,8 @@ func (c *clock) Register() (unregister func()) { return func() { now = orig } } -func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] { - return dropReservoir[N](attr) +func dropExemplars[N int64 | float64](attr attribute.Set) *filteredExemplarReservoir[N] { + return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, exemplar.NewFixedSizeReservoir(0)) } func TestBuilderFilter(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/drop.go b/sdk/metric/internal/aggregate/drop.go deleted file mode 100644 index 76d52839b60..00000000000 --- a/sdk/metric/internal/aggregate/drop.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" - -import ( - "context" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/exemplar" -) - -// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered. -func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] { - return &dropRes[N]{} -} - -type dropRes[N int64 | float64] struct{} - -// Offer does nothing, all measurements offered will be dropped. -func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {} - -// Collect resets dest. No exemplars will ever be returned. -func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) { - *dest = (*dest)[:0] -} diff --git a/sdk/metric/internal/aggregate/drop_test.go b/sdk/metric/internal/aggregate/drop_test.go deleted file mode 100644 index 38781e4c6ad..00000000000 --- a/sdk/metric/internal/aggregate/drop_test.go +++ /dev/null @@ -1,27 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package aggregate - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/exemplar" -) - -func TestDrop(t *testing.T) { - t.Run("Int64", testDropFiltered[int64]) - t.Run("Float64", testDropFiltered[float64]) -} - -func testDropFiltered[N int64 | float64](t *testing.T) { - r := dropReservoir[N](*attribute.EmptySet()) - - var dest []exemplar.Exemplar - r.Collect(&dest) - - assert.Empty(t, dest, "non-sampled context should not be offered") -} diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go index b7aa721651e..ee4bf50cf29 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram.go +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -30,7 +30,7 @@ const ( // expoHistogramDataPoint is a single data point in an exponential histogram. type expoHistogramDataPoint[N int64 | float64] struct { attrs attribute.Set - res FilteredExemplarReservoir[N] + res *filteredExemplarReservoir[N] count uint64 min N @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) { // newExponentialHistogram returns an Aggregator that summarizes a set of // measurements as an exponential histogram. Each histogram is scoped by attributes // and the aggregation cycle the measurements were made in. -func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] { +func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *expoHistogram[N] { return &expoHistogram[N]{ noSum: noSum, noMinMax: noMinMax, @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct { maxSize int maxScale int32 - newRes func(attribute.Set) FilteredExemplarReservoir[N] + newRes func(attribute.Set) *filteredExemplarReservoir[N] limit limiter[*expoHistogramDataPoint[N]] values map[attribute.Distinct]*expoHistogramDataPoint[N] valuesMu sync.Mutex diff --git a/sdk/metric/internal/aggregate/filtered_reservoir.go b/sdk/metric/internal/aggregate/filtered_reservoir.go index dcdb325df28..91035c9d100 100644 --- a/sdk/metric/internal/aggregate/filtered_reservoir.go +++ b/sdk/metric/internal/aggregate/filtered_reservoir.go @@ -11,29 +11,16 @@ import ( "go.opentelemetry.io/otel/sdk/metric/exemplar" ) -// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter. -type FilteredExemplarReservoir[N int64 | float64] interface { - // Offer accepts the parameters associated with a measurement. The - // parameters will be stored as an exemplar if the filter decides to - // sample the measurement. - // - // The passed ctx needs to contain any baggage or span that were active - // when the measurement was made. This information may be used by the - // Reservoir in making a sampling decision. - Offer(ctx context.Context, val N, attr []attribute.KeyValue) - // Collect returns all the held exemplars in the reservoir. - Collect(dest *[]exemplar.Exemplar) -} - // filteredExemplarReservoir handles the pre-sampled exemplar of measurements made. type filteredExemplarReservoir[N int64 | float64] struct { filter exemplar.Filter reservoir exemplar.Reservoir } -// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values -// that are allowed by the filter. -func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] { +// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which +// only offers values that are allowed by the filter. If the provided filter is +// nil, all measurements are dropped.. +func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) *filteredExemplarReservoir[N] { return &filteredExemplarReservoir[N]{ filter: f, reservoir: r, @@ -41,7 +28,7 @@ func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exempl } func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { - if f.filter(ctx) { + if f.filter != nil && f.filter(ctx) { // only record the current time if we are sampling this measurement. f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr) } diff --git a/sdk/metric/internal/aggregate/histogram.go b/sdk/metric/internal/aggregate/histogram.go index d577ae2c198..f0ce537fdfb 100644 --- a/sdk/metric/internal/aggregate/histogram.go +++ b/sdk/metric/internal/aggregate/histogram.go @@ -16,7 +16,7 @@ import ( type buckets[N int64 | float64] struct { attrs attribute.Set - res FilteredExemplarReservoir[N] + res *filteredExemplarReservoir[N] counts []uint64 count uint64 @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct { noSum bool bounds []float64 - newRes func(attribute.Set) FilteredExemplarReservoir[N] + newRes func(attribute.Set) *filteredExemplarReservoir[N] limit limiter[*buckets[N]] values map[attribute.Distinct]*buckets[N] valuesMu sync.Mutex } -func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] { +func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histValues[N] { // The responsibility of keeping all buckets correctly associated with the // passed boundaries is ultimately this type's responsibility. Make a copy // here so we can always guarantee this. Or, in the case of failure, have @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute // newHistogram returns an Aggregator that summarizes a set of measurements as // an histogram. -func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] { +func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histogram[N] { return &histogram[N]{ histValues: newHistValues[N](boundaries, noSum, limit, r), noMinMax: noMinMax, diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index d3a93f085c9..9a3656f96c2 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -16,10 +16,10 @@ import ( type datapoint[N int64 | float64] struct { attrs attribute.Set value N - res FilteredExemplarReservoir[N] + res *filteredExemplarReservoir[N] } -func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *lastValue[N] { +func newLastValue[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *lastValue[N] { return &lastValue[N]{ newRes: r, limit: newLimiter[datapoint[N]](limit), @@ -32,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredEx type lastValue[N int64 | float64] struct { sync.Mutex - newRes func(attribute.Set) FilteredExemplarReservoir[N] + newRes func(attribute.Set) *filteredExemplarReservoir[N] limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] start time.Time @@ -114,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in // newPrecomputedLastValue returns an aggregator that summarizes a set of // observations as the last one made. -func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedLastValue[N] { +func newPrecomputedLastValue[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *precomputedLastValue[N] { return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} } diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 8e132ad6181..462942a6387 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -14,19 +14,19 @@ import ( type sumValue[N int64 | float64] struct { n N - res FilteredExemplarReservoir[N] + res *filteredExemplarReservoir[N] attrs attribute.Set } // valueMap is the storage for sums. type valueMap[N int64 | float64] struct { sync.Mutex - newRes func(attribute.Set) FilteredExemplarReservoir[N] + newRes func(attribute.Set) *filteredExemplarReservoir[N] limit limiter[sumValue[N]] values map[attribute.Distinct]sumValue[N] } -func newValueMap[N int64 | float64](limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *valueMap[N] { +func newValueMap[N int64 | float64](limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *valueMap[N] { return &valueMap[N]{ newRes: r, limit: newLimiter[sumValue[N]](limit), @@ -54,7 +54,7 @@ func (s *valueMap[N]) measure(ctx context.Context, value N, fltrAttr attribute.S // newSum returns an aggregator that summarizes a set of measurements as their // arithmetic sum. Each sum is scoped by attributes and the aggregation cycle // the measurements were made in. -func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *sum[N] { +func newSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *sum[N] { return &sum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, @@ -143,7 +143,7 @@ func (s *sum[N]) cumulative(dest *metricdata.Aggregation) int { // newPrecomputedSum returns an aggregator that summarizes a set of // observations as their arithmetic sum. Each sum is scoped by attributes and // the aggregation cycle the measurements were made in. -func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *precomputedSum[N] { +func newPrecomputedSum[N int64 | float64](monotonic bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *precomputedSum[N] { return &precomputedSum[N]{ valueMap: newValueMap[N](limit, r), monotonic: monotonic, From c968dc4eca221899640d990f967b5c1e5bd016ff Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 19:29:05 +0000 Subject: [PATCH 4/7] use exemplar.AlwaysOffFilter instead of nil --- .../internal/aggregate/aggregate_test.go | 4 +-- .../aggregate/exponential_histogram_test.go | 15 +++++++--- .../internal/aggregate/filtered_reservoir.go | 2 +- .../internal/aggregate/histogram_test.go | 15 +++++++--- .../internal/aggregate/lastvalue_test.go | 11 +++++-- sdk/metric/internal/aggregate/sum_test.go | 29 ++++++++++++++----- 6 files changed, 55 insertions(+), 21 deletions(-) diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index c426f1caa74..59d72d37e08 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -100,8 +100,8 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) { } } - t.Run("NoFilter", run(Builder[N]{}, attr, nil)) - t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue})) + t.Run("NoFilter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter}, attr, nil)) + t.Run("Filter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue})) } } diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index 9ede71ac88c..deb66ccacf1 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -682,22 +683,26 @@ func BenchmarkExponentialHistogram(b *testing.B) { b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) } @@ -747,6 +752,7 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 2, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -874,6 +880,7 @@ func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 2, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ diff --git a/sdk/metric/internal/aggregate/filtered_reservoir.go b/sdk/metric/internal/aggregate/filtered_reservoir.go index 91035c9d100..a600eb1e413 100644 --- a/sdk/metric/internal/aggregate/filtered_reservoir.go +++ b/sdk/metric/internal/aggregate/filtered_reservoir.go @@ -28,7 +28,7 @@ func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exempl } func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) { - if f.filter != nil && f.filter(ctx) { + if f.filter(ctx) { // only record the current time if we are sampling this measurement. f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr) } diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 4484a33fa45..769fc07fa11 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) @@ -54,6 +55,7 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -141,6 +143,7 @@ func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -375,22 +378,26 @@ func TestDeltaHistogramReset(t *testing.T) { func BenchmarkHistogram(b *testing.B) { b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) } diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 77e0d283ba0..349ed337c50 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -7,6 +7,7 @@ import ( "context" "testing" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -39,6 +40,7 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.LastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -143,6 +145,7 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.LastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -265,6 +268,7 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedLastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -369,6 +373,7 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedLastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -469,6 +474,8 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { } func BenchmarkLastValue(b *testing.B) { - b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue)) - b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue)) + b.Run("Int64", benchmarkAggregate(Builder[int64]{ + ExemplarFilter: exemplar.AlwaysOffFilter}.PrecomputedLastValue)) + b.Run("Float64", benchmarkAggregate(Builder[float64]{ + ExemplarFilter: exemplar.AlwaysOffFilter}.PrecomputedLastValue)) } diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index bb825e18375..524af5d0407 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -7,6 +7,7 @@ import ( "context" "testing" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -44,6 +45,7 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -172,6 +174,7 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) { Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -286,6 +289,7 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -415,6 +419,7 @@ func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { Temporality: metricdata.CumulativeTemporality, Filter: attrFltr, AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -544,43 +549,51 @@ func BenchmarkSum(b *testing.B) { // performance, therefore, only monotonic=false is benchmarked here. b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.Sum(false) })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.Sum(false) })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.Sum(false) })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.Sum(false) })) b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedSum(false) })) b.Run("Precomputed/Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedSum(false) })) b.Run("Precomputed/Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedSum(false) })) b.Run("Precomputed/Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, }.PrecomputedSum(false) })) } From 48931d93962c81131515daabf4ef02dbfe0a1e78 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 19:38:16 +0000 Subject: [PATCH 5/7] use newNoopReservoir instead of nil --- sdk/metric/internal/aggregate/aggregate.go | 3 - .../internal/aggregate/aggregate_test.go | 10 ++- .../aggregate/exponential_histogram_test.go | 38 ++++++---- .../internal/aggregate/histogram_test.go | 38 ++++++---- .../internal/aggregate/lastvalue_test.go | 44 ++++++----- sdk/metric/internal/aggregate/sum_test.go | 76 +++++++++++-------- 6 files changed, 121 insertions(+), 88 deletions(-) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index f821e0dcad5..bc7fdbdafeb 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -52,9 +52,6 @@ type Builder[N int64 | float64] struct { func (b Builder[N]) resFunc() func(attribute.Set) *filteredExemplarReservoir[N] { return func(attrs attribute.Set) *filteredExemplarReservoir[N] { - if b.ExemplarReservoirProvider == nil { - return newFilteredExemplarReservoir[N](b.ExemplarFilter, exemplar.NewFixedSizeReservoir(0)) - } return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs)) } } diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index 59d72d37e08..135994065d0 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -73,8 +73,12 @@ func (c *clock) Register() (unregister func()) { return func() { now = orig } } +func newNoopReservoir(attribute.Set) exemplar.Reservoir { + return exemplar.NewFixedSizeReservoir(0) +} + func dropExemplars[N int64 | float64](attr attribute.Set) *filteredExemplarReservoir[N] { - return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, exemplar.NewFixedSizeReservoir(0)) + return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, newNoopReservoir(attr)) } func TestBuilderFilter(t *testing.T) { @@ -100,8 +104,8 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) { } } - t.Run("NoFilter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter}, attr, nil)) - t.Run("Filter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue})) + t.Run("NoFilter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir}, attr, nil)) + t.Run("Filter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir, Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue})) } } diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go index deb66ccacf1..5a8f17bdbc1 100644 --- a/sdk/metric/internal/aggregate/exponential_histogram_test.go +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -683,26 +683,30 @@ func BenchmarkExponentialHistogram(b *testing.B) { b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum) })) } @@ -749,10 +753,11 @@ func TestExponentialHistogramAggregation(t *testing.T) { func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, - AggregationLimit: 2, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 2, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -877,10 +882,11 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) { func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, - AggregationLimit: 2, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 2, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExponentialBucketHistogram(4, 20, false, false) ctx := context.Background() return test[N](in, out, []teststep[N]{ diff --git a/sdk/metric/internal/aggregate/histogram_test.go b/sdk/metric/internal/aggregate/histogram_test.go index 769fc07fa11..158c1b005dc 100644 --- a/sdk/metric/internal/aggregate/histogram_test.go +++ b/sdk/metric/internal/aggregate/histogram_test.go @@ -52,10 +52,11 @@ type conf[N int64 | float64] struct { func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -140,10 +141,11 @@ func testDeltaHist[N int64 | float64](c conf[N]) func(t *testing.T) { func testCumulativeHist[N int64 | float64](c conf[N]) func(t *testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExplicitBucketHistogram(bounds, noMinMax, c.noSum) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -378,26 +380,30 @@ func TestDeltaHistogramReset(t *testing.T) { func BenchmarkHistogram(b *testing.B) { b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.ExplicitBucketHistogram(bounds, noMinMax, false) })) } diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 349ed337c50..7bba4666f0e 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -37,10 +37,11 @@ func TestLastValue(t *testing.T) { func testDeltaLastValue[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.LastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -142,10 +143,11 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.LastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -265,10 +267,11 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedLastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -370,10 +373,11 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedLastValue() ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -475,7 +479,11 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { func BenchmarkLastValue(b *testing.B) { b.Run("Int64", benchmarkAggregate(Builder[int64]{ - ExemplarFilter: exemplar.AlwaysOffFilter}.PrecomputedLastValue)) + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, + }.PrecomputedLastValue)) b.Run("Float64", benchmarkAggregate(Builder[float64]{ - ExemplarFilter: exemplar.AlwaysOffFilter}.PrecomputedLastValue)) + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, + }.PrecomputedLastValue)) } diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index 524af5d0407..fcfd5c539ce 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -42,10 +42,11 @@ func TestSum(t *testing.T) { func testDeltaSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -171,10 +172,11 @@ func testDeltaSum[N int64 | float64]() func(t *testing.T) { func testCumulativeSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.Sum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -286,10 +288,11 @@ func testCumulativeSum[N int64 | float64]() func(t *testing.T) { func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.DeltaTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -416,10 +419,11 @@ func testDeltaPrecomputedSum[N int64 | float64]() func(t *testing.T) { func testCumulativePrecomputedSum[N int64 | float64]() func(t *testing.T) { mono := false in, out := Builder[N]{ - Temporality: metricdata.CumulativeTemporality, - Filter: attrFltr, - AggregationLimit: 3, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedSum(mono) ctx := context.Background() return test[N](in, out, []teststep[N]{ @@ -549,51 +553,59 @@ func BenchmarkSum(b *testing.B) { // performance, therefore, only monotonic=false is benchmarked here. b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.Sum(false) })) b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.Sum(false) })) b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.Sum(false) })) b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.Sum(false) })) b.Run("Precomputed/Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedSum(false) })) b.Run("Precomputed/Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { return Builder[int64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedSum(false) })) b.Run("Precomputed/Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.CumulativeTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.CumulativeTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedSum(false) })) b.Run("Precomputed/Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { return Builder[float64]{ - Temporality: metricdata.DeltaTemporality, - ExemplarFilter: exemplar.AlwaysOffFilter, + Temporality: metricdata.DeltaTemporality, + ExemplarFilter: exemplar.AlwaysOffFilter, + ExemplarReservoirProvider: newNoopReservoir, }.PrecomputedSum(false) })) } From 44343af5530c0d5957159d1947e488ab0fc6594a Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 19:47:57 +0000 Subject: [PATCH 6/7] use non-nil --- sdk/metric/instrument_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/metric/instrument_test.go b/sdk/metric/instrument_test.go index 3a8c2d26c3a..a3e625087ca 100644 --- a/sdk/metric/instrument_test.go +++ b/sdk/metric/instrument_test.go @@ -8,6 +8,7 @@ import ( "testing" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -22,7 +23,7 @@ func BenchmarkInstrument(b *testing.B) { } b.Run("instrumentImpl/aggregate", func(b *testing.B) { - build := aggregate.Builder[int64]{} + build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)} var meas []aggregate.Measure[int64] build.Temporality = metricdata.CumulativeTemporality @@ -52,7 +53,7 @@ func BenchmarkInstrument(b *testing.B) { }) b.Run("observable/observe", func(b *testing.B) { - build := aggregate.Builder[int64]{} + build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)} var meas []aggregate.Measure[int64] in, _ := build.PrecomputedLastValue() From deb9234042cd14e64306d9758d29dba59080f9b2 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 18 Oct 2024 19:55:22 +0000 Subject: [PATCH 7/7] comment style --- sdk/metric/internal/aggregate/aggregate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index bc7fdbdafeb..afe7dfa3aa7 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -35,7 +35,7 @@ type Builder[N int64 | float64] struct { // input of measurements. Filter attribute.Filter // ExemplarFilter is the filter applied to measurements before offering - // them to the exemplar Reservoir + // them to the exemplar Reservoir. ExemplarFilter exemplar.Filter // ExemplarReservoirProvider is the factory function used to create a new // exemplar Reservoir for a given attribute set.