Skip to content

Commit

Permalink
[receiver/prometheus] Fix start timestamp when timestamp is present (#…
Browse files Browse the repository at this point in the history
…6794)

* Adding fix for issue 6405

* Removing m.intervalStartTimeMs as per review feedback
  • Loading branch information
PaurushGarg authored Dec 16, 2021
1 parent 0a0e7e9 commit 4fedae5
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 111 deletions.
2 changes: 1 addition & 1 deletion receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (mg *metricGroup) toDoubleValueTimeSeries(orderedLabelKeys []string) *metri
var startTs *timestamppb.Timestamp
// gauge/undefined types has no start time
if mg.family.isCumulativeType() {
startTs = timestampFromMs(mg.intervalStartTimeMs)
startTs = timestampFromMs(mg.ts) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}

return &metricspb.TimeSeries{
Expand Down
74 changes: 35 additions & 39 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,33 @@ type MetricFamilyPdata interface {
}

type metricFamilyPdata struct {
mtype pdata.MetricDataType
groups map[string]*metricGroupPdata
name string
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
intervalStartTimeMs int64
mtype pdata.MetricDataType
groups map[string]*metricGroupPdata
name string
mc MetadataCache
droppedTimeseries int
labelKeys map[string]bool
labelKeysOrdered []string
metadata *scrape.MetricMetadata
groupOrders map[string]int
}

// metricGroupPdata, represents a single metric of a metric family. for example a histogram metric is usually represent by
// a couple data complexValue (buckets and count/sum), a group of a metric family always share a same set of tags. for
// simple types like counter and gauge, each data point is a group of itself
type metricGroupPdata struct {
family *metricFamilyPdata
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
intervalStartTimeMs int64
family *metricFamilyPdata
ts int64
ls labels.Labels
count float64
hasCount bool
sum float64
hasSum bool
value float64
complexValue []*dataPoint
}

func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamilyPdata {
func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamilyPdata {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
Expand Down Expand Up @@ -101,16 +99,15 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logge
}

return &metricFamilyPdata{
mtype: mtype,
groups: make(map[string]*metricGroupPdata),
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
intervalStartTimeMs: intervalStartTimeMs,
mtype: mtype,
groups: make(map[string]*metricGroupPdata),
name: familyName,
mc: mc,
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
groupOrders: make(map[string]int),
}
}

Expand Down Expand Up @@ -176,7 +173,7 @@ func (mg *metricGroupPdata) toDistributionPoint(orderedLabelKeys []string, dest
// The timestamp MUST be in retrieved from milliseconds and converted to nanoseconds.
tsNanos := pdataTimestampFromMs(mg.ts)
if mg.family.isCumulativeTypePdata() {
point.SetStartTimestamp(pdataTimestampFromMs(mg.intervalStartTimeMs))
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}
point.SetTimestamp(tsNanos)
populateAttributesPdata(orderedLabelKeys, mg.ls, point.Attributes())
Expand Down Expand Up @@ -215,7 +212,7 @@ func (mg *metricGroupPdata) toSummaryPoint(orderedLabelKeys []string, dest *pdat
tsNanos := pdataTimestampFromMs(mg.ts)
point.SetTimestamp(tsNanos)
if mg.family.isCumulativeTypePdata() {
point.SetStartTimestamp(pdataTimestampFromMs(mg.intervalStartTimeMs))
point.SetStartTimestamp(tsNanos) // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}
point.SetSum(mg.sum)
point.SetCount(uint64(mg.count))
Expand All @@ -229,7 +226,7 @@ func (mg *metricGroupPdata) toNumberDataPoint(orderedLabelKeys []string, dest *p
tsNanos := pdataTimestampFromMs(mg.ts)
// gauge/undefined types have no start time.
if mg.family.isCumulativeTypePdata() {
startTsNanos = pdataTimestampFromMs(mg.intervalStartTimeMs)
startTsNanos = tsNanos // metrics_adjuster adjusts the startTimestamp to the initial scrape timestamp
}

point := dest.AppendEmpty()
Expand Down Expand Up @@ -265,11 +262,10 @@ func (mf *metricFamilyPdata) loadMetricGroupOrCreate(groupKey string, ls labels.
mg, ok := mf.groups[groupKey]
if !ok {
mg = &metricGroupPdata{
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
intervalStartTimeMs: mf.intervalStartTimeMs,
family: mf,
ts: ts,
ls: ls,
complexValue: make([]*dataPoint, 0),
}
mf.groups[groupKey] = mg
// maintaining data insertion order is helpful to generate stable/reproducible metric output
Expand Down
43 changes: 22 additions & 21 deletions receiver/prometheusreceiver/internal/otlp_metricfamily_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package internal

import (
"testing"
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
Expand Down Expand Up @@ -103,10 +104,10 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
point := pdata.NewHistogramDataPoint()
point.SetCount(10)
point.SetSum(1004.78)
point.SetTimestamp(11 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetTimestamp(pdata.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetBucketCounts([]uint64{33})
point.SetExplicitBounds([]float64{})
point.SetStartTimestamp(11 * 1e6)
point.SetStartTimestamp(pdata.Timestamp(11 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -118,7 +119,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricName, mc, zap.NewNop(), tt.intervalStartTimeMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricName, mc, zap.NewNop()).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down Expand Up @@ -161,8 +162,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.0"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 10, value: 10, metric: "histogram_count"},
{at: 10, value: 12, metric: "histogram_sum"},
{at: 10, value: 10, metric: "summary_count"},
{at: 10, value: 12, metric: "summary_sum"},
{at: 10, value: 8, metric: "value"},
},
},
Expand All @@ -171,8 +172,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.75"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 11, value: 10, metric: "histogram_count"},
{at: 11, value: 1004.78, metric: "histogram_sum"},
{at: 11, value: 10, metric: "summary_count"},
{at: 11, value: 1004.78, metric: "summary_sum"},
{at: 11, value: 33.7, metric: "value"},
},
},
Expand All @@ -181,8 +182,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.50"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 12, value: 10, metric: "histogram_count"},
{at: 12, value: 13, metric: "histogram_sum"},
{at: 12, value: 10, metric: "summary_count"},
{at: 12, value: 13, metric: "summary_sum"},
{at: 12, value: 27, metric: "value"},
},
},
Expand All @@ -191,8 +192,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.90"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 13, value: 10, metric: "histogram_count"},
{at: 13, value: 14, metric: "histogram_sum"},
{at: 13, value: 10, metric: "summary_count"},
{at: 13, value: 14, metric: "summary_sum"},
{at: 13, value: 56, metric: "value"},
},
},
Expand All @@ -201,8 +202,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
{Name: "a", Value: "A"}, {Name: "quantile", Value: "0.99"}, {Name: "b", Value: "B"},
},
scrapes: []*scrape{
{at: 14, value: 10, metric: "histogram_count"},
{at: 14, value: 15, metric: "histogram_sum"},
{at: 14, value: 10, metric: "summary_count"},
{at: 14, value: 15, metric: "summary_sum"},
{at: 14, value: 82, metric: "value"},
},
},
Expand All @@ -227,8 +228,8 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
qn99 := qtL.AppendEmpty()
qn99.SetQuantile(.99)
qn99.SetValue(82)
point.SetTimestamp(14 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(10 * 1e5)
point.SetTimestamp(pdata.Timestamp(14 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(pdata.Timestamp(14 * time.Millisecond)) // the time in milliseconds -> nanoseconds
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -240,7 +241,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.name, mc, zap.NewNop(), 1).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.name, mc, zap.NewNop()).(*metricFamilyPdata)
for _, lbs := range tt.labelsScrapes {
for _, scrape := range lbs.scrapes {
require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value))
Expand Down Expand Up @@ -290,8 +291,8 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
want: func() pdata.NumberDataPoint {
point := pdata.NewNumberDataPoint()
point.SetDoubleVal(33.7)
point.SetTimestamp(13 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(11 * 1e6)
point.SetTimestamp(pdata.Timestamp(13 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(pdata.Timestamp(13 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -309,8 +310,8 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
want: func() pdata.NumberDataPoint {
point := pdata.NewNumberDataPoint()
point.SetDoubleVal(99.9)
point.SetTimestamp(28 * 1e6) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(0)
point.SetTimestamp(pdata.Timestamp(28 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
point.SetStartTimestamp(pdata.Timestamp(28 * time.Millisecond)) // the time in milliseconds -> nanoseconds.
attributes := point.Attributes()
attributes.InsertString("a", "A")
attributes.InsertString("b", "B")
Expand All @@ -322,7 +323,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricKind, mc, zap.NewNop(), tt.intervalStartTimestampMs).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricKind, mc, zap.NewNop()).(*metricFamilyPdata)
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64)
if mf, ok := b.families[metricName]; ok {
curMF = mf
} else {
curMF = newMetricFamilyPdata(metricName, b.mc, b.logger, b.intervalStartTimeMs)
curMF = newMetricFamilyPdata(metricName, b.mc, b.logger)
b.families[familyName] = curMF
}
}
Expand Down
Loading

0 comments on commit 4fedae5

Please sign in to comment.