From e60e114ef787ddd972d89e828bc3472a6eb6d731 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Fri, 29 Jul 2022 12:10:28 -0500 Subject: [PATCH 01/11] filter out stale spans from metrics generator --- CHANGELOG.md | 4 +++ integration/e2e/metrics_generator_test.go | 22 ++++++++++++-- modules/generator/config.go | 11 ++++--- modules/generator/instance.go | 35 +++++++++++++++++++++++ 4 files changed, 66 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a69a576c92..bc8107f4c16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,8 +82,12 @@ Jsonnet users will now need to specify a storage request and limit for the gener * [BUGFIX] Prevent ingester panic "cannot grow buffer" [#1258](https://github.com/grafana/tempo/issues/1258) (@mdisibio) * [BUGFIX] metrics-generator: do not remove x-scope-orgid header in single tenant modus [#1554](https://github.com/grafana/tempo/pull/1554) (@kvrhdn) * [BUGFIX] Fixed issue where backend does not support `root.name` and `root.service.name` [#1589](https://github.com/grafana/tempo/pull/1589) (@kvrhdn) +<<<<<<< HEAD * [BUGFIX] Fixed ingester to continue starting up after block replay error [#1603](https://github.com/grafana/tempo/issues/1603) (@mdisibio) * [BUGFIX] Fix issue relating to usage stats and GCS returning empty strings as tenantID [#1625](https://github.com/grafana/tempo/pull/1625) (@ie-pham) +======= +* [ENHANCEMENT] metrics-generator: filter out older spans before metrics are aggregated [#1612](https://github.com/grafana/tempo/pull/1612) (@ie-pham) +>>>>>>> c66c53e4 (filter out stale spans from metrics generator) ## v1.4.1 / 2022-05-05 diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index c9a0d150635..8c355b6e2f7 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -103,6 +103,24 @@ func TestMetricsGenerator(t *testing.T) { }) require.NoError(t, err) + //also send one with old timestamp + err = c.EmitBatch(context.Background(), &thrift.Batch{ + Process: &thrift.Process{ServiceName: "app"}, + Spans: []*thrift.Span{ + { + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: r.Int63(), + ParentSpanId: parentSpanID, + OperationName: "app-handle", + StartTime: time.Now().UnixMicro() - (3000 * 1000000), + Duration: int64(1 * time.Second / time.Microsecond), + Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}}, + }, + }, + }) + require.NoError(t, err) + // Fetch metrics from Prometheus once they are received var metricFamilies map[string]*io_prometheus_client.MetricFamily for { @@ -164,8 +182,8 @@ func TestMetricsGenerator(t *testing.T) { assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_latency_sum", lbls)) // Verify metrics - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_received_total")) - + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(3), "tempo_metrics_generator_spans_received_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1), "tempo_metrics_generator_spans_discarded_total")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1000), "tempo_metrics_generator_registry_max_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_series_added_total")) diff --git a/modules/generator/config.go b/modules/generator/config.go index e96637b5f6b..bc63d8af58f 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -19,10 +19,11 @@ const ( // Config for a generator. type Config struct { - Ring RingConfig `yaml:"ring"` - Processor ProcessorConfig `yaml:"processor"` - Registry registry.Config `yaml:"registry"` - Storage storage.Config `yaml:"storage"` + Ring RingConfig `yaml:"ring"` + Processor ProcessorConfig `yaml:"processor"` + Registry registry.Config `yaml:"registry"` + Storage storage.Config `yaml:"storage"` + MaxSpanAge int64 `yaml:"max_span_age_sec"` } // RegisterFlagsAndApplyDefaults registers the flags. @@ -31,6 +32,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Processor.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f) + // setting default for max span age before discarding to 30 sec + cfg.MaxSpanAge = 30 } type ProcessorConfig struct { diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 431b269a3c0..708680d0f39 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/tempo/modules/generator/registry" "github.com/grafana/tempo/modules/generator/storage" "github.com/grafana/tempo/pkg/tempopb" + v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" ) var ( @@ -44,6 +45,11 @@ var ( Name: "metrics_generator_bytes_received_total", Help: "The total number of proto bytes received per tenant", }, []string{"tenant"}) + metricSpansDiscarded = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempo", + Name: "metrics_generator_spans_discarded_total", + Help: "The total number of discarded spans received per tenant", + }, []string{"tenant", "reason"}) ) type instance struct { @@ -248,8 +254,14 @@ func (i *instance) updateProcessorMetrics() { } func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest) { + fmt.Println("Before filter") + count(req) + i.updatePushMetrics(req) + fmt.Println("After filter") + count(req) + i.processorsMtx.RLock() defer i.processorsMtx.RUnlock() @@ -261,14 +273,27 @@ func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest) func (i *instance) updatePushMetrics(req *tempopb.PushSpansRequest) { size := 0 spanCount := 0 + expiredSpans := 0 for _, b := range req.Batches { size += b.Size() for _, ils := range b.InstrumentationLibrarySpans { spanCount += len(ils.Spans) + // filter spans that have end time > max_age + var newSpansArr []*v1.Span + time_now := time.Now().UnixNano() + for _, span := range ils.Spans { + if span.EndTimeUnixNano >= uint64(time_now-i.cfg.MaxSpanAge*1000000000) { + newSpansArr = append(newSpansArr, span) + } else { + expiredSpans++ + } + } + ils.Spans = newSpansArr } } metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(size)) metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount)) + metricSpansDiscarded.WithLabelValues(i.instanceID, "max_age_reached").Add(float64(expiredSpans)) } // shutdown stops the instance and flushes any remaining data. After shutdown @@ -290,3 +315,13 @@ func (i *instance) shutdown() { level.Error(i.logger).Log("msg", "closing wal failed", "tenant", i.instanceID, "err", err) } } + +func count(req *tempopb.PushSpansRequest) { + spanCount := 0 + for _, b := range req.Batches { + for _, ils := range b.InstrumentationLibrarySpans { + spanCount += len(ils.Spans) + } + } + fmt.Println("Span count:", spanCount) +} From 583608aec1b41202f5552f28016e86f96ea0acd9 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Mon, 1 Aug 2022 14:05:39 -0500 Subject: [PATCH 02/11] removed debug steps/logs --- modules/generator/instance.go | 23 +++-------------------- 1 file changed, 3 insertions(+), 20 deletions(-) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 708680d0f39..2f695638599 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -254,14 +254,7 @@ func (i *instance) updateProcessorMetrics() { } func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest) { - fmt.Println("Before filter") - count(req) - i.updatePushMetrics(req) - - fmt.Println("After filter") - count(req) - i.processorsMtx.RLock() defer i.processorsMtx.RUnlock() @@ -280,9 +273,9 @@ func (i *instance) updatePushMetrics(req *tempopb.PushSpansRequest) { spanCount += len(ils.Spans) // filter spans that have end time > max_age var newSpansArr []*v1.Span - time_now := time.Now().UnixNano() + timeNow := time.Now().UnixNano() for _, span := range ils.Spans { - if span.EndTimeUnixNano >= uint64(time_now-i.cfg.MaxSpanAge*1000000000) { + if span.EndTimeUnixNano >= uint64(timeNow-i.cfg.MaxSpanAge*1000000000) { newSpansArr = append(newSpansArr, span) } else { expiredSpans++ @@ -314,14 +307,4 @@ func (i *instance) shutdown() { if err != nil { level.Error(i.logger).Log("msg", "closing wal failed", "tenant", i.instanceID, "err", err) } -} - -func count(req *tempopb.PushSpansRequest) { - spanCount := 0 - for _, b := range req.Batches { - for _, ils := range b.InstrumentationLibrarySpans { - spanCount += len(ils.Spans) - } - } - fmt.Println("Span count:", spanCount) -} +} \ No newline at end of file From 3a3de904dcd0b5004a00c3d1925806d6d5be4143 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Mon, 1 Aug 2022 14:07:42 -0500 Subject: [PATCH 03/11] gofmt --- modules/generator/instance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 2f695638599..85ed49a5dc5 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -307,4 +307,4 @@ func (i *instance) shutdown() { if err != nil { level.Error(i.logger).Log("msg", "closing wal failed", "tenant", i.instanceID, "err", err) } -} \ No newline at end of file +} From 22e829356f90056497f7524149d6b6352c983b44 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Wed, 10 Aug 2022 17:35:00 -0500 Subject: [PATCH 04/11] addressed some review comments --- docs/tempo/website/configuration/_index.md | 6 ++++ integration/e2e/metrics_generator_test.go | 24 +++++++++++++-- modules/generator/config.go | 18 ++++++----- modules/generator/instance.go | 35 ++++++++++++++-------- 4 files changed, 59 insertions(+), 24 deletions(-) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 198f345efda..82ffbb9054a 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -275,6 +275,12 @@ metrics_generator: # https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write remote_write: [- ] + + metrics: + # This option only allows spans with start time that occur within the configured duration to be + # considered in metrics generation + # This is to filter out spans that are outdated + [ingestion_time_range_slack: | default = 2m] ``` ## Query-frontend diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index 8c355b6e2f7..937de160464 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -113,7 +113,25 @@ func TestMetricsGenerator(t *testing.T) { SpanId: r.Int63(), ParentSpanId: parentSpanID, OperationName: "app-handle", - StartTime: time.Now().UnixMicro() - (3000 * 1000000), + StartTime: time.Now().Add(-50 * time.Minute).UnixMicro(), + Duration: int64(1 * time.Second / time.Microsecond), + Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}}, + }, + }, + }) + require.NoError(t, err) + + //also send one with timestamp 10 days in the future + err = c.EmitBatch(context.Background(), &thrift.Batch{ + Process: &thrift.Process{ServiceName: "app"}, + Spans: []*thrift.Span{ + { + TraceIdLow: traceIDLow, + TraceIdHigh: traceIDHigh, + SpanId: r.Int63(), + ParentSpanId: parentSpanID, + OperationName: "app-handle", + StartTime: time.Now().Add(10 * 24 * time.Hour).UnixMicro(), Duration: int64(1 * time.Second / time.Microsecond), Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}}, }, @@ -182,8 +200,8 @@ func TestMetricsGenerator(t *testing.T) { assert.Equal(t, 1.0, sumValues(metricFamilies, "traces_spanmetrics_latency_sum", lbls)) // Verify metrics - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(3), "tempo_metrics_generator_spans_received_total")) - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1), "tempo_metrics_generator_spans_discarded_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(4), "tempo_metrics_generator_spans_received_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_discarded_spans_total")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1000), "tempo_metrics_generator_registry_max_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_series_added_total")) diff --git a/modules/generator/config.go b/modules/generator/config.go index bc63d8af58f..dc569a64b18 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -2,11 +2,11 @@ package generator import ( "flag" - "github.com/grafana/tempo/modules/generator/processor/servicegraphs" "github.com/grafana/tempo/modules/generator/processor/spanmetrics" "github.com/grafana/tempo/modules/generator/registry" "github.com/grafana/tempo/modules/generator/storage" + "time" ) const ( @@ -19,11 +19,13 @@ const ( // Config for a generator. type Config struct { - Ring RingConfig `yaml:"ring"` - Processor ProcessorConfig `yaml:"processor"` - Registry registry.Config `yaml:"registry"` - Storage storage.Config `yaml:"storage"` - MaxSpanAge int64 `yaml:"max_span_age_sec"` + Ring RingConfig `yaml:"ring"` + Processor ProcessorConfig `yaml:"processor"` + Registry registry.Config `yaml:"registry"` + Storage storage.Config `yaml:"storage"` + // MetricsIngestionSlack is the max amount of time passed since a span's start time + // for the span to be considered in metrics generation + MetricsIngestionSlack time.Duration `yaml:"metrics_ingestion_time_range_slack"` } // RegisterFlagsAndApplyDefaults registers the flags. @@ -32,8 +34,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Processor.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f) - // setting default for max span age before discarding to 30 sec - cfg.MaxSpanAge = 30 + // setting default for max span age before discarding to 2m + cfg.MetricsIngestionSlack = 2 * time.Minute } type ProcessorConfig struct { diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 85ed49a5dc5..70a7479698d 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -47,11 +47,14 @@ var ( }, []string{"tenant"}) metricSpansDiscarded = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", - Name: "metrics_generator_spans_discarded_total", + Name: "metrics_generator_discarded_spans_total", Help: "The total number of discarded spans received per tenant", }, []string{"tenant", "reason"}) ) +const reason_outside_time_range_slack = "outside_metrics_ingestion_slack" +const future_slack = 5 * time.Hour * 24 + type instance struct { cfg *Config @@ -254,7 +257,7 @@ func (i *instance) updateProcessorMetrics() { } func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest) { - i.updatePushMetrics(req) + i.preprocessSpans(req) i.processorsMtx.RLock() defer i.processorsMtx.RUnlock() @@ -263,30 +266,36 @@ func (i *instance) pushSpans(ctx context.Context, req *tempopb.PushSpansRequest) } } -func (i *instance) updatePushMetrics(req *tempopb.PushSpansRequest) { +func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) { size := 0 spanCount := 0 - expiredSpans := 0 + expiredSpanCount := 0 for _, b := range req.Batches { size += b.Size() for _, ils := range b.InstrumentationLibrarySpans { spanCount += len(ils.Spans) - // filter spans that have end time > max_age - var newSpansArr []*v1.Span - timeNow := time.Now().UnixNano() + // filter spans that have end time > max_age and end time more than 5 days in the future + newSpansArr := make([]*v1.Span, len(ils.Spans)) + timeNow := time.Now() + index := 0 for _, span := range ils.Spans { - if span.EndTimeUnixNano >= uint64(timeNow-i.cfg.MaxSpanAge*1000000000) { - newSpansArr = append(newSpansArr, span) + if span.EndTimeUnixNano >= uint64(timeNow.Add(-i.cfg.MetricsIngestionSlack).UnixNano()) && span.EndTimeUnixNano <= uint64(timeNow.Add(future_slack).UnixNano()) { + newSpansArr[index] = span + index++ } else { - expiredSpans++ + expiredSpanCount++ } } - ils.Spans = newSpansArr + ils.Spans = newSpansArr[0:index] } } - metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(size)) + i.updatePushMetrics(size, spanCount, expiredSpanCount) +} + +func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSpanCount int) { + metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(bytesIngested)) metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount)) - metricSpansDiscarded.WithLabelValues(i.instanceID, "max_age_reached").Add(float64(expiredSpans)) + metricSpansDiscarded.WithLabelValues(i.instanceID, reason_outside_time_range_slack).Add(float64(expiredSpanCount)) } // shutdown stops the instance and flushes any remaining data. After shutdown From f29b089b9b0a0860fbeff3844b8a0493362c300b Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Wed, 10 Aug 2022 17:37:15 -0500 Subject: [PATCH 05/11] fixed docs --- docs/tempo/website/configuration/_index.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 82ffbb9054a..a1f041b9d7b 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -276,11 +276,10 @@ metrics_generator: remote_write: [- ] - metrics: - # This option only allows spans with start time that occur within the configured duration to be - # considered in metrics generation - # This is to filter out spans that are outdated - [ingestion_time_range_slack: | default = 2m] + # This option only allows spans with start time that occur within the configured duration to be + # considered in metrics generation + # This is to filter out spans that are outdated + [ingestion_time_range_slack: | default = 2m] ``` ## Query-frontend From aaf5def98c4b0ed01f37cb345113c8e68c5d56fb Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Fri, 12 Aug 2022 10:10:27 -0500 Subject: [PATCH 06/11] more review comment addressing --- modules/generator/instance.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 70a7479698d..f35ef09de78 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -47,13 +47,12 @@ var ( }, []string{"tenant"}) metricSpansDiscarded = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempo", - Name: "metrics_generator_discarded_spans_total", + Name: "metrics_generator_spans_discarded_total", Help: "The total number of discarded spans received per tenant", }, []string{"tenant", "reason"}) ) const reason_outside_time_range_slack = "outside_metrics_ingestion_slack" -const future_slack = 5 * time.Hour * 24 type instance struct { cfg *Config @@ -279,7 +278,7 @@ func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) { timeNow := time.Now() index := 0 for _, span := range ils.Spans { - if span.EndTimeUnixNano >= uint64(timeNow.Add(-i.cfg.MetricsIngestionSlack).UnixNano()) && span.EndTimeUnixNano <= uint64(timeNow.Add(future_slack).UnixNano()) { + if span.EndTimeUnixNano >= uint64(timeNow.Add(-i.cfg.MetricsIngestionSlack).UnixNano()) && span.EndTimeUnixNano <= uint64(timeNow.Add(i.cfg.MetricsIngestionSlack).UnixNano()) { newSpansArr[index] = span index++ } else { From 86bdcb3cfc76522f808dcc82b8c612b6e43c8650 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Wed, 17 Aug 2022 11:11:16 -0500 Subject: [PATCH 07/11] testing stuff --- modules/generator/instance.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index f35ef09de78..ff2d0548bd5 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -50,6 +50,13 @@ var ( Name: "metrics_generator_spans_discarded_total", Help: "The total number of discarded spans received per tenant", }, []string{"tenant", "reason"}) + + metricIngestionLatency = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "tempo", + Name: "metrics_generator_ingestion_latency", + Help: "The ingestion latency", + Buckets: prometheus.LinearBuckets(0, 15000, 5), + }) ) const reason_outside_time_range_slack = "outside_metrics_ingestion_slack" @@ -278,6 +285,8 @@ func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) { timeNow := time.Now() index := 0 for _, span := range ils.Spans { + latency := (timeNow.UnixNano() - int64(span.EndTimeUnixNano)) / 1000000 + metricIngestionLatency.Observe(float64(latency)) if span.EndTimeUnixNano >= uint64(timeNow.Add(-i.cfg.MetricsIngestionSlack).UnixNano()) && span.EndTimeUnixNano <= uint64(timeNow.Add(i.cfg.MetricsIngestionSlack).UnixNano()) { newSpansArr[index] = span index++ From d60a85f59fd8ec940841e95f4c74440ea6813e96 Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Thu, 8 Sep 2022 14:18:40 -0500 Subject: [PATCH 08/11] rebase --- CHANGELOG.md | 5 +---- integration/e2e/metrics_generator_test.go | 6 +++--- modules/generator/config.go | 2 +- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc8107f4c16..8424ba57825 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * [ENHANCEMENT] Add PodDisruptionBudget to ingesters in jsonnet [#1691](https://github.com/grafana/tempo/pull/1691) (@joe-elliott) * [ENHANCEMENT] Add cli command an existing file to tempodb's current parquet schema. [#1706](https://github.com/grafana/tempo/pull/1707) (@joe-elliott) * [ENHANCEMENT] Add query parameter to search API for traceQL queries [#1729](https://github.com/grafana/tempo/pull/1729) (@kvrhdn) +* [ENHANCEMENT] metrics-generator: filter out older spans before metrics are aggregated [#1612](https://github.com/grafana/tempo/pull/1612) (@ie-pham) * [BUGFIX] Honor caching and buffering settings when finding traces by id [#1697](https://github.com/grafana/tempo/pull/1697) (@joe-elliott) * [BUGFIX] Correctly propagate errors from the iterator layer up through the queriers [#1723](https://github.com/grafana/tempo/pull/1723) (@joe-elliott) @@ -82,12 +83,8 @@ Jsonnet users will now need to specify a storage request and limit for the gener * [BUGFIX] Prevent ingester panic "cannot grow buffer" [#1258](https://github.com/grafana/tempo/issues/1258) (@mdisibio) * [BUGFIX] metrics-generator: do not remove x-scope-orgid header in single tenant modus [#1554](https://github.com/grafana/tempo/pull/1554) (@kvrhdn) * [BUGFIX] Fixed issue where backend does not support `root.name` and `root.service.name` [#1589](https://github.com/grafana/tempo/pull/1589) (@kvrhdn) -<<<<<<< HEAD * [BUGFIX] Fixed ingester to continue starting up after block replay error [#1603](https://github.com/grafana/tempo/issues/1603) (@mdisibio) * [BUGFIX] Fix issue relating to usage stats and GCS returning empty strings as tenantID [#1625](https://github.com/grafana/tempo/pull/1625) (@ie-pham) -======= -* [ENHANCEMENT] metrics-generator: filter out older spans before metrics are aggregated [#1612](https://github.com/grafana/tempo/pull/1612) (@ie-pham) ->>>>>>> c66c53e4 (filter out stale spans from metrics generator) ## v1.4.1 / 2022-05-05 diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index 937de160464..935bfca3c8d 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -103,7 +103,7 @@ func TestMetricsGenerator(t *testing.T) { }) require.NoError(t, err) - //also send one with old timestamp + //also send one with 5 minutes old timestamp err = c.EmitBatch(context.Background(), &thrift.Batch{ Process: &thrift.Process{ServiceName: "app"}, Spans: []*thrift.Span{ @@ -113,7 +113,7 @@ func TestMetricsGenerator(t *testing.T) { SpanId: r.Int63(), ParentSpanId: parentSpanID, OperationName: "app-handle", - StartTime: time.Now().Add(-50 * time.Minute).UnixMicro(), + StartTime: time.Now().UnixMicro(), Duration: int64(1 * time.Second / time.Microsecond), Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}}, }, @@ -201,7 +201,7 @@ func TestMetricsGenerator(t *testing.T) { // Verify metrics assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(4), "tempo_metrics_generator_spans_received_total")) - assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_discarded_spans_total")) + assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(2), "tempo_metrics_generator_spans_discarded_total")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(1000), "tempo_metrics_generator_registry_max_active_series")) assert.NoError(t, tempoMetricsGenerator.WaitSumMetrics(e2e.Equals(25), "tempo_metrics_generator_registry_series_added_total")) diff --git a/modules/generator/config.go b/modules/generator/config.go index dc569a64b18..a7070ca044f 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -35,7 +35,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f) // setting default for max span age before discarding to 2m - cfg.MetricsIngestionSlack = 2 * time.Minute + cfg.MetricsIngestionSlack = 30 * time.Minutes } type ProcessorConfig struct { From 07cb52194914ced8876490dcacb84918b73f9a0b Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Fri, 9 Sep 2022 11:30:14 -0500 Subject: [PATCH 09/11] rebased --- docs/tempo/website/configuration/_index.md | 2 +- integration/e2e/metrics_generator_test.go | 2 +- modules/generator/config.go | 4 ++-- modules/generator/instance.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index a1f041b9d7b..8530b816844 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -279,7 +279,7 @@ metrics_generator: # This option only allows spans with start time that occur within the configured duration to be # considered in metrics generation # This is to filter out spans that are outdated - [ingestion_time_range_slack: | default = 2m] + [ingestion_time_range_slack: | default = 30s] ``` ## Query-frontend diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index 935bfca3c8d..53c13e453a6 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -113,7 +113,7 @@ func TestMetricsGenerator(t *testing.T) { SpanId: r.Int63(), ParentSpanId: parentSpanID, OperationName: "app-handle", - StartTime: time.Now().UnixMicro(), + StartTime: time.Now().Add(-5 * time.Minute).UnixMicro(), Duration: int64(1 * time.Second / time.Microsecond), Tags: []*thrift.Tag{{Key: "span.kind", VStr: stringPtr("server")}}, }, diff --git a/modules/generator/config.go b/modules/generator/config.go index a7070ca044f..be0eeab4307 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -34,8 +34,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Processor.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f) cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f) - // setting default for max span age before discarding to 2m - cfg.MetricsIngestionSlack = 30 * time.Minutes + // setting default for max span age before discarding to 30s + cfg.MetricsIngestionSlack = 30 * time.Second } type ProcessorConfig struct { diff --git a/modules/generator/instance.go b/modules/generator/instance.go index ff2d0548bd5..edd64d8fb95 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -307,7 +307,7 @@ func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSp } // shutdown stops the instance and flushes any remaining data. After shutdown -// is called pushSpans should not be called anymore. +// is called pushSpans should not be called anymore. func (i *instance) shutdown() { close(i.shutdownCh) From 948b8e0c6802d90b5a62f5d024350494ed801e5e Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Fri, 9 Sep 2022 11:34:31 -0500 Subject: [PATCH 10/11] removing debug stuff --- modules/generator/instance.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/modules/generator/instance.go b/modules/generator/instance.go index edd64d8fb95..8b28615fdc6 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -50,13 +50,6 @@ var ( Name: "metrics_generator_spans_discarded_total", Help: "The total number of discarded spans received per tenant", }, []string{"tenant", "reason"}) - - metricIngestionLatency = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: "tempo", - Name: "metrics_generator_ingestion_latency", - Help: "The ingestion latency", - Buckets: prometheus.LinearBuckets(0, 15000, 5), - }) ) const reason_outside_time_range_slack = "outside_metrics_ingestion_slack" @@ -285,8 +278,6 @@ func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) { timeNow := time.Now() index := 0 for _, span := range ils.Spans { - latency := (timeNow.UnixNano() - int64(span.EndTimeUnixNano)) / 1000000 - metricIngestionLatency.Observe(float64(latency)) if span.EndTimeUnixNano >= uint64(timeNow.Add(-i.cfg.MetricsIngestionSlack).UnixNano()) && span.EndTimeUnixNano <= uint64(timeNow.Add(i.cfg.MetricsIngestionSlack).UnixNano()) { newSpansArr[index] = span index++ From e5ac4b8b9f4aff673fc999404e0d37462fc2f3cf Mon Sep 17 00:00:00 2001 From: Jennie Pham Date: Fri, 9 Sep 2022 11:40:21 -0500 Subject: [PATCH 11/11] lint --- integration/e2e/metrics_generator_test.go | 2 +- modules/generator/config.go | 3 ++- modules/generator/instance.go | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/integration/e2e/metrics_generator_test.go b/integration/e2e/metrics_generator_test.go index 53c13e453a6..caae31d37fd 100644 --- a/integration/e2e/metrics_generator_test.go +++ b/integration/e2e/metrics_generator_test.go @@ -121,7 +121,7 @@ func TestMetricsGenerator(t *testing.T) { }) require.NoError(t, err) - //also send one with timestamp 10 days in the future + //also send one with timestamp 10 days in the future err = c.EmitBatch(context.Background(), &thrift.Batch{ Process: &thrift.Process{ServiceName: "app"}, Spans: []*thrift.Span{ diff --git a/modules/generator/config.go b/modules/generator/config.go index be0eeab4307..0d275aff91e 100644 --- a/modules/generator/config.go +++ b/modules/generator/config.go @@ -2,11 +2,12 @@ package generator import ( "flag" + "time" + "github.com/grafana/tempo/modules/generator/processor/servicegraphs" "github.com/grafana/tempo/modules/generator/processor/spanmetrics" "github.com/grafana/tempo/modules/generator/registry" "github.com/grafana/tempo/modules/generator/storage" - "time" ) const ( diff --git a/modules/generator/instance.go b/modules/generator/instance.go index 8b28615fdc6..17cb57d0a39 100644 --- a/modules/generator/instance.go +++ b/modules/generator/instance.go @@ -52,7 +52,7 @@ var ( }, []string{"tenant", "reason"}) ) -const reason_outside_time_range_slack = "outside_metrics_ingestion_slack" +const reasonOutsideTimeRangeSlack = "outside_metrics_ingestion_slack" type instance struct { cfg *Config @@ -294,11 +294,11 @@ func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) { func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSpanCount int) { metricBytesIngested.WithLabelValues(i.instanceID).Add(float64(bytesIngested)) metricSpansIngested.WithLabelValues(i.instanceID).Add(float64(spanCount)) - metricSpansDiscarded.WithLabelValues(i.instanceID, reason_outside_time_range_slack).Add(float64(expiredSpanCount)) + metricSpansDiscarded.WithLabelValues(i.instanceID, reasonOutsideTimeRangeSlack).Add(float64(expiredSpanCount)) } // shutdown stops the instance and flushes any remaining data. After shutdown -// is called pushSpans should not be called anymore. +// is called pushSpans should not be called anymore. func (i *instance) shutdown() { close(i.shutdownCh)