diff --git a/CHANGELOG.md b/CHANGELOG.md index 50269e3aa81..c624b9fe251 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Unify endpoint API that related to OTel exporter. (#1401) - Optimize metric histogram aggregator to re-use its slice of buckets. (#1435) - Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430) +- Histogram aggregator accepts functional options, uses default boundaries if none given. (#1434) - `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432) - Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420) - The `TraceContext` propagator now correctly propagates `TraceState` through the `SpanContext`. (#1447) diff --git a/example/prom-collector/main.go b/example/prom-collector/main.go index e4efbfcb5d2..946fdccf762 100644 --- a/example/prom-collector/main.go +++ b/example/prom-collector/main.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" "go.opentelemetry.io/otel/sdk/metric/selector/simple" @@ -59,9 +60,11 @@ func initMeter() { cont := controller.New( processor.New( - simple.NewWithHistogramDistribution([]float64{ - 0.001, 0.01, 0.1, 1, 10, 100, 1000, - }), + simple.NewWithHistogramDistribution( + histogram.WithExplicitBoundaries([]float64{ + 0.001, 0.01, 0.1, 1, 10, 100, 1000, + }), + ), otlpExporter, // otlpExporter is an ExportKindSelector processor.WithMemory(true), ), diff --git a/exporters/metric/prometheus/prometheus.go b/exporters/metric/prometheus/prometheus.go index 7924c8c1d40..e74f6524737 100644 --- a/exporters/metric/prometheus/prometheus.go +++ b/exporters/metric/prometheus/prometheus.go @@ -32,9 +32,10 @@ import ( "go.opentelemetry.io/otel/metric/number" export "go.opentelemetry.io/otel/sdk/export/metric" "go.opentelemetry.io/otel/sdk/export/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" + selector "go.opentelemetry.io/otel/sdk/metric/selector/simple" ) // Exporter supports Prometheus pulls. It does not implement the @@ -150,7 +151,9 @@ func InstallNewPipeline(config Config, options ...controller.Option) (*Exporter, func defaultController(config Config, options ...controller.Option) *controller.Controller { return controller.New( processor.New( - simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries), + selector.NewWithHistogramDistribution( + histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries), + ), export.CumulativeExportKindSelector(), processor.WithMemory(true), ), diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index 9d804b84df4..3da92f098ab 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -756,7 +756,7 @@ func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, if r.iKind.Adding() { agg, ckpt = metrictest.Unslice2(sum.New(2)) } else { - agg, ckpt = metrictest.Unslice2(histogram.New(2, &desc, testHistogramBoundaries)) + agg, ckpt = metrictest.Unslice2(histogram.New(2, &desc, histogram.WithExplicitBoundaries(testHistogramBoundaries))) } ctx := context.Background() diff --git a/sdk/metric/aggregator/histogram/benchmark_test.go b/sdk/metric/aggregator/histogram/benchmark_test.go index 91fab848c92..dfc7b092daf 100644 --- a/sdk/metric/aggregator/histogram/benchmark_test.go +++ b/sdk/metric/aggregator/histogram/benchmark_test.go @@ -39,7 +39,7 @@ func benchmarkHistogramSearchFloat64(b *testing.B, size int) { values[i] = rand.Float64() * inputRange } desc := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, number.Float64Kind) - agg := &histogram.New(1, desc, boundaries)[0] + agg := &histogram.New(1, desc, histogram.WithExplicitBoundaries(boundaries))[0] ctx := context.Background() b.ReportAllocs() @@ -90,7 +90,7 @@ func benchmarkHistogramSearchInt64(b *testing.B, size int) { values[i] = int64(rand.Float64() * inputRange) } desc := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, number.Int64Kind) - agg := &histogram.New(1, desc, boundaries)[0] + agg := &histogram.New(1, desc, histogram.WithExplicitBoundaries(boundaries))[0] ctx := context.Background() b.ReportAllocs() diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index b83af270941..ea3ecdbb5b2 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -41,6 +41,19 @@ type ( state *state } + // config describes how the histogram is aggregated. + config struct { + // explicitBoundaries support arbitrary bucketing schemes. This + // is the general case. + explicitBoundaries []float64 + } + + // Option configures a histogram config. + Option interface { + // apply sets one or more config fields. + apply(*config) + } + // state represents the state of a histogram, consisting of // the sum and counts for all observed values and // the less than equal bucket count for the pre-determined boundaries. @@ -51,6 +64,39 @@ type ( } ) +// WithExplicitBoundaries sets the ExplicitBoundaries configuration option of a config. +func WithExplicitBoundaries(explicitBoundaries []float64) Option { + return explicitBoundariesOption{explicitBoundaries} +} + +type explicitBoundariesOption struct { + boundaries []float64 +} + +func (o explicitBoundariesOption) apply(config *config) { + config.explicitBoundaries = o.boundaries +} + +// defaultExplicitBoundaries have been copied from prometheus.DefBuckets. +// +// Note we anticipate the use of a high-precision histogram sketch as +// the standard histogram aggregator for OTLP export. +// (https://github.com/open-telemetry/opentelemetry-specification/issues/982). +var defaultFloat64ExplicitBoundaries = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} + +// defaultInt64ExplicitBoundaryMultiplier determines the default +// integer histogram boundaries. +const defaultInt64ExplicitBoundaryMultiplier = 1e6 + +// defaultInt64ExplicitBoundaries applies a multiplier to the default +// float64 boundaries: [ 5K, 10K, 25K, ..., 2.5M, 5M, 10M ] +var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) { + for _, f := range bounds { + asint = append(asint, defaultInt64ExplicitBoundaryMultiplier*f) + } + return +}(defaultFloat64ExplicitBoundaries) + var _ export.Aggregator = &Aggregator{} var _ aggregation.Sum = &Aggregator{} var _ aggregation.Count = &Aggregator{} @@ -64,14 +110,26 @@ var _ aggregation.Histogram = &Aggregator{} // Note that this aggregator maintains each value using independent // atomic operations, which introduces the possibility that // checkpoints are inconsistent. -func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator { +func New(cnt int, desc *metric.Descriptor, opts ...Option) []Aggregator { + var cfg config + + if desc.NumberKind() == number.Int64Kind { + cfg.explicitBoundaries = defaultInt64ExplicitBoundaries + } else { + cfg.explicitBoundaries = defaultFloat64ExplicitBoundaries + } + + for _, opt := range opts { + opt.apply(&cfg) + } + aggs := make([]Aggregator, cnt) // Boundaries MUST be ordered otherwise the histogram could not // be properly computed. - sortedBoundaries := make([]float64, len(boundaries)) + sortedBoundaries := make([]float64, len(cfg.explicitBoundaries)) - copy(sortedBoundaries, boundaries) + copy(sortedBoundaries, cfg.explicitBoundaries) sort.Float64s(sortedBoundaries) for i := range aggs { diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index 4e381659d44..ddc2195167e 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -15,6 +15,7 @@ package histogram_test import ( + "context" "math" "math/rand" "sort" @@ -59,16 +60,16 @@ var ( }, } - boundaries = []float64{500, 250, 750} + testBoundaries = []float64{500, 250, 750} ) -func new2(desc *metric.Descriptor) (_, _ *histogram.Aggregator) { - alloc := histogram.New(2, desc, boundaries) +func new2(desc *metric.Descriptor, options ...histogram.Option) (_, _ *histogram.Aggregator) { + alloc := histogram.New(2, desc, options...) return &alloc[0], &alloc[1] } -func new4(desc *metric.Descriptor) (_, _, _, _ *histogram.Aggregator) { - alloc := histogram.New(4, desc, boundaries) +func new4(desc *metric.Descriptor, options ...histogram.Option) (_, _, _, _ *histogram.Aggregator) { + alloc := histogram.New(4, desc, options...) return &alloc[0], &alloc[1], &alloc[2], &alloc[3] } @@ -84,11 +85,10 @@ func checkZero(t *testing.T, agg *histogram.Aggregator, desc *metric.Descriptor) buckets, err := agg.Histogram() require.NoError(t, err) - require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + require.Equal(t, len(buckets.Counts), len(testBoundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") for i, bCount := range buckets.Counts { require.Equal(t, uint64(0), uint64(bCount), "Bucket #%d must have 0 observed values", i) } - } func TestHistogramAbsolute(t *testing.T) { @@ -113,7 +113,7 @@ func TestHistogramPositiveAndNegative(t *testing.T) { func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy) { descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind) - agg, ckpt := new2(descriptor) + agg, ckpt := new2(descriptor, histogram.WithExplicitBoundaries(testBoundaries)) // This needs to repeat at least 3 times to uncover a failure to reset // for the overall sum and count fields, since the third time through @@ -139,12 +139,12 @@ func TestHistogramInitial(t *testing.T) { aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) { descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind) - agg := &histogram.New(1, descriptor, boundaries)[0] + agg := &histogram.New(1, descriptor, histogram.WithExplicitBoundaries(testBoundaries))[0] buckets, err := agg.Histogram() require.NoError(t, err) - require.Equal(t, len(buckets.Counts), len(boundaries)+1) - require.Equal(t, len(buckets.Boundaries), len(boundaries)) + require.Equal(t, len(buckets.Counts), len(testBoundaries)+1) + require.Equal(t, len(buckets.Boundaries), len(testBoundaries)) }) } @@ -152,7 +152,7 @@ func TestHistogramMerge(t *testing.T) { aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) { descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind) - agg1, agg2, ckpt1, ckpt2 := new4(descriptor) + agg1, agg2, ckpt1, ckpt2 := new4(descriptor, histogram.WithExplicitBoundaries(testBoundaries)) all := aggregatortest.NewNumbers(profile.NumberKind) @@ -180,7 +180,7 @@ func TestHistogramNotSet(t *testing.T) { aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) { descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind) - agg, ckpt := new2(descriptor) + agg, ckpt := new2(descriptor, histogram.WithExplicitBoundaries(testBoundaries)) err := agg.SynchronizedMove(ckpt, descriptor) require.NoError(t, err) @@ -212,11 +212,12 @@ func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregator buckets, err := agg.Histogram() require.NoError(t, err) - require.Equal(t, len(buckets.Counts), len(boundaries)+1, + require.Equal(t, len(buckets.Counts), len(testBoundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") - sortedBoundaries := make([]float64, len(boundaries)) - copy(sortedBoundaries, boundaries) + sortedBoundaries := make([]float64, len(testBoundaries)) + copy(sortedBoundaries, testBoundaries) + sort.Float64s(sortedBoundaries) require.EqualValues(t, sortedBoundaries, buckets.Boundaries) @@ -240,7 +241,56 @@ func TestSynchronizedMoveReset(t *testing.T) { t, metric.ValueRecorderInstrumentKind, func(desc *metric.Descriptor) export.Aggregator { - return &histogram.New(1, desc, boundaries)[0] + return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0] }, ) } + +func TestHistogramDefaultBoundaries(t *testing.T) { + aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) { + ctx := context.Background() + descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind) + + agg, ckpt := new2(descriptor) + + bounds := []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} // len 11 + values := append(bounds, 100) // len 12 + expect := []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} // len 12 + + for _, value := range values { + var num number.Number + + value -= .001 // Avoid exact boundaries + + if descriptor.NumberKind() == number.Int64Kind { + value *= 1e6 + num = number.NewInt64Number(int64(value)) + } else { + num = number.NewFloat64Number(value) + } + + require.NoError(t, agg.Update(ctx, num, descriptor)) + } + + bucks, err := agg.Histogram() + require.NoError(t, err) + + // Check for proper lengths, 1 count in each bucket. + require.Equal(t, len(values), len(bucks.Counts)) + require.Equal(t, len(bounds), len(bucks.Boundaries)) + require.EqualValues(t, expect, bucks.Counts) + + require.Equal(t, expect, bucks.Counts) + + // Move and repeat the test on `ckpt`. + err = agg.SynchronizedMove(ckpt, descriptor) + require.NoError(t, err) + + bucks, err = ckpt.Histogram() + require.NoError(t, err) + + require.Equal(t, len(values), len(bucks.Counts)) + require.Equal(t, len(bounds), len(bucks.Boundaries)) + require.EqualValues(t, expect, bucks.Counts) + }) +} diff --git a/sdk/metric/histogram_stress_test.go b/sdk/metric/histogram_stress_test.go index 10aa7e1a86f..f88cc3d360b 100644 --- a/sdk/metric/histogram_stress_test.go +++ b/sdk/metric/histogram_stress_test.go @@ -30,7 +30,7 @@ import ( func TestStressInt64Histogram(t *testing.T) { desc := metric.NewDescriptor("some_metric", metric.ValueRecorderInstrumentKind, number.Int64Kind) - alloc := histogram.New(2, &desc, []float64{25, 50, 75}) + alloc := histogram.New(2, &desc, histogram.WithExplicitBoundaries([]float64{25, 50, 75})) h, ckpt := &alloc[0], &alloc[1] ctx, cancelFunc := context.WithCancel(context.Background()) diff --git a/sdk/metric/processor/processortest/test.go b/sdk/metric/processor/processortest/test.go index 17141a8de9b..9b767291b58 100644 --- a/sdk/metric/processor/processortest/test.go +++ b/sdk/metric/processor/processortest/test.go @@ -184,7 +184,7 @@ func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ... *aggPtrs[i] = &aggs[i] } case strings.HasSuffix(desc.Name(), ".histogram"): - aggs := histogram.New(len(aggPtrs), desc, nil) + aggs := histogram.New(len(aggPtrs), desc) for i := range aggPtrs { *aggPtrs[i] = &aggs[i] } diff --git a/sdk/metric/selector/simple/simple.go b/sdk/metric/selector/simple/simple.go index 3d1d055a7f5..bb5760994ac 100644 --- a/sdk/metric/selector/simple/simple.go +++ b/sdk/metric/selector/simple/simple.go @@ -28,7 +28,7 @@ type ( selectorInexpensive struct{} selectorExact struct{} selectorHistogram struct { - boundaries []float64 + options []histogram.Option } ) @@ -59,8 +59,8 @@ func NewWithExactDistribution() export.AggregatorSelector { // NewWithHistogramDistribution returns a simple aggregator selector // that uses histogram aggregators for `ValueRecorder` instruments. // This selector is a good default choice for most metric exporters. -func NewWithHistogramDistribution(boundaries []float64) export.AggregatorSelector { - return selectorHistogram{boundaries: boundaries} +func NewWithHistogramDistribution(options ...histogram.Option) export.AggregatorSelector { + return selectorHistogram{options: options} } func sumAggs(aggPtrs []*export.Aggregator) { @@ -110,7 +110,7 @@ func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor, aggPtrs case metric.ValueObserverInstrumentKind: lastValueAggs(aggPtrs) case metric.ValueRecorderInstrumentKind: - aggs := histogram.New(len(aggPtrs), descriptor, s.boundaries) + aggs := histogram.New(len(aggPtrs), descriptor, s.options...) for i := range aggPtrs { *aggPtrs[i] = &aggs[i] } diff --git a/sdk/metric/selector/simple/simple_test.go b/sdk/metric/selector/simple/simple_test.go index 6bec06f3481..cd7dab46357 100644 --- a/sdk/metric/selector/simple/simple_test.go +++ b/sdk/metric/selector/simple/simple_test.go @@ -66,7 +66,7 @@ func TestExactDistribution(t *testing.T) { } func TestHistogramDistribution(t *testing.T) { - hist := simple.NewWithHistogramDistribution(nil) + hist := simple.NewWithHistogramDistribution() require.IsType(t, (*histogram.Aggregator)(nil), oneAgg(hist, &testValueRecorderDesc)) testFixedSelectors(t, hist) }