Skip to content

Commit

Permalink
move drop and filtered reservoir types to internal/aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Aug 27, 2024
1 parent 44fbe6b commit d7cc77b
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 90 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes
`Exemplar`, `Filter`, `SampledFilter`, `AlwaysOnFilter`, `Histogram`,
`FixedSize`, `Reservoir`, `Value` and `ValueType` types. These will be used
for configuring the exemplar reservoir for the metrics sdk. (#5747)

### Removed

- Drop support for [Go 1.21]. (#5736)
Expand Down
13 changes: 7 additions & 6 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"slices"

"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/internal/x"
)

Expand All @@ -19,7 +20,7 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] {
func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] {
if !x.Exemplars.Enabled() {
return nil
}
Expand All @@ -32,7 +33,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
case "always_on":
filter = exemplar.AlwaysOnFilter
case "always_off":
return exemplar.Drop
return aggregate.DropReservoir
case "trace_based":
fallthrough
default:
Expand All @@ -45,9 +46,9 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.FilteredReservoir[N] {
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds))
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.Histogram(bounds))
}
}

Expand Down Expand Up @@ -75,7 +76,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
}
}

return func() exemplar.FilteredReservoir[N] {
return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n))
return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.FixedSize(n))
}
}
49 changes: 0 additions & 49 deletions sdk/metric/exemplar/filtered_reservoir.go

This file was deleted.

9 changes: 4 additions & 5 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -38,8 +37,8 @@ type Builder[N int64 | float64] struct {
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// exemplar.Drop reservoir will be used.
ReservoirFunc func() exemplar.FilteredReservoir[N]
// DropReservoir reservoir will be used.
ReservoirFunc func() FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -50,12 +49,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func() exemplar.FilteredReservoir[N] {
func (b Builder[N]) resFunc() func() FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return exemplar.Drop
return DropReservoir
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
5 changes: 2 additions & 3 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
Expand Down Expand Up @@ -73,8 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func dropExemplars[N int64 | float64]() exemplar.FilteredReservoir[N] {
return exemplar.Drop[N]()
func dropExemplars[N int64 | float64]() FilteredExemplarReservoir[N] {
return DropReservoir[N]()
}

func TestBuilderFilter(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

// Drop returns a [FilteredReservoir] that drops all measurements it is offered.
func Drop[N int64 | float64]() FilteredReservoir[N] { return &dropRes[N]{} }
// DropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func DropReservoir[N int64 | float64]() FilteredExemplarReservoir[N] { return &dropRes[N]{} }

type dropRes[N int64 | float64] struct{}

// Offer does nothing, all measurements offered will be dropped.
func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}

// Collect resets dest. No exemplars will ever be returned.
func (r *dropRes[N]) Collect(dest *[]Exemplar) {
func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
*dest = (*dest)[:0]
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar
package aggregate

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

func TestDrop(t *testing.T) {
Expand All @@ -15,9 +17,9 @@ func TestDrop(t *testing.T) {
}

func testDropFiltered[N int64 | float64](t *testing.T) {
r := Drop[N]()
r := DropReservoir[N]()

var dest []Exemplar
var dest []exemplar.Exemplar
r.Collect(&dest)

assert.Len(t, dest, 0, "non-sampled context should not be offered")
Expand Down
7 changes: 3 additions & 4 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -31,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]

count uint64
min N
Expand Down Expand Up @@ -284,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -307,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand Down
50 changes: 50 additions & 0 deletions sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}

// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
}

// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
}
}

func (f *filteredExemplarReservoir[N]) Offer(ctx context.Context, val N, attr []attribute.KeyValue) {
if f.filter(ctx) {
// only record the current time if we are sampling this measurment.
f.reservoir.Offer(ctx, time.Now(), exemplar.NewValue(val), attr)
}
}

func (f *filteredExemplarReservoir[N]) Collect(dest *[]exemplar.Exemplar) { f.reservoir.Collect(dest) }
9 changes: 4 additions & 5 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

type buckets[N int64 | float64] struct {
attrs attribute.Set
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]

counts []uint64
count uint64
Expand Down Expand Up @@ -48,13 +47,13 @@ type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[*buckets[N]]
values map[attribute.Distinct]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand Down Expand Up @@ -109,7 +108,7 @@ func (s *histValues[N]) measure(ctx context.Context, value N, fltrAttr attribute

// newHistogram returns an Aggregator that summarizes a set of measurements as
// an histogram.
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() exemplar.FilteredReservoir[N]) *histogram[N] {
func newHistogram[N int64 | float64](boundaries []float64, noMinMax, noSum bool, limit int, r func() FilteredExemplarReservoir[N]) *histogram[N] {
return &histogram[N]{
histValues: newHistValues[N](boundaries, noSum, limit, r),
noMinMax: noMinMax,
Expand Down
9 changes: 4 additions & 5 deletions sdk/metric/internal/aggregate/lastvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// datapoint is timestamped measurement data.
type datapoint[N int64 | float64] struct {
attrs attribute.Set
value N
res exemplar.FilteredReservoir[N]
res FilteredExemplarReservoir[N]
}

func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *lastValue[N] {
func newLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *lastValue[N] {
return &lastValue[N]{
newRes: r,
limit: newLimiter[datapoint[N]](limit),
Expand All @@ -33,7 +32,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReserv
type lastValue[N int64 | float64] struct {
sync.Mutex

newRes func() exemplar.FilteredReservoir[N]
newRes func() FilteredExemplarReservoir[N]
limit limiter[datapoint[N]]
values map[attribute.Distinct]datapoint[N]
start time.Time
Expand Down Expand Up @@ -115,7 +114,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) in

// newPrecomputedLastValue returns an aggregator that summarizes a set of
// observations as the last one made.
func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.FilteredReservoir[N]) *precomputedLastValue[N] {
func newPrecomputedLastValue[N int64 | float64](limit int, r func() FilteredExemplarReservoir[N]) *precomputedLastValue[N] {
return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)}
}

Expand Down
Loading

0 comments on commit d7cc77b

Please sign in to comment.