Skip to content

Commit

Permalink
[processor/deltatocumulative] partial linear pipeline (#35048)
Browse files Browse the repository at this point in the history
**Description:** 
Partially introduces a highly decoupled, linear processing pipeline.
Implemented as a standalone struct to make review easier, will refactor
this later.
Instead of overloading `Map.Store()` to do aggregation, staleness and
limiting, this functionality is now explcitly handled in
`ConsumeMetrics`.

This highly aids readability and makes understanding this processor a
lot easier, as less mental context needs to be kept.

*Notes to reviewer*:
See
[`68dc901`](68dc901)
for the main added logic.
Compare `processor.go` (old, nested) to `linear.go` (new, linear)

Replaces #34757 

**Link to tracking Issue:** none

**Testing:** This is a refactor. Existing tests were not modified and
still pass

**Documentation:** not needed
  • Loading branch information
sh0rez authored Sep 27, 2024
1 parent 3033832 commit 2613b89
Show file tree
Hide file tree
Showing 16 changed files with 589 additions and 14 deletions.
30 changes: 30 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,33 @@ func (s *Staleness[T]) Evict() (identity.Stream, bool) {
func (s *Staleness[T]) Clear() {
s.items.Clear()
}

type Tracker struct {
pq PriorityQueue
}

func NewTracker() Tracker {
return Tracker{pq: NewPriorityQueue()}
}

func (stale Tracker) Refresh(ts time.Time, ids ...identity.Stream) {
for _, id := range ids {
stale.pq.Update(id, ts)
}
}

func (stale Tracker) Collect(max time.Duration) []identity.Stream {
now := NowFunc()

var ids []identity.Stream
for stale.pq.Len() > 0 {
_, ts := stale.pq.Peek()
if now.Sub(ts) < max {
break
}
id, _ := stale.pq.Pop()
ids = append(ids, id)
}

return ids
}
51 changes: 51 additions & 0 deletions processor/deltatocumulativeprocessor/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"

import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/processor"
)

var _ processor.Metrics = Chain(nil)

// Chain calls processors in series.
// They must be manually setup so that their ConsumeMetrics() invoke each other
type Chain []processor.Metrics

func (c Chain) Capabilities() consumer.Capabilities {
if len(c) == 0 {
return consumer.Capabilities{}
}
return c[0].Capabilities()
}

func (c Chain) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
if len(c) == 0 {
return nil
}
return c[0].ConsumeMetrics(ctx, md)
}

func (c Chain) Shutdown(ctx context.Context) error {
for _, proc := range c {
if err := proc.Shutdown(ctx); err != nil {
return err
}
}
return nil
}

func (c Chain) Start(ctx context.Context, host component.Host) error {
for _, proc := range c {
if err := proc.Start(ctx, host); err != nil {
return err
}
}
return nil
}
12 changes: 11 additions & 1 deletion processor/deltatocumulativeprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"

import (
"context"
"fmt"
"math"
"time"

"go.opentelemetry.io/collector/component"

telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
)

var _ component.ConfigValidator = (*Config)(nil)
Expand All @@ -33,6 +37,12 @@ func createDefaultConfig() component.Config {

// disable. TODO: find good default
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603
MaxStreams: 0,
MaxStreams: math.MaxInt,
}
}

func (c Config) Metrics(tel telemetry.Metrics) {
ctx := context.Background()
tel.DeltatocumulativeStreamsMaxStale.Record(ctx, int64(c.MaxStale.Seconds()))
tel.DeltatocumulativeStreamsLimit.Record(ctx, int64(c.MaxStreams))
}
3 changes: 2 additions & 1 deletion processor/deltatocumulativeprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor"

import (
"math"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -37,7 +38,7 @@ func TestLoadConfig(t *testing.T) {
id: component.NewIDWithName(metadata.Type, "set-valid-max_stale"),
expected: &Config{
MaxStale: 2 * time.Minute,
MaxStreams: 0,
MaxStreams: math.MaxInt,
},
},
{
Expand Down
16 changes: 16 additions & 0 deletions processor/deltatocumulativeprocessor/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ number of datapoints dropped due to given 'reason'
| ---- | ----------- | ---------- | --------- |
| {datapoint} | Sum | Int | true |

### otelcol_deltatocumulative.datapoints.linear

total number of datapoints processed. may have 'error' attribute, if processing failed

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {datapoint} | Sum | Int | true |

### otelcol_deltatocumulative.datapoints.processed

number of datapoints processed
Expand Down Expand Up @@ -61,3 +69,11 @@ number of streams tracked
| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {dps} | Sum | Int | false |

### otelcol_deltatocumulative.streams.tracked.linear

number of streams tracked

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {dps} | Sum | Int | false |
10 changes: 9 additions & 1 deletion processor/deltatocumulativeprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

ltel "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata"
)

Expand All @@ -32,6 +33,13 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo
if err != nil {
return nil, err
}
proc := newProcessor(pcfg, set.Logger, telb, next)

return newProcessor(pcfg, set.Logger, telb, next), nil
ltel, err := ltel.New(set.TelemetrySettings)
if err != nil {
return nil, err
}
linear := newLinear(pcfg, ltel, proc)

return Chain{linear, proc}, nil
}
4 changes: 4 additions & 0 deletions processor/deltatocumulativeprocessor/internal/data/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram {

return dp
}

func (dp Summary) Add(Summary) Summary {
panic("todo")
}
25 changes: 24 additions & 1 deletion processor/deltatocumulativeprocessor/internal/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo"
)

var (
_ Point[Number] = Number{}
_ Point[Histogram] = Histogram{}
_ Point[ExpHistogram] = ExpHistogram{}
_ Point[Summary] = Summary{}
)

type Point[Self any] interface {
StartTimestamp() pcommon.Timestamp
Timestamp() pcommon.Timestamp
Expand All @@ -23,7 +30,7 @@ type Point[Self any] interface {

type Typed[Self any] interface {
Point[Self]
Number | Histogram | ExpHistogram
Number | Histogram | ExpHistogram | Summary
}

type Number struct {
Expand Down Expand Up @@ -94,3 +101,19 @@ var (
_ = mustPoint[Histogram]{}
_ = mustPoint[ExpHistogram]{}
)

type Summary struct {
pmetric.SummaryDataPoint
}

func (dp Summary) Clone() Summary {
clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()}
if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) {
dp.CopyTo(clone)
}
return clone
}

func (dp Summary) CopyTo(dst Summary) {
dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint)
}
17 changes: 17 additions & 0 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,20 @@ type ErrGap struct {
func (e ErrGap) Error() string {
return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To)
}

// AccumulateInto adds state and dp, storing the result in state
//
// state = state + dp
func AccumulateInto[P data.Point[P]](state P, dp P) error {
switch {
case dp.StartTimestamp() < state.StartTimestamp():
// belongs to older series
return ErrOlderStart{Start: state.StartTimestamp(), Sample: dp.StartTimestamp()}
case dp.Timestamp() <= state.Timestamp():
// out of order
return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()}
}

state.Add(dp)
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry"

import (
"context"
"errors"
"reflect"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata"
)

func New(set component.TelemetrySettings) (Metrics, error) {
m := Metrics{
tracked: func() int { return 0 },
}

trackedCb := metadata.WithDeltatocumulativeStreamsTrackedLinearCallback(func() int64 {
return int64(m.tracked())
})

telb, err := metadata.NewTelemetryBuilder(set, trackedCb)
if err != nil {
return Metrics{}, err
}
m.TelemetryBuilder = *telb

return m, nil
}

type Metrics struct {
metadata.TelemetryBuilder

tracked func() int
}

func (m Metrics) Datapoints() Counter {
return Counter{Int64Counter: m.DeltatocumulativeDatapointsLinear}
}

func (m *Metrics) WithTracked(streams func() int) {
m.tracked = streams
}

func Error(msg string) attribute.KeyValue {
return attribute.String("error", msg)
}

func Cause(err error) attribute.KeyValue {
for {
uw := errors.Unwrap(err)
if uw == nil {
break
}
err = uw
}

return Error(reflect.TypeOf(err).String())
}

type Counter struct{ metric.Int64Counter }

func (c Counter) Inc(ctx context.Context, attrs ...attribute.KeyValue) {
c.Add(ctx, 1, metric.WithAttributes(attrs...))
}
Loading

0 comments on commit 2613b89

Please sign in to comment.