From f52f07d1a327d4cd3557eed38941e412e13474f3 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 6 May 2024 14:17:46 -0700 Subject: [PATCH 1/4] Add delta/cumulative/precomputed LastValue agg --- sdk/metric/instrument_test.go | 4 +- sdk/metric/internal/aggregate/aggregate.go | 29 +++--- sdk/metric/internal/aggregate/lastvalue.go | 88 ++++++++++++++++++- .../internal/aggregate/lastvalue_test.go | 18 ++-- sdk/metric/pipeline.go | 5 +- 5 files changed, 121 insertions(+), 23 deletions(-) diff --git a/sdk/metric/instrument_test.go b/sdk/metric/instrument_test.go index 712fddc4558..60066f425aa 100644 --- a/sdk/metric/instrument_test.go +++ b/sdk/metric/instrument_test.go @@ -25,7 +25,7 @@ func BenchmarkInstrument(b *testing.B) { build := aggregate.Builder[int64]{} var meas []aggregate.Measure[int64] - in, _ := build.LastValue() + in, _ := build.PrecomputedLastValue() meas = append(meas, in) build.Temporality = metricdata.CumulativeTemporality @@ -50,7 +50,7 @@ func BenchmarkInstrument(b *testing.B) { build := aggregate.Builder[int64]{} var meas []aggregate.Measure[int64] - in, _ := build.LastValue() + in, _ := build.PrecomputedLastValue() meas = append(meas, in) build.Temporality = metricdata.CumulativeTemporality diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index 0a97444a4be..c9976de6c78 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -74,21 +74,26 @@ func (b Builder[N]) filter(f fltrMeasure[N]) Measure[N] { } // LastValue returns a last-value aggregate function input and output. -// -// The Builder.Temporality is ignored and delta is use always. func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) { - // Delta temporality is the only temporality that makes semantic sense for - // a last-value aggregate. lv := newLastValue[N](b.AggregationLimit, b.resFunc()) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(lv.measure), lv.delta + default: + return b.filter(lv.measure), lv.cumulative + } +} - return b.filter(lv.measure), func(dest *metricdata.Aggregation) int { - // Ignore if dest is not a metricdata.Gauge. The chance for memory - // reuse of the DataPoints is missed (better luck next time). - gData, _ := (*dest).(metricdata.Gauge[N]) - lv.computeAggregation(&gData.DataPoints) - *dest = gData - - return len(gData.DataPoints) +// PrecomputedLastValue returns a last-value aggregate function input and +// output. The aggregation returned from the returned ComputeAggregation +// function will always only return values from the previous collection cycle. +func (b Builder[N]) PrecomputedLastValue() (Measure[N], ComputeAggregation) { + lv := newPrecomputedLastValue[N](b.AggregationLimit, b.resFunc()) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(lv.measure), lv.delta + default: + return b.filter(lv.measure), lv.cumulative } } diff --git a/sdk/metric/internal/aggregate/lastvalue.go b/sdk/metric/internal/aggregate/lastvalue.go index f3238974c6a..8f406dd2bcb 100644 --- a/sdk/metric/internal/aggregate/lastvalue.go +++ b/sdk/metric/internal/aggregate/lastvalue.go @@ -26,6 +26,7 @@ func newLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *la newRes: r, limit: newLimiter[datapoint[N]](limit), values: make(map[attribute.Distinct]datapoint[N]), + start: now(), } } @@ -36,6 +37,7 @@ type lastValue[N int64 | float64] struct { newRes func() exemplar.Reservoir limit limiter[datapoint[N]] values map[attribute.Distinct]datapoint[N] + start time.Time } func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute.Set, droppedAttr []attribute.KeyValue) { @@ -58,23 +60,103 @@ func (s *lastValue[N]) measure(ctx context.Context, value N, fltrAttr attribute. s.values[attr.Equivalent()] = d } -func (s *lastValue[N]) computeAggregation(dest *[]metricdata.DataPoint[N]) { +func (s *lastValue[N]) delta(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints) + // Do not report stale values. + clear(s.values) + // Update start time for delta temporality. + s.start = now() + + *dest = gData + + return n +} + +func (s *lastValue[N]) cumulative(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + s.Lock() defer s.Unlock() + n := s.copyDpts(&gData.DataPoints) + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + *dest = gData + + return n +} + +// copyDpts copies the datapoints held by s into dest. The number of datapoints +// copied is returned. +func (s *lastValue[N]) copyDpts(dest *[]metricdata.DataPoint[N]) int { n := len(s.values) *dest = reset(*dest, n, n) var i int for _, v := range s.values { (*dest)[i].Attributes = v.attrs - // The event time is the only meaningful timestamp, StartTime is - // ignored. + (*dest)[i].StartTime = s.start (*dest)[i].Time = v.timestamp (*dest)[i].Value = v.value collectExemplars(&(*dest)[i].Exemplars, v.res.Collect) i++ } + return n +} + +// newPrecomputedLastValue returns an aggregator that summarizes a set of +// observations as the last one made. +func newPrecomputedLastValue[N int64 | float64](limit int, r func() exemplar.Reservoir) *precomputedLastValue[N] { + return &precomputedLastValue[N]{lastValue: newLastValue[N](limit, r)} +} + +// precomputedLastValue summarizes a set of observations as the last one made. +type precomputedLastValue[N int64 | float64] struct { + *lastValue[N] +} + +func (s *precomputedLastValue[N]) delta(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints) // Do not report stale values. clear(s.values) + // Update start time for delta temporality. + s.start = now() + + *dest = gData + + return n +} + +func (s *precomputedLastValue[N]) cumulative(dest *metricdata.Aggregation) int { + // Ignore if dest is not a metricdata.Gauge. The chance for memory reuse of + // the DataPoints is missed (better luck next time). + gData, _ := (*dest).(metricdata.Gauge[N]) + + s.Lock() + defer s.Unlock() + + n := s.copyDpts(&gData.DataPoints) + // Do not report stale values. + clear(s.values) + *dest = gData + + return n } diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 66ef3e785ab..b53960d7bd1 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -13,12 +13,13 @@ import ( func TestLastValue(t *testing.T) { t.Cleanup(mockTime(now)) - t.Run("Int64", testLastValue[int64]()) - t.Run("Float64", testLastValue[float64]()) + t.Run("Int64/DeltaLastValue", testDeltaLastValue[int64]()) + t.Run("Float64/DeltaLastValue", testDeltaLastValue[float64]()) } -func testLastValue[N int64 | float64]() func(*testing.T) { +func testDeltaLastValue[N int64 | float64]() func(*testing.T) { in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, Filter: attrFltr, AggregationLimit: 3, }.LastValue() @@ -42,11 +43,13 @@ func testLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, + StartTime: staticTime, Time: staticTime, Value: 2, }, { Attributes: fltrBob, + StartTime: staticTime, Time: staticTime, Value: -10, }, @@ -68,11 +71,13 @@ func testLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, + StartTime: staticTime, Time: staticTime, Value: 10, }, { Attributes: fltrBob, + StartTime: staticTime, Time: staticTime, Value: 3, }, @@ -93,16 +98,19 @@ func testLastValue[N int64 | float64]() func(*testing.T) { DataPoints: []metricdata.DataPoint[N]{ { Attributes: fltrAlice, + StartTime: staticTime, Time: staticTime, Value: 1, }, { Attributes: fltrBob, + StartTime: staticTime, Time: staticTime, Value: 1, }, { Attributes: overflowSet, + StartTime: staticTime, Time: staticTime, Value: 1, }, @@ -114,6 +122,6 @@ func testLastValue[N int64 | float64]() func(*testing.T) { } func BenchmarkLastValue(b *testing.B) { - b.Run("Int64", benchmarkAggregate(Builder[int64]{}.LastValue)) - b.Run("Float64", benchmarkAggregate(Builder[float64]{}.LastValue)) + b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue)) + b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue)) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index f2167974689..45dab6619f2 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -447,7 +447,10 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg Aggregation, kin case AggregationDrop: // Return nil in and out to signify the drop aggregator. case AggregationLastValue: - meas, comp = b.LastValue() + if kind == InstrumentKindObservableGauge { + meas, comp = b.PrecomputedLastValue() + } + // TODO (#5304): Support synchronous gauges. case AggregationSum: switch kind { case InstrumentKindObservableCounter: From 055e00e0666f376884734e4df86755cc14ca93d9 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 7 May 2024 09:25:51 -0700 Subject: [PATCH 2/4] Add cumulative testing --- .../internal/aggregate/lastvalue_test.go | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index b53960d7bd1..17d755d4e8b 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -15,6 +15,9 @@ func TestLastValue(t *testing.T) { t.Run("Int64/DeltaLastValue", testDeltaLastValue[int64]()) t.Run("Float64/DeltaLastValue", testDeltaLastValue[float64]()) + + t.Run("Int64/CumulativeLastValue", testCumulativeLastValue[int64]()) + t.Run("Float64/CumulativeLastValue", testCumulativeLastValue[float64]()) } func testDeltaLastValue[N int64 | float64]() func(*testing.T) { @@ -121,6 +124,128 @@ func testDeltaLastValue[N int64 | float64]() func(*testing.T) { }) } +func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.LastValue() + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + // Empty output if nothing is measured. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -10, + }, + }, + }, + }, + }, { + // Cumulative temporality means no resets. + input: []arg[N]{}, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -10, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 10, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 3, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + }, + }, + }, + }, + }) +} + func BenchmarkLastValue(b *testing.B) { b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue)) b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue)) From 50af174e6f99af95b6176ad0a41f33a2f4d0eb7a Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 7 May 2024 10:28:41 -0700 Subject: [PATCH 3/4] Add precomputed testing --- .../internal/aggregate/lastvalue_test.go | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) diff --git a/sdk/metric/internal/aggregate/lastvalue_test.go b/sdk/metric/internal/aggregate/lastvalue_test.go index 17d755d4e8b..d25a73e01ad 100644 --- a/sdk/metric/internal/aggregate/lastvalue_test.go +++ b/sdk/metric/internal/aggregate/lastvalue_test.go @@ -18,6 +18,12 @@ func TestLastValue(t *testing.T) { t.Run("Int64/CumulativeLastValue", testCumulativeLastValue[int64]()) t.Run("Float64/CumulativeLastValue", testCumulativeLastValue[float64]()) + + t.Run("Int64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValue[int64]()) + t.Run("Float64/DeltaPrecomputedLastValue", testDeltaPrecomputedLastValue[float64]()) + + t.Run("Int64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValue[int64]()) + t.Run("Float64/CumulativePrecomputedLastValue", testCumulativePrecomputedLastValue[float64]()) } func testDeltaLastValue[N int64 | float64]() func(*testing.T) { @@ -246,6 +252,214 @@ func testCumulativeLastValue[N int64 | float64]() func(*testing.T) { }) } +func testDeltaPrecomputedLastValue[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.DeltaTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedLastValue() + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + // Empty output if nothing is measured. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -10, + }, + }, + }, + }, + }, { + // Everything resets, do not report old measurements. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 10, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 3, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + }, + }, + }, + }, + }) +} + +func testCumulativePrecomputedLastValue[N int64 | float64]() func(*testing.T) { + in, out := Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + Filter: attrFltr, + AggregationLimit: 3, + }.PrecomputedLastValue() + ctx := context.Background() + return test[N](in, out, []teststep[N]{ + { + // Empty output if nothing is measured. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, -1, bob}, + {ctx, 1, fltrAlice}, + {ctx, 2, alice}, + {ctx, -10, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 2, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: -10, + }, + }, + }, + }, + }, { + // Everything resets, do not report old measurements. + input: []arg[N]{}, + expect: output{n: 0, agg: metricdata.Gauge[N]{}}, + }, { + input: []arg[N]{ + {ctx, 10, alice}, + {ctx, 3, bob}, + }, + expect: output{ + n: 2, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 10, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 3, + }, + }, + }, + }, + }, { + input: []arg[N]{ + {ctx, 1, alice}, + {ctx, 1, bob}, + // These will exceed cardinality limit. + {ctx, 1, carol}, + {ctx, 1, dave}, + }, + expect: output{ + n: 3, + agg: metricdata.Gauge[N]{ + DataPoints: []metricdata.DataPoint[N]{ + { + Attributes: fltrAlice, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: fltrBob, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + { + Attributes: overflowSet, + StartTime: staticTime, + Time: staticTime, + Value: 1, + }, + }, + }, + }, + }, + }) +} + func BenchmarkLastValue(b *testing.B) { b.Run("Int64", benchmarkAggregate(Builder[int64]{}.PrecomputedLastValue)) b.Run("Float64", benchmarkAggregate(Builder[float64]{}.PrecomputedLastValue)) From e94d85970120585a2f1af7c9e156c110a951c834 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Tue, 7 May 2024 12:44:13 -0700 Subject: [PATCH 4/4] Add changelog entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 00235a1e28f..409c5e42c75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - De-duplicate map attributes added to a `Record` in `go.opentelemetry.io/otel/sdk/log`. (#5230) - The `go.opentelemetry.io/otel/exporters/stdout/stdoutlog` exporter won't print `AttributeValueLengthLimit` and `AttributeCountLimit` fields now, instead it prints the `DroppedAttributes` field. (#5272) - Improved performance in the `Stringer` implementation of `go.opentelemetry.io/otel/baggage.Member` by reducing the number of allocations. (#5286) +- Set the start time for last-value aggregates in `go.opentelemetry.io/otel/sdk/metric`. (#5305) ## [1.26.0/0.48.0/0.2.0-alpha] 2024-04-24