-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathsplitmetrics.go
198 lines (178 loc) · 6.2 KB
/
splitmetrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package concurrentbatchprocessor // import "github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"
import (
"go.opentelemetry.io/collector/pdata/pmetric"
)
// splitMetrics removes metrics from the input data and returns a new data of the specified size.
func splitMetrics(size int, src pmetric.Metrics) pmetric.Metrics {
dataPoints := src.DataPointCount()
if dataPoints <= size {
return src
}
totalCopiedDataPoints := 0
dest := pmetric.NewMetrics()
src.ResourceMetrics().RemoveIf(func(srcRs pmetric.ResourceMetrics) bool {
// If we are done skip everything else.
if totalCopiedDataPoints == size {
return false
}
// If it fully fits
srcRsDataPointCount := resourceMetricsDPC(srcRs)
if (totalCopiedDataPoints + srcRsDataPointCount) <= size {
totalCopiedDataPoints += srcRsDataPointCount
srcRs.MoveTo(dest.ResourceMetrics().AppendEmpty())
return true
}
destRs := dest.ResourceMetrics().AppendEmpty()
srcRs.Resource().CopyTo(destRs.Resource())
srcRs.ScopeMetrics().RemoveIf(func(srcIlm pmetric.ScopeMetrics) bool {
// If we are done skip everything else.
if totalCopiedDataPoints == size {
return false
}
// If possible to move all metrics do that.
srcIlmDataPointCount := scopeMetricsDPC(srcIlm)
if srcIlmDataPointCount+totalCopiedDataPoints <= size {
totalCopiedDataPoints += srcIlmDataPointCount
srcIlm.MoveTo(destRs.ScopeMetrics().AppendEmpty())
return true
}
destIlm := destRs.ScopeMetrics().AppendEmpty()
srcIlm.Scope().CopyTo(destIlm.Scope())
srcIlm.Metrics().RemoveIf(func(srcMetric pmetric.Metric) bool {
// If we are done skip everything else.
if totalCopiedDataPoints == size {
return false
}
// If possible to move all points do that.
srcMetricPointCount := metricDPC(srcMetric)
if srcMetricPointCount+totalCopiedDataPoints <= size {
totalCopiedDataPoints += srcMetricPointCount
srcMetric.MoveTo(destIlm.Metrics().AppendEmpty())
return true
}
// If the metric has more data points than free slots we should split it.
copiedDataPoints, remove := splitMetric(srcMetric, destIlm.Metrics().AppendEmpty(), size-totalCopiedDataPoints)
totalCopiedDataPoints += copiedDataPoints
return remove
})
return false
})
return srcRs.ScopeMetrics().Len() == 0
})
return dest
}
// resourceMetricsDPC calculates the total number of data points in the pmetric.ResourceMetrics.
func resourceMetricsDPC(rs pmetric.ResourceMetrics) int {
dataPointCount := 0
ilms := rs.ScopeMetrics()
for k := 0; k < ilms.Len(); k++ {
dataPointCount += scopeMetricsDPC(ilms.At(k))
}
return dataPointCount
}
// scopeMetricsDPC calculates the total number of data points in the pmetric.ScopeMetrics.
func scopeMetricsDPC(ilm pmetric.ScopeMetrics) int {
dataPointCount := 0
ms := ilm.Metrics()
for k := 0; k < ms.Len(); k++ {
dataPointCount += metricDPC(ms.At(k))
}
return dataPointCount
}
// metricDPC calculates the total number of data points in the pmetric.Metric.
func metricDPC(ms pmetric.Metric) int {
switch ms.Type() {
case pmetric.MetricTypeGauge:
return ms.Gauge().DataPoints().Len()
case pmetric.MetricTypeSum:
return ms.Sum().DataPoints().Len()
case pmetric.MetricTypeHistogram:
return ms.Histogram().DataPoints().Len()
case pmetric.MetricTypeExponentialHistogram:
return ms.ExponentialHistogram().DataPoints().Len()
case pmetric.MetricTypeSummary:
return ms.Summary().DataPoints().Len()
}
return 0
}
// splitMetric removes metric points from the input data and moves data of the specified size to destination.
// Returns size of moved data and boolean describing, whether the metric should be removed from original slice.
func splitMetric(ms, dest pmetric.Metric, size int) (int, bool) {
dest.SetName(ms.Name())
dest.SetDescription(ms.Description())
dest.SetUnit(ms.Unit())
switch ms.Type() {
case pmetric.MetricTypeGauge:
return splitNumberDataPoints(ms.Gauge().DataPoints(), dest.SetEmptyGauge().DataPoints(), size)
case pmetric.MetricTypeSum:
destSum := dest.SetEmptySum()
destSum.SetAggregationTemporality(ms.Sum().AggregationTemporality())
destSum.SetIsMonotonic(ms.Sum().IsMonotonic())
return splitNumberDataPoints(ms.Sum().DataPoints(), destSum.DataPoints(), size)
case pmetric.MetricTypeHistogram:
destHistogram := dest.SetEmptyHistogram()
destHistogram.SetAggregationTemporality(ms.Histogram().AggregationTemporality())
return splitHistogramDataPoints(ms.Histogram().DataPoints(), destHistogram.DataPoints(), size)
case pmetric.MetricTypeExponentialHistogram:
destHistogram := dest.SetEmptyExponentialHistogram()
destHistogram.SetAggregationTemporality(ms.ExponentialHistogram().AggregationTemporality())
return splitExponentialHistogramDataPoints(ms.ExponentialHistogram().DataPoints(), destHistogram.DataPoints(), size)
case pmetric.MetricTypeSummary:
return splitSummaryDataPoints(ms.Summary().DataPoints(), dest.SetEmptySummary().DataPoints(), size)
}
return size, false
}
func splitNumberDataPoints(src, dst pmetric.NumberDataPointSlice, size int) (int, bool) {
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pmetric.NumberDataPoint) bool {
if i < size {
dp.MoveTo(dst.AppendEmpty())
i++
return true
}
return false
})
return size, false
}
func splitHistogramDataPoints(src, dst pmetric.HistogramDataPointSlice, size int) (int, bool) {
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pmetric.HistogramDataPoint) bool {
if i < size {
dp.MoveTo(dst.AppendEmpty())
i++
return true
}
return false
})
return size, false
}
func splitExponentialHistogramDataPoints(src, dst pmetric.ExponentialHistogramDataPointSlice, size int) (int, bool) {
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
if i < size {
dp.MoveTo(dst.AppendEmpty())
i++
return true
}
return false
})
return size, false
}
func splitSummaryDataPoints(src, dst pmetric.SummaryDataPointSlice, size int) (int, bool) {
dst.EnsureCapacity(size)
i := 0
src.RemoveIf(func(dp pmetric.SummaryDataPoint) bool {
if i < size {
dp.MoveTo(dst.AppendEmpty())
i++
return true
}
return false
})
return size, false
}