From 4f03a0bb356f8a23ee64129b9f2599f442915d02 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 27 Sep 2024 19:10:05 +0000 Subject: [PATCH 01/13] add exemplar.AlwaysOffFilter --- sdk/metric/exemplar.go | 2 +- sdk/metric/exemplar/filter.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index 4beaa9ea00a..2b672e6799a 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -29,7 +29,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.Filtered case "always_on": filter = exemplar.AlwaysOnFilter case "always_off": - return aggregate.DropReservoir + filter = exemplar.AlwaysOffFilter case "trace_based": fallthrough default: diff --git a/sdk/metric/exemplar/filter.go b/sdk/metric/exemplar/filter.go index 4d485200f56..b595e2acef3 100644 --- a/sdk/metric/exemplar/filter.go +++ b/sdk/metric/exemplar/filter.go @@ -27,3 +27,8 @@ func TraceBasedFilter(ctx context.Context) bool { func AlwaysOnFilter(ctx context.Context) bool { return true } + +// AlwaysOffFilter is a [Filter] that never offers measurements. +func AlwaysOffFilter(ctx context.Context) bool { + return false +} From f196e0e86fd051316a1f47421b2ec65e486e84f7 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 27 Sep 2024 19:15:34 +0000 Subject: [PATCH 02/13] unexport aggregate.DropReservoir --- sdk/metric/internal/aggregate/aggregate.go | 4 ++-- sdk/metric/internal/aggregate/aggregate_test.go | 2 +- sdk/metric/internal/aggregate/drop.go | 4 ++-- sdk/metric/internal/aggregate/drop_test.go | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index f1f3ab67314..25de3b05086 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -37,7 +37,7 @@ type Builder[N int64 | float64] struct { // create new exemplar reservoirs for a new seen attribute set. // // If this is not provided a default factory function that returns an - // DropReservoir reservoir will be used. + // dropReservoir reservoir will be used. ReservoirFunc func() FilteredExemplarReservoir[N] // AggregationLimit is the cardinality limit of measurement attributes. Any // measurement for new attributes once the limit has been reached will be @@ -54,7 +54,7 @@ func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] { return b.ReservoirFunc } - return DropReservoir + return dropReservoir } type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) diff --git a/sdk/metric/internal/aggregate/aggregate_test.go b/sdk/metric/internal/aggregate/aggregate_test.go index fec39f5f919..c7e242c041f 100644 --- a/sdk/metric/internal/aggregate/aggregate_test.go +++ b/sdk/metric/internal/aggregate/aggregate_test.go @@ -73,7 +73,7 @@ func (c *clock) Register() (unregister func()) { } func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] { - return DropReservoir[N]() + return dropReservoir[N]() } func TestBuilderFilter(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/drop.go b/sdk/metric/internal/aggregate/drop.go index 4a3d4cc2218..dfc5a033395 100644 --- a/sdk/metric/internal/aggregate/drop.go +++ b/sdk/metric/internal/aggregate/drop.go @@ -10,8 +10,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric/exemplar" ) -// DropReservoir returns a [FilteredReservoir] that drops all measurements it is offered. -func DropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} } +// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered. +func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} } type dropRes[N int64 | float64] struct{} diff --git a/sdk/metric/internal/aggregate/drop_test.go b/sdk/metric/internal/aggregate/drop_test.go index 7b3a0f9c3ef..fee90adc777 100644 --- a/sdk/metric/internal/aggregate/drop_test.go +++ b/sdk/metric/internal/aggregate/drop_test.go @@ -17,7 +17,7 @@ func TestDrop(t *testing.T) { } func testDropFiltered[N int64 | float64](t *testing.T) { - r := DropReservoir[N]() + r := dropReservoir[N]() var dest []exemplar.Exemplar r.Collect(&dest) From 72617ab2c69257eb01400d2a29ee8b2a65c37998 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 27 Sep 2024 20:02:54 +0000 Subject: [PATCH 03/13] add WithExemplarFilter as an option on the SDK's MeterProvider --- sdk/metric/config.go | 24 +++++++++++++++++++++--- sdk/metric/config_test.go | 9 +++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 544275a1146..76dc5871daa 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -9,14 +9,16 @@ import ( "sync" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/resource" ) // config contains configuration options for a MeterProvider. type config struct { - res *resource.Resource - readers []Reader - views []View + res *resource.Resource + readers []Reader + views []View + exemplarFilter exemplar.Filter } // readerSignals returns a force-flush and shutdown function for a @@ -140,3 +142,19 @@ func WithView(views ...View) Option { return cfg }) } + +// WithExemplarFilter configures the exemplar filter. +// +// The exemplar filter determines which measurements are be offered to the +// exemplar reservoir, but the exemplar reservoir makes the final decision of +// whether to store an exemplar. +// +// By default, the [go.opentelemetry.io/otel/sdk/metric/exemplar.SampledFilter] +// is used. Exemplars can be disabled by providing the +// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter] +func WithExemplarFilter(filter exemplar.Filter) Option { + return optionFunc(func(cfg config) config { + cfg.exemplarFilter = filter + return cfg + }) +} diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index d3c1341bf51..bb09e8dc8ee 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/attribute" ottest "go.opentelemetry.io/otel/sdk/internal/internaltest" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" ) @@ -192,3 +193,11 @@ func TestWithView(t *testing.T) { )}) assert.Len(t, c.views, 2) } + +func TestWithExemplarFilter(t *testing.T) { + c := newConfig([]Option{WithExemplarFilter( + exemplar.AlwaysOffFilter, + )}) + assert.NotNil(t, c.exemplarFilter) + assert.False(t, c.exemplarFilter(context.Background())) +} From fdd506563551afff2e3452d6fa42a7e5bb48fe4d Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 27 Sep 2024 20:36:36 +0000 Subject: [PATCH 04/13] plumb exemplarFilter configuration --- CHANGELOG.md | 4 +++- sdk/metric/config.go | 29 +++++++++++++++++++++++++--- sdk/metric/exemplar.go | 26 ++----------------------- sdk/metric/pipeline.go | 17 +++++++++------- sdk/metric/pipeline_registry_test.go | 15 +++++++------- sdk/metric/pipeline_test.go | 17 ++++++++-------- sdk/metric/provider.go | 2 +- 7 files changed, 59 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 56f4b33e670..73dfbcd910c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Added -- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `TraceBasedFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747, #5862) +- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `TraceBasedFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747) +- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850) +- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850) ### Changed diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 76dc5871daa..6f38f05480f 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -6,6 +6,7 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( "context" "fmt" + "os" "sync" "go.opentelemetry.io/otel" @@ -78,10 +79,16 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er // newConfig returns a config configured with options. func newConfig(options []Option) config { - conf := config{res: resource.Default()} + conf := config{ + res: resource.Default(), + exemplarFilter: exemplar.SampledFilter, + } for _, o := range options { conf = o.apply(conf) } + for _, o := range optionsFromEnv() { + conf = o.apply(conf) + } return conf } @@ -150,11 +157,27 @@ func WithView(views ...View) Option { // whether to store an exemplar. // // By default, the [go.opentelemetry.io/otel/sdk/metric/exemplar.SampledFilter] -// is used. Exemplars can be disabled by providing the -// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter] +// is used. Exemplars can be entirely disabled by providing the +// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter]. func WithExemplarFilter(filter exemplar.Filter) Option { return optionFunc(func(cfg config) config { cfg.exemplarFilter = filter return cfg }) } + +func optionsFromEnv() []Option { + var opts []Option + // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar + const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" + + switch os.Getenv(filterEnvKey) { + case "always_on": + opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter)) + case "always_off": + opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter)) + case "trace_based": + opts = append(opts, WithExemplarFilter(exemplar.SampledFilter)) + } + return opts +} diff --git a/sdk/metric/exemplar.go b/sdk/metric/exemplar.go index 2b672e6799a..1f8652d6eca 100644 --- a/sdk/metric/exemplar.go +++ b/sdk/metric/exemplar.go @@ -4,7 +4,6 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric" import ( - "os" "runtime" "slices" @@ -13,29 +12,8 @@ import ( ) // reservoirFunc returns the appropriately configured exemplar reservoir -// creation func based on the passed InstrumentKind and user defined -// environment variables. -// -// Note: This will only return non-nil values when the experimental exemplar -// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable -// is not set to always_off. -func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] { - // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar - const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" - - var filter exemplar.Filter - - switch os.Getenv(filterEnvKey) { - case "always_on": - filter = exemplar.AlwaysOnFilter - case "always_off": - filter = exemplar.AlwaysOffFilter - case "trace_based": - fallthrough - default: - filter = exemplar.TraceBasedFilter - } - +// creation func based on the passed InstrumentKind and filter configuration. +func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] { // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults // Explicit bucket histogram aggregation with more than 1 bucket will // use AlignedHistogramBucketExemplarReservoir. diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 823bf2fe3d2..fbc9b8649e6 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/internal/x" @@ -38,14 +39,15 @@ type instrumentSync struct { compAgg aggregate.ComputeAggregation } -func newPipeline(res *resource.Resource, reader Reader, views []View) *pipeline { +func newPipeline(res *resource.Resource, reader Reader, views []View, exemplarFilter exemplar.Filter) *pipeline { if res == nil { res = resource.Empty() } return &pipeline{ - resource: res, - reader: reader, - views: views, + resource: res, + reader: reader, + views: views, + exemplarFilter: exemplarFilter, // aggregations is lazy allocated when needed. } } @@ -66,6 +68,7 @@ type pipeline struct { aggregations map[instrumentation.Scope][]instrumentSync callbacks []func(context.Context) error multiCallbacks list.List + exemplarFilter exemplar.Filter } // addSync adds the instrumentSync to pipeline p with scope. This method is not @@ -349,7 +352,7 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum cv := i.aggregators.Lookup(normID, func() aggVal[N] { b := aggregate.Builder[N]{ Temporality: i.pipeline.reader.temporality(kind), - ReservoirFunc: reservoirFunc[N](stream.Aggregation), + ReservoirFunc: reservoirFunc[N](stream.Aggregation, i.pipeline.exemplarFilter), } b.Filter = stream.AttributeFilter // A value less than or equal to zero will disable the aggregation @@ -552,10 +555,10 @@ func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error { // measurement. type pipelines []*pipeline -func newPipelines(res *resource.Resource, readers []Reader, views []View) pipelines { +func newPipelines(res *resource.Resource, readers []Reader, views []View, exemplarFilter exemplar.Filter) pipelines { pipes := make([]*pipeline, 0, len(readers)) for _, r := range readers { - p := newPipeline(res, r, views) + p := newPipeline(res, r, views, exemplarFilter) r.register(p) pipes = append(pipes, p) } diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index fc632cbb101..d1fb71a3b13 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -357,7 +358,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { for _, tt := range testcases { t.Run(tt.name, func(t *testing.T) { var c cache[string, instID] - p := newPipeline(nil, tt.reader, tt.views) + p := newPipeline(nil, tt.reader, tt.views, exemplar.AlwaysOffFilter) i := newInserter[N](p, &c) readerAggregation := i.readerDefaultAggregation(tt.inst.Kind) input, err := i.Instrument(tt.inst, readerAggregation) @@ -379,7 +380,7 @@ func TestCreateAggregators(t *testing.T) { func testInvalidInstrumentShouldPanic[N int64 | float64]() { var c cache[string, instID] - i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c) + i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}, exemplar.AlwaysOffFilter), &c) inst := Instrument{ Name: "foo", Kind: InstrumentKind(255), @@ -395,7 +396,7 @@ func TestInvalidInstrumentShouldPanic(t *testing.T) { func TestPipelinesAggregatorForEachReader(t *testing.T) { r0, r1 := NewManualReader(), NewManualReader() - pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil) + pipes := newPipelines(resource.Empty(), []Reader{r0, r1}, nil, exemplar.AlwaysOffFilter) require.Len(t, pipes, 2, "created pipelines") inst := Instrument{Name: "foo", Kind: InstrumentKindCounter} @@ -467,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) { for _, tt := range testCases { t.Run(tt.name, func(t *testing.T) { - p := newPipelines(resource.Empty(), tt.readers, tt.views) + p := newPipelines(resource.Empty(), tt.readers, tt.views, exemplar.AlwaysOffFilter) testPipelineRegistryResolveIntAggregators(t, p, tt.wantCount) testPipelineRegistryResolveFloatAggregators(t, p, tt.wantCount) testPipelineRegistryResolveIntHistogramAggregators(t, p, tt.wantCount) @@ -521,7 +522,7 @@ func TestPipelineRegistryResource(t *testing.T) { readers := []Reader{NewManualReader()} views := []View{defaultView, v} res := resource.NewSchemaless(attribute.String("key", "val")) - pipes := newPipelines(res, readers, views) + pipes := newPipelines(res, readers, views, exemplar.AlwaysOffFilter) for _, p := range pipes { assert.True(t, res.Equal(p.resource), "resource not set") } @@ -532,7 +533,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { readers := []Reader{testRdrHistogram} views := []View{defaultView} - p := newPipelines(resource.Empty(), readers, views) + p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter) inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge} var vc cache[string, instID] @@ -592,7 +593,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) { fooInst := Instrument{Name: "foo", Kind: InstrumentKindCounter} barInst := Instrument{Name: "bar", Kind: InstrumentKindCounter} - p := newPipelines(resource.Empty(), readers, views) + p := newPipelines(resource.Empty(), readers, views, exemplar.AlwaysOffFilter) var vc cache[string, instID] ri := newResolver[int64](p, &vc) diff --git a/sdk/metric/pipeline_test.go b/sdk/metric/pipeline_test.go index 43f9499a09e..3df43827e11 100644 --- a/sdk/metric/pipeline_test.go +++ b/sdk/metric/pipeline_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/resource" @@ -39,7 +40,7 @@ func testSumAggregateOutput(dest *metricdata.Aggregation) int { } func TestNewPipeline(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter) output := metricdata.ResourceMetrics{} err := pipe.produce(context.Background(), &output) @@ -65,7 +66,7 @@ func TestNewPipeline(t *testing.T) { func TestPipelineUsesResource(t *testing.T) { res := resource.NewWithAttributes("noSchema", attribute.String("test", "resource")) - pipe := newPipeline(res, nil, nil) + pipe := newPipeline(res, nil, nil, exemplar.AlwaysOffFilter) output := metricdata.ResourceMetrics{} err := pipe.produce(context.Background(), &output) @@ -74,7 +75,7 @@ func TestPipelineUsesResource(t *testing.T) { } func TestPipelineConcurrentSafe(t *testing.T) { - pipe := newPipeline(nil, nil, nil) + pipe := newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter) ctx := context.Background() var output metricdata.ResourceMetrics @@ -124,13 +125,13 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) { }{ { name: "NoView", - pipe: newPipeline(nil, reader, nil), + pipe: newPipeline(nil, reader, nil, exemplar.AlwaysOffFilter), }, { name: "NoMatchingView", pipe: newPipeline(nil, reader, []View{ NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}), - }), + }, exemplar.AlwaysOffFilter), }, } @@ -215,7 +216,7 @@ func TestLogConflictName(t *testing.T) { return instID{Name: tc.existing} }) - i := newInserter[int64](newPipeline(nil, nil, nil), &vc) + i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc) i.logConflict(instID{Name: tc.name}) if tc.conflict { @@ -257,7 +258,7 @@ func TestLogConflictSuggestView(t *testing.T) { var vc cache[string, instID] name := strings.ToLower(orig.Name) _ = vc.Lookup(name, func() instID { return orig }) - i := newInserter[int64](newPipeline(nil, nil, nil), &vc) + i := newInserter[int64](newPipeline(nil, nil, nil, exemplar.AlwaysOffFilter), &vc) viewSuggestion := func(inst instID, stream string) string { return `"NewView(Instrument{` + @@ -362,7 +363,7 @@ func TestInserterCachedAggregatorNameConflict(t *testing.T) { } var vc cache[string, instID] - pipe := newPipeline(nil, NewManualReader(), nil) + pipe := newPipeline(nil, NewManualReader(), nil, exemplar.AlwaysOffFilter) i := newInserter[int64](pipe, &vc) readerAggregation := i.readerDefaultAggregation(kind) diff --git a/sdk/metric/provider.go b/sdk/metric/provider.go index a82af538e67..7b0c0dbf714 100644 --- a/sdk/metric/provider.go +++ b/sdk/metric/provider.go @@ -42,7 +42,7 @@ func NewMeterProvider(options ...Option) *MeterProvider { flush, sdown := conf.readerSignals() mp := &MeterProvider{ - pipes: newPipelines(conf.res, conf.readers, conf.views), + pipes: newPipelines(conf.res, conf.readers, conf.views, conf.exemplarFilter), forceFlush: flush, shutdown: sdown, } From 216ca5769a64406e6601363ac310fb501447a39b Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 30 Sep 2024 19:44:45 +0000 Subject: [PATCH 05/13] add testable example for exemplar filter --- sdk/metric/example_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index faca4731818..3fa7e969e4a 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" ) @@ -240,3 +241,21 @@ func ExampleNewView_exponentialHistogram() { metric.WithView(view), ) } + +func ExampleWithExemplarFilter_disabled() { + // Use exemplar.AlwaysOffFilter to disable exemplar collection. + _ = metric.NewMeterProvider( + metric.WithExemplarFilter(exemplar.AlwaysOffFilter), + ) +} + +func ExampleWithExemplarFilter_custom() { + // Create a custom filter function that only offers measurements if the + // context has an error. + customFilter := func(ctx context.Context) bool { + return ctx.Err() != nil + } + _ = metric.NewMeterProvider( + metric.WithExemplarFilter(customFilter), + ) +} From 1ca96715aa33be0e44de3406ed937a5e8fb36932 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Mon, 30 Sep 2024 20:07:17 +0000 Subject: [PATCH 06/13] add unit test --- sdk/metric/meter_test.go | 51 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 906182ad13d..189d66de3e0 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/resource" @@ -2462,3 +2463,53 @@ func TestMeterProviderDelegation(t *testing.T) { otel.SetMeterProvider(provider) }) } + +func TestExemplarFilter(t *testing.T) { + rdr := NewManualReader() + mp := NewMeterProvider( + WithReader(rdr), + // Passing AlwaysOnFilter causes collection of the exemplar for the + // counter increment below. + WithExemplarFilter(exemplar.AlwaysOnFilter), + ) + + m1 := mp.Meter("scope") + ctr1, err := m1.Float64Counter("ctr") + assert.NoError(t, err) + ctr1.Add(context.Background(), 1.0) + + want := metricdata.ResourceMetrics{ + Resource: resource.Default(), + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{ + Name: "scope", + }, + Metrics: []metricdata.Metrics{ + { + Name: "ctr", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[float64]{ + { + Value: 1.0, + Exemplars: []metricdata.Exemplar[float64]{ + { + Value: 1.0, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + got := metricdata.ResourceMetrics{} + err = rdr.Collect(context.Background(), &got) + assert.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp()) +} From d69ad9d695ac24cbb81708c97747f78ffc8ccb7a Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 1 Oct 2024 09:57:03 -0400 Subject: [PATCH 07/13] Update sdk/metric/config.go Co-authored-by: Damien Mathieu <42@dmathieu.com> --- sdk/metric/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 6f38f05480f..0091e3dc1e8 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -152,7 +152,7 @@ func WithView(views ...View) Option { // WithExemplarFilter configures the exemplar filter. // -// The exemplar filter determines which measurements are be offered to the +// The exemplar filter determines which measurements are offered to the // exemplar reservoir, but the exemplar reservoir makes the final decision of // whether to store an exemplar. // From 82b4af5ede1499bc774b1e7060143cf3fd4b19eb Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 3 Oct 2024 10:36:32 -0400 Subject: [PATCH 08/13] Update sdk/metric/config.go --- sdk/metric/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 0091e3dc1e8..e4bd9b7da7a 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -177,7 +177,7 @@ func optionsFromEnv() []Option { case "always_off": opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter)) case "trace_based": - opts = append(opts, WithExemplarFilter(exemplar.SampledFilter)) + opts = append(opts, WithExemplarFilter(exemplar.TraceBasedFilter)) } return opts } From 2b53e85b7f5a9e6800209ea1b1d6f8a88cf738a9 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 3 Oct 2024 10:38:21 -0400 Subject: [PATCH 09/13] Update sdk/metric/config.go --- sdk/metric/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index e4bd9b7da7a..42211f50329 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -81,7 +81,7 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er func newConfig(options []Option) config { conf := config{ res: resource.Default(), - exemplarFilter: exemplar.SampledFilter, + exemplarFilter: exemplar.TraceBasedFilter, } for _, o := range options { conf = o.apply(conf) From b7a6979cdce7bf0b45cb61eefd88f52c0318d6d7 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 3 Oct 2024 12:02:23 -0400 Subject: [PATCH 10/13] Update sdk/metric/config.go Co-authored-by: Tyler Yahn --- sdk/metric/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 42211f50329..ce325f615d6 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -156,9 +156,9 @@ func WithView(views ...View) Option { // exemplar reservoir, but the exemplar reservoir makes the final decision of // whether to store an exemplar. // -// By default, the [go.opentelemetry.io/otel/sdk/metric/exemplar.SampledFilter] +// By default, the [exemplar.SampledFilter] // is used. Exemplars can be entirely disabled by providing the -// [go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter]. +// [exemplar.AlwaysOffFilter]. func WithExemplarFilter(filter exemplar.Filter) Option { return optionFunc(func(cfg config) config { cfg.exemplarFilter = filter From 8a10f732fefe0200902105bccac04f67579f9636 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 4 Oct 2024 13:33:11 +0000 Subject: [PATCH 11/13] options take precedence over env vars --- sdk/metric/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index ce325f615d6..442c97b45cf 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -83,10 +83,10 @@ func newConfig(options []Option) config { res: resource.Default(), exemplarFilter: exemplar.TraceBasedFilter, } - for _, o := range options { + for _, o := range meterProviderOptionsFromEnv() { conf = o.apply(conf) } - for _, o := range optionsFromEnv() { + for _, o := range options { conf = o.apply(conf) } return conf @@ -166,7 +166,7 @@ func WithExemplarFilter(filter exemplar.Filter) Option { }) } -func optionsFromEnv() []Option { +func meterProviderOptionsFromEnv() []Option { var opts []Option // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" From 020cf0fbe0c03e320e7cc4df0b4a406df2a0c5cf Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 4 Oct 2024 13:55:47 +0000 Subject: [PATCH 12/13] expand unit testing for exemplar filter --- sdk/metric/config_test.go | 98 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 6 deletions(-) diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index bb09e8dc8ee..c6b13d68021 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -17,6 +17,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/exemplar" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/trace" ) type reader struct { @@ -194,10 +195,95 @@ func TestWithView(t *testing.T) { assert.Len(t, c.views, 2) } -func TestWithExemplarFilter(t *testing.T) { - c := newConfig([]Option{WithExemplarFilter( - exemplar.AlwaysOffFilter, - )}) - assert.NotNil(t, c.exemplarFilter) - assert.False(t, c.exemplarFilter(context.Background())) +func TestWithExemplarFilterOff(t *testing.T) { + for _, tc := range []struct { + desc string + opts []Option + env string + expectFilterSampled bool + expectFilterNotSampled bool + }{ + { + desc: "default", + expectFilterSampled: true, + expectFilterNotSampled: false, + }, + { + desc: "always on option", + opts: []Option{WithExemplarFilter(exemplar.AlwaysOnFilter)}, + expectFilterSampled: true, + expectFilterNotSampled: true, + }, + { + desc: "always off option", + opts: []Option{WithExemplarFilter(exemplar.AlwaysOffFilter)}, + expectFilterSampled: false, + expectFilterNotSampled: false, + }, + { + desc: "trace based option", + opts: []Option{WithExemplarFilter(exemplar.TraceBasedFilter)}, + expectFilterSampled: true, + expectFilterNotSampled: false, + }, + { + desc: "last option takes precedence", + opts: []Option{ + WithExemplarFilter(exemplar.AlwaysOffFilter), + WithExemplarFilter(exemplar.AlwaysOnFilter), + }, + expectFilterSampled: true, + expectFilterNotSampled: true, + }, + { + desc: "always_off env", + env: "always_off", + expectFilterSampled: false, + expectFilterNotSampled: false, + }, + { + desc: "always_on env", + env: "always_on", + expectFilterSampled: true, + expectFilterNotSampled: true, + }, + { + desc: "trace_based env", + env: "trace_based", + expectFilterSampled: true, + expectFilterNotSampled: false, + }, + { + desc: "wrong env", + env: "foo_bar", + expectFilterSampled: true, + expectFilterNotSampled: false, + }, + { + desc: "option takes precedence over env var", + env: "always_off", + opts: []Option{WithExemplarFilter(exemplar.AlwaysOnFilter)}, + expectFilterSampled: true, + expectFilterNotSampled: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if tc.env != "" { + t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", tc.env) + } + c := newConfig(tc.opts) + assert.NotNil(t, c.exemplarFilter) + assert.Equal(t, tc.expectFilterNotSampled, c.exemplarFilter(context.Background())) + assert.Equal(t, tc.expectFilterSampled, c.exemplarFilter(sample(context.Background()))) + }) + } +} + +func sample(parent context.Context) context.Context { + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: trace.TraceID{0x01}, + SpanID: trace.SpanID{0x01}, + TraceFlags: trace.FlagsSampled, + }) + return trace.ContextWithSpanContext(parent, sc) } From 92f4cd46f36e1f5159736d6d9af92124647d1230 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 4 Oct 2024 13:58:29 +0000 Subject: [PATCH 13/13] make env value case insensitive --- sdk/metric/config.go | 3 ++- sdk/metric/config_test.go | 6 ++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/sdk/metric/config.go b/sdk/metric/config.go index 442c97b45cf..8e1fd5d4565 100644 --- a/sdk/metric/config.go +++ b/sdk/metric/config.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "os" + "strings" "sync" "go.opentelemetry.io/otel" @@ -171,7 +172,7 @@ func meterProviderOptionsFromEnv() []Option { // https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER" - switch os.Getenv(filterEnvKey) { + switch strings.ToLower(strings.TrimSpace(os.Getenv(filterEnvKey))) { case "always_on": opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter)) case "always_off": diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index c6b13d68021..644bad30082 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -247,6 +247,12 @@ func TestWithExemplarFilterOff(t *testing.T) { expectFilterSampled: true, expectFilterNotSampled: true, }, + { + desc: "always_on case insensitiveenv", + env: "ALWAYS_ON", + expectFilterSampled: true, + expectFilterNotSampled: true, + }, { desc: "trace_based env", env: "trace_based",