Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/deltatocumulative]: linear pipeline #34757

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/exp/metrics/identity/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ func OfStream[DataPoint attrPoint](m Metric, dp DataPoint) Stream {
type attrPoint interface {
Attributes() pcommon.Map
}

func Compare[T interface{ Hash() hash.Hash64 }](a, b T) int {
return int(a.Hash().Sum64() - b.Hash().Sum64())
}
30 changes: 30 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,33 @@ func (s *Staleness[T]) Evict() (identity.Stream, bool) {
func (s *Staleness[T]) Clear() {
s.items.Clear()
}

type Tracker struct {
pq PriorityQueue
}

func NewTracker() Tracker {
return Tracker{pq: NewPriorityQueue()}
}

func (stale Tracker) Refresh(ts time.Time, ids ...identity.Stream) {
for _, id := range ids {
stale.pq.Update(id, ts)
}
}

func (stale Tracker) Collect(max time.Duration) []identity.Stream {
now := NowFunc()

var ids []identity.Stream
for stale.pq.Len() > 0 {
_, ts := stale.pq.Peek()
if now.Sub(ts) < max {
break
}
id, _ := stale.pq.Pop()
ids = append(ids, id)
}

return ids
}
12 changes: 11 additions & 1 deletion processor/deltatocumulativeprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"

import (
"context"
"fmt"
"math"
"time"

"go.opentelemetry.io/collector/component"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
)

var _ component.ConfigValidator = (*Config)(nil)
Expand All @@ -33,6 +37,12 @@ func createDefaultConfig() component.Config {

// disable. TODO: find good default
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603
MaxStreams: 0,
MaxStreams: math.MaxInt,
}
}

func (c Config) Metrics(tel telemetry.Metrics) {
ctx := context.Background()
tel.DeltatocumulativeStreamsMaxStale.Record(ctx, int64(c.MaxStale.Seconds()))
tel.DeltatocumulativeStreamsLimit.Record(ctx, int64(c.MaxStreams))
}
3 changes: 2 additions & 1 deletion processor/deltatocumulativeprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"

import (
"math"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -37,7 +38,7 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "set-valid-max_stale"),
expected: &Config{
MaxStale: 2 * time.Minute,
MaxStreams: 0,
MaxStreams: math.MaxInt,
},
},
{
Expand Down
28 changes: 2 additions & 26 deletions processor/deltatocumulativeprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,14 @@

The following telemetry is emitted by this component.

### otelcol_deltatocumulative.datapoints.dropped
### otelcol_deltatocumulative.datapoints

number of datapoints dropped due to given 'reason'
total number of datapoints processed. may have 'error' attribute, if processing failed

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoint} | Sum | Int | true |

### otelcol_deltatocumulative.datapoints.processed

number of datapoints processed

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoint} | Sum | Int | true |

### otelcol_deltatocumulative.gaps.length

total duration where data was expected but not received

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| s | Sum | Int | true |

### otelcol_deltatocumulative.streams.evicted

number of streams evicted

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {stream} | Sum | Int | true |

### otelcol_deltatocumulative.streams.limit

upper limit of tracked streams
Expand Down
5 changes: 3 additions & 2 deletions processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.opentelemetry.io/collector/processor"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry"
)

func NewFactory() processor.Factory {
Expand All @@ -28,10 +29,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo
return nil, fmt.Errorf("configuration parsing error")
}

telb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
tel, err := telemetry.New(set.TelemetrySettings)
if err != nil {
return nil, err
}

return newProcessor(pcfg, set.Logger, telb, next), nil
return newProcessor(pcfg, tel, next), nil
}
2 changes: 1 addition & 1 deletion processor/deltatocumulativeprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ require (
go.opentelemetry.io/otel/sdk/metric v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)

require (
Expand Down Expand Up @@ -54,6 +53,7 @@ require (
go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {

return dp
}

func (dp Summary) Add(Summary) Summary {
panic("todo")
}
25 changes: 24 additions & 1 deletion processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

var (
_ Point[Number] = Number{}
_ Point[Histogram] = Histogram{}
_ Point[ExpHistogram] = ExpHistogram{}
_ Point[Summary] = Summary{}
)

type Point[Self any] interface {
StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
Expand All @@ -23,7 +30,7 @@ type Point[Self any] interface {

type Typed[Self any] interface {
Point[Self]
Number | Histogram | ExpHistogram
Number | Histogram | ExpHistogram | Summary
}

type Number struct {
Expand Down Expand Up @@ -94,3 +101,19 @@ var (
_ = mustPoint[Histogram]{}
_ = mustPoint[ExpHistogram]{}
)

type Summary struct {
pmetric.SummaryDataPoint
}

func (dp Summary) Clone() Summary {
clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()}
if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Summary) CopyTo(dst Summary) {
dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ import (
var Opts = []cmp.Option{
cmpopts.EquateApprox(0, 1e-9),
cmp.Exporter(func(ty reflect.Type) bool {
return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata")
return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") || strings.HasPrefix(ty.PkgPath(), "github.com/open-telemetry/opentelemetry-collector-contrib")
}),
}

func Equal[T any](a, b T) bool {
return cmp.Equal(a, b, Opts...)
func Equal[T any](a, b T, opts ...cmp.Option) bool {
return cmp.Equal(a, b, append(Opts, opts...)...)
}

func Diff[T any](a, b T) string {
return cmp.Diff(a, b, Opts...)
func Diff[T any](a, b T, opts ...cmp.Option) string {
return cmp.Diff(a, b, append(Opts, opts...)...)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828
package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"

import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828
package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest"

import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828
package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"

import (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (scale Scale) Idx(v float64) int {
// This means a value min < v <= max belongs to this bucket.
//
// NOTE: this is different from Go slice intervals, which are [a,b)
func (scale Scale) Bounds(index int) (min, max float64) {
func (scale Scale) Bounds(index int) (min, max float64) { //nolint: predeclared
// from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function
lower := func(index int) float64 {
inverseFactor := math.Ldexp(math.Ln2, int(-scale))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828
package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"

import (
Expand Down Expand Up @@ -37,7 +38,7 @@ func WidenZero(dp DataPoint, width float64) {
widen(dp.Positive())
widen(dp.Negative())

_, max := scale.Bounds(zero)
_, max := scale.Bounds(zero) //nolint: predeclared
dp.SetZeroThreshold(max)
}

Expand Down
57 changes: 10 additions & 47 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,53 +8,24 @@ import (

"go.opentelemetry.io/collector/pdata/pcommon"

exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
)

func New[D data.Point[D]]() Accumulator[D] {
return Accumulator[D]{
Map: make(exp.HashMap[D]),
}
}

var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil)

type Accumulator[D data.Point[D]] struct {
streams.Map[D]
}

func (a Accumulator[D]) Store(id streams.Ident, dp D) error {
aggr, ok := a.Map.Load(id)

// new series: initialize with current sample
if !ok {
clone := dp.Clone()
return a.Map.Store(id, clone)
}

// drop bad samples
// AccumulateInto adds state and dp, storing the result in state
//
// state = state + dp
func AccumulateInto[P data.Point[P]](state P, dp P) error {
switch {
case dp.StartTimestamp() < aggr.StartTimestamp():
case dp.StartTimestamp() < state.StartTimestamp():
// belongs to older series
return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
case dp.Timestamp() <= aggr.Timestamp():
return ErrOlderStart{Start: state.StartTimestamp(), Sample: dp.StartTimestamp()}
case dp.Timestamp() <= state.Timestamp():
// out of order
return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

// detect gaps
var gap error
if dp.StartTimestamp() > aggr.Timestamp() {
gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()}
return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()}
}

res := aggr.Add(dp)
if err := a.Map.Store(id, res); err != nil {
return err
}
return gap
state.Add(dp)
return nil
}

type ErrOlderStart struct {
Expand All @@ -74,11 +45,3 @@ type ErrOutOfOrder struct {
func (e ErrOutOfOrder) Error() string {
return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last)
}

type ErrGap struct {
From, To pcommon.Timestamp
}

func (e ErrGap) Error() string {
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
}
Loading
Loading