Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup interaction of exemplar and aggregation #5899

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,14 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"runtime"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// ExemplarReservoirProviderSelector selects the
// [exemplar.ReservoirProvider] to use
// based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
}

// DefaultExemplarReservoirProviderSelector returns the default
// [exemplar.ReservoirProvider] for the
// provided [Aggregation].
Expand Down
5 changes: 3 additions & 2 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand All @@ -22,7 +23,7 @@ func BenchmarkInstrument(b *testing.B) {
}

b.Run("instrumentImpl/aggregate", func(b *testing.B) {
build := aggregate.Builder[int64]{}
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)}
var meas []aggregate.Measure[int64]

build.Temporality = metricdata.CumulativeTemporality
Expand Down Expand Up @@ -52,7 +53,7 @@ func BenchmarkInstrument(b *testing.B) {
})

b.Run("observable/observe", func(b *testing.B) {
build := aggregate.Builder[int64]{}
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)}
var meas []aggregate.Measure[int64]

in, _ := build.PrecomputedLastValue()
Expand Down
21 changes: 10 additions & 11 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -33,12 +34,12 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// ExemplarFilter is the filter applied to measurements before offering
// them to the exemplar Reservoir.
ExemplarFilter exemplar.Filter
// ExemplarReservoirProvider is the factory function used to create a new
// exemplar Reservoir for a given attribute set.
ExemplarReservoirProvider exemplar.ReservoirProvider
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -49,12 +50,10 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
func (b Builder[N]) resFunc() func(attribute.Set) *filteredExemplarReservoir[N] {
return func(attrs attribute.Set) *filteredExemplarReservoir[N] {
return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs))
}

return dropReservoir
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
13 changes: 9 additions & 4 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
Expand Down Expand Up @@ -72,8 +73,12 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return dropReservoir[N](attr)
func newNoopReservoir(attribute.Set) exemplar.Reservoir {
return exemplar.NewFixedSizeReservoir(0)
}

func dropExemplars[N int64 | float64](attr attribute.Set) *filteredExemplarReservoir[N] {
return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, newNoopReservoir(attr))
}

func TestBuilderFilter(t *testing.T) {
Expand All @@ -99,8 +104,8 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
}
}

t.Run("NoFilter", run(Builder[N]{}, attr, nil))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
t.Run("NoFilter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir}, attr, nil))
t.Run("Filter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir, Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
}
}

Expand Down
26 changes: 0 additions & 26 deletions sdk/metric/internal/aggregate/drop.go

This file was deleted.

27 changes: 0 additions & 27 deletions sdk/metric/internal/aggregate/drop_test.go

This file was deleted.

6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
res *filteredExemplarReservoir[N]

count uint64
min N
Expand Down Expand Up @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func(attribute.Set) FilteredExemplarReservoir[N]
newRes func(attribute.Set) *filteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand Down
33 changes: 23 additions & 10 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -682,22 +683,30 @@ func BenchmarkExponentialHistogram(b *testing.B) {

b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{
Temporality: metricdata.CumulativeTemporality,
Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{
Temporality: metricdata.DeltaTemporality,
Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.CumulativeTemporality,
Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.DeltaTemporality,
Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
}
Expand Down Expand Up @@ -744,9 +753,11 @@ func TestExponentialHistogramAggregation(t *testing.T) {

func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 2,
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 2,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background()
return test[N](in, out, []teststep[N]{
Expand Down Expand Up @@ -871,9 +882,11 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {

func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 2,
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 2,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
}.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background()
return test[N](in, out, []teststep[N]{
Expand Down
21 changes: 4 additions & 17 deletions sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,16 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}

// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
}

// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which
// only offers values that are allowed by the filter. If the provided filter is
// nil, all measurements are dropped..
func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) *filteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

type buckets[N int64 | float64] struct {
attrs attribute.Set
res FilteredExemplarReservoir[N]
res *filteredExemplarReservoir[N]

counts []uint64
count uint64
Expand Down Expand Up @@ -47,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func(attribute.Set) FilteredExemplarReservoir[N]
newRes func(attribute.Set) *filteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down
Loading
Loading