Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move exemplar types to non-internal package #5747

Merged
merged 14 commits into from
Sep 26, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `SampledFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747)

### Changed

- Enable exemplars by default in `go.opentelemetry.io/otel/sdk/metric`. Exemplars can be disabled by setting `OTEL_METRICS_EXEMPLAR_FILTER=always_off` (#5778)
Expand Down
15 changes: 8 additions & 7 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"runtime"
"slices"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// reservoirFunc returns the appropriately configured exemplar reservoir
Expand All @@ -18,7 +19,7 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] {
func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

Expand All @@ -28,7 +29,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
case "always_on":
filter = exemplar.AlwaysOnFilter
case "always_off":
return exemplar.Drop
return aggregate.DropReservoir
case "trace_based":
fallthrough
default:
Expand All @@ -41,9 +42,9 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.FilteredReservoir[N] {
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds))
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
}

Expand Down Expand Up @@ -71,7 +72,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
}
}

return func() exemplar.FilteredReservoir[N] {
return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n))
return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
}
3 changes: 3 additions & 0 deletions sdk/metric/exemplar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Metric SDK Exemplars

[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/sdk/metric/exemplar)](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/exemplar)
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

// Package exemplar provides an implementation of the OpenTelemetry exemplar
// reservoir to be used in metric collection pipelines.
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand All @@ -12,15 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
return newRandRes(newStorage(k))
// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
// sample all additional measurement with a decreasing probability.
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
return newFixedSizeReservoir(newStorage(k))
}

type randRes struct {
var _ Reservoir = &FixedSizeReservoir{}

// FixedSizeReservoir is a [Reservoir] that samples at most k exemplars. If
// there are k or less measurements made, the Reservoir will sample each one.
// If there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
type FixedSizeReservoir struct {
*storage

// count is the number of measurement seen.
Expand All @@ -39,8 +45,8 @@ type randRes struct {
rng *rand.Rand
}

func newRandRes(s *storage) *randRes {
r := &randRes{
func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
r := &FixedSizeReservoir{
storage: s,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
Expand All @@ -50,7 +56,7 @@ func newRandRes(s *storage) *randRes {

// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (r *randRes) randomFloat64() float64 {
func (r *FixedSizeReservoir) randomFloat64() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
Expand All @@ -75,7 +81,18 @@ func (r *randRes) randomFloat64() float64 {
return f
}

func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
Expand Down Expand Up @@ -131,7 +148,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
}

// reset resets r to the initial state.
func (r *randRes) reset() {
func (r *FixedSizeReservoir) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
Expand All @@ -153,7 +170,7 @@ func (r *randRes) reset() {

// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *randRes) advance() {
func (r *FixedSizeReservoir) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
Expand All @@ -180,7 +197,10 @@ func (r *randRes) advance() {
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}

func (r *randRes) Collect(dest *[]Exemplar) {
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ import (
"github.com/stretchr/testify/assert"
)

func TestFixedSize(t *testing.T) {
func TestNewFixedSizeReservoir(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) {
return FixedSize(n), n
return NewFixedSizeReservoir(n), n
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) {
return FixedSize(n), n
return NewFixedSizeReservoir(n), n
}))
}

func TestFixedSizeSamplingCorrectness(t *testing.T) {
func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
intensity := 0.1
sampleSize := 1000

Expand All @@ -38,13 +38,13 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
// Sort to test position bias.
slices.Sort(data)

r := FixedSize(sampleSize)
r := NewFixedSizeReservoir(sampleSize)
for _, value := range data {
r.Offer(context.Background(), staticTime, NewValue(value), nil)
}

var sum float64
for _, m := range r.(*randRes).store {
for _, m := range r.store {
sum += m.Value.Float64()
}
mean := sum / float64(sampleSize)
Expand Down
62 changes: 62 additions & 0 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
"slices"
"sort"
"time"

"go.opentelemetry.io/otel/attribute"
)

// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
}
}

var _ Reservoir = &HistogramReservoir{}

// HistogramReservoir is a [Reservoir] that samples the last measurement that
// falls within a histogram bucket. The histogram bucket upper-boundaries are
// define by bounds.
type HistogramReservoir struct {
*storage

// bounds are bucket bounds in ascending order.
bounds []float64
}

// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
var x float64
switch v.Type() {
case Int64ValueType:
x = float64(v.Int64())
case Float64ValueType:
x = v.Float64()
default:
panic("unknown value type")
}
r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import "testing"
func TestHist(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
return Histogram(bounds), len(bounds)
return NewHistogramReservoir(bounds), len(bounds)
}))

t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
return Histogram(bounds), len(bounds)
return NewHistogramReservoir(bounds), len(bounds)
}))
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down Expand Up @@ -35,7 +35,7 @@ func (r *storage) Collect(dest *[]Exemplar) {
continue
}

m.Exemplar(&(*dest)[n])
m.exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
Expand Down Expand Up @@ -66,8 +66,8 @@ func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []at
}
}

// Exemplar returns m as an [Exemplar].
func (m measurement) Exemplar(dest *Exemplar) {
// exemplar returns m as an [Exemplar].
func (m measurement) exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import "math"

Expand Down
Loading
Loading