From f28ea8ca91f44da56b7ce90e7de0e9f4b45adad4 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Fri, 20 Sep 2024 11:21:11 -0400 Subject: [PATCH] store minimal data in normalization cache (#890) --- .../datapointstorage/datapointcache.go | 87 ++++++++++++++++++- .../datapointstorage/datapointcache_test.go | 2 +- .../normalization/standard_normalizer.go | 4 +- 3 files changed, 86 insertions(+), 7 deletions(-) diff --git a/exporter/collector/internal/datapointstorage/datapointcache.go b/exporter/collector/internal/datapointstorage/datapointcache.go index 7b01c578..16114408 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache.go +++ b/exporter/collector/internal/datapointstorage/datapointcache.go @@ -93,7 +93,14 @@ func (c *Cache) GetNumberDataPoint(identifier uint64) (pmetric.NumberDataPoint, func (c *Cache) SetNumberDataPoint(identifier uint64, point pmetric.NumberDataPoint) { c.numberLock.Lock() defer c.numberLock.Unlock() - c.numberCache[identifier] = usedNumberPoint{point, atomic.NewBool(true)} + if existing, ok := c.numberCache[identifier]; ok { + existing.used.Store(true) + minimalNumberDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewNumberDataPoint() + minimalNumberDataPointCopyTo(point, newPoint) + c.numberCache[identifier] = usedNumberPoint{newPoint, atomic.NewBool(true)} } // GetSummaryDataPoint retrieves the point associated with the identifier, and whether @@ -112,7 +119,14 @@ func (c *Cache) GetSummaryDataPoint(identifier uint64) (pmetric.SummaryDataPoint func (c *Cache) SetSummaryDataPoint(identifier uint64, point pmetric.SummaryDataPoint) { c.summaryLock.Lock() defer c.summaryLock.Unlock() - c.summaryCache[identifier] = usedSummaryPoint{point, atomic.NewBool(true)} + if existing, ok := c.summaryCache[identifier]; ok { + existing.used.Store(true) + minimalSummaryDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewSummaryDataPoint() + minimalSummaryDataPointCopyTo(point, newPoint) + c.summaryCache[identifier] = usedSummaryPoint{newPoint, atomic.NewBool(true)} } // GetHistogramDataPoint retrieves the point associated with the identifier, and whether @@ -131,7 +145,14 @@ func (c *Cache) GetHistogramDataPoint(identifier uint64) (pmetric.HistogramDataP func (c *Cache) SetHistogramDataPoint(identifier uint64, point pmetric.HistogramDataPoint) { c.histogramLock.Lock() defer c.histogramLock.Unlock() - c.histogramCache[identifier] = usedHistogramPoint{point, atomic.NewBool(true)} + if existing, ok := c.histogramCache[identifier]; ok { + existing.used.Store(true) + minimalHistogramDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewHistogramDataPoint() + minimalHistogramDataPointCopyTo(point, newPoint) + c.histogramCache[identifier] = usedHistogramPoint{newPoint, atomic.NewBool(true)} } // GetExponentialHistogramDataPoint retrieves the point associated with the identifier, and whether @@ -150,7 +171,14 @@ func (c *Cache) GetExponentialHistogramDataPoint(identifier uint64) (pmetric.Exp func (c *Cache) SetExponentialHistogramDataPoint(identifier uint64, point pmetric.ExponentialHistogramDataPoint) { c.exponentialHistogramLock.Lock() defer c.exponentialHistogramLock.Unlock() - c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{point, atomic.NewBool(true)} + if existing, ok := c.exponentialHistogramCache[identifier]; ok { + existing.used.Store(true) + minimalExponentialHistogramDataPointCopyTo(point, existing.point) + return + } + newPoint := pmetric.NewExponentialHistogramDataPoint() + minimalExponentialHistogramDataPointCopyTo(point, newPoint) + c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{newPoint, atomic.NewBool(true)} } // gc garbage collects the cache after the ticker ticks. @@ -251,3 +279,54 @@ func hashOfMap(h hash.Hash64, m map[string]string) { h.Write(kvSep) } } + +// minimalSummaryDataPointCopyTo is the same as CopyTo for SummaryDataPoint, +// but only copies values required for normalization. +func minimalSummaryDataPointCopyTo(src, dest pmetric.SummaryDataPoint) { + // We do not copy attributes, start timestamp, flags, quantiles + dest.SetTimestamp(src.Timestamp()) + dest.SetCount(src.Count()) + dest.SetSum(src.Sum()) +} + +// minimalHistogramDataPointCopyTo is the same as CopyTo for +// HistogramDataPoint, but only copies values required for normalization. +func minimalHistogramDataPointCopyTo(src, dest pmetric.HistogramDataPoint) { + // We do not copy attributes, start timestamp, flags, exemplars, min, max + dest.SetTimestamp(src.Timestamp()) + dest.SetCount(src.Count()) + src.BucketCounts().CopyTo(dest.BucketCounts()) + src.ExplicitBounds().CopyTo(dest.ExplicitBounds()) + if src.HasSum() { + dest.SetSum(src.Sum()) + } +} + +// minimalExponentialHistogramDataPointCopyTo is the same as CopyTo for +// ExponentialHistogramDataPoint, but only copies values required for normalization. +func minimalExponentialHistogramDataPointCopyTo(src, dest pmetric.ExponentialHistogramDataPoint) { + // We do not copy attributes, start timestamp, flags, exemplars, min, max, zero threshold + dest.SetTimestamp(src.Timestamp()) + dest.SetCount(src.Count()) + dest.SetZeroCount(src.ZeroCount()) + dest.SetScale(src.Scale()) + src.Positive().CopyTo(dest.Positive()) + src.Negative().CopyTo(dest.Negative()) + if src.HasSum() { + dest.SetSum(src.Sum()) + } + dest.SetZeroThreshold(src.ZeroThreshold()) +} + +// minimalNumberDataPointCopyTo is the same as CopyTo for NumberDataPoint, +// but only copies values required for normalization. +func minimalNumberDataPointCopyTo(src, dest pmetric.NumberDataPoint) { + // We do not copy attributes, start timestamp, flags, exemplars + dest.SetTimestamp(src.Timestamp()) + switch src.ValueType() { + case pmetric.NumberDataPointValueTypeDouble: + dest.SetDoubleValue(src.DoubleValue()) + case pmetric.NumberDataPointValueTypeInt: + dest.SetIntValue(src.IntValue()) + } +} diff --git a/exporter/collector/internal/datapointstorage/datapointcache_test.go b/exporter/collector/internal/datapointstorage/datapointcache_test.go index 22415da3..2edf223a 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache_test.go +++ b/exporter/collector/internal/datapointstorage/datapointcache_test.go @@ -67,7 +67,7 @@ func TestGC(t *testing.T) { fakeTicker := make(chan time.Time) id := uint64(12345) - c.SetNumberDataPoint(id, pmetric.NumberDataPoint{}) + c.SetNumberDataPoint(id, pmetric.NewNumberDataPoint()) // bar exists since we just set it usedPoint, found := c.numberCache[id] diff --git a/exporter/collector/internal/normalization/standard_normalizer.go b/exporter/collector/internal/normalization/standard_normalizer.go index e249aa9c..9db7d330 100644 --- a/exporter/collector/internal/normalization/standard_normalizer.go +++ b/exporter/collector/internal/normalization/standard_normalizer.go @@ -129,7 +129,7 @@ func subtractExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramData a.Negative().BucketCounts().FromRaw(subtractExponentialBuckets(a.Negative(), b.Negative())) } -// subtractExponentialBuckets returns a - b. +// subtractExponentialBuckets subtracts b from a. func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBuckets) []uint64 { newBuckets := make([]uint64, a.BucketCounts().Len()) offsetDiff := int(a.Offset() - b.Offset()) @@ -213,7 +213,7 @@ func lessThanHistogramDataPoint(a, b pmetric.HistogramDataPoint) bool { return a.Count() < b.Count() || a.Sum() < b.Sum() } -// subtractHistogramDataPoint returns a - b. +// subtractHistogramDataPoint subtracts b from a. func subtractHistogramDataPoint(a, b pmetric.HistogramDataPoint) { // Use the timestamp from the normalization point a.SetStartTimestamp(b.Timestamp())