Skip to content

Commit

Permalink
deltatocumulative: exponential histograms (#32030)
Browse files Browse the repository at this point in the history
**Description:** Implements accumulation of exponential histograms by
adding bucket-per-bucket.

- [x] Align bucket offset to the smaller one
- [x] Merge buckets by adding up each buckets count
- [x] Widen zero buckets so they are the same
- [x] Adjust scale to the lowest one

**Link to tracking Issue:**
#30705

**Testing:** Extensive tests have been added to the `internal/data`
package

**Documentation:** not needed
  • Loading branch information
sh0rez authored May 14, 2024
1 parent d07e7b9 commit c6a6bd4
Show file tree
Hide file tree
Showing 22 changed files with 1,263 additions and 32 deletions.
29 changes: 29 additions & 0 deletions .chloggen/deltatocumulative-exphist.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulativeprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: exponential histogram accumulation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31340]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
accumulates exponential histogram datapoints by adding respective bucket counts.
also handles downscaling, changing zero-counts, offset adaptions and optional fields

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
5 changes: 5 additions & 0 deletions processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21.0

require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.100.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.100.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.100.0
go.opentelemetry.io/collector/confmap v0.100.0
Expand Down Expand Up @@ -58,4 +59,8 @@ require (

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
51 changes: 48 additions & 3 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

package data // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"

import "go.opentelemetry.io/collector/pdata/pmetric"
import (
"math"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

func (dp Number) Add(in Number) Number {
switch in.ValueType() {
Expand All @@ -23,7 +29,46 @@ func (dp Histogram) Add(in Histogram) Histogram {
panic("todo")
}

// nolint
func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {
panic("todo")
type H = ExpHistogram

if dp.Scale() != in.Scale() {
hi, lo := expo.HiLo(dp, in, H.Scale)
from, to := expo.Scale(hi.Scale()), expo.Scale(lo.Scale())
expo.Downscale(hi.Positive(), from, to)
expo.Downscale(hi.Negative(), from, to)
hi.SetScale(lo.Scale())
}

if dp.ZeroThreshold() != in.ZeroThreshold() {
hi, lo := expo.HiLo(dp, in, H.ZeroThreshold)
expo.WidenZero(lo.DataPoint, hi.ZeroThreshold())
}

expo.Merge(dp.Positive(), in.Positive())
expo.Merge(dp.Negative(), in.Negative())

dp.SetTimestamp(in.Timestamp())
dp.SetCount(dp.Count() + in.Count())
dp.SetZeroCount(dp.ZeroCount() + in.ZeroCount())

if dp.HasSum() && in.HasSum() {
dp.SetSum(dp.Sum() + in.Sum())
} else {
dp.RemoveSum()
}

if dp.HasMin() && in.HasMin() {
dp.SetMin(math.Min(dp.Min(), in.Min()))
} else {
dp.RemoveMin()
}

if dp.HasMax() && in.HasMax() {
dp.SetMax(math.Max(dp.Max(), in.Max()))
} else {
dp.RemoveMax()
}

return dp
}
10 changes: 6 additions & 4 deletions processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package data // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

type Point[Self any] interface {
Expand Down Expand Up @@ -52,19 +54,19 @@ func (dp Histogram) CopyTo(dst Histogram) {
}

type ExpHistogram struct {
pmetric.ExponentialHistogramDataPoint
expo.DataPoint
}

func (dp ExpHistogram) Clone() ExpHistogram {
clone := ExpHistogram{ExponentialHistogramDataPoint: pmetric.NewExponentialHistogramDataPoint()}
if dp.ExponentialHistogramDataPoint != (pmetric.ExponentialHistogramDataPoint{}) {
clone := ExpHistogram{DataPoint: pmetric.NewExponentialHistogramDataPoint()}
if dp.DataPoint != (expo.DataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp ExpHistogram) CopyTo(dst ExpHistogram) {
dp.ExponentialHistogramDataPoint.CopyTo(dst.ExponentialHistogramDataPoint)
dp.DataPoint.CopyTo(dst.DataPoint)
}

type mustPoint[D Point[D]] struct{ _ D }
Expand Down
52 changes: 52 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/expo/expo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package expo implements various operations on exponential histograms and their bucket counts
package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"

import "go.opentelemetry.io/collector/pdata/pmetric"

type (
DataPoint = pmetric.ExponentialHistogramDataPoint
Buckets = pmetric.ExponentialHistogramDataPointBuckets
)

// Abs returns a view into the buckets using an absolute scale
func Abs(bs Buckets) Absolute {
return Absolute{buckets: bs}
}

type buckets = Buckets

// Absolute addresses bucket counts using an absolute scale, such that it is
// interoperable with [Scale].
//
// It spans from [[Absolute.Lower]:[Absolute.Upper]]
//
// NOTE: The zero-value is unusable, use [Abs] to construct
type Absolute struct {
buckets
}

// Abs returns the value at absolute index 'at'
func (a Absolute) Abs(at int) uint64 {
if i, ok := a.idx(at); ok {
return a.BucketCounts().At(i)
}
return 0
}

// Upper returns the minimal index outside the set, such that every i < Upper
func (a Absolute) Upper() int {
return a.BucketCounts().Len() + int(a.Offset())
}

// Lower returns the minimal index inside the set, such that every i >= Lower
func (a Absolute) Lower() int {
return int(a.Offset())
}

func (a Absolute) idx(at int) (int, bool) {
idx := at - a.Lower()
return idx, idx >= 0 && idx < a.BucketCounts().Len()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package expo_test

import (
"fmt"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"
)

func TestAbsolute(t *testing.T) {
is := expotest.Is(t)

bs := expotest.Bins{ø, 1, 2, 3, 4, 5, ø, ø}.Into()
abs := expo.Abs(bs)

lo, up := abs.Lower(), abs.Upper()
is.Equalf(-2, lo, "lower-bound")
is.Equalf(3, up, "upper-bound")

for i := lo; i < up; i++ {
got := abs.Abs(i)
is.Equal(bs.BucketCounts().At(i+2), got)
}
}

func ExampleAbsolute() {
nums := []float64{0.4, 2.3, 2.4, 4.5}

bs := expotest.Observe0(nums...)
abs := expo.Abs(bs)

s := expo.Scale(0)
for _, n := range nums {
fmt.Printf("%.1f belongs to bucket %+d\n", n, s.Idx(n))
}

fmt.Printf("\n index:")
for i := 0; i < bs.BucketCounts().Len(); i++ {
fmt.Printf(" %d", i)
}
fmt.Printf("\n abs:")
for i := abs.Lower(); i < abs.Upper(); i++ {
fmt.Printf(" %+d", i)
}
fmt.Printf("\ncounts:")
for i := abs.Lower(); i < abs.Upper(); i++ {
fmt.Printf(" %d", abs.Abs(i))
}

// Output:
// 0.4 belongs to bucket -2
// 2.3 belongs to bucket +1
// 2.4 belongs to bucket +1
// 4.5 belongs to bucket +2
//
// index: 0 1 2 3 4
// abs: -2 -1 +0 +1 +2
// counts: 1 0 0 2 1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"

import (
"fmt"
"math"

"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

const (
Empty = math.MaxUint64
ø = Empty
)

// index: 0 1 2 3 4 5 6 7
// bucket: -3 -2 -1 0 1 2 3 4
// bounds: (0.125,0.25], (0.25,0.5], (0.5,1], (1,2], (2,4], (4,8], (8,16], (16,32]
type Bins [8]uint64

func (bins Bins) Into() expo.Buckets {
start := 0
for i := 0; i < len(bins); i++ {
if bins[i] != ø {
start = i
break
}
}

end := len(bins)
for i := start; i < len(bins); i++ {
if bins[i] == ø {
end = i
break
}
}

counts := bins[start:end]

buckets := pmetric.NewExponentialHistogramDataPointBuckets()
buckets.SetOffset(int32(start - 3))
buckets.BucketCounts().FromRaw(counts)
return buckets
}

func ObserveInto(bs expo.Buckets, scale expo.Scale, pts ...float64) {
counts := bs.BucketCounts()

for _, pt := range pts {
pt = math.Abs(pt)
if pt <= 0.125 || pt > 32 {
panic(fmt.Sprintf("out of bounds: 0.125 < %f <= 32", pt))
}

idx := scale.Idx(pt) - int(bs.Offset())
switch {
case idx < 0:
bs.SetOffset(bs.Offset() + int32(idx))
counts.FromRaw(append(make([]uint64, -idx), counts.AsRaw()...))
idx = 0
case idx >= counts.Len():
counts.Append(make([]uint64, idx-counts.Len()+1)...)
}

counts.SetAt(idx, counts.At(idx)+1)
}
}

func Observe(scale expo.Scale, pts ...float64) expo.Buckets {
bs := pmetric.NewExponentialHistogramDataPointBuckets()
ObserveInto(bs, scale, pts...)
return bs
}

func Observe0(pts ...float64) expo.Buckets {
return Observe(0, pts...)
}
Loading

0 comments on commit c6a6bd4

Please sign in to comment.