Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow histogram for all instruments #4332

Merged
merged 5 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view.
This change is made to ensure compatibility with the OpenTelemetry specification. (#4288)
- If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290)
- Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332)

### Fixed

Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {

// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) {
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram, noSum bool) (Measure[N], ComputeAggregation) {
var h aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
h = newDeltaHistogram[N](cfg)
h = newDeltaHistogram[N](cfg, noSum)
default:
h = newCumulativeHistogram[N](cfg)
h = newCumulativeHistogram[N](cfg, noSum)
}
return b.input(h), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
Expand Down
28 changes: 19 additions & 9 deletions sdk/metric/internal/aggregate/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
type buckets[N int64 | float64] struct {
counts []uint64
count uint64
sum N
total N
min, max N
}

Expand All @@ -36,10 +36,11 @@ func newBuckets[N int64 | float64](n int) *buckets[N] {
return &buckets[N]{counts: make([]uint64, n)}
}

func (b *buckets[N]) sum(value N) { b.total += value }

func (b *buckets[N]) bin(idx int, value N) {
b.counts[idx]++
b.count++
b.sum += value
if value < b.min {
b.min = value
} else if value > b.max {
Expand All @@ -50,13 +51,14 @@ func (b *buckets[N]) bin(idx int, value N) {
// histValues summarizes a set of measurements as an histValues with
// explicitly defined buckets.
type histValues[N int64 | float64] struct {
noSum bool
bounds []float64

values map[attribute.Set]*buckets[N]
valuesMu sync.Mutex
}

func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
func newHistValues[N int64 | float64](bounds []float64, noSum bool) *histValues[N] {
// The responsibility of keeping all buckets correctly associated with the
// passed boundaries is ultimately this type's responsibility. Make a copy
// here so we can always guarantee this. Or, in the case of failure, have
Expand All @@ -65,6 +67,7 @@ func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
copy(b, bounds)
sort.Float64s(b)
return &histValues[N]{
noSum: noSum,
bounds: b,
values: make(map[attribute.Set]*buckets[N]),
}
Expand Down Expand Up @@ -98,6 +101,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
s.values[attr] = b
}
b.bin(idx, value)
if !s.noSum {
b.sum(value)
}
}

// newDeltaHistogram returns an Aggregator that summarizes a set of
Expand All @@ -107,9 +113,9 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
// Each aggregation cycle is treated independently. When the returned
// Aggregator's Aggregations method is called it will reset all histogram
// counts to zero.
func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] {
func newDeltaHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &deltaHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries),
histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax,
start: now(),
}
Expand Down Expand Up @@ -148,7 +154,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
Count: b.count,
Bounds: bounds,
BucketCounts: b.counts,
Sum: b.sum,
}
if !s.noSum {
hdp.Sum = b.total
}
if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min)
Expand All @@ -170,9 +178,9 @@ func (s *deltaHistogram[N]) Aggregation() metricdata.Aggregation {
// Each aggregation cycle builds from the previous, the histogram counts are
// the bucketed counts of all values aggregated since the returned Aggregator
// was created.
func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram) aggregator[N] {
func newCumulativeHistogram[N int64 | float64](cfg aggregation.ExplicitBucketHistogram, noSum bool) aggregator[N] {
return &cumulativeHistogram[N]{
histValues: newHistValues[N](cfg.Boundaries),
histValues: newHistValues[N](cfg.Boundaries, noSum),
noMinMax: cfg.NoMinMax,
start: now(),
}
Expand Down Expand Up @@ -219,7 +227,9 @@ func (s *cumulativeHistogram[N]) Aggregation() metricdata.Aggregation {
Count: b.count,
Bounds: bounds,
BucketCounts: counts,
Sum: b.sum,
}
if !s.noSum {
hdp.Sum = b.total
}
if !s.noMinMax {
hdp.Min = metricdata.NewExtrema(b.min)
Expand Down
110 changes: 89 additions & 21 deletions sdk/metric/internal/aggregate/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,25 @@ func testHistogram[N int64 | float64](t *testing.T) {
}

incr := monoIncr[N]()
eFunc := deltaHistExpecter[N](incr)
t.Run("Delta", tester.Run(newDeltaHistogram[N](histConf), incr, eFunc))
eFunc := deltaHistSummedExpecter[N](incr)
t.Run("Delta/Summed", tester.Run(newDeltaHistogram[N](histConf, false), incr, eFunc))
eFunc = deltaHistExpecter[N](incr)
t.Run("Delta/NoSum", tester.Run(newDeltaHistogram[N](histConf, true), incr, eFunc))
eFunc = cumuHistSummedExpecter[N](incr)
t.Run("Cumulative/Summed", tester.Run(newCumulativeHistogram[N](histConf, false), incr, eFunc))
eFunc = cumuHistExpecter[N](incr)
t.Run("Cumulative", tester.Run(newCumulativeHistogram[N](histConf), incr, eFunc))
t.Run("Cumulative/NoSum", tester.Run(newCumulativeHistogram[N](histConf, true), incr, eFunc))
}

func deltaHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc {
h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality}
return func(m int) metricdata.Aggregation {
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr))
for a, v := range incr {
h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(m)))
}
return h
}
}

func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
Expand All @@ -66,6 +81,19 @@ func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
}
}

func cumuHistSummedExpecter[N int64 | float64](incr setMap[N]) expectFunc {
var cycle int
h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality}
return func(m int) metricdata.Aggregation {
cycle++
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr))
for a, v := range incr {
h.DataPoints = append(h.DataPoints, hPointSummed[N](a, v, uint64(cycle*m)))
}
return h
}
}

func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
var cycle int
h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality}
Expand All @@ -79,6 +107,25 @@ func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
}
}

// hPointSummed returns an HistogramDataPoint that started and ended now with
// multi number of measurements values v. It includes a min and max (set to v).
func hPointSummed[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] {
idx := sort.SearchFloat64s(bounds, float64(v))
counts := make([]uint64, len(bounds)+1)
counts[idx] += multi
return metricdata.HistogramDataPoint[N]{
Attributes: a,
StartTime: now(),
Time: now(),
Count: multi,
Bounds: bounds,
BucketCounts: counts,
Min: metricdata.NewExtrema(v),
Max: metricdata.NewExtrema(v),
Sum: v * N(multi),
}
}

// hPoint returns an HistogramDataPoint that started and ended now with multi
// number of measurements values v. It includes a min and max (set to v).
func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] {
Expand All @@ -94,7 +141,6 @@ func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.Hi
BucketCounts: counts,
Min: metricdata.NewExtrema(v),
Max: metricdata.NewExtrema(v),
Sum: v * N(multi),
}
}

Expand All @@ -106,28 +152,50 @@ func TestBucketsBin(t *testing.T) {
func testBucketsBin[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
b := newBuckets[N](3)
assertB := func(counts []uint64, count uint64, sum, min, max N) {
assertB := func(counts []uint64, count uint64, min, max N) {
t.Helper()
assert.Equal(t, counts, b.counts)
assert.Equal(t, count, b.count)
assert.Equal(t, sum, b.sum)
assert.Equal(t, min, b.min)
assert.Equal(t, max, b.max)
}

assertB([]uint64{0, 0, 0}, 0, 0, 0, 0)
assertB([]uint64{0, 0, 0}, 0, 0, 0)
b.bin(1, 2)
assertB([]uint64{0, 1, 0}, 1, 2, 0, 2)
assertB([]uint64{0, 1, 0}, 1, 0, 2)
b.bin(0, -1)
assertB([]uint64{1, 1, 0}, 2, 1, -1, 2)
assertB([]uint64{1, 1, 0}, 2, -1, 2)
}
}

func TestBucketsSum(t *testing.T) {
t.Run("Int64", testBucketsSum[int64]())
t.Run("Float64", testBucketsSum[float64]())
}

func testBucketsSum[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
b := newBuckets[N](3)

var want N
assert.Equal(t, want, b.total)

b.sum(2)
want = 2
assert.Equal(t, want, b.total)

b.sum(-1)
want = 1
assert.Equal(t, want, b.total)
}
}

func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) {
func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram, bool) aggregator[N], getBounds func(aggregator[N]) []float64) func(t *testing.T) {
b := []float64{0, 1, 2}
cpB := make([]float64, len(b))
copy(cpB, b)

a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b})
a := newA(aggregation.ExplicitBucketHistogram{Boundaries: b}, false)
return func(t *testing.T) {
require.Equal(t, cpB, getBounds(a))

Expand Down Expand Up @@ -160,7 +228,7 @@ func TestHistogramImmutableBounds(t *testing.T) {
}

func TestCumulativeHistogramImutableCounts(t *testing.T) {
a := newCumulativeHistogram[int64](histConf)
a := newCumulativeHistogram[int64](histConf, false)
a.Aggregate(5, alice)
hdp := a.Aggregation().(metricdata.Histogram[int64]).DataPoints[0]

Expand All @@ -176,12 +244,12 @@ func TestCumulativeHistogramImutableCounts(t *testing.T) {
func TestDeltaHistogramReset(t *testing.T) {
t.Cleanup(mockTime(now))

a := newDeltaHistogram[int64](histConf)
a := newDeltaHistogram[int64](histConf, false)
assert.Nil(t, a.Aggregation())

a.Aggregate(1, alice)
expect := metricdata.Histogram[int64]{Temporality: metricdata.DeltaTemporality}
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](alice, 1, 1)}
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](alice, 1, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())

// The attr set should be forgotten once Aggregations is called.
Expand All @@ -190,15 +258,15 @@ func TestDeltaHistogramReset(t *testing.T) {

// Aggregating another set should not affect the original (alice).
a.Aggregate(1, bob)
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPoint[int64](bob, 1, 1)}
expect.DataPoints = []metricdata.HistogramDataPoint[int64]{hPointSummed[int64](bob, 1, 1)}
metricdatatest.AssertAggregationsEqual(t, expect, a.Aggregation())
}

func TestEmptyHistogramNilAggregation(t *testing.T) {
assert.Nil(t, newCumulativeHistogram[int64](histConf).Aggregation())
assert.Nil(t, newCumulativeHistogram[float64](histConf).Aggregation())
assert.Nil(t, newDeltaHistogram[int64](histConf).Aggregation())
assert.Nil(t, newDeltaHistogram[float64](histConf).Aggregation())
assert.Nil(t, newCumulativeHistogram[int64](histConf, false).Aggregation())
assert.Nil(t, newCumulativeHistogram[float64](histConf, false).Aggregation())
assert.Nil(t, newDeltaHistogram[int64](histConf, false).Aggregation())
assert.Nil(t, newDeltaHistogram[float64](histConf, false).Aggregation())
}

func BenchmarkHistogram(b *testing.B) {
Expand All @@ -207,8 +275,8 @@ func BenchmarkHistogram(b *testing.B) {
}

func benchmarkHistogram[N int64 | float64](b *testing.B) {
factory := func() aggregator[N] { return newDeltaHistogram[N](histConf) }
factory := func() aggregator[N] { return newDeltaHistogram[N](histConf, false) }
b.Run("Delta", benchmarkAggregator(factory))
factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf) }
factory = func() aggregator[N] { return newCumulativeHistogram[N](histConf, false) }
b.Run("Cumulative", benchmarkAggregator(factory))
}
31 changes: 22 additions & 9 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,15 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr
meas, comp = b.Sum(false)
}
case aggregation.ExplicitBucketHistogram:
meas, comp = b.ExplicitBucketHistogram(a)
var noSum bool
switch kind {
case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge:
// The sum should not be collected for any instrument that can make
// negative measurements:
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations
noSum = true
}
meas, comp = b.ExplicitBucketHistogram(a, noSum)
default:
err = errUnknownAggregation
}
Expand All @@ -426,22 +434,27 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr
// | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram |
// |--------------------------|------|-----------|-----|-----------|-----------------------|
// | Counter | ✓ | | ✓ | ✓ | ✓ |
// | UpDownCounter | ✓ | | ✓ | | |
// | UpDownCounter | ✓ | | ✓ | | |
// | Histogram | ✓ | | ✓ | ✓ | ✓ |
// | Observable Counter | ✓ | | ✓ | | |
// | Observable UpDownCounter | ✓ | | ✓ | | |
// | Observable Gauge | ✓ | ✓ | | | |.
// | Observable Counter | ✓ | | ✓ | | |
// | Observable UpDownCounter | ✓ | | ✓ | | |
// | Observable Gauge | ✓ | ✓ | | | |.
func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error {
switch agg.(type) {
case aggregation.Default:
return nil
case aggregation.ExplicitBucketHistogram:
if kind == InstrumentKindCounter || kind == InstrumentKindHistogram {
switch kind {
case InstrumentKindCounter,
InstrumentKindUpDownCounter,
InstrumentKindHistogram,
InstrumentKindObservableCounter,
InstrumentKindObservableUpDownCounter,
InstrumentKindObservableGauge:
return nil
default:
return errIncompatibleAggregation
}
// TODO: review need for aggregation check after
// https://github.com/open-telemetry/opentelemetry-specification/issues/2710
return errIncompatibleAggregation
case aggregation.Sum:
switch kind {
case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter:
Expand Down
Loading