Skip to content

Commit

Permalink
Merge pull request #706 from grafana/jvp/otlp-ct-zero-timestamp-update
Browse files Browse the repository at this point in the history
OTLP: Increase valid created timestamp threshold to 5 minutes
  • Loading branch information
jesusvazquez authored Oct 3, 2024
2 parents 24c9a62 + d2e6c5b commit 35ec40c
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 71 deletions.
21 changes: 16 additions & 5 deletions storage/remote/otlptranslator/prometheusremotewrite/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,9 +574,10 @@ func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi
}
}

// validIntervalForStartTimestamps is hardcoded to 2 minutes in milliseconds. We can't know in advance whats the scrape
// interval of a series.
const validIntervalForStartTimestamps = 120_000
// defaultIntervalForStartTimestamps is hardcoded to 5 minutes in milliseconds.
// Assuming a DPM of 1 and knowing that Grafana's $__rate_interval is typically 4 times the write interval that would give
// us 4 minutes. We add an extra minute for delays.
const defaultIntervalForStartTimestamps = int64(300_000)

// handleStartTime adds a zero sample at startTs only if startTs is within validIntervalForStartTimestamps of the sample timestamp.
// The reason for doing this is that PRW v1 doesn't support Created Timestamps. After switching to PRW v2's direct CT support,
Expand All @@ -595,8 +596,13 @@ func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb
return
}

threshold := defaultIntervalForStartTimestamps
if settings.ValidIntervalCreatedTimestampZeroIngestion != 0 {
threshold = settings.ValidIntervalCreatedTimestampZeroIngestion.Milliseconds()
}

// The difference between the start and the actual timestamp is more than a reasonable time, so we skip this sample.
if ts-startTs > validIntervalForStartTimestamps {
if ts-startTs > threshold {
return
}

Expand All @@ -619,8 +625,13 @@ func (c *PrometheusConverter) handleHistogramStartTime(startTs, sampleTs int64,
return
}

threshold := defaultIntervalForStartTimestamps
if settings.ValidIntervalCreatedTimestampZeroIngestion != 0 {
threshold = settings.ValidIntervalCreatedTimestampZeroIngestion.Milliseconds()
}

// The difference between the start and the actual timestamp is more than a reasonable time, so we skip this sample.
if sampleTs-startTs > validIntervalForStartTimestamps {
if sampleTs-startTs > threshold {
return
}

Expand Down
138 changes: 116 additions & 22 deletions storage/remote/otlptranslator/prometheusremotewrite/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,14 @@ func Test_convertTimeStamp(t *testing.T) {
func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
now := time.Now()
nowUnixNano := pcommon.Timestamp(now.UnixNano())
nowMinus20s := pcommon.Timestamp(now.Add(-20 * time.Second).UnixNano())
nowMinus2m30s := pcommon.Timestamp(now.Add(-2 * time.Minute).Add(-30 * time.Second).UnixNano())
nowMinus6m := pcommon.Timestamp(now.Add(-20 * time.Second).UnixNano())
nowMinus1h := pcommon.Timestamp(now.Add(-1 * time.Hour).UnixNano())
tests := []struct {
name string
metric func() pmetric.Metric
want func() map[uint64]*prompb.TimeSeries
overrideValidInterval time.Duration
metric func() pmetric.Metric
want func() map[uint64]*prompb.TimeSeries
name string
}{
{
name: "summary with start time equal to sample timestamp",
Expand Down Expand Up @@ -245,15 +247,63 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
},
},
{
name: "summary with start time within two minutes to sample timestamp",
name: "summary with start time within default valid interval to sample timestamp",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_summary")
metric.SetEmptySummary()

dp := metric.Summary().DataPoints().AppendEmpty()
dp.SetTimestamp(nowUnixNano)
dp.SetStartTimestamp(nowMinus2m30s)

return metric
},
want: func() map[uint64]*prompb.TimeSeries {
labels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_summary" + countStr},
}
createdLabels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_summary" + createdSuffix},
}
sumLabels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_summary" + sumStr},
}
return map[uint64]*prompb.TimeSeries{
timeSeriesSignature(labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(sumLabels): {
Labels: sumLabels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(createdLabels): {
Labels: createdLabels,
Samples: []prompb.Sample{
{Value: float64(convertTimeStamp(nowMinus2m30s)), Timestamp: convertTimeStamp(nowUnixNano)},
},
},
}
},
overrideValidInterval: 10 * time.Minute,
},
{
name: "summary with start time within overiden valid interval to sample timestamp",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_summary")
metric.SetEmptySummary()

dp := metric.Summary().DataPoints().AppendEmpty()
dp.SetTimestamp(nowUnixNano)
dp.SetStartTimestamp(nowMinus20s)
dp.SetStartTimestamp(nowMinus6m)

return metric
},
Expand All @@ -271,28 +321,28 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
timeSeriesSignature(labels): {
Labels: labels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus20s)},
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(sumLabels): {
Labels: sumLabels,
Samples: []prompb.Sample{
{Value: 0, Timestamp: convertTimeStamp(nowMinus20s)},
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
},
},
timeSeriesSignature(createdLabels): {
Labels: createdLabels,
Samples: []prompb.Sample{
{Value: float64(convertTimeStamp(nowMinus20s)), Timestamp: convertTimeStamp(nowUnixNano)},
{Value: float64(convertTimeStamp(nowMinus6m)), Timestamp: convertTimeStamp(nowUnixNano)},
},
},
}
},
},
{
name: "summary with start time older than two minutes to sample timestamp",
name: "summary with start time older than default valid interval to sample timestamp",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_summary")
Expand Down Expand Up @@ -382,8 +432,9 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
metric.Summary().DataPoints(),
pcommon.NewResource(),
Settings{
ExportCreatedMetric: true,
EnableCreatedTimestampZeroIngestion: true,
ExportCreatedMetric: true,
EnableCreatedTimestampZeroIngestion: true,
ValidIntervalCreatedTimestampZeroIngestion: tt.overrideValidInterval,
},
metric.Name(),
log.NewNopLogger(),
Expand Down Expand Up @@ -513,12 +564,14 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) {
now := time.Now()
nowUnixNano := pcommon.Timestamp(now.UnixNano())
nowMinus20s := pcommon.Timestamp(now.Add(-20 * time.Second).UnixNano())
nowMinus2m30s := pcommon.Timestamp(now.Add(-2 * time.Minute).Add(-30 * time.Second).UnixNano())
nowMinus6m := pcommon.Timestamp(now.Add(-6 * time.Minute).UnixNano())
nowMinus1h := pcommon.Timestamp(now.Add(-1 * time.Hour).UnixNano())
tests := []struct {
name string
metric func() pmetric.Metric
want func() map[uint64]*prompb.TimeSeries
overrideValidInterval time.Duration
metric func() pmetric.Metric
want func() map[uint64]*prompb.TimeSeries
name string
}{
{
name: "histogram with start time",
Expand Down Expand Up @@ -592,15 +645,54 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) {
},
},
{
name: "histogram with start time within two minutes to sample timestamp",
name: "histogram with start time within default valid interval to sample timestamp",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_exponential_hist")
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

pt := metric.ExponentialHistogram().DataPoints().AppendEmpty()
pt.SetTimestamp(nowUnixNano)
pt.SetStartTimestamp(nowMinus2m30s)

return metric
},
want: func() map[uint64]*prompb.TimeSeries {
labels := []prompb.Label{
{Name: model.MetricNameLabel, Value: "test_exponential_hist"},
}
return map[uint64]*prompb.TimeSeries{
timeSeriesSignature(labels): {
Labels: labels,
Histograms: []prompb.Histogram{
{
Timestamp: convertTimeStamp(nowMinus2m30s),
},
{
Timestamp: convertTimeStamp(nowUnixNano),
Count: &prompb.Histogram_CountInt{
CountInt: 0,
},
ZeroCount: &prompb.Histogram_ZeroCountInt{
ZeroCountInt: 0,
},
ZeroThreshold: defaultZeroThreshold,
},
},
},
}
},
},
{
name: "histogram with start time within overiden valid interval to sample timestamp",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_exponential_hist")
metric.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)

pt := metric.ExponentialHistogram().DataPoints().AppendEmpty()
pt.SetTimestamp(nowUnixNano)
pt.SetStartTimestamp(nowMinus20s)
pt.SetStartTimestamp(nowMinus6m)

return metric
},
Expand All @@ -613,7 +705,7 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) {
Labels: labels,
Histograms: []prompb.Histogram{
{
Timestamp: convertTimeStamp(nowMinus20s),
Timestamp: convertTimeStamp(nowMinus6m),
},
{
Timestamp: convertTimeStamp(nowUnixNano),
Expand All @@ -629,9 +721,10 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) {
},
}
},
overrideValidInterval: 10 * time.Minute,
},
{
name: "histogram with start time older than two minutes to sample timestamp",
name: "histogram with start time older than default valid interval to sample timestamp",
metric: func() pmetric.Metric {
metric := pmetric.NewMetric()
metric.SetName("test_exponential_hist")
Expand Down Expand Up @@ -677,8 +770,9 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) {
metric.ExponentialHistogram().DataPoints(),
pcommon.NewResource(),
Settings{
ExportCreatedMetric: true,
EnableCreatedTimestampZeroIngestion: true,
ExportCreatedMetric: true,
EnableCreatedTimestampZeroIngestion: true,
ValidIntervalCreatedTimestampZeroIngestion: tt.overrideValidInterval,
},
metric.Name(),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/go-kit/log"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand All @@ -34,20 +35,21 @@ import (
)

type Settings struct {
Namespace string
ExternalLabels map[string]string
DisableTargetInfo bool
ExportCreatedMetric bool
AddMetricSuffixes bool
SendMetadata bool
PromoteResourceAttributes []string
EnableCreatedTimestampZeroIngestion bool
ExternalLabels map[string]string
Namespace string
PromoteResourceAttributes []string
DisableTargetInfo bool
ExportCreatedMetric bool
AddMetricSuffixes bool
SendMetadata bool
EnableCreatedTimestampZeroIngestion bool
ValidIntervalCreatedTimestampZeroIngestion time.Duration
}

type StartTsAndTs struct {
Labels []prompb.Label
StartTs int64
Ts int64
Labels []prompb.Label
}

// PrometheusConverter converts from OTel write format to Prometheus remote write format.
Expand Down
27 changes: 15 additions & 12 deletions storage/remote/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,25 +482,27 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs *

// NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and
// writes them to the provided appendable.
func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config, enableCTZeroIngestion bool) http.Handler {
func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config, enableCTZeroIngestion bool, validIntervalCTZeroIngestion time.Duration) http.Handler {
rwHandler := &writeHandler{
logger: logger,
appendable: appendable,
}

return &otlpWriteHandler{
logger: logger,
rwHandler: rwHandler,
configFunc: configFunc,
enableCTZeroIngestion: enableCTZeroIngestion,
logger: logger,
rwHandler: rwHandler,
configFunc: configFunc,
enableCTZeroIngestion: enableCTZeroIngestion,
validIntervalCTZeroIngestion: validIntervalCTZeroIngestion,
}
}

type otlpWriteHandler struct {
logger log.Logger
rwHandler *writeHandler
configFunc func() config.Config
enableCTZeroIngestion bool
logger log.Logger
rwHandler *writeHandler
configFunc func() config.Config
enableCTZeroIngestion bool
validIntervalCTZeroIngestion time.Duration
}

func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand All @@ -515,9 +517,10 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

converter := otlptranslator.NewPrometheusConverter()
annots, err := converter.FromMetrics(r.Context(), req.Metrics(), otlptranslator.Settings{
AddMetricSuffixes: true,
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
EnableCreatedTimestampZeroIngestion: h.enableCTZeroIngestion,
AddMetricSuffixes: true,
PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes,
EnableCreatedTimestampZeroIngestion: h.enableCTZeroIngestion,
ValidIntervalCreatedTimestampZeroIngestion: h.validIntervalCTZeroIngestion,
}, h.logger)
if err != nil {
level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err)
Expand Down
2 changes: 1 addition & 1 deletion storage/remote/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func TestOTLPWriteHandler(t *testing.T) {
return config.Config{
OTLPConfig: config.DefaultOTLPConfig,
}
}, false)
}, false, 0)

recorder := httptest.NewRecorder()
handler.ServeHTTP(recorder, req)
Expand Down
3 changes: 2 additions & 1 deletion web/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func NewAPI(
acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg,
otlpEnabled bool,
enableCTZeroIngestion bool,
validIntervalCTZeroIngestion time.Duration,
) *API {
a := &API{
QueryEngine: qe,
Expand Down Expand Up @@ -296,7 +297,7 @@ func NewAPI(
a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs)
}
if otlpEnabled {
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc, enableCTZeroIngestion)
a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc, enableCTZeroIngestion, validIntervalCTZeroIngestion)
}

return a
Expand Down
Loading

0 comments on commit 35ec40c

Please sign in to comment.