Skip to content

Commit

Permalink
Histogram aggregator functional options (open-telemetry#1434)
Browse files Browse the repository at this point in the history
* Add a Config/Option for histogram

* Just one option here

* Test fixes

* Support and test int64 histograms

* Changelog

* Lint

* Un-export three things.
  • Loading branch information
jmacd authored Jan 15, 2021
1 parent 0df8cd6 commit c562277
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Unify endpoint API that related to OTel exporter. (#1401)
- Optimize metric histogram aggregator to re-use its slice of buckets. (#1435)
- Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430)
- Histogram aggregator accepts functional options, uses default boundaries if none given. (#1434)
- `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432)
- Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420)
- The `TraceContext` propagator now correctly propagates `TraceState` through the `SpanContext`. (#1447)
Expand Down
9 changes: 6 additions & 3 deletions example/prom-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
Expand Down Expand Up @@ -59,9 +60,11 @@ func initMeter() {

cont := controller.New(
processor.New(
simple.NewWithHistogramDistribution([]float64{
0.001, 0.01, 0.1, 1, 10, 100, 1000,
}),
simple.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries([]float64{
0.001, 0.01, 0.1, 1, 10, 100, 1000,
}),
),
otlpExporter, // otlpExporter is an ExportKindSelector
processor.WithMemory(true),
),
Expand Down
7 changes: 5 additions & 2 deletions exporters/metric/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ import (
"go.opentelemetry.io/otel/metric/number"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/export/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"
)

// Exporter supports Prometheus pulls. It does not implement the
Expand Down Expand Up @@ -150,7 +151,9 @@ func InstallNewPipeline(config Config, options ...controller.Option) (*Exporter,
func defaultController(config Config, options ...controller.Option) *controller.Controller {
return controller.New(
processor.New(
simple.NewWithHistogramDistribution(config.DefaultHistogramBoundaries),
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries(config.DefaultHistogramBoundaries),
),
export.CumulativeExportKindSelector(),
processor.WithMemory(true),
),
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record,
if r.iKind.Adding() {
agg, ckpt = metrictest.Unslice2(sum.New(2))
} else {
agg, ckpt = metrictest.Unslice2(histogram.New(2, &desc, testHistogramBoundaries))
agg, ckpt = metrictest.Unslice2(histogram.New(2, &desc, histogram.WithExplicitBoundaries(testHistogramBoundaries)))
}

ctx := context.Background()
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/aggregator/histogram/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func benchmarkHistogramSearchFloat64(b *testing.B, size int) {
values[i] = rand.Float64() * inputRange
}
desc := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, number.Float64Kind)
agg := &histogram.New(1, desc, boundaries)[0]
agg := &histogram.New(1, desc, histogram.WithExplicitBoundaries(boundaries))[0]
ctx := context.Background()

b.ReportAllocs()
Expand Down Expand Up @@ -90,7 +90,7 @@ func benchmarkHistogramSearchInt64(b *testing.B, size int) {
values[i] = int64(rand.Float64() * inputRange)
}
desc := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, number.Int64Kind)
agg := &histogram.New(1, desc, boundaries)[0]
agg := &histogram.New(1, desc, histogram.WithExplicitBoundaries(boundaries))[0]
ctx := context.Background()

b.ReportAllocs()
Expand Down
64 changes: 61 additions & 3 deletions sdk/metric/aggregator/histogram/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ type (
state *state
}

// config describes how the histogram is aggregated.
config struct {
// explicitBoundaries support arbitrary bucketing schemes. This
// is the general case.
explicitBoundaries []float64
}

// Option configures a histogram config.
Option interface {
// apply sets one or more config fields.
apply(*config)
}

// state represents the state of a histogram, consisting of
// the sum and counts for all observed values and
// the less than equal bucket count for the pre-determined boundaries.
Expand All @@ -51,6 +64,39 @@ type (
}
)

// WithExplicitBoundaries sets the ExplicitBoundaries configuration option of a config.
func WithExplicitBoundaries(explicitBoundaries []float64) Option {
return explicitBoundariesOption{explicitBoundaries}
}

type explicitBoundariesOption struct {
boundaries []float64
}

func (o explicitBoundariesOption) apply(config *config) {
config.explicitBoundaries = o.boundaries
}

// defaultExplicitBoundaries have been copied from prometheus.DefBuckets.
//
// Note we anticipate the use of a high-precision histogram sketch as
// the standard histogram aggregator for OTLP export.
// (https://github.com/open-telemetry/opentelemetry-specification/issues/982).
var defaultFloat64ExplicitBoundaries = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}

// defaultInt64ExplicitBoundaryMultiplier determines the default
// integer histogram boundaries.
const defaultInt64ExplicitBoundaryMultiplier = 1e6

// defaultInt64ExplicitBoundaries applies a multiplier to the default
// float64 boundaries: [ 5K, 10K, 25K, ..., 2.5M, 5M, 10M ]
var defaultInt64ExplicitBoundaries = func(bounds []float64) (asint []float64) {
for _, f := range bounds {
asint = append(asint, defaultInt64ExplicitBoundaryMultiplier*f)
}
return
}(defaultFloat64ExplicitBoundaries)

var _ export.Aggregator = &Aggregator{}
var _ aggregation.Sum = &Aggregator{}
var _ aggregation.Count = &Aggregator{}
Expand All @@ -64,14 +110,26 @@ var _ aggregation.Histogram = &Aggregator{}
// Note that this aggregator maintains each value using independent
// atomic operations, which introduces the possibility that
// checkpoints are inconsistent.
func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator {
func New(cnt int, desc *metric.Descriptor, opts ...Option) []Aggregator {
var cfg config

if desc.NumberKind() == number.Int64Kind {
cfg.explicitBoundaries = defaultInt64ExplicitBoundaries
} else {
cfg.explicitBoundaries = defaultFloat64ExplicitBoundaries
}

for _, opt := range opts {
opt.apply(&cfg)
}

aggs := make([]Aggregator, cnt)

// Boundaries MUST be ordered otherwise the histogram could not
// be properly computed.
sortedBoundaries := make([]float64, len(boundaries))
sortedBoundaries := make([]float64, len(cfg.explicitBoundaries))

copy(sortedBoundaries, boundaries)
copy(sortedBoundaries, cfg.explicitBoundaries)
sort.Float64s(sortedBoundaries)

for i := range aggs {
Expand Down
84 changes: 67 additions & 17 deletions sdk/metric/aggregator/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package histogram_test

import (
"context"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -59,16 +60,16 @@ var (
},
}

boundaries = []float64{500, 250, 750}
testBoundaries = []float64{500, 250, 750}
)

func new2(desc *metric.Descriptor) (_, _ *histogram.Aggregator) {
alloc := histogram.New(2, desc, boundaries)
func new2(desc *metric.Descriptor, options ...histogram.Option) (_, _ *histogram.Aggregator) {
alloc := histogram.New(2, desc, options...)
return &alloc[0], &alloc[1]
}

func new4(desc *metric.Descriptor) (_, _, _, _ *histogram.Aggregator) {
alloc := histogram.New(4, desc, boundaries)
func new4(desc *metric.Descriptor, options ...histogram.Option) (_, _, _, _ *histogram.Aggregator) {
alloc := histogram.New(4, desc, options...)
return &alloc[0], &alloc[1], &alloc[2], &alloc[3]
}

Expand All @@ -84,11 +85,10 @@ func checkZero(t *testing.T, agg *histogram.Aggregator, desc *metric.Descriptor)
buckets, err := agg.Histogram()
require.NoError(t, err)

require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
require.Equal(t, len(buckets.Counts), len(testBoundaries)+1, "There should be b + 1 counts, where b is the number of boundaries")
for i, bCount := range buckets.Counts {
require.Equal(t, uint64(0), uint64(bCount), "Bucket #%d must have 0 observed values", i)
}

}

func TestHistogramAbsolute(t *testing.T) {
Expand All @@ -113,7 +113,7 @@ func TestHistogramPositiveAndNegative(t *testing.T) {
func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy) {
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)

agg, ckpt := new2(descriptor)
agg, ckpt := new2(descriptor, histogram.WithExplicitBoundaries(testBoundaries))

// This needs to repeat at least 3 times to uncover a failure to reset
// for the overall sum and count fields, since the third time through
Expand All @@ -139,20 +139,20 @@ func TestHistogramInitial(t *testing.T) {
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)

agg := &histogram.New(1, descriptor, boundaries)[0]
agg := &histogram.New(1, descriptor, histogram.WithExplicitBoundaries(testBoundaries))[0]
buckets, err := agg.Histogram()

require.NoError(t, err)
require.Equal(t, len(buckets.Counts), len(boundaries)+1)
require.Equal(t, len(buckets.Boundaries), len(boundaries))
require.Equal(t, len(buckets.Counts), len(testBoundaries)+1)
require.Equal(t, len(buckets.Boundaries), len(testBoundaries))
})
}

func TestHistogramMerge(t *testing.T) {
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)

agg1, agg2, ckpt1, ckpt2 := new4(descriptor)
agg1, agg2, ckpt1, ckpt2 := new4(descriptor, histogram.WithExplicitBoundaries(testBoundaries))

all := aggregatortest.NewNumbers(profile.NumberKind)

Expand Down Expand Up @@ -180,7 +180,7 @@ func TestHistogramNotSet(t *testing.T) {
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)

agg, ckpt := new2(descriptor)
agg, ckpt := new2(descriptor, histogram.WithExplicitBoundaries(testBoundaries))

err := agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)
Expand Down Expand Up @@ -212,11 +212,12 @@ func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregator
buckets, err := agg.Histogram()
require.NoError(t, err)

require.Equal(t, len(buckets.Counts), len(boundaries)+1,
require.Equal(t, len(buckets.Counts), len(testBoundaries)+1,
"There should be b + 1 counts, where b is the number of boundaries")

sortedBoundaries := make([]float64, len(boundaries))
copy(sortedBoundaries, boundaries)
sortedBoundaries := make([]float64, len(testBoundaries))
copy(sortedBoundaries, testBoundaries)

sort.Float64s(sortedBoundaries)

require.EqualValues(t, sortedBoundaries, buckets.Boundaries)
Expand All @@ -240,7 +241,56 @@ func TestSynchronizedMoveReset(t *testing.T) {
t,
metric.ValueRecorderInstrumentKind,
func(desc *metric.Descriptor) export.Aggregator {
return &histogram.New(1, desc, boundaries)[0]
return &histogram.New(1, desc, histogram.WithExplicitBoundaries(testBoundaries))[0]
},
)
}

func TestHistogramDefaultBoundaries(t *testing.T) {
aggregatortest.RunProfiles(t, func(t *testing.T, profile aggregatortest.Profile) {
ctx := context.Background()
descriptor := aggregatortest.NewAggregatorTest(metric.ValueRecorderInstrumentKind, profile.NumberKind)

agg, ckpt := new2(descriptor)

bounds := []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} // len 11
values := append(bounds, 100) // len 12
expect := []uint64{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1} // len 12

for _, value := range values {
var num number.Number

value -= .001 // Avoid exact boundaries

if descriptor.NumberKind() == number.Int64Kind {
value *= 1e6
num = number.NewInt64Number(int64(value))
} else {
num = number.NewFloat64Number(value)
}

require.NoError(t, agg.Update(ctx, num, descriptor))
}

bucks, err := agg.Histogram()
require.NoError(t, err)

// Check for proper lengths, 1 count in each bucket.
require.Equal(t, len(values), len(bucks.Counts))
require.Equal(t, len(bounds), len(bucks.Boundaries))
require.EqualValues(t, expect, bucks.Counts)

require.Equal(t, expect, bucks.Counts)

// Move and repeat the test on `ckpt`.
err = agg.SynchronizedMove(ckpt, descriptor)
require.NoError(t, err)

bucks, err = ckpt.Histogram()
require.NoError(t, err)

require.Equal(t, len(values), len(bucks.Counts))
require.Equal(t, len(bounds), len(bucks.Boundaries))
require.EqualValues(t, expect, bucks.Counts)
})
}
2 changes: 1 addition & 1 deletion sdk/metric/histogram_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func TestStressInt64Histogram(t *testing.T) {
desc := metric.NewDescriptor("some_metric", metric.ValueRecorderInstrumentKind, number.Int64Kind)

alloc := histogram.New(2, &desc, []float64{25, 50, 75})
alloc := histogram.New(2, &desc, histogram.WithExplicitBoundaries([]float64{25, 50, 75}))
h, ckpt := &alloc[0], &alloc[1]

ctx, cancelFunc := context.WithCancel(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/processor/processortest/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (testAggregatorSelector) AggregatorFor(desc *metric.Descriptor, aggPtrs ...
*aggPtrs[i] = &aggs[i]
}
case strings.HasSuffix(desc.Name(), ".histogram"):
aggs := histogram.New(len(aggPtrs), desc, nil)
aggs := histogram.New(len(aggPtrs), desc)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
Expand Down
8 changes: 4 additions & 4 deletions sdk/metric/selector/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type (
selectorInexpensive struct{}
selectorExact struct{}
selectorHistogram struct {
boundaries []float64
options []histogram.Option
}
)

Expand Down Expand Up @@ -59,8 +59,8 @@ func NewWithExactDistribution() export.AggregatorSelector {
// NewWithHistogramDistribution returns a simple aggregator selector
// that uses histogram aggregators for `ValueRecorder` instruments.
// This selector is a good default choice for most metric exporters.
func NewWithHistogramDistribution(boundaries []float64) export.AggregatorSelector {
return selectorHistogram{boundaries: boundaries}
func NewWithHistogramDistribution(options ...histogram.Option) export.AggregatorSelector {
return selectorHistogram{options: options}
}

func sumAggs(aggPtrs []*export.Aggregator) {
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s selectorHistogram) AggregatorFor(descriptor *metric.Descriptor, aggPtrs
case metric.ValueObserverInstrumentKind:
lastValueAggs(aggPtrs)
case metric.ValueRecorderInstrumentKind:
aggs := histogram.New(len(aggPtrs), descriptor, s.boundaries)
aggs := histogram.New(len(aggPtrs), descriptor, s.options...)
for i := range aggPtrs {
*aggPtrs[i] = &aggs[i]
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/metric/selector/simple/simple_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestExactDistribution(t *testing.T) {
}

func TestHistogramDistribution(t *testing.T) {
hist := simple.NewWithHistogramDistribution(nil)
hist := simple.NewWithHistogramDistribution()
require.IsType(t, (*histogram.Aggregator)(nil), oneAgg(hist, &testValueRecorderDesc))
testFixedSelectors(t, hist)
}

0 comments on commit c562277

Please sign in to comment.