Skip to content

Commit

Permalink
[processor/deltatocumulative]: bugfix - permit advancing start-time (#…
Browse files Browse the repository at this point in the history
…31365)

**Description:** <Describe what has changed.>

The
[spec](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative)
allows the `StartTimestamp` of a delta stream to advance over time,
without affecting the aggregation.

Previously, the running counter was reset with each start-time change.
This was violating the spec and also effectively prevented meaningful
aggregation as series were reset all the time.

**Testing:**

A test-case was modified to explicitly check this behavior is permitted
and handled correctly.
  • Loading branch information
sh0rez authored Feb 28, 2024
1 parent ee7b512 commit 298dda4
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 42 deletions.
27 changes: 27 additions & 0 deletions .chloggen/deltatocumulative-advancing-starttime.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: deltatocumulativeprocessor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: permits advancing delta start timestamps, as required by spec.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31365]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
28 changes: 10 additions & 18 deletions processor/deltatocumulativeprocessor/internal/delta/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,21 @@ type Accumulator[D data.Point[D]] struct {
// Aggregate implements delta-to-cumulative aggregation as per spec:
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#sums-delta-to-cumulative
func (a *Accumulator[D]) Aggregate(id streams.Ident, dp D) (D, error) {
// make the accumulator to start with the current sample, discarding any
// earlier data. return after use
reset := func() (D, error) {
a.dps[id] = dp.Clone()
return a.dps[id], nil
}

aggr, ok := a.dps[id]

// new series: reset
// new series: initialize with current sample
if !ok {
return reset()
a.dps[id] = dp.Clone()
return a.dps[id], nil
}
// belongs to older series: drop
if dp.StartTimestamp() < aggr.StartTimestamp() {

// drop bad samples
switch {
case dp.StartTimestamp() < aggr.StartTimestamp():
// belongs to older series
return aggr, ErrOlderStart{Start: aggr.StartTimestamp(), Sample: dp.StartTimestamp()}
}
// belongs to later series: reset
if dp.StartTimestamp() > aggr.StartTimestamp() {
return reset()
}
// out of order: drop
if dp.Timestamp() <= aggr.Timestamp() {
case dp.Timestamp() <= aggr.Timestamp():
// out of order
return aggr, ErrOutOfOrder{Last: aggr.Timestamp(), Sample: dp.Timestamp()}
}

Expand Down
63 changes: 39 additions & 24 deletions processor/deltatocumulativeprocessor/internal/delta/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,30 +105,45 @@ func TestAddition(t *testing.T) {
// verify that start + last times are updated
func TestTimes(t *testing.T) {
acc := delta.Numbers()
id, data := random.Sum().Stream()

start := pcommon.Timestamp(1234)
ts1, ts2 := pcommon.Timestamp(1234), pcommon.Timestamp(1235)

// first sample: take timestamps of point
first := data.Clone()
first.SetStartTimestamp(start)
first.SetTimestamp(ts1)

r1, err := acc.Aggregate(id, first)
require.NoError(t, err)
require.Equal(t, start, r1.StartTimestamp())
require.Equal(t, ts1, r1.Timestamp())

// second sample: take last of point, keep start
second := data.Clone()
second.SetStartTimestamp(start)
second.SetTimestamp(ts2)

r2, err := acc.Aggregate(id, second)
require.NoError(t, err)
require.Equal(t, start, r2.StartTimestamp())
require.Equal(t, ts2, r2.Timestamp())
id, base := random.Sum().Stream()
point := func(start, last pcommon.Timestamp) data.Number {
dp := base.Clone()
dp.SetStartTimestamp(start)
dp.SetTimestamp(last)
return dp
}

// first sample: its the first ever, so take it as-is
{
dp := point(1000, 1000)
res, err := acc.Aggregate(id, dp)

require.NoError(t, err)
require.Equal(t, time(1000), res.StartTimestamp())
require.Equal(t, time(1000), res.Timestamp())
}

// second sample: its subsequent, so keep original startTime, but update lastSeen
{
dp := point(1000, 1100)
res, err := acc.Aggregate(id, dp)

require.NoError(t, err)
require.Equal(t, time(1000), res.StartTimestamp())
require.Equal(t, time(1100), res.Timestamp())
}

// third sample: its subsequent, but has a more recent startTime, which is
// PERMITTED by the spec.
// still keep original startTime, but update lastSeen.
{
dp := point(1100, 1200)
res, err := acc.Aggregate(id, dp)

require.NoError(t, err)
require.Equal(t, time(1000), res.StartTimestamp())
require.Equal(t, time(1200), res.Timestamp())
}
}

func TestErrs(t *testing.T) {
Expand Down

0 comments on commit 298dda4

Please sign in to comment.