From 1633c74aea5f5b9f05083d255d3c37e2b1412e79 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Sat, 8 Jul 2023 08:01:47 -0700 Subject: [PATCH] Replace `Stream.AttributeFilter` with `AllowAttributeKeys` (#4288) * Replace Stream AttributeFilter with AttributeKeys * Rename Stream field AttributeKeys to AllowAttributeKeys Ensure forward compatibility if a deny-list of attribute keys is ever added. * Add change to changelog * Update PR number in changelog * Update CHANGELOG.md Co-authored-by: Damien Mathieu <42@dmathieu.com> --------- Co-authored-by: Damien Mathieu <42@dmathieu.com> --- CHANGELOG.md | 3 +++ sdk/metric/benchmark_test.go | 4 +--- sdk/metric/instrument.go | 27 +++++++++++++++++++++++++-- sdk/metric/meter_test.go | 11 +++-------- sdk/metric/pipeline.go | 4 ++-- sdk/metric/view.go | 10 +++++----- sdk/metric/view_test.go | 29 ++++++++++++----------------- 7 files changed, 51 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 258c09bda9f..27a44e96f77 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Count the Collect time in the PeriodicReader timeout. (#4221) - `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272) - `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272) +- ⚠️ Metrics SDK Breaking ⚠️ : the `AttributeFilter` fields of the `Stream` from `go.opentelemetry.io/otel/sdk/metric` is replaced by the `AttributeKeys` field. + The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view. + This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) ### Fixed diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index d7f8798ba46..a243738cfb7 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -43,9 +43,7 @@ var viewBenchmarks = []struct { "AttrFilterView", []View{NewView( Instrument{Name: "*"}, - Stream{AttributeFilter: func(kv attribute.KeyValue) bool { - return kv.Key == attribute.Key("K") - }}, + Stream{AllowAttributeKeys: []attribute.Key{"K"}}, )}, }, } diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index e18313c96b1..6de8fe86904 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -145,8 +145,31 @@ type Stream struct { Unit string // Aggregation the stream uses for an instrument. Aggregation aggregation.Aggregation - // AttributeFilter applied to all attributes recorded for an instrument. - AttributeFilter attribute.Filter + // AllowAttributeKeys are an allow-list of attribute keys that will be + // preserved for the stream. Any attribute recorded for the stream with a + // key not in this slice will be dropped. + // + // If this slice is empty, all attributes will be kept. + AllowAttributeKeys []attribute.Key +} + +// attributeFilter returns an attribute.Filter that only allows attributes +// with keys in s.AttributeKeys. +// +// If s.AttributeKeys is empty an accept-all filter is returned. +func (s Stream) attributeFilter() attribute.Filter { + if len(s.AllowAttributeKeys) <= 0 { + return func(kv attribute.KeyValue) bool { return true } + } + + allowed := make(map[attribute.Key]struct{}) + for _, k := range s.AllowAttributeKeys { + allowed[k] = struct{}{} + } + return func(kv attribute.KeyValue) bool { + _, ok := allowed[kv.Key] + return ok + } } // streamID are the identifying properties of a stream. diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 4fea8fccf90..7a3d5c0b8dd 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -1516,9 +1516,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) { WithReader(rdr), WithView(NewView( Instrument{Name: "*"}, - Stream{AttributeFilter: func(kv attribute.KeyValue) bool { - return kv.Key == attribute.Key("foo") - }}, + Stream{AllowAttributeKeys: []attribute.Key{"foo"}}, )), ).Meter("TestAttributeFilter") require.NoError(t, tt.register(t, mtr)) @@ -1565,11 +1563,8 @@ func TestObservableExample(t *testing.T) { selector := func(InstrumentKind) metricdata.Temporality { return temp } reader := NewManualReader(WithTemporalitySelector(selector)) - noopFilter := func(kv attribute.KeyValue) bool { return true } - noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: noopFilter}) - - filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" } - filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter}) + noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName}) + filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AllowAttributeKeys: []attribute.Key{"pid"}}) mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered)) meter := mp.Meter(scopeName) diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index b6b15e1cf05..3fa3960b825 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -332,8 +332,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum if agg == nil { // Drop aggregator. return aggVal[N]{nil, nil} } - if stream.AttributeFilter != nil { - agg = aggregate.NewFilter(agg, stream.AttributeFilter) + if len(stream.AllowAttributeKeys) > 0 { + agg = aggregate.NewFilter(agg, stream.attributeFilter()) } i.pipeline.addSync(scope, instrumentSync{ diff --git a/sdk/metric/view.go b/sdk/metric/view.go index b8fc363a0a7..9dab839f003 100644 --- a/sdk/metric/view.go +++ b/sdk/metric/view.go @@ -103,11 +103,11 @@ func NewView(criteria Instrument, mask Stream) View { return func(i Instrument) (Stream, bool) { if matchFunc(i) { return Stream{ - Name: nonZero(mask.Name, i.Name), - Description: nonZero(mask.Description, i.Description), - Unit: nonZero(mask.Unit, i.Unit), - Aggregation: agg, - AttributeFilter: mask.AttributeFilter, + Name: nonZero(mask.Name, i.Name), + Description: nonZero(mask.Description, i.Description), + Unit: nonZero(mask.Unit, i.Unit), + Aggregation: agg, + AllowAttributeKeys: mask.AllowAttributeKeys, }, true } return Stream{}, false diff --git a/sdk/metric/view_test.go b/sdk/metric/view_test.go index d74d1e5b43a..03ae5e7e7eb 100644 --- a/sdk/metric/view_test.go +++ b/sdk/metric/view_test.go @@ -404,6 +404,18 @@ func TestNewViewReplace(t *testing.T) { } }, }, + { + name: "AttributeKeys", + mask: Stream{AllowAttributeKeys: []attribute.Key{"test"}}, + want: func(i Instrument) Stream { + return Stream{ + Name: i.Name, + Description: i.Description, + Unit: i.Unit, + AllowAttributeKeys: []attribute.Key{"test"}, + } + }, + }, { name: "Complete", mask: Stream{ @@ -430,23 +442,6 @@ func TestNewViewReplace(t *testing.T) { assert.Equal(t, test.want(completeIP), got) }) } - - // Go does not allow for the comparison of function values, even their - // addresses. Therefore, the AttributeFilter field needs an alternative - // testing strategy. - t.Run("AttributeFilter", func(t *testing.T) { - allowed := attribute.String("key", "val") - filter := func(kv attribute.KeyValue) bool { - return kv == allowed - } - mask := Stream{AttributeFilter: filter} - got, match := NewView(completeIP, mask)(completeIP) - require.True(t, match, "view did not match exact criteria") - require.NotNil(t, got.AttributeFilter, "AttributeFilter not set") - assert.True(t, got.AttributeFilter(allowed), "wrong AttributeFilter") - other := attribute.String("key", "other val") - assert.False(t, got.AttributeFilter(other), "wrong AttributeFilter") - }) } type badAgg struct {