From fca1bb94e664f29710969de454fcbf0e9c9dd099 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 15 Aug 2024 11:06:32 +0200 Subject: [PATCH 1/7] deltatocumulative: linear, type-agnostic pipeline Transforms the processors main logic from a deeply-nested, map structure that overloads the Store operation to a strongly decoupled, linear processing flow that nearly entirely happens within `ConsumeMetrics`, greatly improving understandability of the code --- internal/exp/metrics/staleness/staleness.go | 30 +++ .../deltatocumulativeprocessor/factory.go | 2 +- .../generated_component_telemetry_test.go | 7 +- .../internal/delta/delta.go | 17 ++ .../internal/metadata/generated_telemetry.go | 5 +- .../metadata/generated_telemetry_test.go | 3 +- .../internal/metrics/data.go | 38 ++++ .../internal/metrics/metrics.go | 28 +++ .../internal/metrics/util.go | 22 +- .../internal/putil/pslice/pslice.go | 10 + .../internal/streams/data.go | 7 +- .../deltatocumulativeprocessor/processor.go | 209 ++++++++++-------- 12 files changed, 273 insertions(+), 105 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index dae1870cbfd6..eb52e6861825 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -102,3 +102,33 @@ func (s *Staleness[T]) Evict() (identity.Stream, bool) { func (s *Staleness[T]) Clear() { s.items.Clear() } + +type Tracker struct { + pq PriorityQueue +} + +func NewTracker() Tracker { + return Tracker{pq: NewPriorityQueue()} +} + +func (stale Tracker) Refresh(ts time.Time, ids ...identity.Stream) { + for _, id := range ids { + stale.pq.Update(id, ts) + } +} + +func (stale Tracker) Collect(max time.Duration) []identity.Stream { + now := NowFunc() + + var ids []identity.Stream + for stale.pq.Len() > 0 { + _, ts := stale.pq.Peek() + if now.Sub(ts) < max { + break + } + id, _ := stale.pq.Pop() + ids = append(ids, id) + } + + return ids +} diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index 8a6a394083d6..119180dc6373 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -33,5 +33,5 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo return nil, err } - return newProcessor(pcfg, set.Logger, telb, next), nil + return newProcessor(pcfg, telb, next), nil } diff --git a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go index 52ad9e905c16..1ef864ce6d1a 100644 --- a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go @@ -7,13 +7,12 @@ import ( "testing" "github.com/stretchr/testify/require" - sdkmetric "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/metric/metricdata" - "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processortest" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" ) type componentTestTelemetry struct { diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index 5539eb8c8e49..e8d71d669f12 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -82,3 +82,20 @@ type ErrGap struct { func (e ErrGap) Error() string { return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) } + +// AccumulateInto adds state and dp, storing the result in state +// +// state = state + dp +func AccumulateInto[P data.Point[P]](state P, dp P) error { + switch { + case dp.StartTimestamp() < state.StartTimestamp(): + // belongs to older series + return ErrOlderStart{Start: state.StartTimestamp(), Sample: dp.StartTimestamp()} + case dp.Timestamp() <= state.Timestamp(): + // out of order + return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} + } + + state.Add(dp) + return nil +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index 75f63aa49698..a0f4d22f25d4 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -5,12 +5,11 @@ package metadata import ( "errors" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go index 50d7558d885a..e6d03363adba 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go @@ -6,14 +6,13 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" - - "go.opentelemetry.io/collector/component" ) type mockMeter struct { diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index 0475ba2d4ed1..3c417ec06418 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -15,6 +15,11 @@ type Data[D data.Point[D]] interface { Ident() Ident } +type Filterable[D data.Point[D]] interface { + Data[D] + Filter(func(D) bool) +} + type Sum Metric func (s Sum) At(i int) data.Number { @@ -36,6 +41,10 @@ func (s Sum) Filter(expr func(data.Number) bool) { }) } +func (s Sum) SetAggregationTemporality(at pmetric.AggregationTemporality) { + s.Sum().SetAggregationTemporality(at) +} + type Histogram Metric func (s Histogram) At(i int) data.Histogram { @@ -57,6 +66,10 @@ func (s Histogram) Filter(expr func(data.Histogram) bool) { }) } +func (s Histogram) SetAggregationTemporality(at pmetric.AggregationTemporality) { + s.Histogram().SetAggregationTemporality(at) +} + type ExpHistogram Metric func (s ExpHistogram) At(i int) data.ExpHistogram { @@ -77,3 +90,28 @@ func (s ExpHistogram) Filter(expr func(data.ExpHistogram) bool) { return !expr(data.ExpHistogram{DataPoint: dp}) }) } + +func (s ExpHistogram) SetAggregationTemporality(at pmetric.AggregationTemporality) { + s.ExponentialHistogram().SetAggregationTemporality(at) +} + +type Gauge Metric + +func (s Gauge) At(i int) data.Number { + dp := Metric(s).Gauge().DataPoints().At(i) + return data.Number{NumberDataPoint: dp} +} + +func (s Gauge) Len() int { + return Metric(s).Gauge().DataPoints().Len() +} + +func (s Gauge) Ident() Ident { + return (*Metric)(&s).Ident() +} + +func (s Gauge) Filter(expr func(data.Number) bool) { + s.Gauge().DataPoints().RemoveIf(func(dp pmetric.NumberDataPoint) bool { + return !expr(data.Number{NumberDataPoint: dp}) + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 6b705f5a7d24..74be6667524c 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -25,3 +25,31 @@ func (m *Metric) Ident() Ident { func From(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { return Metric{res: res, scope: scope, Metric: metric} } + +func (m Metric) AggregationTemporality() pmetric.AggregationTemporality { + switch m.Type() { + case pmetric.MetricTypeSum: + return m.Sum().AggregationTemporality() + case pmetric.MetricTypeHistogram: + return m.Histogram().AggregationTemporality() + case pmetric.MetricTypeExponentialHistogram: + return m.ExponentialHistogram().AggregationTemporality() + } + + return pmetric.AggregationTemporalityUnspecified +} + +func (m Metric) Typed() any { + //exhaustive:enforce + switch m.Type() { + case pmetric.MetricTypeSum: + return Sum(m) + case pmetric.MetricTypeGauge: + return Gauge(m) + case pmetric.MetricTypeExponentialHistogram: + return ExpHistogram(m) + case pmetric.MetricTypeHistogram: + return Histogram(m) + } + panic("unreachable") +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/util.go b/processor/deltatocumulativeprocessor/internal/metrics/util.go index 985716b3cc0f..139986e1d536 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/util.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/util.go @@ -3,7 +3,11 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" -import "go.opentelemetry.io/collector/pdata/pmetric" +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" +) func Filter(metrics pmetric.Metrics, fn func(m Metric) bool) { metrics.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { @@ -23,3 +27,19 @@ func Each(metrics pmetric.Metrics, fn func(m Metric)) { return true }) } + +func All(md pmetric.Metrics) func(func(Metric) bool) { + return func(yield func(Metric) bool) { + var ok bool + pslice.All(md.ResourceMetrics())(func(rm pmetric.ResourceMetrics) bool { + pslice.All(rm.ScopeMetrics())(func(sm pmetric.ScopeMetrics) bool { + pslice.All(sm.Metrics())(func(m pmetric.Metric) bool { + ok = yield(From(rm.Resource(), sm.Scope(), m)) + return ok + }) + return ok + }) + return ok + }) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go b/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go index 5a0c2b64d863..6cc97af04132 100644 --- a/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go +++ b/processor/deltatocumulativeprocessor/internal/putil/pslice/pslice.go @@ -19,3 +19,13 @@ func Equal[E comparable, S Slice[E]](a, b S) bool { } return true } + +func All[E any, S Slice[E]](slice S) func(func(E) bool) { + return func(yield func(E) bool) { + for i := 0; i < slice.Len(); i++ { + if !yield(slice.At(i)) { + break + } + } + } +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/data.go b/processor/deltatocumulativeprocessor/internal/streams/data.go index 0c54be543c45..8a98fa97f919 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data.go @@ -27,15 +27,10 @@ func Samples[D data.Point[D]](m metrics.Data[D]) Seq[D] { } } -type filterable[D data.Point[D]] interface { - metrics.Data[D] - Filter(func(D) bool) -} - // Apply does dps[i] = fn(dps[i]) for each item in dps. // If fn returns [streams.Drop], the datapoint is removed from dps instead. // If fn returns another error, the datapoint is also removed and the error returned eventually -func Apply[P data.Point[P], List filterable[P]](dps List, fn func(Ident, P) (P, error)) error { +func Apply[P data.Point[P], List metrics.Filterable[P]](dps List, fn func(Ident, P) (P, error)) error { var errs error mid := dps.Ident() diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index cc63f2c90e40..804b8812a2c0 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" +package deltatocumulativeprocessor import ( "context" @@ -13,108 +13,154 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/processor" - "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness" + exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) var _ processor.Metrics = (*Processor)(nil) type Processor struct { next consumer.Metrics + cfg Config + + state Map + mtx sync.Mutex - log *zap.Logger ctx context.Context cancel context.CancelFunc - sums Pipeline[data.Number] - expo Pipeline[data.ExpHistogram] - hist Pipeline[data.Histogram] - - mtx sync.Mutex + stale staleness.Tracker + tel metadata.TelemetryBuilder } -func newProcessor(cfg *Config, log *zap.Logger, telb *metadata.TelemetryBuilder, next consumer.Metrics) *Processor { +func newProcessor(cfg *Config, tel *metadata.TelemetryBuilder, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) - tel := telemetry.New(telb) proc := Processor{ - log: log, + next: next, + cfg: *cfg, + state: Map{ + nums: make(exp.HashMap[data.Number]), + hist: make(exp.HashMap[data.Histogram]), + expo: make(exp.HashMap[data.ExpHistogram]), + }, ctx: ctx, cancel: cancel, - next: next, - sums: pipeline[data.Number](cfg, &tel), - expo: pipeline[data.ExpHistogram](cfg, &tel), - hist: pipeline[data.Histogram](cfg, &tel), + stale: staleness.NewTracker(), + tel: *tel, } - return &proc } -type Pipeline[D data.Point[D]] struct { - aggr streams.Aggregator[D] - stale maybe.Ptr[staleness.Staleness[D]] -} +func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { + p.mtx.Lock() + defer p.mtx.Unlock() -func pipeline[D data.Point[D]](cfg *Config, tel *telemetry.Telemetry) Pipeline[D] { - var pipe Pipeline[D] + now := time.Now() - var dps streams.Map[D] - dps = delta.New[D]() - dps = telemetry.ObserveItems(dps, &tel.Metrics) + var errs error + metrics.All(md)(func(m metrics.Metric) bool { + if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { + return true + } - if cfg.MaxStale > 0 { - tel.WithStale(cfg.MaxStale) - stale := maybe.Some(staleness.NewStaleness(cfg.MaxStale, dps)) - pipe.stale = stale - dps, _ = stale.Try() - } - if cfg.MaxStreams > 0 { - tel.WithLimit(int64(cfg.MaxStreams)) - lim := streams.Limit(dps, cfg.MaxStreams) - if stale, ok := pipe.stale.Try(); ok { - lim.Evictor = stale + var each AggrFunc + switch m := m.Typed().(type) { + case metrics.Sum: + each = use(m, p.state.nums) + case metrics.Histogram: + each = use(m, p.state.hist) + case metrics.ExpHistogram: + each = use(m, p.state.expo) } - dps = lim - } - dps = telemetry.ObserveNonFatal(dps, &tel.Metrics) + err := each(func(id identity.Stream, aggr func() error) error { + // if stream not seen before and stream limit is reached, reject + if !p.state.Has(id) && p.state.Len() >= p.cfg.MaxStreams { + // TODO: record metric + return streams.Drop + } - pipe.aggr = streams.IntoAggregator(dps) - return pipe + // stream is alive, refresh it + p.stale.Refresh(now, id) + + // accumulate current dp into state + return aggr() + }) + errs = errors.Join(errs, err) + + return true + }) + + if errs != nil { + return errs + } + return p.next.ConsumeMetrics(ctx, md) } -func (p *Processor) Start(_ context.Context, _ component.Host) error { - sums, sok := p.sums.stale.Try() - expo, eok := p.expo.stale.Try() - hist, hok := p.hist.stale.Try() - if !(sok && eok && hok) { - return nil +// AggrFunc calls `do` for datapoint of a metric, giving the caller the +// opportunity to decide whether to perform aggregation in a type-agnostic way, +// while keeping underlying strong typing. +// +// if `aggr` is called, the current datapoint is accumulated into the streams +// cumulative state. +// +// if any error is returned, the current datapoint is dropped, the error +// collected and eventually returned. [streams.Drop] silently drops a datapoint +type AggrFunc func(do func(id identity.Stream, aggr func() error) error) error + +// use returns a AggrFn for the given metric, accumulating into the given state, +// given `do` calls its `aggr` +func use[T data.Typed[T], M Metric[T]](m M, state streams.Map[T]) AggrFunc { + return func(do func(id identity.Stream, aggr func() error) error) error { + return streams.Apply(m, func(id identity.Stream, dp T) (T, error) { + acc, ok := state.Load(id) + aggr := func() error { + m.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + // no previous state: take dp as-is + if !ok { + acc = dp + return nil + } + + // accumulate into previous state + return delta.AccumulateInto(acc, dp) + } + + err := do(id, aggr) + return acc, err + }) } +} +func (p *Processor) Start(_ context.Context, _ component.Host) error { go func() { tick := time.NewTicker(time.Minute) + defer tick.Stop() for { select { case <-p.ctx.Done(): return case <-tick.C: p.mtx.Lock() - sums.ExpireOldEntries() - expo.ExpireOldEntries() - hist.ExpireOldEntries() + stale := p.stale.Collect(p.cfg.MaxStale) + for _, id := range stale { + p.state.Delete(id) + } p.mtx.Unlock() } } }() + return nil } @@ -127,43 +173,30 @@ func (p *Processor) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } -func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { - if err := context.Cause(p.ctx); err != nil { - return err - } +type Metric[T data.Point[T]] interface { + metrics.Filterable[T] + SetAggregationTemporality(pmetric.AggregationTemporality) +} - p.mtx.Lock() - defer p.mtx.Unlock() +type Map struct { + nums streams.Map[data.Number] + hist streams.Map[data.Histogram] + expo streams.Map[data.ExpHistogram] +} - var errs error - metrics.Each(md, func(m metrics.Metric) { - switch m.Type() { - case pmetric.MetricTypeSum: - sum := m.Sum() - if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Apply(metrics.Sum(m), p.sums.aggr.Aggregate) - errs = errors.Join(errs, err) - sum.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } - case pmetric.MetricTypeHistogram: - hist := m.Histogram() - if hist.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Apply(metrics.Histogram(m), p.hist.aggr.Aggregate) - errs = errors.Join(errs, err) - hist.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } - case pmetric.MetricTypeExponentialHistogram: - expo := m.ExponentialHistogram() - if expo.AggregationTemporality() == pmetric.AggregationTemporalityDelta { - err := streams.Apply(metrics.ExpHistogram(m), p.expo.aggr.Aggregate) - errs = errors.Join(errs, err) - expo.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) - } - } - }) - if errs != nil { - return errs - } +func (m Map) Len() int { + return m.nums.Len() + m.hist.Len() + m.expo.Len() +} - return p.next.ConsumeMetrics(ctx, md) +func (m Map) Has(id identity.Stream) bool { + _, nok := m.nums.Load(id) + _, hok := m.hist.Load(id) + _, eok := m.expo.Load(id) + return nok || hok || eok +} + +func (m Map) Delete(id identity.Stream) { + m.nums.Delete(id) + m.hist.Delete(id) + m.expo.Delete(id) } From 9474b117b759d869f1a349778a4bf410fa7ab446 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Thu, 15 Aug 2024 14:58:01 +0200 Subject: [PATCH 2/7] processor: testing moves most tests from `internal/delta` to the processor itself, excercising the public api. --- internal/exp/metrics/identity/stream.go | 4 + .../deltatocumulativeprocessor/config.go | 3 +- .../deltatocumulativeprocessor/config_test.go | 3 +- processor/deltatocumulativeprocessor/go.mod | 2 +- .../internal/data/datatest/compare/compare.go | 10 +- .../internal/delta/delta.go | 74 +---- .../internal/delta/delta_test.go | 230 -------------- .../internal/maybe/ptr.go | 52 ---- .../internal/maybe/ptr_test.go | 64 ---- .../internal/metrics/{util.go => iter.go} | 31 +- .../internal/metrics/metrics.go | 8 + .../internal/streams/data.go | 29 +- .../internal/streams/data_test.go | 105 +++---- .../internal/streams/errors.go | 4 + .../internal/streams/limit.go | 78 ----- .../internal/streams/limit_test.go | 111 ------- .../internal/streams/streams.go | 27 +- .../deltatocumulativeprocessor/processor.go | 47 +-- .../processor_test.go | 285 ++++++++++++++++++ 19 files changed, 414 insertions(+), 753 deletions(-) delete mode 100644 processor/deltatocumulativeprocessor/internal/delta/delta_test.go delete mode 100644 processor/deltatocumulativeprocessor/internal/maybe/ptr.go delete mode 100644 processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go rename processor/deltatocumulativeprocessor/internal/metrics/{util.go => iter.go} (73%) delete mode 100644 processor/deltatocumulativeprocessor/internal/streams/limit.go delete mode 100644 processor/deltatocumulativeprocessor/internal/streams/limit_test.go create mode 100644 processor/deltatocumulativeprocessor/processor_test.go diff --git a/internal/exp/metrics/identity/stream.go b/internal/exp/metrics/identity/stream.go index 19988f7730dc..117b58c90643 100644 --- a/internal/exp/metrics/identity/stream.go +++ b/internal/exp/metrics/identity/stream.go @@ -33,3 +33,7 @@ func OfStream[DataPoint attrPoint](m Metric, dp DataPoint) Stream { type attrPoint interface { Attributes() pcommon.Map } + +func Compare[T interface{ Hash() hash.Hash64 }](a, b T) int { + return int(a.Hash().Sum64() - b.Hash().Sum64()) +} diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index b97793d0b6d8..b8789f290665 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -5,6 +5,7 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "fmt" + "math" "time" "go.opentelemetry.io/collector/component" @@ -33,6 +34,6 @@ func createDefaultConfig() component.Config { // disable. TODO: find good default // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603 - MaxStreams: 0, + MaxStreams: math.MaxInt, } } diff --git a/processor/deltatocumulativeprocessor/config_test.go b/processor/deltatocumulativeprocessor/config_test.go index cbda97e2672d..371afcc05c0d 100644 --- a/processor/deltatocumulativeprocessor/config_test.go +++ b/processor/deltatocumulativeprocessor/config_test.go @@ -4,6 +4,7 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" import ( + "math" "path/filepath" "testing" "time" @@ -37,7 +38,7 @@ func TestLoadConfig(t *testing.T) { id: component.NewIDWithName(metadata.Type, "set-valid-max_stale"), expected: &Config{ MaxStale: 2 * time.Minute, - MaxStreams: 0, + MaxStreams: math.MaxInt, }, }, { diff --git a/processor/deltatocumulativeprocessor/go.mod b/processor/deltatocumulativeprocessor/go.mod index 6d2aee64e94d..41b8c1aa64c1 100644 --- a/processor/deltatocumulativeprocessor/go.mod +++ b/processor/deltatocumulativeprocessor/go.mod @@ -19,7 +19,6 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.28.0 go.opentelemetry.io/otel/trace v1.28.0 go.uber.org/goleak v1.3.0 - go.uber.org/zap v1.27.0 ) require ( @@ -54,6 +53,7 @@ require ( go.opentelemetry.io/otel/exporters/prometheus v0.50.0 // indirect go.opentelemetry.io/otel/sdk v1.28.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go b/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go index 91f58ff8b0f0..eb8c0f11174a 100644 --- a/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go +++ b/processor/deltatocumulativeprocessor/internal/data/datatest/compare/compare.go @@ -14,14 +14,14 @@ import ( var Opts = []cmp.Option{ cmpopts.EquateApprox(0, 1e-9), cmp.Exporter(func(ty reflect.Type) bool { - return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") + return strings.HasPrefix(ty.PkgPath(), "go.opentelemetry.io/collector/pdata") || strings.HasPrefix(ty.PkgPath(), "github.com/open-telemetry/opentelemetry-collector-contrib") }), } -func Equal[T any](a, b T) bool { - return cmp.Equal(a, b, Opts...) +func Equal[T any](a, b T, opts ...cmp.Option) bool { + return cmp.Equal(a, b, append(Opts, opts...)...) } -func Diff[T any](a, b T) string { - return cmp.Diff(a, b, Opts...) +func Diff[T any](a, b T, opts ...cmp.Option) string { + return cmp.Diff(a, b, append(Opts, opts...)...) } diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index e8d71d669f12..d8174db9c2b2 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -8,53 +8,24 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) -func New[D data.Point[D]]() Accumulator[D] { - return Accumulator[D]{ - Map: make(exp.HashMap[D]), - } -} - -var _ streams.Map[data.Number] = (*Accumulator[data.Number])(nil) - -type Accumulator[D data.Point[D]] struct { - streams.Map[D] -} - -func (a Accumulator[D]) Store(id streams.Ident, dp D) error { - aggr, ok := a.Map.Load(id) - - // new series: initialize with current sample - if !ok { - clone := dp.Clone() - return a.Map.Store(id, clone) - } - - // drop bad samples +// AccumulateInto adds state and dp, storing the result in state +// +// state = state + dp +func AccumulateInto[P data.Point[P]](state P, dp P) error { switch { - case dp.StartTimestamp() < aggr.StartTimestamp(): + case dp.StartTimestamp() < state.StartTimestamp(): // belongs to older series - return ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()} - case dp.Timestamp() <= aggr.Timestamp(): + return ErrOlderStart{Start: state.StartTimestamp(), Sample: dp.StartTimestamp()} + case dp.Timestamp() <= state.Timestamp(): // out of order - return ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} - } - - // detect gaps - var gap error - if dp.StartTimestamp() > aggr.Timestamp() { - gap = ErrGap{From: aggr.Timestamp(), To: dp.StartTimestamp()} + return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} } - res := aggr.Add(dp) - if err := a.Map.Store(id, res); err != nil { - return err - } - return gap + state.Add(dp) + return nil } type ErrOlderStart struct { @@ -74,28 +45,3 @@ type ErrOutOfOrder struct { func (e ErrOutOfOrder) Error() string { return fmt.Sprintf("out of order: dropped sample from time=%s, because series is already at time=%s", e.Sample, e.Last) } - -type ErrGap struct { - From, To pcommon.Timestamp -} - -func (e ErrGap) Error() string { - return fmt.Sprintf("gap in stream from %s to %s. samples were likely lost in transit", e.From, e.To) -} - -// AccumulateInto adds state and dp, storing the result in state -// -// state = state + dp -func AccumulateInto[P data.Point[P]](state P, dp P) error { - switch { - case dp.StartTimestamp() < state.StartTimestamp(): - // belongs to older series - return ErrOlderStart{Start: state.StartTimestamp(), Sample: dp.StartTimestamp()} - case dp.Timestamp() <= state.Timestamp(): - // out of order - return ErrOutOfOrder{Last: state.Timestamp(), Sample: dp.Timestamp()} - } - - state.Add(dp) - return nil -} diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go deleted file mode 100644 index 4b0be3be724d..000000000000 --- a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go +++ /dev/null @@ -1,230 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package delta_test - -import ( - "fmt" - "math/rand" - "strconv" - "sync" - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/pcommon" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -var result any - -func aggr[P point[P]]() streams.Aggregator[P] { - return streams.IntoAggregator(delta.New[P]()) -} - -func BenchmarkAccumulator(b *testing.B) { - acc := aggr[data.Number]() - sum := random.Sum() - - bench := func(b *testing.B, nstreams int) { - nsamples := b.N / nstreams - - ids := make([]streams.Ident, nstreams) - dps := make([]data.Number, nstreams) - for i := 0; i < nstreams; i++ { - ids[i], dps[i] = sum.Stream() - } - - b.ResetTimer() - - var wg sync.WaitGroup - for i := 0; i < nstreams; i++ { - wg.Add(1) - go func(id streams.Ident, num data.Number) { - for n := 0; n < nsamples; n++ { - num.SetTimestamp(num.Timestamp() + 1) - val, err := acc.Aggregate(id, num) - if err != nil { - panic(err) - } - result = val - } - wg.Done() - }(ids[i], dps[i]) - } - - wg.Wait() - } - - nstreams := []int{1, 2, 10, 100, 1000} - for _, n := range nstreams { - b.Run(strconv.Itoa(n), func(b *testing.B) { - bench(b, n) - }) - } -} - -// verify the distinction between streams and the accumulated value -func TestAddition(t *testing.T) { - acc := aggr[data.Number]() - sum := random.Sum() - - type Idx int - type Stream struct { - idx Idx - id streams.Ident - dp data.Number - } - - streams := make([]Stream, 10) - for i := range streams { - id, dp := sum.Stream() - streams[i] = Stream{ - idx: Idx(i), - id: id, - dp: dp, - } - } - - want := make(map[Idx]int64) - for i := 0; i < 100; i++ { - stream := streams[rand.Intn(10)] - dp := stream.dp.Clone() - dp.SetTimestamp(dp.Timestamp() + pcommon.Timestamp(i)) - - val := int64(rand.Intn(255)) - dp.SetIntValue(val) - want[stream.idx] += val - - got, err := acc.Aggregate(stream.id, dp) - require.NoError(t, err) - - require.Equal(t, want[stream.idx], got.IntValue()) - } -} - -// verify that start + last times are updated -func TestTimes(t *testing.T) { - t.Run("sum", testTimes(random.Sum())) - t.Run("histogram", testTimes(random.Histogram())) - t.Run("exponential", testTimes(random.Exponential())) -} - -func testTimes[P point[P]](metric random.Metric[P]) func(t *testing.T) { - return func(t *testing.T) { - acc := aggr[P]() - id, base := metric.Stream() - point := func(start, last pcommon.Timestamp) P { - dp := base.Clone() - dp.SetStartTimestamp(start) - dp.SetTimestamp(last) - return dp - } - - // first sample: its the first ever, so take it as-is - { - dp := point(1000, 1000) - res, err := acc.Aggregate(id, dp) - - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1000), res.Timestamp()) - } - - // second sample: its subsequent, so keep original startTime, but update lastSeen - { - dp := point(1000, 1100) - res, err := acc.Aggregate(id, dp) - - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1100), res.Timestamp()) - } - - // third sample: its subsequent, but has a more recent startTime, which is - // PERMITTED by the spec. - // still keep original startTime, but update lastSeen. - { - dp := point(1100, 1200) - res, err := acc.Aggregate(id, dp) - - require.NoError(t, err) - require.Equal(t, time(1000), res.StartTimestamp()) - require.Equal(t, time(1200), res.Timestamp()) - } - } -} - -type point[Self any] interface { - random.Point[Self] - - SetTimestamp(pcommon.Timestamp) - SetStartTimestamp(pcommon.Timestamp) -} - -func TestErrs(t *testing.T) { - type Point struct { - Start int - Time int - Value int - } - type Case struct { - Good Point - Bad Point - Err error - } - - cases := []Case{ - { - Good: Point{Start: 1234, Time: 1337, Value: 42}, - Bad: Point{Start: 1000, Time: 2000, Value: 24}, - Err: delta.ErrOlderStart{Start: time(1234), Sample: time(1000)}, - }, - { - Good: Point{Start: 1234, Time: 1337, Value: 42}, - Bad: Point{Start: 1234, Time: 1336, Value: 24}, - Err: delta.ErrOutOfOrder{Last: time(1337), Sample: time(1336)}, - }, - } - - for _, c := range cases { - c := c - t.Run(fmt.Sprintf("%T", c.Err), func(t *testing.T) { - acc := aggr[data.Number]() - id, data := random.Sum().Stream() - - good := data.Clone() - good.SetStartTimestamp(pcommon.Timestamp(c.Good.Start)) - good.SetTimestamp(pcommon.Timestamp(c.Good.Time)) - good.SetIntValue(int64(c.Good.Value)) - - r1, err := acc.Aggregate(id, good) - require.NoError(t, err) - - require.Equal(t, good.StartTimestamp(), r1.StartTimestamp()) - require.Equal(t, good.Timestamp(), r1.Timestamp()) - require.Equal(t, good.IntValue(), r1.IntValue()) - - bad := data.Clone() - bad.SetStartTimestamp(pcommon.Timestamp(c.Bad.Start)) - bad.SetTimestamp(pcommon.Timestamp(c.Bad.Time)) - bad.SetIntValue(int64(c.Bad.Value)) - - r2, err := acc.Aggregate(id, bad) - require.ErrorIs(t, err, c.Err) - - // sample must be dropped => no change - require.Equal(t, r1.StartTimestamp(), r2.StartTimestamp()) - require.Equal(t, r1.Timestamp(), r2.Timestamp()) - require.Equal(t, r1.IntValue(), r2.IntValue()) - }) - } - -} - -func time(ts int) pcommon.Timestamp { - return pcommon.Timestamp(ts) -} diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr.go deleted file mode 100644 index 8f40b8d277b3..000000000000 --- a/processor/deltatocumulativeprocessor/internal/maybe/ptr.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -// maybe provides utilities for representing data may or may not exist at -// runtime in a safe way. -// -// A typical approach to this are pointers, but they suffer from two issues: -// - Unsafety: permitting nil pointers must require careful checking on each use, -// which is easily forgotten -// - Blindness: nil itself does cannot differentiate between "set to nil" and -// "not set all", leading to unexepcted edge cases -// -// The [Ptr] type of this package provides a safe alternative with a clear -// distinction between "not set" and "set to nil". -package maybe // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" - -// Ptr references some value of type T that is not guaranteed to exist. -// Callers must use [Ptr.Try] to access the underlying value, checking the -// ok return value too. -// This provides a clear distinction between "not set" and "set to nil". -// -// Use [Some] and [None] to create Ptrs. -type Ptr[T any] struct { - to *T - ok bool -} - -// None returns a Ptr that represents "not-set". -// This is equal to a zero-value Ptr. -func None[T any]() Ptr[T] { - return Ptr[T]{to: nil, ok: false} -} - -// Some returns a pointer to the passed T. -// -// The ptr argument may be nil, in which case this represents "explicitly set to -// nil". -func Some[T any](ptr *T) Ptr[T] { - return Ptr[T]{to: ptr, ok: true} -} - -// Try attempts to de-reference the Ptr, giving one of three results: -// -// - nil, false: not-set -// - nil, true: explicitly set to nil -// - non-nil, true: set to some value -// -// This provides extra safety over bare pointers, because callers are forced by -// the compiler to either check or explicitly ignore the ok value. -func (ptr Ptr[T]) Try() (_ *T, ok bool) { - return ptr.to, ptr.ok -} diff --git a/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go b/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go deleted file mode 100644 index c32c34e7e505..000000000000 --- a/processor/deltatocumulativeprocessor/internal/maybe/ptr_test.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package maybe_test - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/maybe" -) - -func TestMaybe(t *testing.T) { - t.Run("zero-not-ok", func(t *testing.T) { - var ptr maybe.Ptr[int] - _, ok := ptr.Try() - require.False(t, ok) - }) - t.Run("none-not-ok", func(t *testing.T) { - ptr := maybe.None[int]() - _, ok := ptr.Try() - require.False(t, ok) - }) - t.Run("explicit-nil", func(t *testing.T) { - ptr := maybe.Some[int](nil) - v, ok := ptr.Try() - require.Nil(t, v) - require.True(t, ok) - }) - t.Run("value", func(t *testing.T) { - num := 42 - ptr := maybe.Some(&num) - v, ok := ptr.Try() - require.True(t, ok) - require.Equal(t, num, *v) - }) -} - -func ExamplePtr() { - var unset maybe.Ptr[int] // = maybe.None() - if v, ok := unset.Try(); ok { - fmt.Println("unset:", v) - } else { - fmt.Println("unset: !ok") - } - - var xnil maybe.Ptr[int] = maybe.Some[int](nil) - if v, ok := xnil.Try(); ok { - fmt.Println("explicit nil:", v) - } - - num := 42 - var set maybe.Ptr[int] = maybe.Some(&num) - if v, ok := set.Try(); ok { - fmt.Println("set:", *v) - } - - // Output: - // unset: !ok - // explicit nil: - // set: 42 -} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/util.go b/processor/deltatocumulativeprocessor/internal/metrics/iter.go similarity index 73% rename from processor/deltatocumulativeprocessor/internal/metrics/util.go rename to processor/deltatocumulativeprocessor/internal/metrics/iter.go index 139986e1d536..9902d22a2eec 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/util.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/iter.go @@ -9,25 +9,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" ) -func Filter(metrics pmetric.Metrics, fn func(m Metric) bool) { - metrics.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { - rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { - sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { - return !fn(From(rm.Resource(), sm.Scope(), m)) - }) - return false - }) - return false - }) -} - -func Each(metrics pmetric.Metrics, fn func(m Metric)) { - Filter(metrics, func(m Metric) bool { - fn(m) - return true - }) -} - func All(md pmetric.Metrics) func(func(Metric) bool) { return func(yield func(Metric) bool) { var ok bool @@ -43,3 +24,15 @@ func All(md pmetric.Metrics) func(func(Metric) bool) { }) } } + +func Filter(md pmetric.Metrics, keep func(Metric) bool) { + md.ResourceMetrics().RemoveIf(func(rm pmetric.ResourceMetrics) bool { + rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool { + sm.Metrics().RemoveIf(func(m pmetric.Metric) bool { + return !keep(From(rm.Resource(), sm.Scope(), m)) + }) + return sm.Metrics().Len() == 0 + }) + return rm.ScopeMetrics().Len() == 0 + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index 74be6667524c..be5f52f86bc7 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -22,6 +22,14 @@ func (m *Metric) Ident() Ident { return identity.OfResourceMetric(m.res, m.scope, m.Metric) } +func (m *Metric) Resource() pcommon.Resource { + return m.res +} + +func (m *Metric) Scope() pcommon.InstrumentationScope { + return m.scope +} + func From(res pcommon.Resource, scope pcommon.InstrumentationScope, metric pmetric.Metric) Metric { return Metric{res: res, scope: scope, Metric: metric} } diff --git a/processor/deltatocumulativeprocessor/internal/streams/data.go b/processor/deltatocumulativeprocessor/internal/streams/data.go index 8a98fa97f919..1f70a2a1ec5d 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data.go @@ -9,21 +9,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/putil/pslice" ) -// Samples returns an Iterator over each sample of all streams in the metric -func Samples[D data.Point[D]](m metrics.Data[D]) Seq[D] { - mid := m.Ident() - - return func(yield func(Ident, D) bool) bool { - for i := 0; i < m.Len(); i++ { - dp := m.At(i) +func Datapoints[P data.Point[P], List metrics.Data[P]](dps List) func(func(identity.Stream, P) bool) { + return func(yield func(identity.Stream, P) bool) { + mid := dps.Ident() + pslice.All(dps)(func(dp P) bool { id := identity.OfStream(mid, dp) - if !yield(id, dp) { - break - } - } - return false + return yield(id, dp) + }) } } @@ -33,19 +28,25 @@ func Samples[D data.Point[D]](m metrics.Data[D]) Seq[D] { func Apply[P data.Point[P], List metrics.Filterable[P]](dps List, fn func(Ident, P) (P, error)) error { var errs error + const ( + keep = true + drop = false + ) + mid := dps.Ident() dps.Filter(func(dp P) bool { id := identity.OfStream(mid, dp) next, err := fn(id, dp) if err != nil { if !errors.Is(err, Drop) { + err = Error(id, err) errs = errors.Join(errs, err) } - return false + return drop } next.CopyTo(dp) - return true + return keep }) return errs diff --git a/processor/deltatocumulativeprocessor/internal/streams/data_test.go b/processor/deltatocumulativeprocessor/internal/streams/data_test.go index f8180713f86f..a7d221557205 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" @@ -19,77 +20,55 @@ import ( var rdp data.Number var rid streams.Ident -func BenchmarkSamples(b *testing.B) { - b.Run("iterfn", func(b *testing.B) { +func BenchmarkApply(b *testing.B) { + b.Run("fn", func(b *testing.B) { dps := generate(b.N) b.ResetTimer() - streams.Samples(dps)(func(id streams.Ident, dp data.Number) bool { - rdp = dp + i := 0 + streams.Apply(dps, func(id identity.Stream, dp data.Number) (data.Number, error) { + i++ + dp.Add(dp) rid = id - return true - }) - }) - - b.Run("iface", func(b *testing.B) { - dps := generate(b.N) - mid := dps.id.Metric() - b.ResetTimer() - - for i := 0; i < dps.Len(); i++ { - dp := dps.At(i) - rid = identity.OfStream(mid, dp) rdp = dp - } + if i%2 != 0 { + return dp, streams.Drop + } + return dp, nil + }) }) - b.Run("loop", func(b *testing.B) { + b.Run("remove-if", func(b *testing.B) { dps := generate(b.N) mid := dps.id.Metric() b.ResetTimer() - for i := range dps.dps { - dp := dps.dps[i] - rid = identity.OfStream(mid, dp) - rdp = dp - } - }) -} - -func TestAggregate(t *testing.T) { - const total = 1000 - dps := generate(total) - - // inv aggregator inverts each sample - inv := aggr(func(_ streams.Ident, n data.Number) (data.Number, error) { - dp := n.Clone() - dp.SetIntValue(-dp.IntValue()) - return dp, nil + i := 0 + dps.dps.RemoveIf(func(dp pmetric.NumberDataPoint) bool { + i++ + ndp := data.Number{NumberDataPoint: dp} + ndp.Add(ndp) + rid = identity.OfStream(mid, ndp) + rdp = ndp + return i%2 == 0 + }) }) - - err := streams.Apply(dps, inv.Aggregate) - require.NoError(t, err) - - // check that all samples are inverted - for i := 0; i < total; i++ { - require.Equal(t, int64(-i), dps.dps[i].IntValue()) - } } func TestDrop(t *testing.T) { const total = 1000 dps := generate(total) - var want []data.Number - maybe := aggr(func(_ streams.Ident, dp data.Number) (data.Number, error) { + want := pmetric.NewNumberDataPointSlice() + maybe := func(_ streams.Ident, dp data.Number) (data.Number, error) { if rand.Intn(2) == 1 { - want = append(want, dp) + dp.NumberDataPoint.CopyTo(want.AppendEmpty()) return dp, nil } return dp, streams.Drop - }) + } - err := streams.Apply(dps, maybe.Aggregate) + err := streams.Apply(dps, maybe) require.NoError(t, err) require.Equal(t, want, dps.dps) @@ -97,26 +76,26 @@ func TestDrop(t *testing.T) { func generate(n int) *Data { id, ndp := random.Sum().Stream() - dps := Data{id: id, dps: make([]data.Number, n)} - for i := range dps.dps { - dp := ndp.Clone() + dps := Data{id: id, dps: pmetric.NewNumberDataPointSlice()} + for i := 0; i < n; i++ { + dp := dps.dps.AppendEmpty() + ndp.NumberDataPoint.CopyTo(dp) dp.SetIntValue(int64(i)) - dps.dps[i] = dp } return &dps } type Data struct { id streams.Ident - dps []data.Number + dps pmetric.NumberDataPointSlice } func (l Data) At(i int) data.Number { - return l.dps[i] + return data.Number{NumberDataPoint: l.dps.At(i)} } func (l Data) Len() int { - return len(l.dps) + return l.dps.Len() } func (l Data) Ident() metrics.Ident { @@ -124,17 +103,7 @@ func (l Data) Ident() metrics.Ident { } func (l *Data) Filter(expr func(data.Number) bool) { - var next []data.Number - for _, dp := range l.dps { - if expr(dp) { - next = append(next, dp) - } - } - l.dps = next -} - -type aggr func(streams.Ident, data.Number) (data.Number, error) - -func (a aggr) Aggregate(id streams.Ident, dp data.Number) (data.Number, error) { - return a(id, dp) + l.dps.RemoveIf(func(dp pmetric.NumberDataPoint) bool { + return !expr(data.Number{NumberDataPoint: dp}) + }) } diff --git a/processor/deltatocumulativeprocessor/internal/streams/errors.go b/processor/deltatocumulativeprocessor/internal/streams/errors.go index e69827a6212c..c0638e091502 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/errors.go +++ b/processor/deltatocumulativeprocessor/internal/streams/errors.go @@ -19,3 +19,7 @@ type StreamErr struct { func (e StreamErr) Error() string { return fmt.Sprintf("%s: %s", e.Ident, e.Err) } + +func (e StreamErr) Unwrap() error { + return e.Err +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go deleted file mode 100644 index dd1d927687c9..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - -import ( - "errors" - "fmt" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" -) - -func Limit[T any](m Map[T], max int) LimitMap[T] { - return LimitMap[T]{ - Map: m, Max: max, - Evictor: EvictorFunc(func() (identity.Stream, bool) { - return identity.Stream{}, false - }), - } -} - -type LimitMap[T any] struct { - Max int - - Evictor streams.Evictor - streams.Map[T] -} - -func (m LimitMap[T]) Store(id identity.Stream, v T) error { - _, exist := m.Map.Load(id) - - var errEv error - // if not already tracked and no space: try to evict - if !exist && m.Map.Len() >= m.Max { - errl := ErrLimit(m.Max) - gone, ok := m.Evictor.Evict() - if !ok { - // if no eviction possible, fail as there is no space - return errl - } - errEv = ErrEvicted{ErrLimit: errl, Ident: gone} - } - - // there was space, or we made space: store it - if err := m.Map.Store(id, v); err != nil { - return err - } - - // we may have evicted something, let the caller know - return errEv -} - -type ErrLimit int - -func (e ErrLimit) Error() string { - return fmt.Sprintf("stream limit of %d reached", e) -} - -func AtLimit(err error) bool { - var errLimit ErrLimit - return errors.As(err, &errLimit) -} - -type ErrEvicted struct { - ErrLimit - Ident Ident -} - -func (e ErrEvicted) Error() string { - return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident) -} - -type EvictorFunc func() (identity.Stream, bool) - -func (ev EvictorFunc) Evict() (identity.Stream, bool) { - return ev() -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go deleted file mode 100644 index 380f657eb227..000000000000 --- a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package streams_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -func TestLimit(t *testing.T) { - sum := random.Sum() - - items := make(exp.HashMap[data.Number]) - lim := streams.Limit(items, 10) - - ids := make([]identity.Stream, 10) - dps := make([]data.Number, 10) - - // write until limit must work - for i := 0; i < 10; i++ { - id, dp := sum.Stream() - ids[i] = id - dps[i] = dp - err := lim.Store(id, dp) - require.NoError(t, err) - } - - // one over limit must be rejected - { - id, dp := sum.Stream() - err := lim.Store(id, dp) - want := streams.ErrLimit(10) - require.ErrorAs(t, err, &want) - require.True(t, streams.AtLimit(err)) - } - - // write to existing must work - { - err := lim.Store(ids[3], dps[3]) - require.NoError(t, err) - } - - // after removing one, must be accepted again - { - lim.Delete(ids[0]) - - id, dp := sum.Stream() - err := lim.Store(id, dp) - require.NoError(t, err) - } -} - -func TestLimitEvict(t *testing.T) { - sum := random.Sum() - evictable := make(map[identity.Stream]struct{}) - - items := make(exp.HashMap[data.Number]) - lim := streams.Limit(items, 5) - - ids := make([]identity.Stream, 10) - lim.Evictor = streams.EvictorFunc(func() (identity.Stream, bool) { - for _, id := range ids { - if _, ok := evictable[id]; ok { - delete(evictable, id) - return id, true - } - } - return identity.Stream{}, false - }) - - dps := make([]data.Number, 10) - for i := 0; i < 10; i++ { - id, dp := sum.Stream() - ids[i] = id - dps[i] = dp - } - - // store up to limit must work - for i := 0; i < 5; i++ { - err := lim.Store(ids[i], dps[i]) - require.NoError(t, err) - } - - // store beyond limit must fail - for i := 5; i < 10; i++ { - err := lim.Store(ids[i], dps[i]) - require.Equal(t, streams.ErrLimit(5), err) - } - - // put two streams up for eviction - evictable[ids[2]] = struct{}{} - evictable[ids[3]] = struct{}{} - - // while evictable do so, fail again afterwards - for i := 5; i < 10; i++ { - err := lim.Store(ids[i], dps[i]) - if i < 7 { - require.Equal(t, streams.ErrEvicted{ErrLimit: streams.ErrLimit(5), Ident: ids[i-3]}, err) - } else { - require.Equal(t, streams.ErrLimit(5), err) - } - } -} diff --git a/processor/deltatocumulativeprocessor/internal/streams/streams.go b/processor/deltatocumulativeprocessor/internal/streams/streams.go index 1b34f806b272..db3789add388 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/streams.go +++ b/processor/deltatocumulativeprocessor/internal/streams/streams.go @@ -6,32 +6,7 @@ package streams // import "github.com/open-telemetry/opentelemetry-collector-con import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" ) type Ident = identity.Stream - -type ( - Seq[T any] streams.Seq[T] - Map[T any] streams.Map[T] -) - -type Aggregator[D data.Point[D]] interface { - Aggregate(Ident, D) (D, error) -} - -func IntoAggregator[D data.Point[D]](m Map[D]) MapAggr[D] { - return MapAggr[D]{Map: m} -} - -type MapAggr[D data.Point[D]] struct { - Map[D] -} - -func (a MapAggr[D]) Aggregate(id Ident, dp D) (D, error) { - err := a.Map.Store(id, dp) - v, _ := a.Map.Load(id) - return v, err -} - -type Evictor = streams.Evictor +type Map[T any] streams.Map[T] diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 804b8812a2c0..b9b658303d99 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -67,7 +67,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro now := time.Now() var errs error - metrics.All(md)(func(m metrics.Metric) bool { + metrics.Filter(md, func(m metrics.Metric) bool { if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { return true } @@ -82,12 +82,14 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro each = use(m, p.state.expo) } + var ok bool err := each(func(id identity.Stream, aggr func() error) error { // if stream not seen before and stream limit is reached, reject if !p.state.Has(id) && p.state.Len() >= p.cfg.MaxStreams { // TODO: record metric return streams.Drop } + ok = true // stream is alive, refresh it p.stale.Refresh(now, id) @@ -97,12 +99,17 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro }) errs = errors.Join(errs, err) - return true + return ok }) if errs != nil { return errs } + + // no need to continue pipeline if we dropped all data + if md.MetricCount() == 0 { + return nil + } return p.next.ConsumeMetrics(ctx, md) } @@ -117,7 +124,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro // collected and eventually returned. [streams.Drop] silently drops a datapoint type AggrFunc func(do func(id identity.Stream, aggr func() error) error) error -// use returns a AggrFn for the given metric, accumulating into the given state, +// use returns an AggrFunc for the given metric, accumulating into the given state, // given `do` calls its `aggr` func use[T data.Typed[T], M Metric[T]](m M, state streams.Map[T]) AggrFunc { return func(do func(id identity.Stream, aggr func() error) error) error { @@ -135,31 +142,33 @@ func use[T data.Typed[T], M Metric[T]](m M, state streams.Map[T]) AggrFunc { // accumulate into previous state return delta.AccumulateInto(acc, dp) } - err := do(id, aggr) + state.Store(id, acc) return acc, err }) } } func (p *Processor) Start(_ context.Context, _ component.Host) error { - go func() { - tick := time.NewTicker(time.Minute) - defer tick.Stop() - for { - select { - case <-p.ctx.Done(): - return - case <-tick.C: - p.mtx.Lock() - stale := p.stale.Collect(p.cfg.MaxStale) - for _, id := range stale { - p.state.Delete(id) + if p.cfg.MaxStale != 0 { + go func() { + tick := time.NewTicker(time.Minute) + defer tick.Stop() + for { + select { + case <-p.ctx.Done(): + return + case <-tick.C: + p.mtx.Lock() + stale := p.stale.Collect(p.cfg.MaxStale) + for _, id := range stale { + p.state.Delete(id) + } + p.mtx.Unlock() } - p.mtx.Unlock() } - } - }() + }() + } return nil } diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go new file mode 100644 index 000000000000..ca6f1959158f --- /dev/null +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -0,0 +1,285 @@ +package deltatocumulativeprocessor_test + +import ( + "context" + "math" + "math/rand" + "strconv" + "testing" + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + self "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/datatest/compare" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" +) + +func setup(t *testing.T, cfg *self.Config) (processor.Metrics, *consumertest.MetricsSink) { + t.Helper() + + next := &consumertest.MetricsSink{} + if cfg == nil { + cfg = &self.Config{MaxStale: 0, MaxStreams: math.MaxInt} + } + + proc, err := self.NewFactory().CreateMetricsProcessor( + context.Background(), + processortest.NewNopSettings(), + cfg, + next, + ) + require.NoError(t, err) + + return proc, next +} + +// TestAccumulation verifies stream identification works correctly by writing +// 100 random dps spread across 10 different streams. +// Processor output is compared against a manual aggregation on a per-stream basis. +// +// Uses Sum datatype for testing, as we are not testing actual aggregation (see +// internal/data for tests), but proper stream separation +func TestAccumulation(t *testing.T) { + proc, sink := setup(t, nil) + + sum := random.Sum() + + // create 10 distinct streams + const N = 10 + sbs := make([]SumBuilder, N) + for i := range sbs { + _, base := sum.Stream() + sbs[i] = SumBuilder{Metric: sum, base: base} + } + + // init manual aggregation state + want := make(map[identity.Stream]data.Number) + for _, s := range sbs { + id := s.id(pmetric.AggregationTemporalityCumulative) + want[id] = s.point(0, 0, 0) + } + + for i := 0; i < 100; i++ { + s := sbs[rand.Intn(N)] + + v := int64(rand.Intn(255)) + ts := pcommon.Timestamp(i) + + // write to processor + in := s.delta(s.point(0, ts, v)) + rms := s.resourceMetrics(in) + err := proc.ConsumeMetrics(context.Background(), rms) + require.NoError(t, err) + + // aggregate manually + wantv := want[s.id(pmetric.AggregationTemporalityCumulative)] + wantv.SetIntValue(wantv.IntValue() + v) + wantv.SetTimestamp(ts) + } + + // get the final processor output for each stream + got := make(map[identity.Stream]data.Number) + for _, md := range sink.AllMetrics() { + metrics.All(md)(func(m metrics.Metric) bool { + sum := metrics.Sum(m) + streams.Datapoints(sum)(func(id identity.Stream, dp data.Number) bool { + got[id] = dp + return true + }) + return true + }) + } + + sort := cmpopts.SortMaps(func(a, b identity.Stream) bool { + return a.Hash().Sum64() < b.Hash().Sum64() + }) + if diff := compare.Diff(want, got, sort); diff != "" { + t.Fatal(diff) + } +} + +// TestTimestamp verifies timestamp handling, most notably: +// - Timestamp() keeps getting advanced +// - StartTimestamp() stays the same +func TestTimestamps(t *testing.T) { + proc, sink := setup(t, nil) + + sb := stream() + point := func(start, last pcommon.Timestamp) data.Number { + return sb.point(start, last, 0) + } + + cases := []struct { + in data.Number + out data.Number + err error + }{{ + // first: take as-is + in: point(1000, 1100), + out: point(1000, 1100), + }, { + // subsequent: take, but keep start-ts + in: point(1100, 1200), + out: point(1000, 1200), + }, { + // gap: take + in: point(1300, 1400), + out: point(1000, 1400), + }, { + // out of order + in: point(1200, 1300), + err: delta.ErrOutOfOrder{Last: 1400, Sample: 1300}, + }, { + // older start + in: point(500, 550), + err: delta.ErrOlderStart{Start: 1000, Sample: 500}, + }} + + for i, cs := range cases { + t.Run(strconv.Itoa(i), func(t *testing.T) { + sink.Reset() + + in := sb.resourceMetrics(sb.delta(cs.in)) + var want []pmetric.Metrics + if cs.out != (data.Number{}) { + want = []pmetric.Metrics{sb.resourceMetrics(sb.cumul(cs.out))} + } + + err := proc.ConsumeMetrics(context.Background(), in) + if cs.err != nil { + require.ErrorIs(t, err, cs.err) + return + } + require.NoError(t, err) + + out := sink.AllMetrics() + if diff := compare.Diff(want, out); diff != "" { + t.Fatal(diff) + } + }) + } +} + +func TestStreamLimit(t *testing.T) { + proc, sink := setup(t, &self.Config{MaxStale: 5 * time.Minute, MaxStreams: 10}) + + good := make([]SumBuilder, 10) + for i := range good { + good[i] = stream() + } + bad := stream() + _ = bad + + diff := func(want, got []pmetric.Metrics) { + t.Helper() + if diff := compare.Diff(want, got); diff != "" { + t.Fatal(diff) + } + } + + writeGood := func(ts pcommon.Timestamp) { + for i, sb := range good { + in := sb.resourceMetrics(sb.delta(sb.point(0, ts+pcommon.Timestamp(i), 0))) + want := sb.resourceMetrics(sb.cumul(sb.point(0, ts+pcommon.Timestamp(i), 0))) + + err := proc.ConsumeMetrics(context.Background(), in) + require.NoError(t, err) + + diff([]pmetric.Metrics{want}, sink.AllMetrics()) + sink.Reset() + } + } + + // write up to limit must work + writeGood(0) + + // extra stream must be dropped, nothing written + in := bad.resourceMetrics(bad.delta(bad.point(0, 0, 0))) + err := proc.ConsumeMetrics(context.Background(), in) + require.NoError(t, err) + diff([]pmetric.Metrics{}, sink.AllMetrics()) + sink.Reset() + + // writing existing streams must still work + writeGood(100) +} + +type copy interface { + CopyTo(pmetric.Metric) +} + +func (s SumBuilder) resourceMetrics(metrics ...copy) pmetric.Metrics { + md := pmetric.NewMetrics() + + rm := md.ResourceMetrics().AppendEmpty() + s.Resource().CopyTo(rm.Resource()) + + sm := rm.ScopeMetrics().AppendEmpty() + s.Scope().CopyTo(sm.Scope()) + + for _, m := range metrics { + m.CopyTo(sm.Metrics().AppendEmpty()) + } + return md +} + +type SumBuilder struct { + random.Metric[data.Number] + base data.Number +} + +func (s SumBuilder) with(dps ...data.Number) pmetric.Metric { + m := pmetric.NewMetric() + s.Metric.CopyTo(m) + + for _, dp := range dps { + dp.NumberDataPoint.CopyTo(m.Sum().DataPoints().AppendEmpty()) + } + + return m +} + +func (s SumBuilder) delta(dps ...data.Number) pmetric.Metric { + m := s.with(dps...) + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + return m +} + +func (s SumBuilder) cumul(dps ...data.Number) pmetric.Metric { + m := s.with(dps...) + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + return m +} + +func (s SumBuilder) id(temp pmetric.AggregationTemporality) identity.Stream { + m := s.with(s.base) + m.Sum().SetAggregationTemporality(temp) + + mid := identity.OfMetric(s.Ident().Scope(), m) + return identity.OfStream(mid, s.base) +} + +func (s SumBuilder) point(start, ts pcommon.Timestamp, value int64) data.Number { + dp := s.base.Clone() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(value) + return dp +} + +func stream() SumBuilder { + sum := random.Sum() + _, base := sum.Stream() + return SumBuilder{Metric: sum, base: base} +} From d08fcadf325253df9515dc4be3bf4093fbc8065c Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 20 Aug 2024 15:45:13 +0200 Subject: [PATCH 3/7] processor: public api cleanup hides types from public api that don't need to be public --- .../deltatocumulativeprocessor/processor.go | 46 ++++++++++++------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index b9b658303d99..afeebfb280ca 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -30,7 +30,7 @@ type Processor struct { next consumer.Metrics cfg Config - state Map + state state mtx sync.Mutex ctx context.Context @@ -46,7 +46,7 @@ func newProcessor(cfg *Config, tel *metadata.TelemetryBuilder, next consumer.Met proc := Processor{ next: next, cfg: *cfg, - state: Map{ + state: state{ nums: make(exp.HashMap[data.Number]), hist: make(exp.HashMap[data.Histogram]), expo: make(exp.HashMap[data.ExpHistogram]), @@ -66,13 +66,18 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro now := time.Now() + const ( + keep = true + drop = false + ) + var errs error metrics.Filter(md, func(m metrics.Metric) bool { if m.AggregationTemporality() != pmetric.AggregationTemporalityDelta { - return true + return keep } - var each AggrFunc + var each aggrFunc switch m := m.Typed().(type) { case metrics.Sum: each = use(m, p.state.nums) @@ -82,14 +87,14 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro each = use(m, p.state.expo) } - var ok bool + var do = drop err := each(func(id identity.Stream, aggr func() error) error { // if stream not seen before and stream limit is reached, reject if !p.state.Has(id) && p.state.Len() >= p.cfg.MaxStreams { // TODO: record metric return streams.Drop } - ok = true + do = keep // stream is alive, refresh it p.stale.Refresh(now, id) @@ -98,10 +103,8 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro return aggr() }) errs = errors.Join(errs, err) - - return ok + return do }) - if errs != nil { return errs } @@ -113,7 +116,7 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro return p.next.ConsumeMetrics(ctx, md) } -// AggrFunc calls `do` for datapoint of a metric, giving the caller the +// aggrFunc calls `do` for datapoint of a metric, giving the caller the // opportunity to decide whether to perform aggregation in a type-agnostic way, // while keeping underlying strong typing. // @@ -122,14 +125,17 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro // // if any error is returned, the current datapoint is dropped, the error // collected and eventually returned. [streams.Drop] silently drops a datapoint -type AggrFunc func(do func(id identity.Stream, aggr func() error) error) error +type aggrFunc func(do func(id identity.Stream, aggr func() error) error) error // use returns an AggrFunc for the given metric, accumulating into the given state, // given `do` calls its `aggr` -func use[T data.Typed[T], M Metric[T]](m M, state streams.Map[T]) AggrFunc { +func use[T data.Typed[T], M metric[T]](m M, state streams.Map[T]) aggrFunc { return func(do func(id identity.Stream, aggr func() error) error) error { return streams.Apply(m, func(id identity.Stream, dp T) (T, error) { + // load previously aggregated state for this stream acc, ok := state.Load(id) + + // to be invoked by caller if accumulation is desired aggr := func() error { m.SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) @@ -143,7 +149,11 @@ func use[T data.Typed[T], M Metric[T]](m M, state streams.Map[T]) AggrFunc { return delta.AccumulateInto(acc, dp) } err := do(id, aggr) + + // update state to possibly changed value state.Store(id, acc) + + // store new value in output metrics slice return acc, err }) } @@ -151,6 +161,7 @@ func use[T data.Typed[T], M Metric[T]](m M, state streams.Map[T]) AggrFunc { func (p *Processor) Start(_ context.Context, _ component.Host) error { if p.cfg.MaxStale != 0 { + // delete stale streams once per minute go func() { tick := time.NewTicker(time.Minute) defer tick.Stop() @@ -182,29 +193,30 @@ func (p *Processor) Capabilities() consumer.Capabilities { return consumer.Capabilities{MutatesData: true} } -type Metric[T data.Point[T]] interface { +type metric[T data.Point[T]] interface { metrics.Filterable[T] SetAggregationTemporality(pmetric.AggregationTemporality) } -type Map struct { +// state keeps a cumulative value, aggregated over time, per stream +type state struct { nums streams.Map[data.Number] hist streams.Map[data.Histogram] expo streams.Map[data.ExpHistogram] } -func (m Map) Len() int { +func (m state) Len() int { return m.nums.Len() + m.hist.Len() + m.expo.Len() } -func (m Map) Has(id identity.Stream) bool { +func (m state) Has(id identity.Stream) bool { _, nok := m.nums.Load(id) _, hok := m.hist.Load(id) _, eok := m.expo.Load(id) return nok || hok || eok } -func (m Map) Delete(id identity.Stream) { +func (m state) Delete(id identity.Stream) { m.nums.Delete(id) m.hist.Delete(id) m.expo.Delete(id) From cb7fdd5d10283d465bb3fb2a91364fe0b80714c5 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 20 Aug 2024 21:59:12 +0200 Subject: [PATCH 4/7] telemetry: simplify --- .../deltatocumulativeprocessor/config.go | 8 + .../documentation.md | 28 +-- .../deltatocumulativeprocessor/factory.go | 5 +- .../generated_component_telemetry_test.go | 7 +- .../generated_package_test.go | 3 +- .../internal/metadata/generated_config.go | 50 +++++ .../metadata/generated_config_test.go | 59 ++++++ .../internal/metadata/generated_metrics.go | 184 ++++++++++++++++++ .../metadata/generated_metrics_test.go | 106 ++++++++++ .../internal/metadata/generated_telemetry.go | 60 +++--- .../metadata/generated_telemetry_test.go | 3 +- .../internal/metadata/testdata/config.yaml | 9 + .../internal/telemetry/faults_test.go | 157 --------------- .../internal/telemetry/metrics.go | 165 ++++------------ .../deltatocumulativeprocessor/metadata.yaml | 28 +-- .../deltatocumulativeprocessor/processor.go | 27 ++- .../processor_test.go | 13 +- 17 files changed, 522 insertions(+), 390 deletions(-) create mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_config.go create mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go create mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go create mode 100644 processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml delete mode 100644 processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index b8789f290665..b4db0a7cfa1b 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -4,10 +4,12 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" import ( + "context" "fmt" "math" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" "go.opentelemetry.io/collector/component" ) @@ -37,3 +39,9 @@ func createDefaultConfig() component.Config { MaxStreams: math.MaxInt, } } + +func (c Config) Metrics(tel telemetry.Metrics) { + ctx := context.Background() + tel.DeltatocumulativeStreamsMaxStale.Record(ctx, int64(c.MaxStale.Seconds())) + tel.DeltatocumulativeStreamsLimit.Record(ctx, int64(c.MaxStreams)) +} diff --git a/processor/deltatocumulativeprocessor/documentation.md b/processor/deltatocumulativeprocessor/documentation.md index 55d85f06c764..ad507b5d9471 100644 --- a/processor/deltatocumulativeprocessor/documentation.md +++ b/processor/deltatocumulativeprocessor/documentation.md @@ -6,38 +6,14 @@ The following telemetry is emitted by this component. -### otelcol_deltatocumulative.datapoints.dropped +### otelcol_deltatocumulative.datapoints -number of datapoints dropped due to given 'reason' +total number of datapoints processed | Unit | Metric Type | Value Type | Monotonic | | ---- | ----------- | ---------- | --------- | | {datapoint} | Sum | Int | true | -### otelcol_deltatocumulative.datapoints.processed - -number of datapoints processed - -| Unit | Metric Type | Value Type | Monotonic | -| ---- | ----------- | ---------- | --------- | -| {datapoint} | Sum | Int | true | - -### otelcol_deltatocumulative.gaps.length - -total duration where data was expected but not received - -| Unit | Metric Type | Value Type | Monotonic | -| ---- | ----------- | ---------- | --------- | -| s | Sum | Int | true | - -### otelcol_deltatocumulative.streams.evicted - -number of streams evicted - -| Unit | Metric Type | Value Type | Monotonic | -| ---- | ----------- | ---------- | --------- | -| {stream} | Sum | Int | true | - ### otelcol_deltatocumulative.streams.limit upper limit of tracked streams diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index 119180dc6373..904ae1ee6827 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/processor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) func NewFactory() processor.Factory { @@ -28,10 +29,10 @@ func createMetricsProcessor(_ context.Context, set processor.Settings, cfg compo return nil, fmt.Errorf("configuration parsing error") } - telb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings) + tel, err := telemetry.New(set.TelemetrySettings) if err != nil { return nil, err } - return newProcessor(pcfg, telb, next), nil + return newProcessor(pcfg, tel, next), nil } diff --git a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go index 1ef864ce6d1a..52ad9e905c16 100644 --- a/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/generated_component_telemetry_test.go @@ -7,12 +7,13 @@ import ( "testing" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processortest" sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" ) type componentTestTelemetry struct { diff --git a/processor/deltatocumulativeprocessor/generated_package_test.go b/processor/deltatocumulativeprocessor/generated_package_test.go index d2832d4e5268..d8f0fa33d92d 100644 --- a/processor/deltatocumulativeprocessor/generated_package_test.go +++ b/processor/deltatocumulativeprocessor/generated_package_test.go @@ -3,9 +3,8 @@ package deltatocumulativeprocessor import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_config.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_config.go new file mode 100644 index 000000000000..f9341dcfd397 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_config.go @@ -0,0 +1,50 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "go.opentelemetry.io/collector/confmap" +) + +// MetricConfig provides common config for a particular metric. +type MetricConfig struct { + Enabled bool `mapstructure:"enabled"` + + enabledSetByUser bool +} + +func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + err := parser.Unmarshal(ms) + if err != nil { + return err + } + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// MetricsConfig provides config for deltatocumulative metrics. +type MetricsConfig struct { + Fixme MetricConfig `mapstructure:"fixme"` +} + +func DefaultMetricsConfig() MetricsConfig { + return MetricsConfig{ + Fixme: MetricConfig{ + Enabled: false, + }, + } +} + +// MetricsBuilderConfig is a configuration for deltatocumulative metrics builder. +type MetricsBuilderConfig struct { + Metrics MetricsConfig `mapstructure:"metrics"` +} + +func DefaultMetricsBuilderConfig() MetricsBuilderConfig { + return MetricsBuilderConfig{ + Metrics: DefaultMetricsConfig(), + } +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go new file mode 100644 index 000000000000..345dea748baf --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go @@ -0,0 +1,59 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestMetricsBuilderConfig(t *testing.T) { + tests := []struct { + name string + want MetricsBuilderConfig + }{ + { + name: "default", + want: DefaultMetricsBuilderConfig(), + }, + { + name: "all_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + Fixme: MetricConfig{Enabled: true}, + }, + }, + }, + { + name: "none_set", + want: MetricsBuilderConfig{ + Metrics: MetricsConfig{ + Fixme: MetricConfig{Enabled: false}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg := loadMetricsBuilderConfig(t, tt.name) + if diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(MetricConfig{})); diff != "" { + t.Errorf("Config mismatch (-expected +actual):\n%s", diff) + } + }) + } +} + +func loadMetricsBuilderConfig(t *testing.T, name string) MetricsBuilderConfig { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + sub, err := cm.Sub(name) + require.NoError(t, err) + cfg := DefaultMetricsBuilderConfig() + require.NoError(t, sub.Unmarshal(&cfg)) + return cfg +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go new file mode 100644 index 000000000000..9e17cbbd847a --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go @@ -0,0 +1,184 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "time" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver" +) + +type metricFixme struct { + data pmetric.Metric // data buffer for generated metric. + config MetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. +} + +// init fills fixme metric with initial data. +func (m *metricFixme) init() { + m.data.SetName("fixme") + m.data.SetDescription("bug") + m.data.SetUnit("none") + m.data.SetEmptyGauge() + m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) +} + +func (m *metricFixme) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, typeAttributeValue string) { + if !m.config.Enabled { + return + } + dp := m.data.Gauge().DataPoints().AppendEmpty() + dp.SetStartTimestamp(start) + dp.SetTimestamp(ts) + dp.SetIntValue(val) + dp.Attributes().PutStr("type", typeAttributeValue) +} + +// updateCapacity saves max length of data point slices that will be used for the slice capacity. +func (m *metricFixme) updateCapacity() { + if m.data.Gauge().DataPoints().Len() > m.capacity { + m.capacity = m.data.Gauge().DataPoints().Len() + } +} + +// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. +func (m *metricFixme) emit(metrics pmetric.MetricSlice) { + if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + m.updateCapacity() + m.data.MoveTo(metrics.AppendEmpty()) + m.init() + } +} + +func newMetricFixme(cfg MetricConfig) metricFixme { + m := metricFixme{config: cfg} + if cfg.Enabled { + m.data = pmetric.NewMetric() + m.init() + } + return m +} + +// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations +// required to produce metric representation defined in metadata and user config. +type MetricsBuilder struct { + config MetricsBuilderConfig // config of the metrics builder. + startTime pcommon.Timestamp // start time that will be applied to all recorded data points. + metricsCapacity int // maximum observed number of metrics per resource. + metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. + buildInfo component.BuildInfo // contains version information. + metricFixme metricFixme +} + +// metricBuilderOption applies changes to default metrics builder. +type metricBuilderOption func(*MetricsBuilder) + +// WithStartTime sets startTime on the metrics builder. +func WithStartTime(startTime pcommon.Timestamp) metricBuilderOption { + return func(mb *MetricsBuilder) { + mb.startTime = startTime + } +} + +func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, options ...metricBuilderOption) *MetricsBuilder { + mb := &MetricsBuilder{ + config: mbc, + startTime: pcommon.NewTimestampFromTime(time.Now()), + metricsBuffer: pmetric.NewMetrics(), + buildInfo: settings.BuildInfo, + metricFixme: newMetricFixme(mbc.Metrics.Fixme), + } + + for _, op := range options { + op(mb) + } + return mb +} + +// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. +func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { + if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { + mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() + } +} + +// ResourceMetricsOption applies changes to provided resource metrics. +type ResourceMetricsOption func(pmetric.ResourceMetrics) + +// WithResource sets the provided resource on the emitted ResourceMetrics. +// It's recommended to use ResourceBuilder to create the resource. +func WithResource(res pcommon.Resource) ResourceMetricsOption { + return func(rm pmetric.ResourceMetrics) { + res.CopyTo(rm.Resource()) + } +} + +// WithStartTimeOverride overrides start time for all the resource metrics data points. +// This option should be only used if different start time has to be set on metrics coming from different resources. +func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { + return func(rm pmetric.ResourceMetrics) { + var dps pmetric.NumberDataPointSlice + metrics := rm.ScopeMetrics().At(0).Metrics() + for i := 0; i < metrics.Len(); i++ { + switch metrics.At(i).Type() { + case pmetric.MetricTypeGauge: + dps = metrics.At(i).Gauge().DataPoints() + case pmetric.MetricTypeSum: + dps = metrics.At(i).Sum().DataPoints() + } + for j := 0; j < dps.Len(); j++ { + dps.At(j).SetStartTimestamp(start) + } + } + } +} + +// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for +// recording another set of data points as part of another resource. This function can be helpful when one scraper +// needs to emit metrics from several resources. Otherwise calling this function is not required, +// just `Emit` function can be called instead. +// Resource attributes should be provided as ResourceMetricsOption arguments. +func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { + rm := pmetric.NewResourceMetrics() + ils := rm.ScopeMetrics().AppendEmpty() + ils.Scope().SetName("github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor") + ils.Scope().SetVersion(mb.buildInfo.Version) + ils.Metrics().EnsureCapacity(mb.metricsCapacity) + mb.metricFixme.emit(ils.Metrics()) + + for _, op := range rmo { + op(rm) + } + + if ils.Metrics().Len() > 0 { + mb.updateCapacity(rm) + rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) + } +} + +// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for +// recording another set of metrics. This function will be responsible for applying all the transformations required to +// produce metric representation defined in metadata and user config, e.g. delta or cumulative. +func (mb *MetricsBuilder) Emit(rmo ...ResourceMetricsOption) pmetric.Metrics { + mb.EmitForResource(rmo...) + metrics := mb.metricsBuffer + mb.metricsBuffer = pmetric.NewMetrics() + return metrics +} + +// RecordFixmeDataPoint adds a data point to fixme metric. +func (mb *MetricsBuilder) RecordFixmeDataPoint(ts pcommon.Timestamp, val int64, typeAttributeValue string) { + mb.metricFixme.recordDataPoint(mb.startTime, ts, val, typeAttributeValue) +} + +// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, +// and metrics builder should update its startTime and reset it's internal state accordingly. +func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { + mb.startTime = pcommon.NewTimestampFromTime(time.Now()) + for _, op := range options { + op(mb) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go new file mode 100644 index 000000000000..9a424ca8b89b --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go @@ -0,0 +1,106 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" +) + +type testDataSet int + +const ( + testDataSetDefault testDataSet = iota + testDataSetAll + testDataSetNone +) + +func TestMetricsBuilder(t *testing.T) { + tests := []struct { + name string + metricsSet testDataSet + resAttrsSet testDataSet + expectEmpty bool + }{ + { + name: "default", + }, + { + name: "all_set", + metricsSet: testDataSetAll, + resAttrsSet: testDataSetAll, + }, + { + name: "none_set", + metricsSet: testDataSetNone, + resAttrsSet: testDataSetNone, + expectEmpty: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + start := pcommon.Timestamp(1_000_000_000) + ts := pcommon.Timestamp(1_000_001_000) + observedZapCore, observedLogs := observer.New(zap.WarnLevel) + settings := receivertest.NewNopSettings() + settings.Logger = zap.New(observedZapCore) + mb := NewMetricsBuilder(loadMetricsBuilderConfig(t, test.name), settings, WithStartTime(start)) + + expectedWarnings := 0 + + assert.Equal(t, expectedWarnings, observedLogs.Len()) + + defaultMetricsCount := 0 + allMetricsCount := 0 + + allMetricsCount++ + mb.RecordFixmeDataPoint(ts, 1, "type-val") + + res := pcommon.NewResource() + metrics := mb.Emit(WithResource(res)) + + if test.expectEmpty { + assert.Equal(t, 0, metrics.ResourceMetrics().Len()) + return + } + + assert.Equal(t, 1, metrics.ResourceMetrics().Len()) + rm := metrics.ResourceMetrics().At(0) + assert.Equal(t, res, rm.Resource()) + assert.Equal(t, 1, rm.ScopeMetrics().Len()) + ms := rm.ScopeMetrics().At(0).Metrics() + if test.metricsSet == testDataSetDefault { + assert.Equal(t, defaultMetricsCount, ms.Len()) + } + if test.metricsSet == testDataSetAll { + assert.Equal(t, allMetricsCount, ms.Len()) + } + validatedMetrics := make(map[string]bool) + for i := 0; i < ms.Len(); i++ { + switch ms.At(i).Name() { + case "fixme": + assert.False(t, validatedMetrics["fixme"], "Found a duplicate in the metrics slice: fixme") + validatedMetrics["fixme"] = true + assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) + assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) + assert.Equal(t, "bug", ms.At(i).Description()) + assert.Equal(t, "none", ms.At(i).Unit()) + dp := ms.At(i).Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) + assert.Equal(t, int64(1), dp.IntValue()) + attrVal, ok := dp.Attributes().Get("type") + assert.True(t, ok) + assert.EqualValues(t, "type-val", attrVal.Str()) + } + } + }) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index a0f4d22f25d4..b9f508b376fc 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -3,13 +3,15 @@ package metadata import ( + "context" "errors" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" ) func Meter(settings component.TelemetrySettings) metric.Meter { @@ -23,15 +25,13 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - meter metric.Meter - DeltatocumulativeDatapointsDropped metric.Int64Counter - DeltatocumulativeDatapointsProcessed metric.Int64Counter - DeltatocumulativeGapsLength metric.Int64Counter - DeltatocumulativeStreamsEvicted metric.Int64Counter - DeltatocumulativeStreamsLimit metric.Int64Gauge - DeltatocumulativeStreamsMaxStale metric.Int64Gauge - DeltatocumulativeStreamsTracked metric.Int64UpDownCounter - level configtelemetry.Level + meter metric.Meter + DeltatocumulativeDatapoints metric.Int64Counter + DeltatocumulativeStreamsLimit metric.Int64Gauge + DeltatocumulativeStreamsMaxStale metric.Int64Gauge + DeltatocumulativeStreamsTracked metric.Int64ObservableUpDownCounter + observeDeltatocumulativeStreamsTracked func(context.Context, metric.Observer) error + level configtelemetry.Level } // telemetryBuilderOption applies changes to default builder. @@ -44,6 +44,16 @@ func WithLevel(lvl configtelemetry.Level) telemetryBuilderOption { } } +// WithDeltatocumulativeStreamsTrackedCallback sets callback for observable DeltatocumulativeStreamsTracked metric. +func WithDeltatocumulativeStreamsTrackedCallback(cb func() int64, opts ...metric.ObserveOption) telemetryBuilderOption { + return func(builder *TelemetryBuilder) { + builder.observeDeltatocumulativeStreamsTracked = func(_ context.Context, o metric.Observer) error { + o.ObserveInt64(builder.DeltatocumulativeStreamsTracked, cb(), opts...) + return nil + } + } +} + // NewTelemetryBuilder provides a struct with methods to update all internal telemetry // for a component func NewTelemetryBuilder(settings component.TelemetrySettings, options ...telemetryBuilderOption) (*TelemetryBuilder, error) { @@ -57,30 +67,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme } else { builder.meter = noop.Meter{} } - builder.DeltatocumulativeDatapointsDropped, err = builder.meter.Int64Counter( - "otelcol_deltatocumulative.datapoints.dropped", - metric.WithDescription("number of datapoints dropped due to given 'reason'"), + builder.DeltatocumulativeDatapoints, err = builder.meter.Int64Counter( + "otelcol_deltatocumulative.datapoints", + metric.WithDescription("total number of datapoints processed"), metric.WithUnit("{datapoint}"), ) errs = errors.Join(errs, err) - builder.DeltatocumulativeDatapointsProcessed, err = builder.meter.Int64Counter( - "otelcol_deltatocumulative.datapoints.processed", - metric.WithDescription("number of datapoints processed"), - metric.WithUnit("{datapoint}"), - ) - errs = errors.Join(errs, err) - builder.DeltatocumulativeGapsLength, err = builder.meter.Int64Counter( - "otelcol_deltatocumulative.gaps.length", - metric.WithDescription("total duration where data was expected but not received"), - metric.WithUnit("s"), - ) - errs = errors.Join(errs, err) - builder.DeltatocumulativeStreamsEvicted, err = builder.meter.Int64Counter( - "otelcol_deltatocumulative.streams.evicted", - metric.WithDescription("number of streams evicted"), - metric.WithUnit("{stream}"), - ) - errs = errors.Join(errs, err) builder.DeltatocumulativeStreamsLimit, err = builder.meter.Int64Gauge( "otelcol_deltatocumulative.streams.limit", metric.WithDescription("upper limit of tracked streams"), @@ -93,11 +85,13 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme metric.WithUnit("s"), ) errs = errors.Join(errs, err) - builder.DeltatocumulativeStreamsTracked, err = builder.meter.Int64UpDownCounter( + builder.DeltatocumulativeStreamsTracked, err = builder.meter.Int64ObservableUpDownCounter( "otelcol_deltatocumulative.streams.tracked", metric.WithDescription("number of streams tracked"), metric.WithUnit("{dps}"), ) errs = errors.Join(errs, err) + _, err = builder.meter.RegisterCallback(builder.observeDeltatocumulativeStreamsTracked, builder.DeltatocumulativeStreamsTracked) + errs = errors.Join(errs, err) return &builder, errs } diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go index e6d03363adba..50d7558d885a 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry_test.go @@ -6,13 +6,14 @@ import ( "testing" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/metric" embeddedmetric "go.opentelemetry.io/otel/metric/embedded" noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" embeddedtrace "go.opentelemetry.io/otel/trace/embedded" nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" ) type mockMeter struct { diff --git a/processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml b/processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml new file mode 100644 index 000000000000..307cc8b96535 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml @@ -0,0 +1,9 @@ +default: +all_set: + metrics: + fixme: + enabled: true +none_set: + metrics: + fixme: + enabled: false diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go deleted file mode 100644 index b16d0e4183ef..000000000000 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package telemetry_test - -import ( - "testing" - - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/otel/metric/noop" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" -) - -// TestFaults verifies certain non-fatal errors are actually caused and -// subsequently dropped. It does so by writing bad samples to the actual -// implementation instead of fabricating errors manually. -func TestFaults(t *testing.T) { - type Map = streams.Map[data.Number] - type Case struct { - Name string - Map Map - Pre func(Map, identity.Stream, data.Number) error - Bad func(Map, identity.Stream, data.Number) error - Err error - Want error - } - - sum := random.Sum() - evid, evdp := sum.Stream() - - cases := []Case{ - { - Name: "older-start", - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(20)) - dp.SetTimestamp(ts(30)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(10)) - dp.SetTimestamp(ts(40)) - return dps.Store(id, dp) - }, - Err: delta.ErrOlderStart{Start: ts(20), Sample: ts(10)}, - }, - { - Name: "out-of-order", - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetTimestamp(ts(10)) - return dps.Store(id, dp) - }, - Err: delta.ErrOutOfOrder{Last: ts(20), Sample: ts(10)}, - }, - { - Name: "gap", - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(10)) - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetStartTimestamp(ts(30)) - dp.SetTimestamp(ts(40)) - return dps.Store(id, dp) - }, - Err: delta.ErrGap{From: ts(20), To: ts(30)}, - }, - { - Name: "limit", - Map: streams.Limit(delta.New[data.Number](), 1), - Pre: func(dps Map, id identity.Stream, dp data.Number) error { - dp.SetTimestamp(ts(10)) - return dps.Store(id, dp) - }, - Bad: func(dps Map, _ identity.Stream, _ data.Number) error { - id, dp := sum.Stream() - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Err: streams.ErrLimit(1), - Want: streams.Drop, // we can't ignore being at limit, we need to drop the entire stream for this request - }, - { - Name: "evict", - Map: func() Map { - ev := HeadEvictor[data.Number]{Map: delta.New[data.Number]()} - lim := streams.Limit(ev, 1) - lim.Evictor = ev - return lim - }(), - Pre: func(dps Map, _ identity.Stream, _ data.Number) error { - evdp.SetTimestamp(ts(10)) - return dps.Store(evid, evdp) - }, - Bad: func(dps Map, _ identity.Stream, _ data.Number) error { - id, dp := sum.Stream() - dp.SetTimestamp(ts(20)) - return dps.Store(id, dp) - }, - Err: streams.ErrEvicted{Ident: evid, ErrLimit: streams.ErrLimit(1)}, - }, - } - - telb, err := metadata.NewTelemetryBuilder(component.TelemetrySettings{MeterProvider: noop.NewMeterProvider()}) - require.NoError(t, err) - - for _, c := range cases { - t.Run(c.Name, func(t *testing.T) { - id, dp := sum.Stream() - tel := telemetry.New(telb) - - dps := c.Map - if dps == nil { - dps = delta.New[data.Number]() - } - onf := telemetry.ObserveNonFatal(dps, &tel.Metrics) - - if c.Pre != nil { - err := c.Pre(onf, id, dp.Clone()) - require.NoError(t, err) - } - - err := c.Bad(dps, id, dp.Clone()) - require.Equal(t, c.Err, err) - - err = c.Bad(onf, id, dp.Clone()) - require.Equal(t, c.Want, err) - }) - } -} - -type ts = pcommon.Timestamp - -// HeadEvictor drops the first stream on Evict() -type HeadEvictor[T any] struct{ streams.Map[T] } - -func (e HeadEvictor[T]) Evict() (evicted identity.Stream, ok bool) { - e.Items()(func(id identity.Stream, _ T) bool { - e.Delete(id) - evicted = id - return false - }) - return evicted, true -} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index cbf52c09ff94..d5a89da5899b 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -1,159 +1,66 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package telemetry // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" +package telemetry import ( "context" "errors" - "time" + "reflect" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" ) -type Telemetry struct { - Metrics -} +func New(set component.TelemetrySettings) (Metrics, error) { + m := Metrics{ + tracked: func() int { return 0 }, + } -func New(telb *metadata.TelemetryBuilder) Telemetry { - return Telemetry{Metrics: Metrics{ - streams: Streams{ - tracked: telb.DeltatocumulativeStreamsTracked, - limit: telb.DeltatocumulativeStreamsLimit, - evicted: telb.DeltatocumulativeStreamsEvicted, - stale: telb.DeltatocumulativeStreamsMaxStale, - }, - dps: Datapoints{ - total: telb.DeltatocumulativeDatapointsProcessed, - dropped: telb.DeltatocumulativeDatapointsDropped, - }, - gaps: telb.DeltatocumulativeGapsLength, - }} -} + trackedCb := metadata.WithDeltatocumulativeStreamsTrackedCallback(func() int64 { + return int64(m.tracked()) + }) -type Streams struct { - tracked metric.Int64UpDownCounter - limit metric.Int64Gauge - evicted metric.Int64Counter - stale metric.Int64Gauge -} + telb, err := metadata.NewTelemetryBuilder(set, trackedCb) + if err != nil { + return Metrics{}, err + } + m.TelemetryBuilder = *telb -type Datapoints struct { - total metric.Int64Counter - dropped metric.Int64Counter + return m, nil } type Metrics struct { - streams Streams - dps Datapoints - - gaps metric.Int64Counter -} - -func (tel Telemetry) WithLimit(max int64) { - tel.streams.limit.Record(context.Background(), max) -} - -func (tel Telemetry) WithStale(max time.Duration) { - tel.streams.stale.Record(context.Background(), int64(max.Seconds())) -} - -func ObserveItems[T any](items streams.Map[T], metrics *Metrics) Items[T] { - return Items[T]{ - Map: items, - Metrics: metrics, - } -} - -func ObserveNonFatal[T any](items streams.Map[T], metrics *Metrics) Faults[T] { - return Faults[T]{ - Map: items, - Metrics: metrics, - } -} + metadata.TelemetryBuilder -type Items[T any] struct { - streams.Map[T] - *Metrics + tracked func() int } -func (i Items[T]) Store(id streams.Ident, v T) error { - inc(i.dps.total) - - _, old := i.Map.Load(id) - err := i.Map.Store(id, v) - if err == nil && !old { - inc(i.streams.tracked) - } - - return err +func (m Metrics) Datapoints() Counter { + return Counter{Int64Counter: m.DeltatocumulativeDatapoints} } -func (i Items[T]) Delete(id streams.Ident) { - dec(i.streams.tracked) - i.Map.Delete(id) +func (m *Metrics) WithTracked(streams func() int) { + m.tracked = streams } -type Faults[T any] struct { - streams.Map[T] - *Metrics +func Error(msg string) attribute.KeyValue { + return attribute.String("error", msg) } -func (f Faults[T]) Store(id streams.Ident, v T) error { - var ( - olderStart delta.ErrOlderStart - outOfOrder delta.ErrOutOfOrder - gap delta.ErrGap - limit streams.ErrLimit - evict streams.ErrEvicted - ) - - err := f.Map.Store(id, v) - switch { - default: - return err - case errors.As(err, &olderStart): - inc(f.dps.dropped, reason("older-start")) - case errors.As(err, &outOfOrder): - inc(f.dps.dropped, reason("out-of-order")) - case errors.As(err, &limit): - inc(f.dps.dropped, reason("stream-limit")) - // no space to store stream, drop it instead of failing silently - return streams.Drop - case errors.As(err, &evict): - inc(f.streams.evicted) - case errors.As(err, &gap): - from := gap.From.AsTime() - to := gap.To.AsTime() - lost := to.Sub(from).Seconds() - f.gaps.Add(context.TODO(), int64(lost)) +func Cause(err error) attribute.KeyValue { + for { + uw := errors.Unwrap(err) + if uw == nil { + break + } + err = uw } - return nil -} - -var ( - _ streams.Map[any] = (*Items[any])(nil) - _ streams.Map[any] = (*Faults[any])(nil) -) - -type addable[Opts any] interface { - Add(context.Context, int64, ...Opts) -} - -func inc[A addable[O], O any](a A, opts ...O) { - a.Add(context.Background(), 1, opts...) + return Error(reflect.TypeOf(err).String()) } -func dec[A addable[O], O any](a A, opts ...O) { - a.Add(context.Background(), -1, opts...) -} +type Counter struct{ metric.Int64Counter } -func reason(reason string) metric.AddOption { - return metric.WithAttributes(attribute.String("reason", reason)) +func (c Counter) Inc(ctx context.Context, attrs ...attribute.KeyValue) { + c.Add(ctx, 1, metric.WithAttributes(attrs...)) } diff --git a/processor/deltatocumulativeprocessor/metadata.yaml b/processor/deltatocumulativeprocessor/metadata.yaml index 552c812e1f15..f02d06064622 100644 --- a/processor/deltatocumulativeprocessor/metadata.yaml +++ b/processor/deltatocumulativeprocessor/metadata.yaml @@ -9,7 +9,6 @@ status: codeowners: active: [sh0rez, RichieSams, jpkrohling] - telemetry: metrics: # streams @@ -19,20 +18,15 @@ telemetry: sum: value_type: int monotonic: false + async: true enabled: true deltatocumulative.streams.limit: + attributes: [foo] description: upper limit of tracked streams unit: "{stream}" gauge: value_type: int enabled: true - deltatocumulative.streams.evicted: - description: number of streams evicted - unit: "{stream}" - sum: - value_type: int - monotonic: true - enabled: true deltatocumulative.streams.max_stale: description: duration after which streams inactive streams are dropped unit: "s" @@ -40,24 +34,10 @@ telemetry: value_type: int enabled: true # datapoints - deltatocumulative.datapoints.processed: - description: number of datapoints processed - unit: "{datapoint}" - sum: - value_type: int - monotonic: true - enabled: true - deltatocumulative.datapoints.dropped: - description: number of datapoints dropped due to given 'reason' + deltatocumulative.datapoints: + description: total number of datapoints processed. may have 'error' attribute, if processing failed unit: "{datapoint}" sum: value_type: int monotonic: true enabled: true - deltatocumulative.gaps.length: - description: total duration where data was expected but not received - unit: "s" - sum: - value_type: int - monotonic: true - enabled: true diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index afeebfb280ca..2dd935c2686f 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -19,9 +19,9 @@ import ( exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/delta" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) var _ processor.Metrics = (*Processor)(nil) @@ -37,10 +37,10 @@ type Processor struct { cancel context.CancelFunc stale staleness.Tracker - tel metadata.TelemetryBuilder + tel telemetry.Metrics } -func newProcessor(cfg *Config, tel *metadata.TelemetryBuilder, next consumer.Metrics) *Processor { +func newProcessor(cfg *Config, tel telemetry.Metrics, next consumer.Metrics) *Processor { ctx, cancel := context.WithCancel(context.Background()) proc := Processor{ @@ -55,8 +55,12 @@ func newProcessor(cfg *Config, tel *metadata.TelemetryBuilder, next consumer.Met cancel: cancel, stale: staleness.NewTracker(), - tel: *tel, + tel: tel, } + + tel.WithTracked(proc.state.Len) + cfg.Metrics(tel) + return &proc } @@ -87,20 +91,29 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro each = use(m, p.state.expo) } + // do signals whether to drop or keep the entire metric. + // if any single dp is accumulated, it must be kept. if not, it no + // longer has datapoints and can be dropped var do = drop err := each(func(id identity.Stream, aggr func() error) error { // if stream not seen before and stream limit is reached, reject if !p.state.Has(id) && p.state.Len() >= p.cfg.MaxStreams { - // TODO: record metric + p.tel.Datapoints().Inc(ctx, telemetry.Error("limit")) return streams.Drop } - do = keep // stream is alive, refresh it p.stale.Refresh(now, id) // accumulate current dp into state - return aggr() + if err := aggr(); err != nil { + p.tel.Datapoints().Inc(ctx, telemetry.Cause(err)) + return err + } + + do = keep + p.tel.Datapoints().Inc(ctx) + return nil }) errs = errors.Join(errs, err) return do diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go index ca6f1959158f..692e54977626 100644 --- a/processor/deltatocumulativeprocessor/processor_test.go +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -9,6 +9,13 @@ import ( "time" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processortest" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" self "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" @@ -17,12 +24,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metrics" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/consumer/consumertest" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/processor" - "go.opentelemetry.io/collector/processor/processortest" ) func setup(t *testing.T, cfg *self.Config) (processor.Metrics, *consumertest.MetricsSink) { From 86ef0568a0f71ebefcfeddc6b8c4e43cde56e62a Mon Sep 17 00:00:00 2001 From: sh0rez Date: Tue, 20 Aug 2024 22:05:25 +0200 Subject: [PATCH 5/7] internal/metadata: remove old generated files --- .../deltatocumulativeprocessor/config.go | 3 +- .../documentation.md | 2 +- .../generated_package_test.go | 3 +- .../internal/metadata/generated_config.go | 50 ----- .../metadata/generated_config_test.go | 59 ------ .../internal/metadata/generated_metrics.go | 184 ------------------ .../metadata/generated_metrics_test.go | 106 ---------- .../internal/metadata/generated_telemetry.go | 2 +- .../internal/metadata/testdata/config.yaml | 9 - .../internal/telemetry/metrics.go | 3 +- 10 files changed, 8 insertions(+), 413 deletions(-) delete mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_config.go delete mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go delete mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go delete mode 100644 processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go delete mode 100644 processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index b4db0a7cfa1b..0a1f91f5ae06 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -9,8 +9,9 @@ import ( "math" "time" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" "go.opentelemetry.io/collector/component" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/telemetry" ) var _ component.ConfigValidator = (*Config)(nil) diff --git a/processor/deltatocumulativeprocessor/documentation.md b/processor/deltatocumulativeprocessor/documentation.md index ad507b5d9471..325a5e61f077 100644 --- a/processor/deltatocumulativeprocessor/documentation.md +++ b/processor/deltatocumulativeprocessor/documentation.md @@ -8,7 +8,7 @@ The following telemetry is emitted by this component. ### otelcol_deltatocumulative.datapoints -total number of datapoints processed +total number of datapoints processed. may have 'error' attribute, if processing failed | Unit | Metric Type | Value Type | Monotonic | | ---- | ----------- | ---------- | --------- | diff --git a/processor/deltatocumulativeprocessor/generated_package_test.go b/processor/deltatocumulativeprocessor/generated_package_test.go index d8f0fa33d92d..d2832d4e5268 100644 --- a/processor/deltatocumulativeprocessor/generated_package_test.go +++ b/processor/deltatocumulativeprocessor/generated_package_test.go @@ -3,8 +3,9 @@ package deltatocumulativeprocessor import ( - "go.uber.org/goleak" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_config.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_config.go deleted file mode 100644 index f9341dcfd397..000000000000 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_config.go +++ /dev/null @@ -1,50 +0,0 @@ -// Code generated by mdatagen. DO NOT EDIT. - -package metadata - -import ( - "go.opentelemetry.io/collector/confmap" -) - -// MetricConfig provides common config for a particular metric. -type MetricConfig struct { - Enabled bool `mapstructure:"enabled"` - - enabledSetByUser bool -} - -func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { - if parser == nil { - return nil - } - err := parser.Unmarshal(ms) - if err != nil { - return err - } - ms.enabledSetByUser = parser.IsSet("enabled") - return nil -} - -// MetricsConfig provides config for deltatocumulative metrics. -type MetricsConfig struct { - Fixme MetricConfig `mapstructure:"fixme"` -} - -func DefaultMetricsConfig() MetricsConfig { - return MetricsConfig{ - Fixme: MetricConfig{ - Enabled: false, - }, - } -} - -// MetricsBuilderConfig is a configuration for deltatocumulative metrics builder. -type MetricsBuilderConfig struct { - Metrics MetricsConfig `mapstructure:"metrics"` -} - -func DefaultMetricsBuilderConfig() MetricsBuilderConfig { - return MetricsBuilderConfig{ - Metrics: DefaultMetricsConfig(), - } -} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go deleted file mode 100644 index 345dea748baf..000000000000 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_config_test.go +++ /dev/null @@ -1,59 +0,0 @@ -// Code generated by mdatagen. DO NOT EDIT. - -package metadata - -import ( - "path/filepath" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/confmap/confmaptest" -) - -func TestMetricsBuilderConfig(t *testing.T) { - tests := []struct { - name string - want MetricsBuilderConfig - }{ - { - name: "default", - want: DefaultMetricsBuilderConfig(), - }, - { - name: "all_set", - want: MetricsBuilderConfig{ - Metrics: MetricsConfig{ - Fixme: MetricConfig{Enabled: true}, - }, - }, - }, - { - name: "none_set", - want: MetricsBuilderConfig{ - Metrics: MetricsConfig{ - Fixme: MetricConfig{Enabled: false}, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := loadMetricsBuilderConfig(t, tt.name) - if diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(MetricConfig{})); diff != "" { - t.Errorf("Config mismatch (-expected +actual):\n%s", diff) - } - }) - } -} - -func loadMetricsBuilderConfig(t *testing.T, name string) MetricsBuilderConfig { - cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) - require.NoError(t, err) - sub, err := cm.Sub(name) - require.NoError(t, err) - cfg := DefaultMetricsBuilderConfig() - require.NoError(t, sub.Unmarshal(&cfg)) - return cfg -} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go deleted file mode 100644 index 9e17cbbd847a..000000000000 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics.go +++ /dev/null @@ -1,184 +0,0 @@ -// Code generated by mdatagen. DO NOT EDIT. - -package metadata - -import ( - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver" -) - -type metricFixme struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. -} - -// init fills fixme metric with initial data. -func (m *metricFixme) init() { - m.data.SetName("fixme") - m.data.SetDescription("bug") - m.data.SetUnit("none") - m.data.SetEmptyGauge() - m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) -} - -func (m *metricFixme) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, typeAttributeValue string) { - if !m.config.Enabled { - return - } - dp := m.data.Gauge().DataPoints().AppendEmpty() - dp.SetStartTimestamp(start) - dp.SetTimestamp(ts) - dp.SetIntValue(val) - dp.Attributes().PutStr("type", typeAttributeValue) -} - -// updateCapacity saves max length of data point slices that will be used for the slice capacity. -func (m *metricFixme) updateCapacity() { - if m.data.Gauge().DataPoints().Len() > m.capacity { - m.capacity = m.data.Gauge().DataPoints().Len() - } -} - -// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. -func (m *metricFixme) emit(metrics pmetric.MetricSlice) { - if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { - m.updateCapacity() - m.data.MoveTo(metrics.AppendEmpty()) - m.init() - } -} - -func newMetricFixme(cfg MetricConfig) metricFixme { - m := metricFixme{config: cfg} - if cfg.Enabled { - m.data = pmetric.NewMetric() - m.init() - } - return m -} - -// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations -// required to produce metric representation defined in metadata and user config. -type MetricsBuilder struct { - config MetricsBuilderConfig // config of the metrics builder. - startTime pcommon.Timestamp // start time that will be applied to all recorded data points. - metricsCapacity int // maximum observed number of metrics per resource. - metricsBuffer pmetric.Metrics // accumulates metrics data before emitting. - buildInfo component.BuildInfo // contains version information. - metricFixme metricFixme -} - -// metricBuilderOption applies changes to default metrics builder. -type metricBuilderOption func(*MetricsBuilder) - -// WithStartTime sets startTime on the metrics builder. -func WithStartTime(startTime pcommon.Timestamp) metricBuilderOption { - return func(mb *MetricsBuilder) { - mb.startTime = startTime - } -} - -func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, options ...metricBuilderOption) *MetricsBuilder { - mb := &MetricsBuilder{ - config: mbc, - startTime: pcommon.NewTimestampFromTime(time.Now()), - metricsBuffer: pmetric.NewMetrics(), - buildInfo: settings.BuildInfo, - metricFixme: newMetricFixme(mbc.Metrics.Fixme), - } - - for _, op := range options { - op(mb) - } - return mb -} - -// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity. -func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { - if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() { - mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len() - } -} - -// ResourceMetricsOption applies changes to provided resource metrics. -type ResourceMetricsOption func(pmetric.ResourceMetrics) - -// WithResource sets the provided resource on the emitted ResourceMetrics. -// It's recommended to use ResourceBuilder to create the resource. -func WithResource(res pcommon.Resource) ResourceMetricsOption { - return func(rm pmetric.ResourceMetrics) { - res.CopyTo(rm.Resource()) - } -} - -// WithStartTimeOverride overrides start time for all the resource metrics data points. -// This option should be only used if different start time has to be set on metrics coming from different resources. -func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption { - return func(rm pmetric.ResourceMetrics) { - var dps pmetric.NumberDataPointSlice - metrics := rm.ScopeMetrics().At(0).Metrics() - for i := 0; i < metrics.Len(); i++ { - switch metrics.At(i).Type() { - case pmetric.MetricTypeGauge: - dps = metrics.At(i).Gauge().DataPoints() - case pmetric.MetricTypeSum: - dps = metrics.At(i).Sum().DataPoints() - } - for j := 0; j < dps.Len(); j++ { - dps.At(j).SetStartTimestamp(start) - } - } - } -} - -// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for -// recording another set of data points as part of another resource. This function can be helpful when one scraper -// needs to emit metrics from several resources. Otherwise calling this function is not required, -// just `Emit` function can be called instead. -// Resource attributes should be provided as ResourceMetricsOption arguments. -func (mb *MetricsBuilder) EmitForResource(rmo ...ResourceMetricsOption) { - rm := pmetric.NewResourceMetrics() - ils := rm.ScopeMetrics().AppendEmpty() - ils.Scope().SetName("github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor") - ils.Scope().SetVersion(mb.buildInfo.Version) - ils.Metrics().EnsureCapacity(mb.metricsCapacity) - mb.metricFixme.emit(ils.Metrics()) - - for _, op := range rmo { - op(rm) - } - - if ils.Metrics().Len() > 0 { - mb.updateCapacity(rm) - rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty()) - } -} - -// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for -// recording another set of metrics. This function will be responsible for applying all the transformations required to -// produce metric representation defined in metadata and user config, e.g. delta or cumulative. -func (mb *MetricsBuilder) Emit(rmo ...ResourceMetricsOption) pmetric.Metrics { - mb.EmitForResource(rmo...) - metrics := mb.metricsBuffer - mb.metricsBuffer = pmetric.NewMetrics() - return metrics -} - -// RecordFixmeDataPoint adds a data point to fixme metric. -func (mb *MetricsBuilder) RecordFixmeDataPoint(ts pcommon.Timestamp, val int64, typeAttributeValue string) { - mb.metricFixme.recordDataPoint(mb.startTime, ts, val, typeAttributeValue) -} - -// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted, -// and metrics builder should update its startTime and reset it's internal state accordingly. -func (mb *MetricsBuilder) Reset(options ...metricBuilderOption) { - mb.startTime = pcommon.NewTimestampFromTime(time.Now()) - for _, op := range options { - op(mb) - } -} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go deleted file mode 100644 index 9a424ca8b89b..000000000000 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_metrics_test.go +++ /dev/null @@ -1,106 +0,0 @@ -// Code generated by mdatagen. DO NOT EDIT. - -package metadata - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/pdata/pcommon" - "go.opentelemetry.io/collector/pdata/pmetric" - "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" -) - -type testDataSet int - -const ( - testDataSetDefault testDataSet = iota - testDataSetAll - testDataSetNone -) - -func TestMetricsBuilder(t *testing.T) { - tests := []struct { - name string - metricsSet testDataSet - resAttrsSet testDataSet - expectEmpty bool - }{ - { - name: "default", - }, - { - name: "all_set", - metricsSet: testDataSetAll, - resAttrsSet: testDataSetAll, - }, - { - name: "none_set", - metricsSet: testDataSetNone, - resAttrsSet: testDataSetNone, - expectEmpty: true, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - start := pcommon.Timestamp(1_000_000_000) - ts := pcommon.Timestamp(1_000_001_000) - observedZapCore, observedLogs := observer.New(zap.WarnLevel) - settings := receivertest.NewNopSettings() - settings.Logger = zap.New(observedZapCore) - mb := NewMetricsBuilder(loadMetricsBuilderConfig(t, test.name), settings, WithStartTime(start)) - - expectedWarnings := 0 - - assert.Equal(t, expectedWarnings, observedLogs.Len()) - - defaultMetricsCount := 0 - allMetricsCount := 0 - - allMetricsCount++ - mb.RecordFixmeDataPoint(ts, 1, "type-val") - - res := pcommon.NewResource() - metrics := mb.Emit(WithResource(res)) - - if test.expectEmpty { - assert.Equal(t, 0, metrics.ResourceMetrics().Len()) - return - } - - assert.Equal(t, 1, metrics.ResourceMetrics().Len()) - rm := metrics.ResourceMetrics().At(0) - assert.Equal(t, res, rm.Resource()) - assert.Equal(t, 1, rm.ScopeMetrics().Len()) - ms := rm.ScopeMetrics().At(0).Metrics() - if test.metricsSet == testDataSetDefault { - assert.Equal(t, defaultMetricsCount, ms.Len()) - } - if test.metricsSet == testDataSetAll { - assert.Equal(t, allMetricsCount, ms.Len()) - } - validatedMetrics := make(map[string]bool) - for i := 0; i < ms.Len(); i++ { - switch ms.At(i).Name() { - case "fixme": - assert.False(t, validatedMetrics["fixme"], "Found a duplicate in the metrics slice: fixme") - validatedMetrics["fixme"] = true - assert.Equal(t, pmetric.MetricTypeGauge, ms.At(i).Type()) - assert.Equal(t, 1, ms.At(i).Gauge().DataPoints().Len()) - assert.Equal(t, "bug", ms.At(i).Description()) - assert.Equal(t, "none", ms.At(i).Unit()) - dp := ms.At(i).Gauge().DataPoints().At(0) - assert.Equal(t, start, dp.StartTimestamp()) - assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) - assert.Equal(t, int64(1), dp.IntValue()) - attrVal, ok := dp.Attributes().Get("type") - assert.True(t, ok) - assert.EqualValues(t, "type-val", attrVal.Str()) - } - } - }) - } -} diff --git a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go index b9f508b376fc..b96f8b14a420 100644 --- a/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go +++ b/processor/deltatocumulativeprocessor/internal/metadata/generated_telemetry.go @@ -69,7 +69,7 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...teleme } builder.DeltatocumulativeDatapoints, err = builder.meter.Int64Counter( "otelcol_deltatocumulative.datapoints", - metric.WithDescription("total number of datapoints processed"), + metric.WithDescription("total number of datapoints processed. may have 'error' attribute, if processing failed"), metric.WithUnit("{datapoint}"), ) errs = errors.Join(errs, err) diff --git a/processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml b/processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml deleted file mode 100644 index 307cc8b96535..000000000000 --- a/processor/deltatocumulativeprocessor/internal/metadata/testdata/config.yaml +++ /dev/null @@ -1,9 +0,0 @@ -default: -all_set: - metrics: - fixme: - enabled: true -none_set: - metrics: - fixme: - enabled: false diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index d5a89da5899b..701c9397cd22 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -5,10 +5,11 @@ import ( "errors" "reflect" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/metadata" ) func New(set component.TelemetrySettings) (Metrics, error) { From bc891fdc7e1b0bcf85f597dd8a15722f6fa6c610 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Fri, 23 Aug 2024 11:16:12 +0200 Subject: [PATCH 6/7] internal/metrics: Summary adds summary datatype to make type switch exhaustive --- .../internal/data/add.go | 4 +++ .../internal/data/data.go | 25 ++++++++++++++++++- .../internal/metrics/data.go | 21 ++++++++++++++++ .../internal/metrics/metrics.go | 2 ++ 4 files changed, 51 insertions(+), 1 deletion(-) diff --git a/processor/deltatocumulativeprocessor/internal/data/add.go b/processor/deltatocumulativeprocessor/internal/data/add.go index 597f918243d9..33c2f283c840 100644 --- a/processor/deltatocumulativeprocessor/internal/data/add.go +++ b/processor/deltatocumulativeprocessor/internal/data/add.go @@ -108,3 +108,7 @@ func (dp ExpHistogram) Add(in ExpHistogram) ExpHistogram { return dp } + +func (dp Summary) Add(Summary) Summary { + panic("todo") +} diff --git a/processor/deltatocumulativeprocessor/internal/data/data.go b/processor/deltatocumulativeprocessor/internal/data/data.go index e6f7551fd1c2..2460af09c1b8 100644 --- a/processor/deltatocumulativeprocessor/internal/data/data.go +++ b/processor/deltatocumulativeprocessor/internal/data/data.go @@ -10,6 +10,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" ) +var ( + _ Point[Number] = Number{} + _ Point[Histogram] = Histogram{} + _ Point[ExpHistogram] = ExpHistogram{} + _ Point[Summary] = Summary{} +) + type Point[Self any] interface { StartTimestamp() pcommon.Timestamp Timestamp() pcommon.Timestamp @@ -23,7 +30,7 @@ type Point[Self any] interface { type Typed[Self any] interface { Point[Self] - Number | Histogram | ExpHistogram + Number | Histogram | ExpHistogram | Summary } type Number struct { @@ -94,3 +101,19 @@ var ( _ = mustPoint[Histogram]{} _ = mustPoint[ExpHistogram]{} ) + +type Summary struct { + pmetric.SummaryDataPoint +} + +func (dp Summary) Clone() Summary { + clone := Summary{SummaryDataPoint: pmetric.NewSummaryDataPoint()} + if dp.SummaryDataPoint != (pmetric.SummaryDataPoint{}) { + dp.CopyTo(clone) + } + return clone +} + +func (dp Summary) CopyTo(dst Summary) { + dp.SummaryDataPoint.CopyTo(dst.SummaryDataPoint) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/data.go b/processor/deltatocumulativeprocessor/internal/metrics/data.go index 3c417ec06418..9fa1df07eb1d 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/data.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/data.go @@ -115,3 +115,24 @@ func (s Gauge) Filter(expr func(data.Number) bool) { return !expr(data.Number{NumberDataPoint: dp}) }) } + +type Summary Metric + +func (s Summary) At(i int) data.Summary { + dp := Metric(s).Summary().DataPoints().At(i) + return data.Summary{SummaryDataPoint: dp} +} + +func (s Summary) Len() int { + return Metric(s).Summary().DataPoints().Len() +} + +func (s Summary) Ident() Ident { + return (*Metric)(&s).Ident() +} + +func (s Summary) Filter(expr func(data.Summary) bool) { + s.Summary().DataPoints().RemoveIf(func(dp pmetric.SummaryDataPoint) bool { + return !expr(data.Summary{SummaryDataPoint: dp}) + }) +} diff --git a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go index be5f52f86bc7..98388dbf5eb6 100644 --- a/processor/deltatocumulativeprocessor/internal/metrics/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/metrics/metrics.go @@ -58,6 +58,8 @@ func (m Metric) Typed() any { return ExpHistogram(m) case pmetric.MetricTypeHistogram: return Histogram(m) + case pmetric.MetricTypeSummary: + return Summary(m) } panic("unreachable") } From d182712d94dc17e6bc0411e9176e1b74d84f0f47 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Fri, 23 Aug 2024 11:18:15 +0200 Subject: [PATCH 7/7] *: linter fixes --- .../internal/data/expo/expotest/bins.go | 1 + .../internal/data/expo/expotest/histogram.go | 1 + .../deltatocumulativeprocessor/internal/data/expo/merge.go | 1 + .../deltatocumulativeprocessor/internal/data/expo/scale.go | 2 +- .../deltatocumulativeprocessor/internal/data/expo/zero.go | 3 ++- .../internal/streams/data_test.go | 2 +- .../internal/telemetry/metrics.go | 3 +++ processor/deltatocumulativeprocessor/processor.go | 3 ++- processor/deltatocumulativeprocessor/processor_test.go | 7 +++++-- 9 files changed, 17 insertions(+), 6 deletions(-) diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go index 13b4ce74c928..2e5695e22a2a 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/bins.go @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828 package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" import ( diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go index 141dad724d82..a8a0db4de54e 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/expotest/histogram.go @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828 package expotest // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo/expotest" import ( diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/merge.go b/processor/deltatocumulativeprocessor/internal/data/expo/merge.go index 150e29a65819..376f6718a97b 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/merge.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/merge.go @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828 package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" import ( diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/scale.go b/processor/deltatocumulativeprocessor/internal/data/expo/scale.go index 5201806fb82f..51f2f7fbdc06 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/scale.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/scale.go @@ -29,7 +29,7 @@ func (scale Scale) Idx(v float64) int { // This means a value min < v <= max belongs to this bucket. // // NOTE: this is different from Go slice intervals, which are [a,b) -func (scale Scale) Bounds(index int) (min, max float64) { +func (scale Scale) Bounds(index int) (min, max float64) { //nolint: predeclared // from: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#all-scales-use-the-logarithm-function lower := func(index int) float64 { inverseFactor := math.Ldexp(math.Ln2, int(-scale)) diff --git a/processor/deltatocumulativeprocessor/internal/data/expo/zero.go b/processor/deltatocumulativeprocessor/internal/data/expo/zero.go index 2d5401b39f5c..d1c7ebd00840 100644 --- a/processor/deltatocumulativeprocessor/internal/data/expo/zero.go +++ b/processor/deltatocumulativeprocessor/internal/data/expo/zero.go @@ -1,6 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 +//nolint:gosec // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/34828 package expo // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data/expo" import ( @@ -37,7 +38,7 @@ func WidenZero(dp DataPoint, width float64) { widen(dp.Positive()) widen(dp.Negative()) - _, max := scale.Bounds(zero) + _, max := scale.Bounds(zero) //nolint: predeclared dp.SetZeroThreshold(max) } diff --git a/processor/deltatocumulativeprocessor/internal/streams/data_test.go b/processor/deltatocumulativeprocessor/internal/streams/data_test.go index a7d221557205..a5e2ca46c490 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/data_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/data_test.go @@ -26,7 +26,7 @@ func BenchmarkApply(b *testing.B) { b.ResetTimer() i := 0 - streams.Apply(dps, func(id identity.Stream, dp data.Number) (data.Number, error) { + _ = streams.Apply(dps, func(id identity.Stream, dp data.Number) (data.Number, error) { i++ dp.Add(dp) rid = id diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go index 701c9397cd22..2a58b2f3e33b 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/metrics.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package telemetry import ( diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 2dd935c2686f..c1a10507155e 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -164,7 +164,8 @@ func use[T data.Typed[T], M metric[T]](m M, state streams.Map[T]) aggrFunc { err := do(id, aggr) // update state to possibly changed value - state.Store(id, acc) + // TODO: make streams.Map.Store err free + _ = state.Store(id, acc) // store new value in output metrics slice return acc, err diff --git a/processor/deltatocumulativeprocessor/processor_test.go b/processor/deltatocumulativeprocessor/processor_test.go index 692e54977626..3ccd1961e9e6 100644 --- a/processor/deltatocumulativeprocessor/processor_test.go +++ b/processor/deltatocumulativeprocessor/processor_test.go @@ -1,3 +1,6 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + package deltatocumulativeprocessor_test import ( @@ -216,11 +219,11 @@ func TestStreamLimit(t *testing.T) { writeGood(100) } -type copy interface { +type copyable interface { CopyTo(pmetric.Metric) } -func (s SumBuilder) resourceMetrics(metrics ...copy) pmetric.Metrics { +func (s SumBuilder) resourceMetrics(metrics ...copyable) pmetric.Metrics { md := pmetric.NewMetrics() rm := md.ResourceMetrics().AppendEmpty()