From b5b438492ba34aaa9aaf817ae106bdf753f94326 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 18 Jun 2024 01:28:06 +0000 Subject: [PATCH 1/2] fix timestamp handling for the lastvalue aggregation --- CHANGELOG.md | 1 + sdk/metric/internal/aggregate/lastvalue.go | 28 +++++---- .../internal/aggregate/lastvalue_test.go | 60 +++++++++---------- 3 files changed, 46 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88cd72c418f..f9fd9ce9d5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Prevent random number generation data-race for experimental rand exemplars in `go.opentelemetry.io/otel/sdk/metric`. (#5456) - Fix counting number of dropped attributes of `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5464) - Fix panic in baggage creation when a member contains 0x80 char in key or value. (#5494) +- Fix stale timestamps reported by the lastvalue aggregation. (#TODO) ## [1.27.0/0.49.0/0.3.0] 2024-05-21 diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index 8f406dd2bcb..3b65e761e86 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -15,10 +15,9 @@ import ( // datapoint is timestamped measurement data. type datapoint[N int64 | float64] struct { - attrs attribute.Set - timestamp time.Time - value N - res exemplar.Reservoir + attrs attribute.Set + value N + res exemplar.Reservoir } func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *lastValue[N] { @@ -53,7 +52,6 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. } d.attrs = attr - d.timestamp = t d.value = value d.res.Offer(ctx, t, exemplar.NewValue(value), droppedAttr) @@ -61,6 +59,7 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. } func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -68,11 +67,11 @@ func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) // Update start time for delta temporality. - s.start = now() + s.start = t *dest = gData @@ -80,6 +79,7 @@ func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { } func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -87,7 +87,7 @@ func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // TODO (#3006): This will use an unbounded amount of memory if there // are unbounded number of attribute sets being aggregated. Attribute // sets that become "stale" need to be forgotten so this will not @@ -99,7 +99,7 @@ func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { // copyDpts copies the datapoints held by s into dest. The number of datapoints // copied is returned. -func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int { +func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N], t time.Time) int { n := len(s.values) *dest = reset(*dest, n, n) @@ -107,7 +107,7 @@ func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int { for _, v := range s.values { (*dest)[i].Attributes = v.attrs (*dest)[i].StartTime = s.start - (*dest)[i].Time = v.timestamp + (*dest)[i].Time = t (*dest)[i].Value = v.value collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ @@ -127,6 +127,7 @@ type precomputedLastValue[N int64 | float64] struct { } func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -134,11 +135,11 @@ func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) // Update start time for delta temporality. - s.start = now() + s.start = t *dest = gData @@ -146,6 +147,7 @@ func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { } func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of // the DataPoints is missed (better luck next time). gData, _ := (*dest).(metricdata.Gauge[N]) @@ -153,7 +155,7 @@ func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { s.Lock() defer s.Unlock() - n := s.copyDpts(&gData.DataPoints) + n := s.copyDpts(&gData.DataPoints, t) // Do not report stale values. clear(s.values) *dest = gData diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 8504e3b192e..1e4ca21c96a 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -61,13 +61,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(6), + Time: y2kPlus(7), Value: -10, }, }, @@ -89,13 +89,13 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(8), - Time: y2kPlus(9), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(8), - Time: y2kPlus(10), + Time: y2kPlus(11), Value: 3, }, }, @@ -116,19 +116,19 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(11), - Time: y2kPlus(12), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(11), - Time: y2kPlus(13), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(11), - Time: y2kPlus(15), + Time: y2kPlus(16), Value: 1, }, }, @@ -165,13 +165,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(4), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: -10, }, }, @@ -187,13 +187,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(4), + Time: y2kPlus(8), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(5), + Time: y2kPlus(8), Value: -10, }, }, @@ -211,13 +211,13 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(6), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(11), Value: 3, }, }, @@ -238,19 +238,19 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(8), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(9), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(16), Value: 1, }, }, @@ -287,13 +287,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(1), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(1), - Time: y2kPlus(6), + Time: y2kPlus(7), Value: -10, }, }, @@ -315,13 +315,13 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(8), - Time: y2kPlus(9), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(8), - Time: y2kPlus(10), + Time: y2kPlus(11), Value: 3, }, }, @@ -342,19 +342,19 @@ func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(11), - Time: y2kPlus(12), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(11), - Time: y2kPlus(13), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(11), - Time: y2kPlus(15), + Time: y2kPlus(16), Value: 1, }, }, @@ -391,13 +391,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(4), + Time: y2kPlus(7), Value: 2, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(5), + Time: y2kPlus(7), Value: -10, }, }, @@ -419,13 +419,13 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(6), + Time: y2kPlus(11), Value: 10, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(7), + Time: y2kPlus(11), Value: 3, }, }, @@ -446,19 +446,19 @@ func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { { Attributes: fltrAlice, StartTime: y2kPlus(0), - Time: y2kPlus(8), + Time: y2kPlus(16), Value: 1, }, { Attributes: fltrBob, StartTime: y2kPlus(0), - Time: y2kPlus(9), + Time: y2kPlus(16), Value: 1, }, { Attributes: overflowSet, StartTime: y2kPlus(0), - Time: y2kPlus(11), + Time: y2kPlus(16), Value: 1, }, }, From c6b1ae96cf4297b746a59bd52cb71fb82e51ef99 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 18 Jun 2024 10:14:27 -0400 Subject: [PATCH 2/2] Update CHANGELOG.md Co-authored-by: Damien Mathieu <42@dmathieu.com> --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f9fd9ce9d5e..150d73757b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,7 +39,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Prevent random number generation data-race for experimental rand exemplars in `go.opentelemetry.io/otel/sdk/metric`. (#5456) - Fix counting number of dropped attributes of `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5464) - Fix panic in baggage creation when a member contains 0x80 char in key or value. (#5494) -- Fix stale timestamps reported by the lastvalue aggregation. (#TODO) +- Fix stale timestamps reported by the lastvalue aggregation. (#5517) ## [1.27.0/0.49.0/0.3.0] 2024-05-21