Skip to content

Commit

Permalink
[processor/deltatocumulative]: explicit-bounds histograms (#33983)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->

adds aggregation support for fixed-bucket / explicit bounds histograms


**Link to tracking Issue:**
#30705

**Testing:** unit tests added

**Documentation:** none
  • Loading branch information
sh0rez authored Aug 14, 2024
1 parent 01258c5 commit 3190917
Show file tree
Hide file tree
Showing 19 changed files with 451 additions and 81 deletions.
28 changes: 28 additions & 0 deletions .chloggen/deltatocumulative-histograms.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulative

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: explicit-bounds histograms

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30705]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
implements aggregation of explicit-bounds (traditional) histograms.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta
go 1.22.0

require (
github.com/google/go-cmp v0.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0
github.com/stretchr/testify v1.9.0
Expand Down
40 changes: 38 additions & 2 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice"
)

func (dp Number) Add(in Number) Number {
Expand All @@ -24,9 +25,44 @@ func (dp Number) Add(in Number) Number {
return dp
}

// nolint
func (dp Histogram) Add(in Histogram) Histogram {
panic("todo")
// bounds different: no way to merge, so reset observation to new boundaries
if !pslice.Equal(dp.ExplicitBounds(), in.ExplicitBounds()) {
in.MoveTo(dp.HistogramDataPoint)
return dp
}

// spec requires len(BucketCounts) == len(ExplicitBounds)+1.
// given we have limited error handling at this stage (and already verified boundaries are correct),
// doing a best-effort add of whatever we have appears reasonable.
n := min(dp.BucketCounts().Len(), in.BucketCounts().Len())
for i := 0; i < n; i++ {
sum := dp.BucketCounts().At(i) + in.BucketCounts().At(i)
dp.BucketCounts().SetAt(i, sum)
}

dp.SetTimestamp(in.Timestamp())
dp.SetCount(dp.Count() + in.Count())

if dp.HasSum() && in.HasSum() {
dp.SetSum(dp.Sum() + in.Sum())
} else {
dp.RemoveSum()
}

if dp.HasMin() && in.HasMin() {
dp.SetMin(math.Min(dp.Min(), in.Min()))
} else {
dp.RemoveMin()
}

if dp.HasMax() && in.HasMax() {
dp.SetMax(math.Max(dp.Max(), in.Max()))
} else {
dp.RemoveMax()
}

return dp
}

func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {
Expand Down
18 changes: 18 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,28 @@ type Point[Self any] interface {
Add(Self) Self
}

type Typed[Self any] interface {
Point[Self]
Number | Histogram | ExpHistogram
}

type Number struct {
pmetric.NumberDataPoint
}

func Zero[P Typed[P]]() P {
var point P
switch ty := any(&point).(type) {
case *Number:
ty.NumberDataPoint = pmetric.NewNumberDataPoint()
case *Histogram:
ty.HistogramDataPoint = pmetric.NewHistogramDataPoint()
case *ExpHistogram:
ty.DataPoint = pmetric.NewExponentialHistogramDataPoint()
}
return point
}

func (dp Number) Clone() Number {
clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()}
if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"

import (
"reflect"
"strings"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

var Opts = []cmp.Option{
cmpopts.EquateApprox(0, 1e-9),
cmp.Exporter(func(ty reflect.Type) bool {
return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata")
}),
}

func Equal[T any](a, b T) bool {
return cmp.Equal(a, b, Opts...)
}

func Diff[T any](a, b T) string {
return cmp.Diff(a, b, Opts...)
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
package datatest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"

import (
"reflect"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

Expand All @@ -20,7 +20,7 @@ type T struct {
testing.TB
}

func Is(t testing.TB) T {
func New(t testing.TB) T {
return T{TB: t}
}

Expand Down Expand Up @@ -58,7 +58,7 @@ func equal(t testing.TB, want, got any, name string) bool {
vg := reflect.ValueOf(got)

if vw.Kind() != reflect.Struct {
ok := reflect.DeepEqual(want, got)
ok := compare.Equal(want, got)
if !ok {
t.Errorf("%s: %+v != %+v", name, want, got)
}
Expand All @@ -79,7 +79,7 @@ func equal(t testing.TB, want, got any, name string) bool {
continue
}
// Append(Empty) fails above heuristic, exclude it
if strings.HasPrefix(mname, "Append") {
if strings.HasPrefix(mname, "Append") || strings.HasPrefix(mname, "Clone") {
continue
}

Expand Down Expand Up @@ -111,5 +111,10 @@ func equal(t testing.TB, want, got any, name string) bool {
}

// fallback to a full deep-equal for rare cases (unexported fields, etc)
return assert.Equal(t, want, got)
if diff := compare.Diff(want, got); diff != "" {
t.Error(diff)
return false
}

return true
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package expotest
package datatest

import (
"fmt"
Expand All @@ -12,39 +12,34 @@ import (
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

var t testing.TB = fakeT{}

var expotest = struct {
Is func(t testing.TB) T
Observe func(expo.Scale, ...float64) expo.Buckets
}{
Is: Is,
Observe: Observe,
}
var datatest = struct{ New func(t testing.TB) T }{New: New}

func ExampleT_Equal() {
is := expotest.Is(t)
is := datatest.New(t)

want := Histogram{
want := expotest.Histogram{
PosNeg: expotest.Observe(expo.Scale(0), 1, 2, 3, 4),
Scale: 0,
}.Into()

got := Histogram{
got := expotest.Histogram{
PosNeg: expotest.Observe(expo.Scale(1), 1, 1, 1, 1),
Scale: 1,
}.Into()

is.Equal(want, got)

// Output:
// equal_test.go:40: Negative().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:40: Negative().BucketCounts().Len(): 3 != 1
// equal_test.go:40: Positive().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:40: Positive().BucketCounts().Len(): 3 != 1
// equal_test.go:40: Scale(): 0 != 1
// equal_test.go:35: Negative().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:35: Negative().BucketCounts().Len(): 3 != 1
// equal_test.go:35: Positive().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:35: Positive().BucketCounts().Len(): 3 != 1
// equal_test.go:35: Scale(): 0 != 1
}

func TestNone(*testing.T) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

func TestAbsolute(t *testing.T) {
is := expotest.Is(t)
is := datatest.New(t)

bs := expotest.Bins{ø, 1, 2, 3, 4, 5, ø, ø}.Into()
abs := expo.Abs(bs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestMerge(t *testing.T) {
name := fmt.Sprintf("(%+d,%d)+(%+d,%d)=(%+d,%d)", a.Offset(), a.BucketCounts().Len(), b.Offset(), b.BucketCounts().Len(), want.Offset(), want.BucketCounts().Len())
t.Run(name, func(t *testing.T) {
expo.Merge(a, b)
is := expotest.Is(t)
is := datatest.New(t)
is.Equal(want, a)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

func TestDownscale(t *testing.T) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestDownscale(t *testing.T) {
buckets[i] = Repr[B]{scale: r.scale, bkt: bkt}
}

is := expotest.Is(t)
is := datatest.New(t)
for i := 0; i < len(buckets)-1; i++ {
expo.Downscale(buckets[i].bkt, buckets[i].scale, buckets[i+1].scale)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestWidenZero(t *testing.T) {
}
expo.WidenZero(hist, zt)

is := expotest.Is(t)
is := datatest.New(t)
is.Equal(want, hist)
})
}
Expand Down Expand Up @@ -108,7 +109,7 @@ func TestSlice(t *testing.T) {

expo.Abs(bins).Slice(from, to)

is := expotest.Is(t)
is := datatest.New(t)
is.Equal(want, bins)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

// represents none/absent/unset in several tests
const ø = math.MaxUint64

func TestAdd(t *testing.T) {
func TestExpoAdd(t *testing.T) {
type expdp = expotest.Histogram
type bins = expotest.Bins
var obs0 = expotest.Observe0
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestAdd(t *testing.T) {
for _, cs := range cases {
run := func(dp, in expdp) func(t *testing.T) {
return func(t *testing.T) {
is := expotest.Is(t)
is := datatest.New(t)

var (
dp = ExpHistogram{dp.Into()}
Expand Down
Loading

0 comments on commit 3190917

Please sign in to comment.