diff --git a/.chloggen/deltatocumulative-advancing-starttime.yaml b/.chloggen/deltatocumulative-advancing-starttime.yaml new file mode 100644 index 000000000000..b8138d3059b3 --- /dev/null +++ b/.chloggen/deltatocumulative-advancing-starttime.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulativeprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: permits advancing delta start timestamps, as required by spec. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31365] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta.go b/processor/deltatocumulativeprocessor/internal/delta/delta.go index 8246bf8e09d1..fda1e8149a84 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta.go @@ -34,29 +34,21 @@ type Accumulator[D data.Point[D]] struct { // Aggregate implements delta-to-cumulative aggregation as per spec: // https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative func (a *Accumulator[D]) Aggregate(id streams.Ident, dp D) (D, error) { - // make the accumulator to start with the current sample, discarding any - // earlier data. return after use - reset := func() (D, error) { - a.dps[id] = dp.Clone() - return a.dps[id], nil - } - aggr, ok := a.dps[id] - // new series: reset + // new series: initialize with current sample if !ok { - return reset() + a.dps[id] = dp.Clone() + return a.dps[id], nil } - // belongs to older series: drop - if dp.StartTimestamp() < aggr.StartTimestamp() { + + // drop bad samples + switch { + case dp.StartTimestamp() < aggr.StartTimestamp(): + // belongs to older series return aggr, ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()} - } - // belongs to later series: reset - if dp.StartTimestamp() > aggr.StartTimestamp() { - return reset() - } - // out of order: drop - if dp.Timestamp() <= aggr.Timestamp() { + case dp.Timestamp() <= aggr.Timestamp(): + // out of order return aggr, ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()} } diff --git a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go index ae863697339f..f298d174977c 100644 --- a/processor/deltatocumulativeprocessor/internal/delta/delta_test.go +++ b/processor/deltatocumulativeprocessor/internal/delta/delta_test.go @@ -105,30 +105,45 @@ func TestAddition(t *testing.T) { // verify that start + last times are updated func TestTimes(t *testing.T) { acc := delta.Numbers() - id, data := random.Sum().Stream() - - start := pcommon.Timestamp(1234) - ts1, ts2 := pcommon.Timestamp(1234), pcommon.Timestamp(1235) - - // first sample: take timestamps of point - first := data.Clone() - first.SetStartTimestamp(start) - first.SetTimestamp(ts1) - - r1, err := acc.Aggregate(id, first) - require.NoError(t, err) - require.Equal(t, start, r1.StartTimestamp()) - require.Equal(t, ts1, r1.Timestamp()) - - // second sample: take last of point, keep start - second := data.Clone() - second.SetStartTimestamp(start) - second.SetTimestamp(ts2) - - r2, err := acc.Aggregate(id, second) - require.NoError(t, err) - require.Equal(t, start, r2.StartTimestamp()) - require.Equal(t, ts2, r2.Timestamp()) + id, base := random.Sum().Stream() + point := func(start, last pcommon.Timestamp) data.Number { + dp := base.Clone() + dp.SetStartTimestamp(start) + dp.SetTimestamp(last) + return dp + } + + // first sample: its the first ever, so take it as-is + { + dp := point(1000, 1000) + res, err := acc.Aggregate(id, dp) + + require.NoError(t, err) + require.Equal(t, time(1000), res.StartTimestamp()) + require.Equal(t, time(1000), res.Timestamp()) + } + + // second sample: its subsequent, so keep original startTime, but update lastSeen + { + dp := point(1000, 1100) + res, err := acc.Aggregate(id, dp) + + require.NoError(t, err) + require.Equal(t, time(1000), res.StartTimestamp()) + require.Equal(t, time(1100), res.Timestamp()) + } + + // third sample: its subsequent, but has a more recent startTime, which is + // PERMITTED by the spec. + // still keep original startTime, but update lastSeen. + { + dp := point(1100, 1200) + res, err := acc.Aggregate(id, dp) + + require.NoError(t, err) + require.Equal(t, time(1000), res.StartTimestamp()) + require.Equal(t, time(1200), res.Timestamp()) + } } func TestErrs(t *testing.T) {