Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize normalization caching and copying #890

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 83 additions & 4 deletions exporter/collector/internal/datapointstorage/datapointcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@
func (c *Cache) SetNumberDataPoint(identifier uint64, point pmetric.NumberDataPoint) {
c.numberLock.Lock()
defer c.numberLock.Unlock()
c.numberCache[identifier] = usedNumberPoint{point, atomic.NewBool(true)}
if existing, ok := c.numberCache[identifier]; ok {
existing.used.Store(true)
minimalNumberDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewNumberDataPoint()
minimalNumberDataPointCopyTo(point, newPoint)
c.numberCache[identifier] = usedNumberPoint{newPoint, atomic.NewBool(true)}
}

// GetSummaryDataPoint retrieves the point associated with the identifier, and whether
Expand All @@ -112,7 +119,14 @@
func (c *Cache) SetSummaryDataPoint(identifier uint64, point pmetric.SummaryDataPoint) {
c.summaryLock.Lock()
defer c.summaryLock.Unlock()
c.summaryCache[identifier] = usedSummaryPoint{point, atomic.NewBool(true)}
if existing, ok := c.summaryCache[identifier]; ok {
existing.used.Store(true)
minimalSummaryDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewSummaryDataPoint()
minimalSummaryDataPointCopyTo(point, newPoint)
c.summaryCache[identifier] = usedSummaryPoint{newPoint, atomic.NewBool(true)}
}

// GetHistogramDataPoint retrieves the point associated with the identifier, and whether
Expand All @@ -131,7 +145,14 @@
func (c *Cache) SetHistogramDataPoint(identifier uint64, point pmetric.HistogramDataPoint) {
c.histogramLock.Lock()
defer c.histogramLock.Unlock()
c.histogramCache[identifier] = usedHistogramPoint{point, atomic.NewBool(true)}
if existing, ok := c.histogramCache[identifier]; ok {
existing.used.Store(true)
minimalHistogramDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewHistogramDataPoint()
minimalHistogramDataPointCopyTo(point, newPoint)
c.histogramCache[identifier] = usedHistogramPoint{newPoint, atomic.NewBool(true)}
}

// GetExponentialHistogramDataPoint retrieves the point associated with the identifier, and whether
Expand All @@ -150,7 +171,14 @@
func (c *Cache) SetExponentialHistogramDataPoint(identifier uint64, point pmetric.ExponentialHistogramDataPoint) {
c.exponentialHistogramLock.Lock()
defer c.exponentialHistogramLock.Unlock()
c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{point, atomic.NewBool(true)}
if existing, ok := c.exponentialHistogramCache[identifier]; ok {
existing.used.Store(true)
minimalExponentialHistogramDataPointCopyTo(point, existing.point)
return
}
newPoint := pmetric.NewExponentialHistogramDataPoint()
minimalExponentialHistogramDataPointCopyTo(point, newPoint)
c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{newPoint, atomic.NewBool(true)}
}

// gc garbage collects the cache after the ticker ticks.
Expand Down Expand Up @@ -251,3 +279,54 @@
h.Write(kvSep)
}
}

// minimalSummaryDataPointCopyTo is the same as CopyTo for SummaryDataPoint,
// but only copies values required for normalization.
func minimalSummaryDataPointCopyTo(src, dest pmetric.SummaryDataPoint) {
// We do not copy attributes, start timestamp, flags, quantiles
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
dest.SetSum(src.Sum())
}

// minimalHistogramDataPointCopyTo is the same as CopyTo for
// HistogramDataPoint, but only copies values required for normalization.
func minimalHistogramDataPointCopyTo(src, dest pmetric.HistogramDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars, min, max
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
src.BucketCounts().CopyTo(dest.BucketCounts())
src.ExplicitBounds().CopyTo(dest.ExplicitBounds())
if src.HasSum() {
dest.SetSum(src.Sum())
}
}

// minimalExponentialHistogramDataPointCopyTo is the same as CopyTo for
// ExponentialHistogramDataPoint, but only copies values required for normalization.
func minimalExponentialHistogramDataPointCopyTo(src, dest pmetric.ExponentialHistogramDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars, min, max, zero threshold
dest.SetTimestamp(src.Timestamp())
dest.SetCount(src.Count())
dest.SetZeroCount(src.ZeroCount())
dest.SetScale(src.Scale())
src.Positive().CopyTo(dest.Positive())
src.Negative().CopyTo(dest.Negative())
if src.HasSum() {
dest.SetSum(src.Sum())
}
dest.SetZeroThreshold(src.ZeroThreshold())
}

// minimalNumberDataPointCopyTo is the same as CopyTo for NumberDataPoint,
// but only copies values required for normalization.
func minimalNumberDataPointCopyTo(src, dest pmetric.NumberDataPoint) {
// We do not copy attributes, start timestamp, flags, exemplars
dest.SetTimestamp(src.Timestamp())
switch src.ValueType() {
case pmetric.NumberDataPointValueTypeDouble:
dest.SetDoubleValue(src.DoubleValue())
case pmetric.NumberDataPointValueTypeInt:
dest.SetIntValue(src.IntValue())

Check warning on line 330 in exporter/collector/internal/datapointstorage/datapointcache.go

View check run for this annotation

Codecov / codecov/patch

exporter/collector/internal/datapointstorage/datapointcache.go#L329-L330

Added lines #L329 - L330 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestGC(t *testing.T) {
fakeTicker := make(chan time.Time)
id := uint64(12345)

c.SetNumberDataPoint(id, pmetric.NumberDataPoint{})
c.SetNumberDataPoint(id, pmetric.NewNumberDataPoint())

// bar exists since we just set it
usedPoint, found := c.numberCache[id]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func subtractExponentialHistogramDataPoint(a, b pmetric.ExponentialHistogramData
a.Negative().BucketCounts().FromRaw(subtractExponentialBuckets(a.Negative(), b.Negative()))
}

// subtractExponentialBuckets returns a - b.
// subtractExponentialBuckets subtracts b from a.
func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBuckets) []uint64 {
newBuckets := make([]uint64, a.BucketCounts().Len())
offsetDiff := int(a.Offset() - b.Offset())
Expand Down Expand Up @@ -213,7 +213,7 @@ func lessThanHistogramDataPoint(a, b pmetric.HistogramDataPoint) bool {
return a.Count() < b.Count() || a.Sum() < b.Sum()
}

// subtractHistogramDataPoint returns a - b.
// subtractHistogramDataPoint subtracts b from a.
func subtractHistogramDataPoint(a, b pmetric.HistogramDataPoint) {
// Use the timestamp from the normalization point
a.SetStartTimestamp(b.Timestamp())
Expand Down
Loading