Skip to content

Commit

Permalink
Replace Stream.AttributeFilter with AllowAttributeKeys (open-tele…
Browse files Browse the repository at this point in the history
…metry#4288)

* Replace Stream AttributeFilter with AttributeKeys

* Rename Stream field AttributeKeys to AllowAttributeKeys

Ensure forward compatibility if a deny-list of attribute keys is ever
added.

* Add change to changelog

* Update PR number in changelog

* Update CHANGELOG.md

Co-authored-by: Damien Mathieu <42@dmathieu.com>

---------

Co-authored-by: Damien Mathieu <42@dmathieu.com>
  • Loading branch information
MrAlias and dmathieu authored Jul 8, 2023
1 parent c404a30 commit 1633c74
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 37 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Count the Collect time in the PeriodicReader timeout. (#4221)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- `New` in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp` returns `*Exporter` instead of `"go.opentelemetry.io/otel/sdk/metric".Exporter`. (#4272)
- ⚠️ Metrics SDK Breaking ⚠️ : the `AttributeFilter` fields of the `Stream` from `go.opentelemetry.io/otel/sdk/metric` is replaced by the `AttributeKeys` field.
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)

### Fixed

Expand Down
4 changes: 1 addition & 3 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,7 @@ var viewBenchmarks = []struct {
"AttrFilterView",
[]View{NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("K")
}},
Stream{AllowAttributeKeys: []attribute.Key{"K"}},
)},
},
}
Expand Down
27 changes: 25 additions & 2 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,31 @@ type Stream struct {
Unit string
// Aggregation the stream uses for an instrument.
Aggregation aggregation.Aggregation
// AttributeFilter applied to all attributes recorded for an instrument.
AttributeFilter attribute.Filter
// AllowAttributeKeys are an allow-list of attribute keys that will be
// preserved for the stream. Any attribute recorded for the stream with a
// key not in this slice will be dropped.
//
// If this slice is empty, all attributes will be kept.
AllowAttributeKeys []attribute.Key
}

// attributeFilter returns an attribute.Filter that only allows attributes
// with keys in s.AttributeKeys.
//
// If s.AttributeKeys is empty an accept-all filter is returned.
func (s Stream) attributeFilter() attribute.Filter {
if len(s.AllowAttributeKeys) <= 0 {
return func(kv attribute.KeyValue) bool { return true }
}

allowed := make(map[attribute.Key]struct{})
for _, k := range s.AllowAttributeKeys {
allowed[k] = struct{}{}
}
return func(kv attribute.KeyValue) bool {
_, ok := allowed[kv.Key]
return ok
}
}

// streamID are the identifying properties of a stream.
Expand Down
11 changes: 3 additions & 8 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1516,9 +1516,7 @@ func testAttributeFilter(temporality metricdata.Temporality) func(*testing.T) {
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == attribute.Key("foo")
}},
Stream{AllowAttributeKeys: []attribute.Key{"foo"}},
)),
).Meter("TestAttributeFilter")
require.NoError(t, tt.register(t, mtr))
Expand Down Expand Up @@ -1565,11 +1563,8 @@ func TestObservableExample(t *testing.T) {
selector := func(InstrumentKind) metricdata.Temporality { return temp }
reader := NewManualReader(WithTemporalitySelector(selector))

noopFilter := func(kv attribute.KeyValue) bool { return true }
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName, AttributeFilter: noopFilter})

filter := func(kv attribute.KeyValue) bool { return kv.Key != "tid" }
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AttributeFilter: filter})
noFiltered := NewView(Instrument{Name: instName}, Stream{Name: instName})
filtered := NewView(Instrument{Name: instName}, Stream{Name: filteredStream, AllowAttributeKeys: []attribute.Key{"pid"}})

mp := NewMeterProvider(WithReader(reader), WithView(noFiltered, filtered))
meter := mp.Meter(scopeName)
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
if agg == nil { // Drop aggregator.
return aggVal[N]{nil, nil}
}
if stream.AttributeFilter != nil {
agg = aggregate.NewFilter(agg, stream.AttributeFilter)
if len(stream.AllowAttributeKeys) > 0 {
agg = aggregate.NewFilter(agg, stream.attributeFilter())
}

i.pipeline.addSync(scope, instrumentSync{
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ func NewView(criteria Instrument, mask Stream) View {
return func(i Instrument) (Stream, bool) {
if matchFunc(i) {
return Stream{
Name: nonZero(mask.Name, i.Name),
Description: nonZero(mask.Description, i.Description),
Unit: nonZero(mask.Unit, i.Unit),
Aggregation: agg,
AttributeFilter: mask.AttributeFilter,
Name: nonZero(mask.Name, i.Name),
Description: nonZero(mask.Description, i.Description),
Unit: nonZero(mask.Unit, i.Unit),
Aggregation: agg,
AllowAttributeKeys: mask.AllowAttributeKeys,
}, true
}
return Stream{}, false
Expand Down
29 changes: 12 additions & 17 deletions sdk/metric/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,18 @@ func TestNewViewReplace(t *testing.T) {
}
},
},
{
name: "AttributeKeys",
mask: Stream{AllowAttributeKeys: []attribute.Key{"test"}},
want: func(i Instrument) Stream {
return Stream{
Name: i.Name,
Description: i.Description,
Unit: i.Unit,
AllowAttributeKeys: []attribute.Key{"test"},
}
},
},
{
name: "Complete",
mask: Stream{
Expand All @@ -430,23 +442,6 @@ func TestNewViewReplace(t *testing.T) {
assert.Equal(t, test.want(completeIP), got)
})
}

// Go does not allow for the comparison of function values, even their
// addresses. Therefore, the AttributeFilter field needs an alternative
// testing strategy.
t.Run("AttributeFilter", func(t *testing.T) {
allowed := attribute.String("key", "val")
filter := func(kv attribute.KeyValue) bool {
return kv == allowed
}
mask := Stream{AttributeFilter: filter}
got, match := NewView(completeIP, mask)(completeIP)
require.True(t, match, "view did not match exact criteria")
require.NotNil(t, got.AttributeFilter, "AttributeFilter not set")
assert.True(t, got.AttributeFilter(allowed), "wrong AttributeFilter")
other := attribute.String("key", "other val")
assert.False(t, got.AttributeFilter(other), "wrong AttributeFilter")
})
}

type badAgg struct {
Expand Down

0 comments on commit 1633c74

Please sign in to comment.