Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/prometheus] Fix start timestamp when timestamp is present #6794

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (mg *metricGroup) toDoubleValueTimeSeries(orderedLabelKeys []string) *metri
var startTs *timestamppb.Timestamp
// gauge/undefined types has no start time
if mg.family.isCumulativeType() {
startTs = timestampFromMs(mg.intervalStartTimeMs)
startTs = timestampFromMs(mg.ts) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}

return &metricspb.TimeSeries{
Expand Down
74 changes: 35 additions & 39 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,33 @@ type MetricFamilyPdata interface {
}

type metricFamilyPdata struct {
mtype pdata.MetricDataType
groups map[string]*metricGroupPdata
name string
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
intervalStartTimeMs int64
mtype pdata.MetricDataType
groups map[string]*metricGroupPdata
name string
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
}

// metricGroupPdata, represents a single metric of a metric family. for example a histogram metric is usually represent by
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroupPdata struct {
family *metricFamilyPdata
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
intervalStartTimeMs int64
family *metricFamilyPdata
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
}

func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamilyPdata {
func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamilyPdata {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand Down Expand Up @@ -101,16 +99,15 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logge
}

return &metricFamilyPdata{
mtype: mtype,
groups: make(map[string]*metricGroupPdata),
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
intervalStartTimeMs: intervalStartTimeMs,
mtype: mtype,
groups: make(map[string]*metricGroupPdata),
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
}
}

Expand Down Expand Up @@ -176,7 +173,7 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := pdataTimestampFromMs(mg.ts)
if mg.family.isCumulativeTypePdata() {
point.SetStartTimestamp(pdataTimestampFromMs(mg.intervalStartTimeMs))
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}
point.SetTimestamp(tsNanos)
populateAttributesPdata(orderedLabelKeys, mg.ls, point.Attributes())
Expand Down Expand Up @@ -215,7 +212,7 @@ func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdat
tsNanos := pdataTimestampFromMs(mg.ts)
point.SetTimestamp(tsNanos)
if mg.family.isCumulativeTypePdata() {
point.SetStartTimestamp(pdataTimestampFromMs(mg.intervalStartTimeMs))
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}
point.SetSum(mg.sum)
point.SetCount(uint64(mg.count))
Expand All @@ -229,7 +226,7 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p
tsNanos := pdataTimestampFromMs(mg.ts)
// gauge/undefined types have no start time.
if mg.family.isCumulativeTypePdata() {
startTsNanos = pdataTimestampFromMs(mg.intervalStartTimeMs)
startTsNanos = tsNanos // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}

point := dest.AppendEmpty()
Expand Down Expand Up @@ -265,11 +262,10 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroupPdata{
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
}
mf.groups[groupKey] = mg
// maintaining data insertion order is helpful to generate stable/reproducible metric output
Expand Down
43 changes: 22 additions & 21 deletions receiver/prometheusreceiver/internal/otlp_metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package internal

import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
Expand Down Expand Up @@ -103,10 +104,10 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
point := pdata.NewHistogramDataPoint()
point.SetCount(10)
point.SetSum(1004.78)
point.SetTimestamp(11 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetTimestamp(pdata.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetBucketCounts([]uint64{33})
point.SetExplicitBounds([]float64{})
point.SetStartTimestamp(11 * 1e6)
point.SetStartTimestamp(pdata.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -118,7 +119,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricName, mc, zap.NewNop(), tt.intervalStartTimeMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricName, mc, zap.NewNop()).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down Expand Up @@ -161,8 +162,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.0"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 10, value: 10, metric: "histogram_count"},
{at: 10, value: 12, metric: "histogram_sum"},
{at: 10, value: 10, metric: "summary_count"},
codeboten marked this conversation as resolved.
Show resolved Hide resolved
{at: 10, value: 12, metric: "summary_sum"},
{at: 10, value: 8, metric: "value"},
},
},
Expand All @@ -171,8 +172,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 11, value: 10, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 11, value: 10, metric: "summary_count"},
{at: 11, value: 1004.78, metric: "summary_sum"},
{at: 11, value: 33.7, metric: "value"},
},
},
Expand All @@ -181,8 +182,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.50"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 12, value: 10, metric: "histogram_count"},
{at: 12, value: 13, metric: "histogram_sum"},
{at: 12, value: 10, metric: "summary_count"},
{at: 12, value: 13, metric: "summary_sum"},
{at: 12, value: 27, metric: "value"},
},
},
Expand All @@ -191,8 +192,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.90"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 13, value: 10, metric: "histogram_count"},
{at: 13, value: 14, metric: "histogram_sum"},
{at: 13, value: 10, metric: "summary_count"},
{at: 13, value: 14, metric: "summary_sum"},
{at: 13, value: 56, metric: "value"},
},
},
Expand All @@ -201,8 +202,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.99"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 14, value: 10, metric: "histogram_count"},
{at: 14, value: 15, metric: "histogram_sum"},
{at: 14, value: 10, metric: "summary_count"},
{at: 14, value: 15, metric: "summary_sum"},
{at: 14, value: 82, metric: "value"},
},
},
Expand All @@ -227,8 +228,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
qn99 := qtL.AppendEmpty()
qn99.SetQuantile(.99)
qn99.SetValue(82)
point.SetTimestamp(14 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(10 * 1e5)
point.SetTimestamp(pdata.Timestamp(14 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(pdata.Timestamp(14 * time.Millisecond)) // the time in milliseconds -> nanoseconds
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -240,7 +241,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.name, mc, zap.NewNop(), 1).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.name, mc, zap.NewNop()).(*metricFamilyPdata)
for _, lbs := range tt.labelsScrapes {
for _, scrape := range lbs.scrapes {
require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value))
Expand Down Expand Up @@ -290,8 +291,8 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
want: func() pdata.NumberDataPoint {
point := pdata.NewNumberDataPoint()
point.SetDoubleVal(33.7)
point.SetTimestamp(13 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(11 * 1e6)
point.SetTimestamp(pdata.Timestamp(13 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(pdata.Timestamp(13 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -309,8 +310,8 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
want: func() pdata.NumberDataPoint {
point := pdata.NewNumberDataPoint()
point.SetDoubleVal(99.9)
point.SetTimestamp(28 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(0)
point.SetTimestamp(pdata.Timestamp(28 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(pdata.Timestamp(28 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -322,7 +323,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricKind, mc, zap.NewNop(), tt.intervalStartTimestampMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricKind, mc, zap.NewNop()).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64)
if mf, ok := b.families[metricName]; ok {
curMF = mf
} else {
curMF = newMetricFamilyPdata(metricName, b.mc, b.logger, b.intervalStartTimeMs)
curMF = newMetricFamilyPdata(metricName, b.mc, b.logger)
b.families[familyName] = curMF
}
}
Expand Down
Loading