Skip to content

Commit

Permalink
Merge branch 'main' into fix-prometheus-gauge-exemplars
Browse files Browse the repository at this point in the history
  • Loading branch information
trthomps authored Oct 23, 2024
2 parents ab6a89e + 3429e15 commit 4caf61e
Show file tree
Hide file tree
Showing 20 changed files with 214 additions and 167 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
# So we can fetch the exact benchmark data from the previous commit.
key: ${{ runner.os }}-benchmark-${{ github.event.before }}
- name: Store benchmarks result
uses: benchmark-action/github-action-benchmark@v1.20.3
uses: benchmark-action/github-action-benchmark@v1.20.4
with:
name: Benchmarks
tool: 'go'
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Changed

- `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc` now keeps the metadata already present in the context when `WithHeaders` is used. (#5892)
- `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc` now keeps the metadata already present in the context when `WithHeaders` is used. (#5911)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
7 changes: 6 additions & 1 deletion exporters/otlp/otlplog/otlploggrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,12 @@ func (c *client) exportContext(parent context.Context) (context.Context, context
}

if c.metadata.Len() > 0 {
ctx = metadata.NewOutgoingContext(ctx, c.metadata)
md := c.metadata
if outMD, ok := metadata.FromOutgoingContext(ctx); ok {
md = metadata.Join(md, outMD)
}

ctx = metadata.NewOutgoingContext(ctx, md)
}

return ctx, cancel
Expand Down
37 changes: 37 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/log"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
cpb "go.opentelemetry.io/proto/otlp/common/v1"
Expand Down Expand Up @@ -561,3 +562,39 @@ func TestClient(t *testing.T) {
assert.ErrorContains(t, errs[0], want)
})
}

func TestConfig(t *testing.T) {
factoryFunc := func(rCh <-chan exportResult, o ...Option) (log.Exporter, *grpcCollector) {
coll, err := newGRPCCollector("", rCh)
require.NoError(t, err)

ctx := context.Background()
opts := append([]Option{
WithEndpoint(coll.listener.Addr().String()),
WithInsecure(),
}, o...)
exp, err := New(ctx, opts...)
require.NoError(t, err)
return exp, coll
}

t.Run("WithHeaders", func(t *testing.T) {
key := "my-custom-header"
headers := map[string]string{key: "custom-value"}
exp, coll := factoryFunc(nil, WithHeaders(headers))
t.Cleanup(coll.srv.Stop)

ctx := context.Background()
additionalKey := "additional-custom-header"
ctx = metadata.AppendToOutgoingContext(ctx, additionalKey, "additional-value")
require.NoError(t, exp.Export(ctx, make([]log.Record, 1)))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

got := metadata.Join(coll.headers)
require.Regexp(t, "OTel Go OTLP over gRPC logs exporter/[01]\\..*", got)
require.Contains(t, got, key)
require.Contains(t, got, additionalKey)
assert.Equal(t, []string{headers[key]}, got[key])
})
}
10 changes: 10 additions & 0 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,24 @@ package metric // import "go.opentelemetry.io/otel/sdk/metric"
import (
"runtime"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// ExemplarReservoirProviderSelector selects the
// [exemplar.ReservoirProvider] to use
// based on the [Aggregation] of the metric.
type ExemplarReservoirProviderSelector func(Aggregation) exemplar.ReservoirProvider

// reservoirFunc returns the appropriately configured exemplar reservoir
// creation func based on the passed InstrumentKind and filter configuration.
func reservoirFunc[N int64 | float64](provider exemplar.ReservoirProvider, filter exemplar.Filter) func(attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return func(attrs attribute.Set) aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, provider(attrs))
}
}

// DefaultExemplarReservoirProviderSelector returns the default
// [exemplar.ReservoirProvider] for the
// provided [Aggregation].
Expand Down
5 changes: 2 additions & 3 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"testing"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand All @@ -23,7 +22,7 @@ func BenchmarkInstrument(b *testing.B) {
}

b.Run("instrumentImpl/aggregate", func(b *testing.B) {
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)}
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

build.Temporality = metricdata.CumulativeTemporality
Expand Down Expand Up @@ -53,7 +52,7 @@ func BenchmarkInstrument(b *testing.B) {
})

b.Run("observable/observe", func(b *testing.B) {
build := aggregate.Builder[int64]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: exemplar.FixedSizeReservoirProvider(0)}
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.PrecomputedLastValue()
Expand Down
21 changes: 11 additions & 10 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand All @@ -34,12 +33,12 @@ type Builder[N int64 | float64] struct {
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
// ExemplarFilter is the filter applied to measurements before offering
// them to the exemplar Reservoir.
ExemplarFilter exemplar.Filter
// ExemplarReservoirProvider is the factory function used to create a new
// exemplar Reservoir for a given attribute set.
ExemplarReservoirProvider exemplar.ReservoirProvider
// ReservoirFunc is the factory function used by aggregate functions to
// create new exemplar reservoirs for a new seen attribute set.
//
// If this is not provided a default factory function that returns an
// dropReservoir reservoir will be used.
ReservoirFunc func(attribute.Set) FilteredExemplarReservoir[N]
// AggregationLimit is the cardinality limit of measurement attributes. Any
// measurement for new attributes once the limit has been reached will be
// aggregated into a single aggregate for the "otel.metric.overflow"
Expand All @@ -50,10 +49,12 @@ type Builder[N int64 | float64] struct {
AggregationLimit int
}

func (b Builder[N]) resFunc() func(attribute.Set) *filteredExemplarReservoir[N] {
return func(attrs attribute.Set) *filteredExemplarReservoir[N] {
return newFilteredExemplarReservoir[N](b.ExemplarFilter, b.ExemplarReservoirProvider(attrs))
func (b Builder[N]) resFunc() func(attribute.Set) FilteredExemplarReservoir[N] {
if b.ReservoirFunc != nil {
return b.ReservoirFunc
}

return dropReservoir
}

type fltrMeasure[N int64 | float64] func(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue)
Expand Down
13 changes: 4 additions & 9 deletions sdk/metric/internal/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
Expand Down Expand Up @@ -73,12 +72,8 @@ func (c *clock) Register() (unregister func()) {
return func() { now = orig }
}

func newNoopReservoir(attribute.Set) exemplar.Reservoir {
return exemplar.NewFixedSizeReservoir(0)
}

func dropExemplars[N int64 | float64](attr attribute.Set) *filteredExemplarReservoir[N] {
return newFilteredExemplarReservoir[N](exemplar.AlwaysOffFilter, newNoopReservoir(attr))
func dropExemplars[N int64 | float64](attr attribute.Set) FilteredExemplarReservoir[N] {
return dropReservoir[N](attr)
}

func TestBuilderFilter(t *testing.T) {
Expand All @@ -104,8 +99,8 @@ func testBuilderFilter[N int64 | float64]() func(t *testing.T) {
}
}

t.Run("NoFilter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir}, attr, nil))
t.Run("Filter", run(Builder[N]{ExemplarFilter: exemplar.AlwaysOffFilter, ExemplarReservoirProvider: newNoopReservoir, Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
t.Run("NoFilter", run(Builder[N]{}, attr, nil))
t.Run("Filter", run(Builder[N]{Filter: attrFltr}, fltrAlice, []attribute.KeyValue{adminTrue}))
}
}

Expand Down
26 changes: 26 additions & 0 deletions sdk/metric/internal/aggregate/drop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

// dropReservoir returns a [FilteredReservoir] that drops all measurements it is offered.
func dropReservoir[N int64 | float64](attribute.Set) FilteredExemplarReservoir[N] {
return &dropRes[N]{}
}

type dropRes[N int64 | float64] struct{}

// Offer does nothing, all measurements offered will be dropped.
func (r *dropRes[N]) Offer(context.Context, N, []attribute.KeyValue) {}

// Collect resets dest. No exemplars will ever be returned.
func (r *dropRes[N]) Collect(dest *[]exemplar.Exemplar) {
*dest = (*dest)[:0]
}
27 changes: 27 additions & 0 deletions sdk/metric/internal/aggregate/drop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package aggregate

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

func TestDrop(t *testing.T) {
t.Run("Int64", testDropFiltered[int64])
t.Run("Float64", testDropFiltered[float64])
}

func testDropFiltered[N int64 | float64](t *testing.T) {
r := dropReservoir[N](*attribute.EmptySet())

var dest []exemplar.Exemplar
r.Collect(&dest)

assert.Empty(t, dest, "non-sampled context should not be offered")
}
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/exponential_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
// expoHistogramDataPoint is a single data point in an exponential histogram.
type expoHistogramDataPoint[N int64 | float64] struct {
attrs attribute.Set
res *filteredExemplarReservoir[N]
res FilteredExemplarReservoir[N]

count uint64
min N
Expand Down Expand Up @@ -283,7 +283,7 @@ func (b *expoBuckets) downscale(delta int32) {
// newExponentialHistogram returns an Aggregator that summarizes a set of
// measurements as an exponential histogram. Each histogram is scoped by attributes
// and the aggregation cycle the measurements were made in.
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) *filteredExemplarReservoir[N]) *expoHistogram[N] {
func newExponentialHistogram[N int64 | float64](maxSize, maxScale int32, noMinMax, noSum bool, limit int, r func(attribute.Set) FilteredExemplarReservoir[N]) *expoHistogram[N] {
return &expoHistogram[N]{
noSum: noSum,
noMinMax: noMinMax,
Expand All @@ -306,7 +306,7 @@ type expoHistogram[N int64 | float64] struct {
maxSize int
maxScale int32

newRes func(attribute.Set) *filteredExemplarReservoir[N]
newRes func(attribute.Set) FilteredExemplarReservoir[N]
limit limiter[*expoHistogramDataPoint[N]]
values map[attribute.Distinct]*expoHistogramDataPoint[N]
valuesMu sync.Mutex
Expand Down
33 changes: 10 additions & 23 deletions sdk/metric/internal/aggregate/exponential_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

Expand Down Expand Up @@ -683,30 +682,22 @@ func BenchmarkExponentialHistogram(b *testing.B) {

b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{
Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
Temporality: metricdata.CumulativeTemporality,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) {
return Builder[int64]{
Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
Temporality: metricdata.DeltaTemporality,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.CumulativeTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
Temporality: metricdata.CumulativeTemporality,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) {
return Builder[float64]{
Temporality: metricdata.DeltaTemporality,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
Temporality: metricdata.DeltaTemporality,
}.ExponentialBucketHistogram(maxSize, maxScale, noMinMax, noSum)
}))
}
Expand Down Expand Up @@ -753,11 +744,9 @@ func TestExponentialHistogramAggregation(t *testing.T) {

func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 2,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
Temporality: metricdata.DeltaTemporality,
Filter: attrFltr,
AggregationLimit: 2,
}.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background()
return test[N](in, out, []teststep[N]{
Expand Down Expand Up @@ -882,11 +871,9 @@ func testDeltaExpoHist[N int64 | float64]() func(t *testing.T) {

func testCumulativeExpoHist[N int64 | float64]() func(t *testing.T) {
in, out := Builder[N]{
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 2,
ExemplarFilter: exemplar.AlwaysOffFilter,
ExemplarReservoirProvider: newNoopReservoir,
Temporality: metricdata.CumulativeTemporality,
Filter: attrFltr,
AggregationLimit: 2,
}.ExponentialBucketHistogram(4, 20, false, false)
ctx := context.Background()
return test[N](in, out, []teststep[N]{
Expand Down
21 changes: 17 additions & 4 deletions sdk/metric/internal/aggregate/filtered_reservoir.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,29 @@ import (
"go.opentelemetry.io/otel/sdk/metric/exemplar"
)

// FilteredExemplarReservoir wraps a [exemplar.Reservoir] with a filter.
type FilteredExemplarReservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the filter decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
Offer(ctx context.Context, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars in the reservoir.
Collect(dest *[]exemplar.Exemplar)
}

// filteredExemplarReservoir handles the pre-sampled exemplar of measurements made.
type filteredExemplarReservoir[N int64 | float64] struct {
filter exemplar.Filter
reservoir exemplar.Reservoir
}

// newFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which
// only offers values that are allowed by the filter. If the provided filter is
// nil, all measurements are dropped..
func newFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) *filteredExemplarReservoir[N] {
// NewFilteredExemplarReservoir creates a [FilteredExemplarReservoir] which only offers values
// that are allowed by the filter.
func NewFilteredExemplarReservoir[N int64 | float64](f exemplar.Filter, r exemplar.Reservoir) FilteredExemplarReservoir[N] {
return &filteredExemplarReservoir[N]{
filter: f,
reservoir: r,
Expand Down
Loading

0 comments on commit 4caf61e

Please sign in to comment.