Skip to content

Commit

Permalink
Allow configuring the exemplar filter on the metrics SDK (#5850)
Browse files Browse the repository at this point in the history
Part of #5249

### Spec

https://opentelemetry.io/docs/specs/otel/metrics/sdk/#exemplarfilter

> The ExemplarFilter configuration MUST allow users to select between
one of the built-in ExemplarFilters. While ExemplarFilter determines
which measurements are eligible for becoming an Exemplar, the
ExemplarReservoir makes the final decision if a measurement becomes an
exemplar and is stored.

> The ExemplarFilter SHOULD be a configuration parameter of a
MeterProvider for an SDK. The default value SHOULD be TraceBased. The
filter configuration SHOULD follow the [environment variable
specification](https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/#exemplar).

> An OpenTelemetry SDK MUST support the following filters:

> *
[AlwaysOn](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwayson)
> *
[AlwaysOff](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwaysoff)
> *
[TraceBased](https://opentelemetry.io/docs/specs/otel/metrics/sdk/#tracebased)

### Changes

* adds exemplar.AlwaysOffFilter, which is one of the required filters
from the SDK:
https://opentelemetry.io/docs/specs/otel/metrics/sdk/#alwaysoff
* adds `metric.WithExemplarFilter` as an option for the metrics SDK.
* moves handling of `OTEL_METRICS_EXEMPLAR_FILTER` to the same location
as config handling to make code easier to navigate.



dropReservoir can actually be removed, but I plan to do that in a
follow-up refactor, since it will be a large diff.

---------

Co-authored-by: Damien Mathieu <42@dmathieu.com>
Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
Co-authored-by: Chester Cheung <cheung.zhy.csu@gmail.com>
  • Loading branch information
4 people authored Oct 11, 2024
1 parent bc2fe88 commit 6b251b8
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 57 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar.AlwaysOffFilter`, which can be used to disable exemplar recording. (#5850)
- Add `go.opentelemetry.io/otel/sdk/metric.WithExemplarFilter`, which can be used to configure the exemplar filter used by the metrics SDK. (#5850)

<!-- Released section -->
<!-- Don't change this section unless doing release -->

Expand Down
50 changes: 46 additions & 4 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,21 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"context"
"fmt"
"os"
"strings"
"sync"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/resource"
)

// config contains configuration options for a MeterProvider.
type config struct {
res *resource.Resource
readers []Reader
views []View
res *resource.Resource
readers []Reader
views []View
exemplarFilter exemplar.Filter
}

// readerSignals returns a force-flush and shutdown function for a
Expand Down Expand Up @@ -76,7 +80,13 @@ func unifyShutdown(funcs []func(context.Context) error) func(context.Context) er

// newConfig returns a config configured with options.
func newConfig(options []Option) config {
conf := config{res: resource.Default()}
conf := config{
res: resource.Default(),
exemplarFilter: exemplar.TraceBasedFilter,
}
for _, o := range meterProviderOptionsFromEnv() {
conf = o.apply(conf)
}
for _, o := range options {
conf = o.apply(conf)
}
Expand Down Expand Up @@ -140,3 +150,35 @@ func WithView(views ...View) Option {
return cfg
})
}

// WithExemplarFilter configures the exemplar filter.
//
// The exemplar filter determines which measurements are offered to the
// exemplar reservoir, but the exemplar reservoir makes the final decision of
// whether to store an exemplar.
//
// By default, the [exemplar.SampledFilter]
// is used. Exemplars can be entirely disabled by providing the
// [exemplar.AlwaysOffFilter].
func WithExemplarFilter(filter exemplar.Filter) Option {
return optionFunc(func(cfg config) config {
cfg.exemplarFilter = filter
return cfg
})
}

func meterProviderOptionsFromEnv() []Option {
var opts []Option
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

switch strings.ToLower(strings.TrimSpace(os.Getenv(filterEnvKey))) {
case "always_on":
opts = append(opts, WithExemplarFilter(exemplar.AlwaysOnFilter))
case "always_off":
opts = append(opts, WithExemplarFilter(exemplar.AlwaysOffFilter))
case "trace_based":
opts = append(opts, WithExemplarFilter(exemplar.TraceBasedFilter))
}
return opts
}
101 changes: 101 additions & 0 deletions sdk/metric/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (

"go.opentelemetry.io/otel/attribute"
ottest "go.opentelemetry.io/otel/sdk/internal/internaltest"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
)

type reader struct {
Expand Down Expand Up @@ -192,3 +194,102 @@ func TestWithView(t *testing.T) {
)})
assert.Len(t, c.views, 2)
}

func TestWithExemplarFilterOff(t *testing.T) {
for _, tc := range []struct {
desc string
opts []Option
env string
expectFilterSampled bool
expectFilterNotSampled bool
}{
{
desc: "default",
expectFilterSampled: true,
expectFilterNotSampled: false,
},
{
desc: "always on option",
opts: []Option{WithExemplarFilter(exemplar.AlwaysOnFilter)},
expectFilterSampled: true,
expectFilterNotSampled: true,
},
{
desc: "always off option",
opts: []Option{WithExemplarFilter(exemplar.AlwaysOffFilter)},
expectFilterSampled: false,
expectFilterNotSampled: false,
},
{
desc: "trace based option",
opts: []Option{WithExemplarFilter(exemplar.TraceBasedFilter)},
expectFilterSampled: true,
expectFilterNotSampled: false,
},
{
desc: "last option takes precedence",
opts: []Option{
WithExemplarFilter(exemplar.AlwaysOffFilter),
WithExemplarFilter(exemplar.AlwaysOnFilter),
},
expectFilterSampled: true,
expectFilterNotSampled: true,
},
{
desc: "always_off env",
env: "always_off",
expectFilterSampled: false,
expectFilterNotSampled: false,
},
{
desc: "always_on env",
env: "always_on",
expectFilterSampled: true,
expectFilterNotSampled: true,
},
{
desc: "always_on case insensitiveenv",
env: "ALWAYS_ON",
expectFilterSampled: true,
expectFilterNotSampled: true,
},
{
desc: "trace_based env",
env: "trace_based",
expectFilterSampled: true,
expectFilterNotSampled: false,
},
{
desc: "wrong env",
env: "foo_bar",
expectFilterSampled: true,
expectFilterNotSampled: false,
},
{
desc: "option takes precedence over env var",
env: "always_off",
opts: []Option{WithExemplarFilter(exemplar.AlwaysOnFilter)},
expectFilterSampled: true,
expectFilterNotSampled: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
if tc.env != "" {
t.Setenv("OTEL_METRICS_EXEMPLAR_FILTER", tc.env)
}
c := newConfig(tc.opts)
assert.NotNil(t, c.exemplarFilter)
assert.Equal(t, tc.expectFilterNotSampled, c.exemplarFilter(context.Background()))
assert.Equal(t, tc.expectFilterSampled, c.exemplarFilter(sample(context.Background())))
})
}
}

func sample(parent context.Context) context.Context {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{0x01},
SpanID: trace.SpanID{0x01},
TraceFlags: trace.FlagsSampled,
})
return trace.ContextWithSpanContext(parent, sc)
}
19 changes: 19 additions & 0 deletions sdk/metric/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
Expand Down Expand Up @@ -240,3 +241,21 @@ func ExampleNewView_exponentialHistogram() {
metric.WithView(view),
)
}

func ExampleWithExemplarFilter_disabled() {
// Use exemplar.AlwaysOffFilter to disable exemplar collection.
_ = metric.NewMeterProvider(
metric.WithExemplarFilter(exemplar.AlwaysOffFilter),
)
}

func ExampleWithExemplarFilter_custom() {
// Create a custom filter function that only offers measurements if the
// context has an error.
customFilter := func(ctx context.Context) bool {
return ctx.Err() != nil
}
_ = metric.NewMeterProvider(
metric.WithExemplarFilter(customFilter),
)
}
26 changes: 2 additions & 24 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"os"
"runtime"
"slices"

Expand All @@ -13,29 +12,8 @@ import (
)

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and user defined
// environment variables.
//
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

var filter exemplar.Filter

switch os.Getenv(filterEnvKey) {
case "always_on":
filter = exemplar.AlwaysOnFilter
case "always_off":
return aggregate.DropReservoir
case "trace_based":
fallthrough
default:
filter = exemplar.TraceBasedFilter
}

// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](agg Aggregation, filter exemplar.Filter) func() aggregate.FilteredExemplarReservoir[N] {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/metrics/sdk.md#exemplar-defaults
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
Expand Down
5 changes: 5 additions & 0 deletions sdk/metric/exemplar/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,8 @@ func TraceBasedFilter(ctx context.Context) bool {
func AlwaysOnFilter(ctx context.Context) bool {
return true
}

// AlwaysOffFilter is a [Filter] that never offers measurements.
func AlwaysOffFilter(ctx context.Context) bool {
return false
}
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Builder[N int64 | float64] struct {
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// DropReservoir reservoir will be used.
// dropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
Expand All @@ -54,7 +54,7 @@ func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
return b.ReservoirFunc
}

return DropReservoir
return dropReservoir
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *clock) Register() (unregister func()) {
}

func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
return DropReservoir[N]()
return dropReservoir[N]()
}

func TestBuilderFilter(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/aggregate/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

// DropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func DropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }
// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }

type dropRes[N int64 | float64] struct{}

Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/internal/aggregate/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestDrop(t *testing.T) {
}

func testDropFiltered[N int64 | float64](t *testing.T) {
r := DropReservoir[N]()
r := dropReservoir[N]()

var dest []exemplar.Exemplar
r.Collect(&dest)
Expand Down
51 changes: 51 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -2462,3 +2463,53 @@ func TestMeterProviderDelegation(t *testing.T) {
otel.SetMeterProvider(provider)
})
}

func TestExemplarFilter(t *testing.T) {
rdr := NewManualReader()
mp := NewMeterProvider(
WithReader(rdr),
// Passing AlwaysOnFilter causes collection of the exemplar for the
// counter increment below.
WithExemplarFilter(exemplar.AlwaysOnFilter),
)

m1 := mp.Meter("scope")
ctr1, err := m1.Float64Counter("ctr")
assert.NoError(t, err)
ctr1.Add(context.Background(), 1.0)

want := metricdata.ResourceMetrics{
Resource: resource.Default(),
ScopeMetrics: []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{
Name: "scope",
},
Metrics: []metricdata.Metrics{
{
Name: "ctr",
Data: metricdata.Sum[float64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[float64]{
{
Value: 1.0,
Exemplars: []metricdata.Exemplar[float64]{
{
Value: 1.0,
},
},
},
},
},
},
},
},
},
}

got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
assert.NoError(t, err)
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}
Loading

0 comments on commit 6b251b8

Please sign in to comment.