Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative]: explicit-bounds histograms #33983

Merged
merged 7 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/deltatocumulative-histograms.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulative

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: explicit-bounds histograms

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30705]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
implements aggregation of explicit-bounds (traditional) histograms.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/delta
go 1.22.0

require (
github.com/google/go-cmp v0.6.0
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.107.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.107.0
github.com/stretchr/testify v1.9.0
Expand Down
40 changes: 38 additions & 2 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice"
)

func (dp Number) Add(in Number) Number {
Expand All @@ -24,9 +25,44 @@ func (dp Number) Add(in Number) Number {
return dp
}

// nolint
func (dp Histogram) Add(in Histogram) Histogram {
panic("todo")
// bounds different: no way to merge, so reset observation to new boundaries
if !pslice.Equal(dp.ExplicitBounds(), in.ExplicitBounds()) {
in.MoveTo(dp.HistogramDataPoint)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a metric for how often this occurs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, but will do later as we don't have a good way of accessing a Meter here without refactoring the telemetry package

return dp
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do the StartTimeUnixNano checks like we do for Sums?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahhh. that's why I missed it :)

// spec requires len(BucketCounts) == len(ExplicitBounds)+1.
// given we have limited error handling at this stage (and already verified boundaries are correct),
// doing a best-effort add of whatever we have appears reasonable.
n := min(dp.BucketCounts().Len(), in.BucketCounts().Len())
for i := 0; i < n; i++ {
sum := dp.BucketCounts().At(i) + in.BucketCounts().At(i)
dp.BucketCounts().SetAt(i, sum)
}

dp.SetTimestamp(in.Timestamp())
dp.SetCount(dp.Count() + in.Count())

if dp.HasSum() && in.HasSum() {
dp.SetSum(dp.Sum() + in.Sum())
} else {
dp.RemoveSum()
}

if dp.HasMin() && in.HasMin() {
dp.SetMin(math.Min(dp.Min(), in.Min()))
} else {
dp.RemoveMin()
}

if dp.HasMax() && in.HasMax() {
dp.SetMax(math.Max(dp.Max(), in.Max()))
} else {
dp.RemoveMax()
}

return dp
}

func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {
Expand Down
18 changes: 18 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,28 @@ type Point[Self any] interface {
Add(Self) Self
}

type Typed[Self any] interface {
Point[Self]
Number | Histogram | ExpHistogram
}

type Number struct {
pmetric.NumberDataPoint
}

func Zero[P Typed[P]]() P {
var point P
switch ty := any(&point).(type) {
case *Number:
ty.NumberDataPoint = pmetric.NewNumberDataPoint()
case *Histogram:
ty.HistogramDataPoint = pmetric.NewHistogramDataPoint()
case *ExpHistogram:
ty.DataPoint = pmetric.NewExponentialHistogramDataPoint()
}
return point
}

func (dp Number) Clone() Number {
clone := Number{NumberDataPoint: pmetric.NewNumberDataPoint()}
if dp.NumberDataPoint != (pmetric.NumberDataPoint{}) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package compare // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"

import (
"reflect"
"strings"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)

var Opts = []cmp.Option{
cmpopts.EquateApprox(0, 1e-9),
cmp.Exporter(func(ty reflect.Type) bool {
return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata")
}),
}

func Equal[T any](a, b T) bool {
return cmp.Equal(a, b, Opts...)
}

func Diff[T any](a, b T) string {
return cmp.Diff(a, b, Opts...)
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
package datatest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"

import (
"reflect"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

Expand All @@ -20,7 +20,7 @@ type T struct {
testing.TB
}

func Is(t testing.TB) T {
func New(t testing.TB) T {
return T{TB: t}
}

Expand Down Expand Up @@ -58,7 +58,7 @@ func equal(t testing.TB, want, got any, name string) bool {
vg := reflect.ValueOf(got)

if vw.Kind() != reflect.Struct {
ok := reflect.DeepEqual(want, got)
ok := compare.Equal(want, got)
if !ok {
t.Errorf("%s: %+v != %+v", name, want, got)
}
Expand All @@ -79,7 +79,7 @@ func equal(t testing.TB, want, got any, name string) bool {
continue
}
// Append(Empty) fails above heuristic, exclude it
if strings.HasPrefix(mname, "Append") {
if strings.HasPrefix(mname, "Append") || strings.HasPrefix(mname, "Clone") {
continue
}

Expand Down Expand Up @@ -111,5 +111,10 @@ func equal(t testing.TB, want, got any, name string) bool {
}

// fallback to a full deep-equal for rare cases (unexported fields, etc)
return assert.Equal(t, want, got)
if diff := compare.Diff(want, got); diff != "" {
t.Error(diff)
return false
}

return true
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package expotest
package datatest

import (
"fmt"
Expand All @@ -12,39 +12,34 @@ import (
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

var t testing.TB = fakeT{}

var expotest = struct {
Is func(t testing.TB) T
Observe func(expo.Scale, ...float64) expo.Buckets
}{
Is: Is,
Observe: Observe,
}
var datatest = struct{ New func(t testing.TB) T }{New: New}

func ExampleT_Equal() {
is := expotest.Is(t)
is := datatest.New(t)

want := Histogram{
want := expotest.Histogram{
PosNeg: expotest.Observe(expo.Scale(0), 1, 2, 3, 4),
Scale: 0,
}.Into()

got := Histogram{
got := expotest.Histogram{
PosNeg: expotest.Observe(expo.Scale(1), 1, 1, 1, 1),
Scale: 1,
}.Into()

is.Equal(want, got)

// Output:
// equal_test.go:40: Negative().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:40: Negative().BucketCounts().Len(): 3 != 1
// equal_test.go:40: Positive().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:40: Positive().BucketCounts().Len(): 3 != 1
// equal_test.go:40: Scale(): 0 != 1
// equal_test.go:35: Negative().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:35: Negative().BucketCounts().Len(): 3 != 1
// equal_test.go:35: Positive().BucketCounts().AsRaw(): [1 1 2] != [4]
// equal_test.go:35: Positive().BucketCounts().Len(): 3 != 1
// equal_test.go:35: Scale(): 0 != 1
}

func TestNone(*testing.T) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

func TestAbsolute(t *testing.T) {
is := expotest.Is(t)
is := datatest.New(t)

bs := expotest.Bins{ø, 1, 2, 3, 4, 5, ø, ø}.Into()
abs := expo.Abs(bs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestMerge(t *testing.T) {
name := fmt.Sprintf("(%+d,%d)+(%+d,%d)=(%+d,%d)", a.Offset(), a.BucketCounts().Len(), b.Offset(), b.BucketCounts().Len(), want.Offset(), want.BucketCounts().Len())
t.Run(name, func(t *testing.T) {
expo.Merge(a, b)
is := expotest.Is(t)
is := datatest.New(t)
is.Equal(want, a)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

func TestDownscale(t *testing.T) {
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestDownscale(t *testing.T) {
buckets[i] = Repr[B]{scale: r.scale, bkt: bkt}
}

is := expotest.Is(t)
is := datatest.New(t)
for i := 0; i < len(buckets)-1; i++ {
expo.Downscale(buckets[i].bkt, buckets[i].scale, buckets[i+1].scale)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestWidenZero(t *testing.T) {
}
expo.WidenZero(hist, zt)

is := expotest.Is(t)
is := datatest.New(t)
is.Equal(want, hist)
})
}
Expand Down Expand Up @@ -108,7 +109,7 @@ func TestSlice(t *testing.T) {

expo.Abs(bins).Slice(from, to)

is := expotest.Is(t)
is := datatest.New(t)
is.Equal(want, bins)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ import (

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

// represents none/absent/unset in several tests
const ø = math.MaxUint64

func TestAdd(t *testing.T) {
func TestExpoAdd(t *testing.T) {
type expdp = expotest.Histogram
type bins = expotest.Bins
var obs0 = expotest.Observe0
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestAdd(t *testing.T) {
for _, cs := range cases {
run := func(dp, in expdp) func(t *testing.T) {
return func(t *testing.T) {
is := expotest.Is(t)
is := datatest.New(t)

var (
dp = ExpHistogram{dp.Into()}
Expand Down
Loading