Skip to content

Commit

Permalink
store minimal data in normalization cache (#890)
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Sep 20, 2024
1 parent ac83b74 commit f28ea8c
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 7 deletions.
87 changes: 83 additions & 4 deletions exporter/collector/internal/datapointstorage/datapointcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ func (c *Cache) GetNumberDataPoint(identifier uint64) (pmetric.NumberDataPoint,
func (c *Cache) SetNumberDataPoint(identifier uint64, point pmetric.NumberDataPoint) {
c.numberLock.Lock()
defer c.numberLock.Unlock()
c.numberCache[identifier] = usedNumberPoint{point, atomic.NewBool(true)}
if existing, ok := c.numberCache[identifier]; ok {
existing.used.Store(true)
minimalNumberDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewNumberDataPoint()
minimalNumberDataPointCopyTo(point, newPoint)
c.numberCache[identifier] = usedNumberPoint{newPoint, atomic.NewBool(true)}
}

// GetSummaryDataPoint retrieves the point associated with the identifier, and whether
Expand All @@ -112,7 +119,14 @@ func (c *Cache) GetSummaryDataPoint(identifier uint64) (pmetric.SummaryDataPoint
func (c *Cache) SetSummaryDataPoint(identifier uint64, point pmetric.SummaryDataPoint) {
c.summaryLock.Lock()
defer c.summaryLock.Unlock()
c.summaryCache[identifier] = usedSummaryPoint{point, atomic.NewBool(true)}
if existing, ok := c.summaryCache[identifier]; ok {
existing.used.Store(true)
minimalSummaryDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewSummaryDataPoint()
minimalSummaryDataPointCopyTo(point, newPoint)
c.summaryCache[identifier] = usedSummaryPoint{newPoint, atomic.NewBool(true)}
}

// GetHistogramDataPoint retrieves the point associated with the identifier, and whether
Expand All @@ -131,7 +145,14 @@ func (c *Cache) GetHistogramDataPoint(identifier uint64) (pmetric.HistogramDataP
func (c *Cache) SetHistogramDataPoint(identifier uint64, point pmetric.HistogramDataPoint) {
c.histogramLock.Lock()
defer c.histogramLock.Unlock()
c.histogramCache[identifier] = usedHistogramPoint{point, atomic.NewBool(true)}
if existing, ok := c.histogramCache[identifier]; ok {
existing.used.Store(true)
minimalHistogramDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewHistogramDataPoint()
minimalHistogramDataPointCopyTo(point, newPoint)
c.histogramCache[identifier] = usedHistogramPoint{newPoint, atomic.NewBool(true)}
}

// GetExponentialHistogramDataPoint retrieves the point associated with the identifier, and whether
Expand All @@ -150,7 +171,14 @@ func (c *Cache) GetExponentialHistogramDataPoint(identifier uint64) (pmetric.Exp
func (c *Cache) SetExponentialHistogramDataPoint(identifier uint64, point pmetric.ExponentialHistogramDataPoint) {
c.exponentialHistogramLock.Lock()
defer c.exponentialHistogramLock.Unlock()
c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{point, atomic.NewBool(true)}
if existing, ok := c.exponentialHistogramCache[identifier]; ok {
existing.used.Store(true)
minimalExponentialHistogramDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewExponentialHistogramDataPoint()
minimalExponentialHistogramDataPointCopyTo(point, newPoint)
c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{newPoint, atomic.NewBool(true)}
}

// gc garbage collects the cache after the ticker ticks.
Expand Down Expand Up @@ -251,3 +279,54 @@ func hashOfMap(h hash.Hash64, m map[string]string) {
h.Write(kvSep)
}
}

// minimalSummaryDataPointCopyTo is the same as CopyTo for SummaryDataPoint,
// but only copies values required for normalization.
func minimalSummaryDataPointCopyTo(src, dest pmetric.SummaryDataPoint) {
// We do not copy attributes, start timestamp, flags, quantiles
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
dest.SetSum(src.Sum())
}

// minimalHistogramDataPointCopyTo is the same as CopyTo for
// HistogramDataPoint, but only copies values required for normalization.
func minimalHistogramDataPointCopyTo(src, dest pmetric.HistogramDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars, min, max
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
src.BucketCounts().CopyTo(dest.BucketCounts())
src.ExplicitBounds().CopyTo(dest.ExplicitBounds())
if src.HasSum() {
dest.SetSum(src.Sum())
}
}

// minimalExponentialHistogramDataPointCopyTo is the same as CopyTo for
// ExponentialHistogramDataPoint, but only copies values required for normalization.
func minimalExponentialHistogramDataPointCopyTo(src, dest pmetric.ExponentialHistogramDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars, min, max, zero threshold
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
dest.SetZeroCount(src.ZeroCount())
dest.SetScale(src.Scale())
src.Positive().CopyTo(dest.Positive())
src.Negative().CopyTo(dest.Negative())
if src.HasSum() {
dest.SetSum(src.Sum())
}
dest.SetZeroThreshold(src.ZeroThreshold())
}

// minimalNumberDataPointCopyTo is the same as CopyTo for NumberDataPoint,
// but only copies values required for normalization.
func minimalNumberDataPointCopyTo(src, dest pmetric.NumberDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars
dest.SetTimestamp(src.Timestamp())
switch src.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
dest.SetDoubleValue(src.DoubleValue())
case pmetric.NumberDataPointValueTypeInt:
dest.SetIntValue(src.IntValue())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestGC(t *testing.T) {
fakeTicker := make(chan time.Time)
id := uint64(12345)

c.SetNumberDataPoint(id, pmetric.NumberDataPoint{})
c.SetNumberDataPoint(id, pmetric.NewNumberDataPoint())

// bar exists since we just set it
usedPoint, found := c.numberCache[id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func subtractExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramData
a.Negative().BucketCounts().FromRaw(subtractExponentialBuckets(a.Negative(), b.Negative()))
}

// subtractExponentialBuckets returns a - b.
// subtractExponentialBuckets subtracts b from a.
func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBuckets) []uint64 {
newBuckets := make([]uint64, a.BucketCounts().Len())
offsetDiff := int(a.Offset() - b.Offset())
Expand Down Expand Up @@ -213,7 +213,7 @@ func lessThanHistogramDataPoint(a, b pmetric.HistogramDataPoint) bool {
return a.Count() < b.Count() || a.Sum() < b.Sum()
}

// subtractHistogramDataPoint returns a - b.
// subtractHistogramDataPoint subtracts b from a.
func subtractHistogramDataPoint(a, b pmetric.HistogramDataPoint) {
// Use the timestamp from the normalization point
a.SetStartTimestamp(b.Timestamp())
Expand Down

0 comments on commit f28ea8c

Please sign in to comment.