diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper.go b/storage/remote/otlptranslator/prometheusremotewrite/helper.go index abec70ff7..84cd309d6 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper.go @@ -574,9 +574,10 @@ func (c *PrometheusConverter) addTimeSeriesIfNeeded(lbls []prompb.Label, startTi } } -// validIntervalForStartTimestamps is hardcoded to 2 minutes in milliseconds. We can't know in advance whats the scrape -// interval of a series. -const validIntervalForStartTimestamps = 120_000 +// defaultIntervalForStartTimestamps is hardcoded to 5 minutes in milliseconds. +// Assuming a DPM of 1 and knowing that Grafana's $__rate_interval is typically 4 times the write interval that would give +// us 4 minutes. We add an extra minute for delays. +const defaultIntervalForStartTimestamps = int64(300_000) // handleStartTime adds a zero sample at startTs only if startTs is within validIntervalForStartTimestamps of the sample timestamp. // The reason for doing this is that PRW v1 doesn't support Created Timestamps. After switching to PRW v2's direct CT support, @@ -595,8 +596,13 @@ func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb return } + threshold := defaultIntervalForStartTimestamps + if settings.ValidIntervalCreatedTimestampZeroIngestion != 0 { + threshold = settings.ValidIntervalCreatedTimestampZeroIngestion.Milliseconds() + } + // The difference between the start and the actual timestamp is more than a reasonable time, so we skip this sample. - if ts-startTs > validIntervalForStartTimestamps { + if ts-startTs > threshold { return } @@ -619,8 +625,13 @@ func (c *PrometheusConverter) handleHistogramStartTime(startTs, sampleTs int64, return } + threshold := defaultIntervalForStartTimestamps + if settings.ValidIntervalCreatedTimestampZeroIngestion != 0 { + threshold = settings.ValidIntervalCreatedTimestampZeroIngestion.Milliseconds() + } + // The difference between the start and the actual timestamp is more than a reasonable time, so we skip this sample. - if sampleTs-startTs > validIntervalForStartTimestamps { + if sampleTs-startTs > threshold { return } diff --git a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go index f17822a67..32315e15a 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/helper_test.go @@ -192,12 +192,14 @@ func Test_convertTimeStamp(t *testing.T) { func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { now := time.Now() nowUnixNano := pcommon.Timestamp(now.UnixNano()) - nowMinus20s := pcommon.Timestamp(now.Add(-20 * time.Second).UnixNano()) + nowMinus2m30s := pcommon.Timestamp(now.Add(-2 * time.Minute).Add(-30 * time.Second).UnixNano()) + nowMinus6m := pcommon.Timestamp(now.Add(-20 * time.Second).UnixNano()) nowMinus1h := pcommon.Timestamp(now.Add(-1 * time.Hour).UnixNano()) tests := []struct { - name string - metric func() pmetric.Metric - want func() map[uint64]*prompb.TimeSeries + overrideValidInterval time.Duration + metric func() pmetric.Metric + want func() map[uint64]*prompb.TimeSeries + name string }{ { name: "summary with start time equal to sample timestamp", @@ -245,7 +247,55 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { }, }, { - name: "summary with start time within two minutes to sample timestamp", + name: "summary with start time within default valid interval to sample timestamp", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_summary") + metric.SetEmptySummary() + + dp := metric.Summary().DataPoints().AppendEmpty() + dp.SetTimestamp(nowUnixNano) + dp.SetStartTimestamp(nowMinus2m30s) + + return metric + }, + want: func() map[uint64]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + countStr}, + } + createdLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + createdSuffix}, + } + sumLabels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_summary" + sumStr}, + } + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { + Labels: labels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)}, + {Value: 0, Timestamp: convertTimeStamp(nowUnixNano)}, + }, + }, + timeSeriesSignature(sumLabels): { + Labels: sumLabels, + Samples: []prompb.Sample{ + {Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)}, + {Value: 0, Timestamp: convertTimeStamp(nowUnixNano)}, + }, + }, + timeSeriesSignature(createdLabels): { + Labels: createdLabels, + Samples: []prompb.Sample{ + {Value: float64(convertTimeStamp(nowMinus2m30s)), Timestamp: convertTimeStamp(nowUnixNano)}, + }, + }, + } + }, + overrideValidInterval: 10 * time.Minute, + }, + { + name: "summary with start time within overiden valid interval to sample timestamp", metric: func() pmetric.Metric { metric := pmetric.NewMetric() metric.SetName("test_summary") @@ -253,7 +303,7 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { dp := metric.Summary().DataPoints().AppendEmpty() dp.SetTimestamp(nowUnixNano) - dp.SetStartTimestamp(nowMinus20s) + dp.SetStartTimestamp(nowMinus6m) return metric }, @@ -271,28 +321,28 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { timeSeriesSignature(labels): { Labels: labels, Samples: []prompb.Sample{ - {Value: 0, Timestamp: convertTimeStamp(nowMinus20s)}, + {Value: 0, Timestamp: convertTimeStamp(nowMinus6m)}, {Value: 0, Timestamp: convertTimeStamp(nowUnixNano)}, }, }, timeSeriesSignature(sumLabels): { Labels: sumLabels, Samples: []prompb.Sample{ - {Value: 0, Timestamp: convertTimeStamp(nowMinus20s)}, + {Value: 0, Timestamp: convertTimeStamp(nowMinus6m)}, {Value: 0, Timestamp: convertTimeStamp(nowUnixNano)}, }, }, timeSeriesSignature(createdLabels): { Labels: createdLabels, Samples: []prompb.Sample{ - {Value: float64(convertTimeStamp(nowMinus20s)), Timestamp: convertTimeStamp(nowUnixNano)}, + {Value: float64(convertTimeStamp(nowMinus6m)), Timestamp: convertTimeStamp(nowUnixNano)}, }, }, } }, }, { - name: "summary with start time older than two minutes to sample timestamp", + name: "summary with start time older than default valid interval to sample timestamp", metric: func() pmetric.Metric { metric := pmetric.NewMetric() metric.SetName("test_summary") @@ -382,8 +432,9 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) { metric.Summary().DataPoints(), pcommon.NewResource(), Settings{ - ExportCreatedMetric: true, - EnableCreatedTimestampZeroIngestion: true, + ExportCreatedMetric: true, + EnableCreatedTimestampZeroIngestion: true, + ValidIntervalCreatedTimestampZeroIngestion: tt.overrideValidInterval, }, metric.Name(), log.NewNopLogger(), @@ -513,12 +564,14 @@ func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) { func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) { now := time.Now() nowUnixNano := pcommon.Timestamp(now.UnixNano()) - nowMinus20s := pcommon.Timestamp(now.Add(-20 * time.Second).UnixNano()) + nowMinus2m30s := pcommon.Timestamp(now.Add(-2 * time.Minute).Add(-30 * time.Second).UnixNano()) + nowMinus6m := pcommon.Timestamp(now.Add(-6 * time.Minute).UnixNano()) nowMinus1h := pcommon.Timestamp(now.Add(-1 * time.Hour).UnixNano()) tests := []struct { - name string - metric func() pmetric.Metric - want func() map[uint64]*prompb.TimeSeries + overrideValidInterval time.Duration + metric func() pmetric.Metric + want func() map[uint64]*prompb.TimeSeries + name string }{ { name: "histogram with start time", @@ -592,7 +645,46 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) { }, }, { - name: "histogram with start time within two minutes to sample timestamp", + name: "histogram with start time within default valid interval to sample timestamp", + metric: func() pmetric.Metric { + metric := pmetric.NewMetric() + metric.SetName("test_exponential_hist") + metric.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + + pt := metric.ExponentialHistogram().DataPoints().AppendEmpty() + pt.SetTimestamp(nowUnixNano) + pt.SetStartTimestamp(nowMinus2m30s) + + return metric + }, + want: func() map[uint64]*prompb.TimeSeries { + labels := []prompb.Label{ + {Name: model.MetricNameLabel, Value: "test_exponential_hist"}, + } + return map[uint64]*prompb.TimeSeries{ + timeSeriesSignature(labels): { + Labels: labels, + Histograms: []prompb.Histogram{ + { + Timestamp: convertTimeStamp(nowMinus2m30s), + }, + { + Timestamp: convertTimeStamp(nowUnixNano), + Count: &prompb.Histogram_CountInt{ + CountInt: 0, + }, + ZeroCount: &prompb.Histogram_ZeroCountInt{ + ZeroCountInt: 0, + }, + ZeroThreshold: defaultZeroThreshold, + }, + }, + }, + } + }, + }, + { + name: "histogram with start time within overiden valid interval to sample timestamp", metric: func() pmetric.Metric { metric := pmetric.NewMetric() metric.SetName("test_exponential_hist") @@ -600,7 +692,7 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) { pt := metric.ExponentialHistogram().DataPoints().AppendEmpty() pt.SetTimestamp(nowUnixNano) - pt.SetStartTimestamp(nowMinus20s) + pt.SetStartTimestamp(nowMinus6m) return metric }, @@ -613,7 +705,7 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) { Labels: labels, Histograms: []prompb.Histogram{ { - Timestamp: convertTimeStamp(nowMinus20s), + Timestamp: convertTimeStamp(nowMinus6m), }, { Timestamp: convertTimeStamp(nowUnixNano), @@ -629,9 +721,10 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) { }, } }, + overrideValidInterval: 10 * time.Minute, }, { - name: "histogram with start time older than two minutes to sample timestamp", + name: "histogram with start time older than default valid interval to sample timestamp", metric: func() pmetric.Metric { metric := pmetric.NewMetric() metric.SetName("test_exponential_hist") @@ -677,8 +770,9 @@ func TestPrometheusConverter_AddExponentialHistogramDataPoints(t *testing.T) { metric.ExponentialHistogram().DataPoints(), pcommon.NewResource(), Settings{ - ExportCreatedMetric: true, - EnableCreatedTimestampZeroIngestion: true, + ExportCreatedMetric: true, + EnableCreatedTimestampZeroIngestion: true, + ValidIntervalCreatedTimestampZeroIngestion: tt.overrideValidInterval, }, metric.Name(), ) diff --git a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go index 0cb9e6a3f..a74caabc5 100644 --- a/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go +++ b/storage/remote/otlptranslator/prometheusremotewrite/metrics_to_prw.go @@ -22,6 +22,7 @@ import ( "fmt" "sort" "strings" + "time" "github.com/go-kit/log" "go.opentelemetry.io/collector/pdata/pcommon" @@ -34,20 +35,21 @@ import ( ) type Settings struct { - Namespace string - ExternalLabels map[string]string - DisableTargetInfo bool - ExportCreatedMetric bool - AddMetricSuffixes bool - SendMetadata bool - PromoteResourceAttributes []string - EnableCreatedTimestampZeroIngestion bool + ExternalLabels map[string]string + Namespace string + PromoteResourceAttributes []string + DisableTargetInfo bool + ExportCreatedMetric bool + AddMetricSuffixes bool + SendMetadata bool + EnableCreatedTimestampZeroIngestion bool + ValidIntervalCreatedTimestampZeroIngestion time.Duration } type StartTsAndTs struct { + Labels []prompb.Label StartTs int64 Ts int64 - Labels []prompb.Label } // PrometheusConverter converts from OTel write format to Prometheus remote write format. diff --git a/storage/remote/write_handler.go b/storage/remote/write_handler.go index 56dff04f8..43c72a357 100644 --- a/storage/remote/write_handler.go +++ b/storage/remote/write_handler.go @@ -482,25 +482,27 @@ func (h *writeHandler) appendV2(app storage.Appender, req *writev2.Request, rs * // NewOTLPWriteHandler creates a http.Handler that accepts OTLP write requests and // writes them to the provided appendable. -func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config, enableCTZeroIngestion bool) http.Handler { +func NewOTLPWriteHandler(logger log.Logger, appendable storage.Appendable, configFunc func() config.Config, enableCTZeroIngestion bool, validIntervalCTZeroIngestion time.Duration) http.Handler { rwHandler := &writeHandler{ logger: logger, appendable: appendable, } return &otlpWriteHandler{ - logger: logger, - rwHandler: rwHandler, - configFunc: configFunc, - enableCTZeroIngestion: enableCTZeroIngestion, + logger: logger, + rwHandler: rwHandler, + configFunc: configFunc, + enableCTZeroIngestion: enableCTZeroIngestion, + validIntervalCTZeroIngestion: validIntervalCTZeroIngestion, } } type otlpWriteHandler struct { - logger log.Logger - rwHandler *writeHandler - configFunc func() config.Config - enableCTZeroIngestion bool + logger log.Logger + rwHandler *writeHandler + configFunc func() config.Config + enableCTZeroIngestion bool + validIntervalCTZeroIngestion time.Duration } func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -515,9 +517,10 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { converter := otlptranslator.NewPrometheusConverter() annots, err := converter.FromMetrics(r.Context(), req.Metrics(), otlptranslator.Settings{ - AddMetricSuffixes: true, - PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, - EnableCreatedTimestampZeroIngestion: h.enableCTZeroIngestion, + AddMetricSuffixes: true, + PromoteResourceAttributes: otlpCfg.PromoteResourceAttributes, + EnableCreatedTimestampZeroIngestion: h.enableCTZeroIngestion, + ValidIntervalCreatedTimestampZeroIngestion: h.validIntervalCTZeroIngestion, }, h.logger) if err != nil { level.Warn(h.logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) diff --git a/storage/remote/write_test.go b/storage/remote/write_test.go index d162de130..17b35ec9e 100644 --- a/storage/remote/write_test.go +++ b/storage/remote/write_test.go @@ -384,7 +384,7 @@ func TestOTLPWriteHandler(t *testing.T) { return config.Config{ OTLPConfig: config.DefaultOTLPConfig, } - }, false) + }, false, 0) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) diff --git a/web/api/v1/api.go b/web/api/v1/api.go index 48283a6c5..ff700d835 100644 --- a/web/api/v1/api.go +++ b/web/api/v1/api.go @@ -252,6 +252,7 @@ func NewAPI( acceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg, otlpEnabled bool, enableCTZeroIngestion bool, + validIntervalCTZeroIngestion time.Duration, ) *API { a := &API{ QueryEngine: qe, @@ -296,7 +297,7 @@ func NewAPI( a.remoteWriteHandler = remote.NewWriteHandler(logger, registerer, ap, acceptRemoteWriteProtoMsgs) } if otlpEnabled { - a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc, enableCTZeroIngestion) + a.otlpWriteHandler = remote.NewOTLPWriteHandler(logger, ap, configFunc, enableCTZeroIngestion, validIntervalCTZeroIngestion) } return a diff --git a/web/api/v1/errors_test.go b/web/api/v1/errors_test.go index 40173d159..2076c7238 100644 --- a/web/api/v1/errors_test.go +++ b/web/api/v1/errors_test.go @@ -141,6 +141,7 @@ func createPrometheusAPI(t *testing.T, q storage.SampleAndChunkQueryable) *route config.RemoteWriteProtoMsgs{config.RemoteWriteProtoMsgV1, config.RemoteWriteProtoMsgV2}, false, false, + 0, ) promRouter := route.New().WithPrefix("/api/v1") diff --git a/web/web.go b/web/web.go index 2c5f9a230..1bbf0b8f5 100644 --- a/web/web.go +++ b/web/web.go @@ -244,32 +244,35 @@ type Options struct { Version *PrometheusVersion Flags map[string]string - ListenAddresses []string - CORSOrigin *regexp.Regexp - ReadTimeout time.Duration - MaxConnections int - ExternalURL *url.URL - RoutePrefix string - UseLocalAssets bool - UserAssetsPath string - ConsoleTemplatesPath string - ConsoleLibrariesPath string - EnableLifecycle bool - EnableAdminAPI bool - PageTitle string - RemoteReadSampleLimit int - RemoteReadConcurrencyLimit int - RemoteReadBytesInFrame int - EnableRemoteWriteReceiver bool - EnableOTLPWriteReceiver bool - EnableCreatedTimestampZeroIngestion bool - IsAgent bool - AppName string + ListenAddresses []string + CORSOrigin *regexp.Regexp + ReadTimeout time.Duration + MaxConnections int + ExternalURL *url.URL + RoutePrefix string + UseLocalAssets bool + UserAssetsPath string + ConsoleTemplatesPath string + ConsoleLibrariesPath string + EnableLifecycle bool + EnableAdminAPI bool + PageTitle string + RemoteReadSampleLimit int + RemoteReadConcurrencyLimit int + RemoteReadBytesInFrame int + EnableRemoteWriteReceiver bool + EnableOTLPWriteReceiver bool + IsAgent bool + AppName string AcceptRemoteWriteProtoMsgs []config.RemoteWriteProtoMsg Gatherer prometheus.Gatherer Registerer prometheus.Registerer + + // Our Grafana Cloud additions. Leaving them separately for the updates with upstream. + EnableCreatedTimestampZeroIngestion bool + ValidIntervalCreatedTimestampZeroIngestion time.Duration } // New initializes a new web Handler. @@ -359,6 +362,7 @@ func New(logger log.Logger, o *Options) *Handler { o.AcceptRemoteWriteProtoMsgs, o.EnableOTLPWriteReceiver, o.EnableCreatedTimestampZeroIngestion, + o.ValidIntervalCreatedTimestampZeroIngestion, ) if o.RoutePrefix != "/" {