diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index dae1870cbfd6..eb52e6861825 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -102,3 +102,33 @@ func (s *Staleness[T]) Evict() (identity.Stream, bool) { func (s *Staleness[T]) Clear() { s.items.Clear() } + +type Tracker struct { + pq PriorityQueue +} + +func NewTracker() Tracker { + return Tracker{pq: NewPriorityQueue()} +} + +func (stale Tracker) Refresh(ts time.Time, ids ...identity.Stream) { + for _, id := range ids { + stale.pq.Update(id, ts) + } +} + +func (stale Tracker) Collect(max time.Duration) []identity.Stream { + now := NowFunc() + + var ids []identity.Stream + for stale.pq.Len() > 0 { + _, ts := stale.pq.Peek() + if now.Sub(ts) < max { + break + } + id, _ := stale.pq.Pop() + ids = append(ids, id) + } + + return ids +} diff --git a/processor/deltatocumulativeprocessor/chain.go b/processor/deltatocumulativeprocessor/chain.go new file mode 100644 index 000000000000..0a39ea8939c7 --- /dev/null +++ b/processor/deltatocumulativeprocessor/chain.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor" +) + +var _ processor.Metrics = Chain(nil) + +// Chain calls processors in series. +// They must be manually setup so that their ConsumeMetrics() invoke each other +type Chain []processor.Metrics + +func (c Chain) Capabilities() consumer.Capabilities { + if len(c) == 0 { + return consumer.Capabilities{} + } + return c[0].Capabilities() +} + +func (c Chain) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + if len(c) == 0 { + return nil + } + return c[0].ConsumeMetrics(ctx, md) +} + +func (c Chain) Shutdown(ctx context.Context) error { + for _, proc := range c { + if err := proc.Shutdown(ctx); err != nil { + return err + } + } + return nil +} + +func (c Chain) Start(ctx context.Context, host component.Host) error { + for _, proc := range c { + if err := proc.Start(ctx, host); err != nil { + return err + } + } + return nil +} diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index b97793d0b6d8..2d6e1f5ae343 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -4,10 +4,14 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" import ( + "context" "fmt" + "math" "time" "go.opentelemetry.io/collector/component" + + telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" ) var _ component.ConfigValidator = (*Config)(nil) @@ -33,6 +37,12 @@ func createDefaultConfig() component.Config { // disable. TODO: find good default // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603 - MaxStreams: 0, + MaxStreams: math.MaxInt, } } + +func (c Config) Metrics(tel telemetry.Metrics) { + ctx := context.Background() + tel.DeltatocumulativeStreamsMaxStale.Record(ctx, int64(c.MaxStale.Seconds())) + tel.DeltatocumulativeStreamsLimit.Record(ctx, int64(c.MaxStreams)) +} diff --git a/processor/deltatocumulativeprocessor/config_test.go b/processor/deltatocumulativeprocessor/config_test.go index cbda97e2672d..371afcc05c0d 100644 --- a/processor/deltatocumulativeprocessor/config_test.go +++ b/processor/deltatocumulativeprocessor/config_test.go @@ -4,6 +4,7 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" import ( + "math" "path/filepath" "testing" "time" @@ -37,7 +38,7 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "set-valid-max_stale"), expected: &Config{ MaxStale: 2 * time.Minute, - MaxStreams: 0, + MaxStreams: math.MaxInt, }, }, { diff --git a/processor/deltatocumulativeprocessor/documentation.md b/processor/deltatocumulativeprocessor/documentation.md index 55d85f06c764..f9c560a8703e 100644 --- a/processor/deltatocumulativeprocessor/documentation.md +++ b/processor/deltatocumulativeprocessor/documentation.md @@ -14,6 +14,14 @@ number of datapoints dropped due to given 'reason' | ---- | ----------- | ---------- | --------- | | {datapoint} | Sum | Int | true | +### otelcol_deltatocumulative.datapoints.linear + +total number of datapoints processed. may have 'error' attribute, if processing failed + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoint} | Sum | Int | true | + ### otelcol_deltatocumulative.datapoints.processed number of datapoints processed @@ -61,3 +69,11 @@ number of streams tracked | Unit | Metric Type | Value Type | Monotonic | | ---- | ----------- | ---------- | --------- | | {dps} | Sum | Int | false | + +### otelcol_deltatocumulative.streams.tracked.linear + +number of streams tracked + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {dps} | Sum | Int | false | diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index 8a6a394083d6..5fbbacddc903 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" + ltel "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" ) @@ -32,6 +33,13 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo if err != nil { return nil, err } + proc := newProcessor(pcfg, set.Logger, telb, next) - return newProcessor(pcfg, set.Logger, telb, next), nil + ltel, err := ltel.New(set.TelemetrySettings) + if err != nil { + return nil, err + } + linear := newLinear(pcfg, ltel, proc) + + return Chain{linear, proc}, nil } diff --git a/processor/deltatocumulativeprocessor/internal/data/add.go b/processor/deltatocumulativeprocessor/internal/data/add.go index 597f918243d9..33c2f283c840 100644 --- a/processor/deltatocumulativeprocessor/internal/data/add.go +++ b/processor/deltatocumulativeprocessor/internal/data/add.go @@ -108,3 +108,7 @@ func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { return dp } + +func (dp Summary) Add(Summary) Summary { + panic("todo") +} diff --git a/processor/deltatocumulativeprocessor/internal/data/data.go b/processor/deltatocumulativeprocessor/internal/data/data.go index e6f7551fd1c2..2460af09c1b8 100644 --- a/processor/deltatocumulativeprocessor/internal/data/data.go +++ b/processor/deltatocumulativeprocessor/internal/data/data.go @@ -10,6 +10,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" ) +var ( + _ Point[Number] = Number{} + _ Point[Histogram] = Histogram{} + _ Point[ExpHistogram] = ExpHistogram{} + _ Point[Summary] = Summary{} +) + type Point[Self any] interface { StartTimestamp() pcommon.Timestamp Timestamp() pcommon.Timestamp @@ -23,7 +30,7 @@ type Point[Self any] interface { type Typed[Self any] interface { Point[Self] - Number | Histogram | ExpHistogram + Number | Histogram | ExpHistogram | Summary } type Number struct { @@ -94,3 +101,19 @@ var ( _ = mustPoint[Histogram]{} _ = mustPoint[ExpHistogram]{} ) + +type Summary struct { + pmetric.SummaryDataPoint +} + +func (dp Summary) Clone() Summary { + clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()} + if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp Summary) CopyTo(dst Summary) { + dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint) +} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index 5539eb8c8e49..e8d71d669f12 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -82,3 +82,20 @@ type ErrGap struct { func (e ErrGap) Error() string { return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) } + +// AccumulateInto adds state and dp, storing the result in state +// +// state = state + dp +func AccumulateInto[P data.Point[P]](state P, dp P) error { + switch { + case dp.StartTimestamp() < state.StartTimestamp(): + // belongs to older series + return ErrOlderStart{Start: state.StartTimestamp(), Sample: dp.StartTimestamp()} + case dp.Timestamp() <= state.Timestamp(): + // out of order + return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} + } + + state.Add(dp) + return nil +} diff --git a/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go new file mode 100644 index 000000000000..c81068d75c79 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/lineartelemetry/metrics.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" + +import ( + "context" + "errors" + "reflect" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" +) + +func New(set component.TelemetrySettings) (Metrics, error) { + m := Metrics{ + tracked: func() int { return 0 }, + } + + trackedCb := metadata.WithDeltatocumulativeStreamsTrackedLinearCallback(func() int64 { + return int64(m.tracked()) + }) + + telb, err := metadata.NewTelemetryBuilder(set, trackedCb) + if err != nil { + return Metrics{}, err + } + m.TelemetryBuilder = *telb + + return m, nil +} + +type Metrics struct { + metadata.TelemetryBuilder + + tracked func() int +} + +func (m Metrics) Datapoints() Counter { + return Counter{Int64Counter: m.DeltatocumulativeDatapointsLinear} +} + +func (m *Metrics) WithTracked(streams func() int) { + m.tracked = streams +} + +func Error(msg string) attribute.KeyValue { + return attribute.String("error", msg) +} + +func Cause(err error) attribute.KeyValue { + for { + uw := errors.Unwrap(err) + if uw == nil { + break + } + err = uw + } + + return Error(reflect.TypeOf(err).String()) +} + +type Counter struct{ metric.Int64Counter } + +func (c Counter) Inc(ctx context.Context, attrs ...attribute.KeyValue) { + c.Add(ctx, 1, metric.WithAttributes(attrs...)) +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index caf9cd16d9fd..ea7b2d964bb6 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -3,6 +3,7 @@ package metadata import ( + "context" "errors" "go.opentelemetry.io/otel/metric" @@ -28,15 +29,18 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - meter metric.Meter - DeltatocumulativeDatapointsDropped metric.Int64Counter - DeltatocumulativeDatapointsProcessed metric.Int64Counter - DeltatocumulativeGapsLength metric.Int64Counter - DeltatocumulativeStreamsEvicted metric.Int64Counter - DeltatocumulativeStreamsLimit metric.Int64Gauge - DeltatocumulativeStreamsMaxStale metric.Int64Gauge - DeltatocumulativeStreamsTracked metric.Int64UpDownCounter - meters map[configtelemetry.Level]metric.Meter + meter metric.Meter + DeltatocumulativeDatapointsDropped metric.Int64Counter + DeltatocumulativeDatapointsLinear metric.Int64Counter + DeltatocumulativeDatapointsProcessed metric.Int64Counter + DeltatocumulativeGapsLength metric.Int64Counter + DeltatocumulativeStreamsEvicted metric.Int64Counter + DeltatocumulativeStreamsLimit metric.Int64Gauge + DeltatocumulativeStreamsMaxStale metric.Int64Gauge + DeltatocumulativeStreamsTracked metric.Int64UpDownCounter + DeltatocumulativeStreamsTrackedLinear metric.Int64ObservableUpDownCounter + observeDeltatocumulativeStreamsTrackedLinear func(context.Context, metric.Observer) error + meters map[configtelemetry.Level]metric.Meter } // TelemetryBuilderOption applies changes to default builder. @@ -50,6 +54,16 @@ func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { tbof(mb) } +// WithDeltatocumulativeStreamsTrackedLinearCallback sets callback for observable DeltatocumulativeStreamsTrackedLinear metric. +func WithDeltatocumulativeStreamsTrackedLinearCallback(cb func() int64, opts ...metric.ObserveOption) TelemetryBuilderOption { + return telemetryBuilderOptionFunc(func(builder *TelemetryBuilder) { + builder.observeDeltatocumulativeStreamsTrackedLinear = func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(builder.DeltatocumulativeStreamsTrackedLinear, cb(), opts...) + return nil + } + }) +} + // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { @@ -65,6 +79,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{datapoint}"), ) errs = errors.Join(errs, err) + builder.DeltatocumulativeDatapointsLinear, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( + "otelcol_deltatocumulative.datapoints.linear", + metric.WithDescription("total number of datapoints processed. may have 'error' attribute, if processing failed"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) builder.DeltatocumulativeDatapointsProcessed, err = builder.meters[configtelemetry.LevelBasic].Int64Counter( "otelcol_deltatocumulative.datapoints.processed", metric.WithDescription("number of datapoints processed"), @@ -101,5 +121,13 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{dps}"), ) errs = errors.Join(errs, err) + builder.DeltatocumulativeStreamsTrackedLinear, err = builder.meters[configtelemetry.LevelBasic].Int64ObservableUpDownCounter( + "otelcol_deltatocumulative.streams.tracked.linear", + metric.WithDescription("number of streams tracked"), + metric.WithUnit("{dps}"), + ) + errs = errors.Join(errs, err) + _, err = builder.meters[configtelemetry.LevelBasic].RegisterCallback(builder.observeDeltatocumulativeStreamsTrackedLinear, builder.DeltatocumulativeStreamsTrackedLinear) + errs = errors.Join(errs, err) return &builder, errs } diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index 0475ba2d4ed1..9fa1df07eb1d 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -15,6 +15,11 @@ type Data[D data.Point[D]] interface { Ident() Ident } +type Filterable[D data.Point[D]] interface { + Data[D] + Filter(func(D) bool) +} + type Sum Metric func (s Sum) At(i int) data.Number { @@ -36,6 +41,10 @@ func (s Sum) Filter(expr func(data.Number) bool) { }) } +func (s Sum) SetAggregationTemporality(at pmetric.AggregationTemporality) { + s.Sum().SetAggregationTemporality(at) +} + type Histogram Metric func (s Histogram) At(i int) data.Histogram { @@ -57,6 +66,10 @@ func (s Histogram) Filter(expr func(data.Histogram) bool) { }) } +func (s Histogram) SetAggregationTemporality(at pmetric.AggregationTemporality) { + s.Histogram().SetAggregationTemporality(at) +} + type ExpHistogram Metric func (s ExpHistogram) At(i int) data.ExpHistogram { @@ -77,3 +90,49 @@ func (s ExpHistogram) Filter(expr func(data.ExpHistogram) bool) { return !expr(data.ExpHistogram{DataPoint: dp}) }) } + +func (s ExpHistogram) SetAggregationTemporality(at pmetric.AggregationTemporality) { + s.ExponentialHistogram().SetAggregationTemporality(at) +} + +type Gauge Metric + +func (s Gauge) At(i int) data.Number { + dp := Metric(s).Gauge().DataPoints().At(i) + return data.Number{NumberDataPoint: dp} +} + +func (s Gauge) Len() int { + return Metric(s).Gauge().DataPoints().Len() +} + +func (s Gauge) Ident() Ident { + return (*Metric)(&s).Ident() +} + +func (s Gauge) Filter(expr func(data.Number) bool) { + s.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + return !expr(data.Number{NumberDataPoint: dp}) + }) +} + +type Summary Metric + +func (s Summary) At(i int) data.Summary { + dp := Metric(s).Summary().DataPoints().At(i) + return data.Summary{SummaryDataPoint: dp} +} + +func (s Summary) Len() int { + return Metric(s).Summary().DataPoints().Len() +} + +func (s Summary) Ident() Ident { + return (*Metric)(&s).Ident() +} + +func (s Summary) Filter(expr func(data.Summary) bool) { + s.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool { + return !expr(data.Summary{SummaryDataPoint: dp}) + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 50c802c70e1d..98388dbf5eb6 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -33,3 +33,33 @@ func (m *Metric) Scope() pcommon.InstrumentationScope { func From(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { return Metric{res: res, scope: scope, Metric: metric} } + +func (m Metric) AggregationTemporality() pmetric.AggregationTemporality { + switch m.Type() { + case pmetric.MetricTypeSum: + return m.Sum().AggregationTemporality() + case pmetric.MetricTypeHistogram: + return m.Histogram().AggregationTemporality() + case pmetric.MetricTypeExponentialHistogram: + return m.ExponentialHistogram().AggregationTemporality() + } + + return pmetric.AggregationTemporalityUnspecified +} + +func (m Metric) Typed() any { + //exhaustive:enforce + switch m.Type() { + case pmetric.MetricTypeSum: + return Sum(m) + case pmetric.MetricTypeGauge: + return Gauge(m) + case pmetric.MetricTypeExponentialHistogram: + return ExpHistogram(m) + case pmetric.MetricTypeHistogram: + return Histogram(m) + case pmetric.MetricTypeSummary: + return Summary(m) + } + panic("unreachable") +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/data.go b/processor/deltatocumulativeprocessor/internal/streams/data.go index 532b4b8289e1..201dae8d884e 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data.go @@ -39,6 +39,7 @@ func Apply[P data.Point[P], List filterable[P]](dps List, fn func(Ident, P) (P, next, err := fn(id, dp) if err != nil { if !errors.Is(err, Drop) { + err = Error(id, err) errs = errors.Join(errs, err) } return false diff --git a/processor/deltatocumulativeprocessor/linear.go b/processor/deltatocumulativeprocessor/linear.go new file mode 100644 index 000000000000..b333ab851627 --- /dev/null +++ b/processor/deltatocumulativeprocessor/linear.go @@ -0,0 +1,212 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + +import ( + "context" + "errors" + "sync" + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + telemetry "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/lineartelemetry" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" +) + +var _ processor.Metrics = (*Linear)(nil) + +type Linear struct { + next processor.Metrics + cfg Config + + state state + mtx sync.Mutex + + ctx context.Context + cancel context.CancelFunc + + stale staleness.Tracker + tel telemetry.Metrics +} + +func newLinear(cfg *Config, tel telemetry.Metrics, next processor.Metrics) *Linear { + ctx, cancel := context.WithCancel(context.Background()) + + proc := Linear{ + next: next, + cfg: *cfg, + state: state{ + nums: make(exp.HashMap[data.Number]), + hist: make(exp.HashMap[data.Histogram]), + expo: make(exp.HashMap[data.ExpHistogram]), + }, + ctx: ctx, + cancel: cancel, + + stale: staleness.NewTracker(), + tel: tel, + } + + tel.WithTracked(proc.state.Len) + cfg.Metrics(tel) + + return &proc +} + +func (p *Linear) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + p.mtx.Lock() + defer p.mtx.Unlock() + + now := time.Now() + + const ( + keep = true + drop = false + ) + + // possible errors encountered while aggregating. + // errors.Join-ed []streams.Error + var errs error + + metrics.Filter(md, func(m metrics.Metric) bool { + if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { + return keep + } + + // NOTE: to make review and migration easier, below only does sums for now. + // all other datatypes are handled by older code, which is called after this. + // + // TODO: implement other datatypes here + if m.Type() != pmetric.MetricTypeSum { + return keep + } + + sum := metrics.Sum(m) + state := p.state.nums + + // apply fn to each dp in stream. if fn's err != nil, dp is removed from stream + err := streams.Apply(sum, func(id identity.Stream, dp data.Number) (data.Number, error) { + acc, ok := state.Load(id) + // if at stream limit and stream not seen before, reject + if !ok && p.state.Len() >= p.cfg.MaxStreams { + p.tel.Datapoints().Inc(ctx, telemetry.Error("limit")) + return dp, streams.Drop + } + + // stream is alive, update stale tracker + p.stale.Refresh(now, id) + + acc, err := func() (data.Number, error) { + if !ok { + // new stream: there is no existing aggregation, so start new with current dp + return dp, nil + } + // tracked stream: add incoming delta dp to existing cumulative aggregation + return acc, delta.AccumulateInto(acc, dp) + }() + + // aggregation failed, record as metric and drop datapoint + if err != nil { + p.tel.Datapoints().Inc(ctx, telemetry.Cause(err)) + return acc, streams.Drop + } + + // store aggregated result in state and return + p.tel.Datapoints().Inc(ctx) + _ = state.Store(id, acc) + return acc, nil + }) + errs = errors.Join(errs, err) + + // all remaining datapoints are cumulative + sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + // if no datapoints remain, drop now-empty metric + return sum.Len() > 0 + }) + if errs != nil { + return errs + } + + // no need to continue pipeline if we dropped all metrics + if md.MetricCount() == 0 { + return nil + } + return p.next.ConsumeMetrics(ctx, md) +} + +func (p *Linear) Start(_ context.Context, _ component.Host) error { + if p.cfg.MaxStale != 0 { + // delete stale streams once per minute + go func() { + tick := time.NewTicker(time.Minute) + defer tick.Stop() + for { + select { + case <-p.ctx.Done(): + return + case <-tick.C: + p.mtx.Lock() + stale := p.stale.Collect(p.cfg.MaxStale) + for _, id := range stale { + p.state.Delete(id) + } + p.mtx.Unlock() + } + } + }() + } + + return nil +} + +func (p *Linear) Shutdown(_ context.Context) error { + p.cancel() + return nil +} + +func (p *Linear) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: true} +} + +type Metric[T data.Point[T]] interface { + metrics.Filterable[T] + SetAggregationTemporality(pmetric.AggregationTemporality) +} + +// state keeps a cumulative value, aggregated over time, per stream +type state struct { + nums streams.Map[data.Number] + + // future use + hist streams.Map[data.Histogram] + expo streams.Map[data.ExpHistogram] +} + +func (m state) Len() int { + return m.nums.Len() + m.hist.Len() + m.expo.Len() +} + +func (m state) Has(id identity.Stream) bool { + _, nok := m.nums.Load(id) + _, hok := m.hist.Load(id) + _, eok := m.expo.Load(id) + return nok || hok || eok +} + +func (m state) Delete(id identity.Stream) { + m.nums.Delete(id) + m.hist.Delete(id) + m.expo.Delete(id) +} diff --git a/processor/deltatocumulativeprocessor/metadata.yaml b/processor/deltatocumulativeprocessor/metadata.yaml index 552c812e1f15..5c9149ede108 100644 --- a/processor/deltatocumulativeprocessor/metadata.yaml +++ b/processor/deltatocumulativeprocessor/metadata.yaml @@ -9,7 +9,6 @@ status: codeowners: active: [sh0rez, RichieSams, jpkrohling] - telemetry: metrics: # streams @@ -20,6 +19,14 @@ telemetry: value_type: int monotonic: false enabled: true + deltatocumulative.streams.tracked.linear: + description: number of streams tracked + unit: "{dps}" + sum: + value_type: int + monotonic: false + async: true + enabled: true deltatocumulative.streams.limit: description: upper limit of tracked streams unit: "{stream}" @@ -54,6 +61,14 @@ telemetry: value_type: int monotonic: true enabled: true + + deltatocumulative.datapoints.linear: + description: total number of datapoints processed. may have 'error' attribute, if processing failed + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + enabled: true deltatocumulative.gaps.length: description: total duration where data was expected but not received unit: "s"