Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix throughput calculation from collectors metrics #1621

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions frontend/endpoints/collector_metrics/cluster_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num

// From this point on, we are updating the existing destination metrics
var throughputPtr *int64
var dataSentInInterval int64
var dataSentInInterval float64
dtm := currentVal.clusterCollectorsTraffic[clusterCollectorID]

// the metric data in 'dp' represent the number of spans/metrics/logs sent by the exporter
Expand All @@ -208,20 +208,20 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num
case exporterSentSpansMetricName:
throughputPtr = &dtm.tracesThroughput
spansInInterval := int64(dp.DoubleValue()) - dtm.sentSpans
dataSentInInterval = int64(float64(spansInInterval) * dm.avgCalculator.lastCalculatedAvgSpanSize())
dtm.tracesDataSent += dataSentInInterval
dataSentInInterval = float64(spansInInterval) * dm.avgCalculator.lastCalculatedAvgSpanSize()
dtm.tracesDataSent += int64(dataSentInInterval)
dtm.sentSpans = int64(dp.DoubleValue())
case exporterSentMetricsMetricName:
throughputPtr = &dtm.metricsThroughput
metricsInInterval := int64(dp.DoubleValue()) - dtm.sentMetrics
dataSentInInterval = int64(float64(metricsInInterval) * dm.avgCalculator.lastCalculatedAvgMetricSize())
dtm.metricsDataSent += dataSentInInterval
dataSentInInterval = float64(metricsInInterval) * dm.avgCalculator.lastCalculatedAvgMetricSize()
dtm.metricsDataSent += int64(dataSentInInterval)
dtm.sentMetrics = int64(dp.DoubleValue())
case exporterSentLogsMetricName:
throughputPtr = &dtm.logsThroughput
logsInInterval := int64(dp.DoubleValue()) - dtm.sentLogs
dataSentInInterval = int64(float64(logsInInterval) * dm.avgCalculator.lastCalculatedAvgLogSize())
dtm.logsDataSent += dataSentInInterval
dataSentInInterval = float64(logsInInterval) * dm.avgCalculator.lastCalculatedAvgLogSize()
dtm.logsDataSent += int64(dataSentInInterval)
dtm.sentLogs = int64(dp.DoubleValue())
}

Expand All @@ -235,14 +235,7 @@ func (dm *destinationsMetrics) updateDestinationMetricsByExporter(dp pmetric.Num
return
}

timeDiff := newTime.Sub(oldTime).Seconds()

var throughput int64
// calculate throughput only if the new value is greater than the old value and the time difference is positive
// otherwise, the throughput is set to 0
if dataSentInInterval > 0 && timeDiff > 0 {
throughput = (dataSentInInterval) / int64(timeDiff)
}
throughput := calculateThroughput(dataSentInInterval, newTime, oldTime)

*throughputPtr = throughput
}
Expand Down
9 changes: 1 addition & 8 deletions frontend/endpoints/collector_metrics/node_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,7 @@ func (sm *sourcesMetrics) updateSourceMetrics(dp pmetric.NumberDataPoint, metric
return
}

timeDiff := newTime.Sub(oldTime).Seconds()

var throughput int64
// calculate throughput only if the new value is greater than the old value and the time difference is positive
// otherwise, the throughput is set to 0
if newVal > oldVal && timeDiff > 0 {
throughput = (newVal - oldVal) / int64(timeDiff)
}
throughput := calculateThroughput(float64(newVal-oldVal), newTime, oldTime)

*throughputPtr = throughput
}
Expand Down
14 changes: 14 additions & 0 deletions frontend/endpoints/collector_metrics/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package collectormetrics

import "time"

func calculateThroughput(diff float64, currentTime, prevTime time.Time) int64 {
elapsed := currentTime.Sub(prevTime).Seconds()

var throughput int64
if diff > 0 && elapsed > 0 {
throughput = int64(diff / elapsed)
}

return throughput
}
33 changes: 33 additions & 0 deletions frontend/endpoints/collector_metrics/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package collectormetrics

import (
"testing"
"time"
)

func TestCalculateThroughput(t *testing.T) {
cases := []struct {
name string
diff float64
currentTime time.Time
prevTime time.Time
want int64
}{
{name: "no diff and no time change", diff: 0, currentTime: time.Now(), prevTime: time.Now(), want: 0},
{name: "diff of 1 in 1 second", diff: 1, currentTime: time.Now(), prevTime: time.Now().Add(-1 * time.Second), want: 1},
{name: "diff of 1 in 2 seconds", diff: 1, currentTime: time.Now(), prevTime: time.Now().Add(-2 * time.Second), want: 0},
{name: "diff of 0 in 1 second", diff: 0, currentTime: time.Now(), prevTime: time.Now().Add(-1 * time.Second), want: 0},
{name: "diff of 100 in 10 seconds", diff: 100, currentTime: time.Now(), prevTime: time.Now().Add(-10 * time.Second), want: 10},
{name: "diff of 100 in 0.1 seconds", diff: 100, currentTime: time.Now(), prevTime: time.Now().Add(-100 * time.Millisecond), want: 1000},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
got := calculateThroughput(c.diff, c.currentTime, c.prevTime)
if got != c.want {
t.Errorf("calculateThroughput(%f, %v, %v) = %d, want %d", c.diff, c.currentTime, c.prevTime, got, c.want)
}
})
}

}
Loading