From 62e399051754fd17ce9a446dc2ea0f201dd647be Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 30 Sep 2021 22:42:05 +0200 Subject: [PATCH 01/14] Adding test case for dropping metrics by name to understand better flow of distributor Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor_test.go | 155 +++++++++++++++++----------- 1 file changed, 95 insertions(+), 60 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 02da0678f4..c30bfe090b 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1082,7 +1082,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") type testcase struct { - inputSeries labels.Labels + inputSeries []labels.Labels expectedSeries labels.Labels removeReplica bool removeLabels []string @@ -1093,11 +1093,11 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { { removeReplica: true, removeLabels: []string{"cluster"}, - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, {Name: "__replica__", Value: "two"}, - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, }, @@ -1106,13 +1106,13 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { { removeReplica: true, removeLabels: []string{"foo", "some"}, - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, {Name: "__replica__", Value: "two"}, {Name: "foo", Value: "bar"}, {Name: "some", Value: "thing"}, - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, @@ -1121,11 +1121,11 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { // Don't remove any labels. { removeReplica: false, - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "some_metric"}, {Name: "__replica__", Value: "two"}, {Name: "cluster", Value: "one"}, - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "__replica__", Value: "two"}, @@ -1169,16 +1169,16 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { - inputSeries labels.Labels + inputSeries []labels.Labels expectedSeries labels.Labels expectedToken uint32 }{ "metric_1 with value_1": { - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, {Name: "key", Value: "value_1"}, - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, @@ -1187,12 +1187,12 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xec0a2e9d, }, "metric_1 with value_1 and dropped label due to config": { - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, {Name: "key", Value: "value_1"}, {Name: "dropped", Value: "unused"}, // will be dropped, doesn't need to be in correct order - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, @@ -1201,12 +1201,12 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xec0a2e9d, }, "metric_1 with value_1 and dropped HA replica label": { - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, {Name: "key", Value: "value_1"}, {Name: "__replica__", Value: "replica_1"}, - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, @@ -1215,10 +1215,10 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xec0a2e9d, }, "metric_2 with value_1": { - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "metric_2"}, {Name: "key", Value: "value_1"}, - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_2"}, {Name: "key", Value: "value_1"}, @@ -1226,10 +1226,10 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xa60906f2, }, "metric_1 with value_2": { - inputSeries: labels.Labels{ + inputSeries: []labels.Labels{{ {Name: "__name__", Value: "metric_1"}, {Name: "key", Value: "value_2"}, - }, + }}, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "key", Value: "value_2"}, @@ -1280,24 +1280,24 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { - inputLabels labels.Labels + inputLabels []labels.Labels skipLabelNameValidationCfg bool skipLabelNameValidationReq bool errExpected bool errMessage string }{ "label name validation is on by default": { - inputLabels: inputLabels, + inputLabels: []labels.Labels{inputLabels}, errExpected: true, errMessage: `sample invalid label: "999.illegal" metric "foo{999.illegal=\"baz\"}"`, }, "label name validation can be skipped via config": { - inputLabels: inputLabels, + inputLabels: []labels.Labels{inputLabels}, skipLabelNameValidationCfg: true, errExpected: false, }, "label name validation can be skipped via WriteRequest parameter": { - inputLabels: inputLabels, + inputLabels: []labels.Labels{inputLabels}, skipLabelNameValidationReq: true, errExpected: false, }, @@ -1790,7 +1790,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range fixtures { - req := mockWriteRequest(series.lbls, series.value, series.timestamp) + req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp) _, err := ds[0].Push(ctx, req) require.NoError(t, err) } @@ -1875,15 +1875,16 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher { return m } -func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest { - samples := []cortexpb.Sample{ - { +func mockWriteRequest(lbls []labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest { + samples := make([]cortexpb.Sample, len(lbls)) + for i := range lbls { + samples[i] = cortexpb.Sample{ TimestampMs: timestampMs, Value: value, - }, + } } - return cortexpb.ToWriteRequest([]labels.Labels{lbls}, samples, nil, cortexpb.API) + return cortexpb.ToWriteRequest(lbls, samples, nil, cortexpb.API) } type prepConfig struct { @@ -2641,20 +2642,24 @@ func TestSortLabels(t *testing.T) { } func TestDistributor_Push_Relabel(t *testing.T) { - ctx := user.InjectOrgID(context.Background(), "user") + const orgID = "user" + ctx := user.InjectOrgID(context.Background(), orgID) type testcase struct { - inputSeries labels.Labels + name string + inputSeries []labels.Labels expectedSeries labels.Labels metricRelabelConfigs []*relabel.Config } cases := []testcase{ - // No relabel config. { - inputSeries: labels.Labels{ - {Name: "__name__", Value: "foo"}, - {Name: "cluster", Value: "one"}, + name: "with no relabel config", + inputSeries: []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "foo"}, @@ -2662,9 +2667,12 @@ func TestDistributor_Push_Relabel(t *testing.T) { }, }, { - inputSeries: labels.Labels{ - {Name: "__name__", Value: "foo"}, - {Name: "cluster", Value: "one"}, + name: "with hardcoded replace", + inputSeries: []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "foo"}, @@ -2680,36 +2688,63 @@ func TestDistributor_Push_Relabel(t *testing.T) { }, }, }, + { + name: "with drop action", + inputSeries: []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + }, + expectedSeries: labels.Labels{ + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + metricRelabelConfigs: []*relabel.Config{ + { + SourceLabels: []model.LabelName{"__name__"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("(foo)"), + }, + }, + }, } for _, tc := range cases { - var err error - var limits validation.Limits - flagext.DefaultValues(&limits) - limits.MetricRelabelConfigs = tc.metricRelabelConfigs + t.Run(tc.name, func(t *testing.T) { + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.MetricRelabelConfigs = tc.metricRelabelConfigs - ds, ingesters, _ := prepare(t, prepConfig{ - numIngesters: 2, - happyIngesters: 2, - numDistributors: 1, - shardByAllLabels: true, - limits: &limits, - }) + ds, ingesters, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) - // Push the series to the distributor - req := mockWriteRequest(tc.inputSeries, 1, 1) - _, err = ds[0].Push(ctx, req) - require.NoError(t, err) - // Since each test pushes only 1 series, we do expect the ingester - // to have received exactly 1 series - for i := range ingesters { - timeseries := ingesters[i].series() - assert.Equal(t, 1, len(timeseries)) - for _, v := range timeseries { - assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + // Push the series to the distributor + req := mockWriteRequest(tc.inputSeries, 1, 1) + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) + for _, v := range timeseries { + assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + } } - } + }) } } From f70552a3208150eaafa64015fdd3250ea65fa189 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 30 Sep 2021 23:16:48 +0200 Subject: [PATCH 02/14] Adding test case and new metric for dropped samples Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor.go | 10 ++++- pkg/distributor/distributor_test.go | 59 +++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index edc959ed99..aacd4a1c2f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -94,6 +94,7 @@ type Distributor struct { // Metrics queryDuration *instrument.HistogramCollector receivedSamples *prometheus.CounterVec + discardedSamples *prometheus.CounterVec receivedExemplars *prometheus.CounterVec receivedMetadata *prometheus.CounterVec incomingSamples *prometheus.CounterVec @@ -250,6 +251,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user"}), + discardedSamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "distributor_discarded_samples_total", + Help: "The total number of samples which were discarded due to relabel configuration.", + }, []string{"user"}), receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_received_exemplars_total", @@ -652,6 +658,8 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } if len(ts.Labels) == 0 { + // the __name__ label is not present, metric will be discarded + d.discardedSamples.WithLabelValues(userID).Add(float64(len(ts.Samples))) continue } @@ -707,7 +715,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) - d.receivedExemplars.WithLabelValues(userID).Add((float64(validatedExemplars))) + d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) if len(seriesKeys) == 0 && len(metadataKeys) == 0 { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c30bfe090b..57d180f71e 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2748,6 +2748,65 @@ func TestDistributor_Push_Relabel(t *testing.T) { } } +func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) { + metricRelabelConfigs := []*relabel.Config{ + { + SourceLabels: []model.LabelName{"__name__"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("(foo)"), + }, + } + + inputSeries := []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + } + + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.MetricRelabelConfigs = metricRelabelConfigs + + ds, ingesters, r, regs := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + reg := regs[0] + defer stopAll(ds, r) + + // Push the series to the distributor + req := mockWriteRequest(inputSeries, 1, 1) + ctx := user.InjectOrgID(context.Background(), "user") + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) + } + + metrics := []string{"cortex_distributor_received_samples_total", "cortex_distributor_discarded_samples_total"} + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_total counter + cortex_distributor_received_samples_total{user="user"} 1 + # HELP cortex_distributor_discarded_samples_total The total number of samples which were discarded due to relabel configuration. + # TYPE cortex_distributor_discarded_samples_total counter + cortex_distributor_discarded_samples_total{user="user"} 1 + `), metrics...)) +} + func countMockIngestersCalls(ingesters []mockIngester, name string) int { count := 0 for i := 0; i < len(ingesters); i++ { From 550d5b64547870455fb882b9d9f0ef8200da4eb6 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 30 Sep 2021 23:24:34 +0200 Subject: [PATCH 03/14] Updating CHANGELOG with new changes Signed-off-by: Pedro Tanaka --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eaa4b21d8d..b43019e99f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased +* [ENHANCEMENT] Add metric `distributor_discarded_samples_total` exposing the total of dropped samples due to relabeling configuration. #4503 * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 * [CHANGE] query-frontend: Do not print anything in the logs of `query-frontend` if a in-progress query has been canceled (context canceled). #4562 * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 From a6c5884a592a607815531022c5c4a667702cc52b Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 30 Sep 2021 23:28:33 +0200 Subject: [PATCH 04/14] Fixing linting problem on distributor file Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index aacd4a1c2f..342b71f925 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -94,7 +94,7 @@ type Distributor struct { // Metrics queryDuration *instrument.HistogramCollector receivedSamples *prometheus.CounterVec - discardedSamples *prometheus.CounterVec + discardedSamples *prometheus.CounterVec receivedExemplars *prometheus.CounterVec receivedMetadata *prometheus.CounterVec incomingSamples *prometheus.CounterVec From 65c70d1dcc601466aa4e49f176728b9b955ddc5b Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Fri, 1 Oct 2021 09:00:30 +0200 Subject: [PATCH 05/14] Reusing discarded samples metric from validate package Signed-off-by: Pedro Tanaka --- CHANGELOG.md | 2 +- pkg/distributor/distributor.go | 15 +++++++-------- pkg/distributor/distributor_test.go | 9 +++++---- pkg/util/validation/validate.go | 23 +++++++++++++---------- 4 files changed, 26 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b43019e99f..6f57553897 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,7 @@ ## master / unreleased -* [ENHANCEMENT] Add metric `distributor_discarded_samples_total` exposing the total of dropped samples due to relabeling configuration. #4503 +* [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 * [CHANGE] query-frontend: Do not print anything in the logs of `query-frontend` if a in-progress query has been canceled (context canceled). #4562 * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 342b71f925..2e72c3bb58 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -94,7 +94,6 @@ type Distributor struct { // Metrics queryDuration *instrument.HistogramCollector receivedSamples *prometheus.CounterVec - discardedSamples *prometheus.CounterVec receivedExemplars *prometheus.CounterVec receivedMetadata *prometheus.CounterVec incomingSamples *prometheus.CounterVec @@ -251,11 +250,6 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove Name: "distributor_received_samples_total", Help: "The total number of received samples, excluding rejected and deduped samples.", }, []string{"user"}), - discardedSamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Namespace: "cortex", - Name: "distributor_discarded_samples_total", - Help: "The total number of samples which were discarded due to relabel configuration.", - }, []string{"user"}), receivedExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Namespace: "cortex", Name: "distributor_received_exemplars_total", @@ -643,6 +637,13 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 { l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...) + if l == nil { + // all labels are gone, therefore the __name__ label is not present, metric will be discarded + validation.DiscardedSamples.WithLabelValues( + validation.DroppedByRelabelConfiguration, + userID, + ).Add(float64(len(ts.Samples))) + } ts.Labels = cortexpb.FromLabelsToLabelAdapters(l) } @@ -658,8 +659,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } if len(ts.Labels) == 0 { - // the __name__ label is not present, metric will be discarded - d.discardedSamples.WithLabelValues(userID).Add(float64(len(ts.Samples))) continue } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 57d180f71e..ceee08e864 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2781,6 +2781,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing limits: &limits, }) reg := regs[0] + reg.MustRegister(validation.DiscardedSamples) defer stopAll(ds, r) // Push the series to the distributor @@ -2796,14 +2797,14 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing assert.Equal(t, 1, len(timeseries)) } - metrics := []string{"cortex_distributor_received_samples_total", "cortex_distributor_discarded_samples_total"} + metrics := []string{"cortex_distributor_received_samples_total", "cortex_discarded_samples_total"} require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="relabel_configuration",user="user"} 1 # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{user="user"} 1 - # HELP cortex_distributor_discarded_samples_total The total number of samples which were discarded due to relabel configuration. - # TYPE cortex_distributor_discarded_samples_total counter - cortex_distributor_discarded_samples_total{user="user"} 1 `), metrics...)) } diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 094c879970..a062bfc6ec 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -34,16 +34,16 @@ const ( // ErrQueryTooLong is used in chunk store, querier and query frontend. ErrQueryTooLong = "the query time range exceeds the limit (query length: %s, limit: %s)" - missingMetricName = "missing_metric_name" - invalidMetricName = "metric_name_invalid" - greaterThanMaxSampleAge = "greater_than_max_sample_age" - maxLabelNamesPerSeries = "max_label_names_per_series" - tooFarInFuture = "too_far_in_future" - invalidLabel = "label_invalid" - labelNameTooLong = "label_name_too_long" - duplicateLabelNames = "duplicate_label_names" - labelsNotSorted = "labels_not_sorted" - labelValueTooLong = "label_value_too_long" + missingMetricName = "missing_metric_name" + invalidMetricName = "metric_name_invalid" + greaterThanMaxSampleAge = "greater_than_max_sample_age" + maxLabelNamesPerSeries = "max_label_names_per_series" + tooFarInFuture = "too_far_in_future" + invalidLabel = "label_invalid" + labelNameTooLong = "label_name_too_long" + duplicateLabelNames = "duplicate_label_names" + labelsNotSorted = "labels_not_sorted" + labelValueTooLong = "label_value_too_long" // Exemplar-specific validation reasons exemplarLabelsMissing = "exemplar_labels_missing" @@ -57,6 +57,9 @@ const ( // Too many HA clusters is one of the reasons for discarding samples. TooManyHAClusters = "too_many_ha_clusters" + // DroppedByRelabelConfiguration Samples can also be discarded because of relabeling configuration + DroppedByRelabelConfiguration = "relabel_configuration" + // The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars ExemplarMaxLabelSetLength = 128 From 10efbbbabe2828533a3d9f5b3d2572ddac51713c Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Fri, 1 Oct 2021 14:11:37 +0200 Subject: [PATCH 06/14] Compare labelset with len() instead of comparing to nil Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2e72c3bb58..b18fef4a38 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -637,12 +637,13 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 { l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...) - if l == nil { + if len(l) == 0 { // all labels are gone, therefore the __name__ label is not present, metric will be discarded validation.DiscardedSamples.WithLabelValues( validation.DroppedByRelabelConfiguration, userID, ).Add(float64(len(ts.Samples))) + continue } ts.Labels = cortexpb.FromLabelsToLabelAdapters(l) } @@ -658,10 +659,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co removeLabel(labelName, &ts.Labels) } - if len(ts.Labels) == 0 { - continue - } - // We rely on sorted labels in different places: // 1) When computing token for labels, and sharding by all labels. Here different order of labels returns // different tokens, which is bad. From 3d430b1cd018400498b68ac79a7d365e3e5c0b75 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Fri, 1 Oct 2021 14:20:37 +0200 Subject: [PATCH 07/14] Undoing unnecessary changes on tests and distributor Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor.go | 2 +- pkg/distributor/distributor_test.go | 53 ++++++++++++++--------------- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b18fef4a38..ba9048a306 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -711,7 +711,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } d.receivedSamples.WithLabelValues(userID).Add(float64(validatedSamples)) - d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) + d.receivedExemplars.WithLabelValues(userID).Add((float64(validatedExemplars))) d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) if len(seriesKeys) == 0 && len(metadataKeys) == 0 { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ceee08e864..edb9b1fed1 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1082,7 +1082,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") type testcase struct { - inputSeries []labels.Labels + inputSeries labels.Labels expectedSeries labels.Labels removeReplica bool removeLabels []string @@ -1093,11 +1093,11 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { { removeReplica: true, removeLabels: []string{"cluster"}, - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, {Name: "__replica__", Value: "two"}, - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, }, @@ -1106,13 +1106,13 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { { removeReplica: true, removeLabels: []string{"foo", "some"}, - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, {Name: "__replica__", Value: "two"}, {Name: "foo", Value: "bar"}, {Name: "some", Value: "thing"}, - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "cluster", Value: "one"}, @@ -1121,11 +1121,11 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { // Don't remove any labels. { removeReplica: false, - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "__replica__", Value: "two"}, {Name: "cluster", Value: "one"}, - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "some_metric"}, {Name: "__replica__", Value: "two"}, @@ -1150,7 +1150,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { }) // Push the series to the distributor - req := mockWriteRequest(tc.inputSeries, 1, 1) + req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1) _, err = ds[0].Push(ctx, req) require.NoError(t, err) @@ -1169,16 +1169,16 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { - inputSeries []labels.Labels + inputSeries labels.Labels expectedSeries labels.Labels expectedToken uint32 }{ "metric_1 with value_1": { - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, {Name: "key", Value: "value_1"}, - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, @@ -1187,12 +1187,12 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xec0a2e9d, }, "metric_1 with value_1 and dropped label due to config": { - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, {Name: "key", Value: "value_1"}, {Name: "dropped", Value: "unused"}, // will be dropped, doesn't need to be in correct order - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, @@ -1201,12 +1201,12 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xec0a2e9d, }, "metric_1 with value_1 and dropped HA replica label": { - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, {Name: "key", Value: "value_1"}, {Name: "__replica__", Value: "replica_1"}, - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "cluster", Value: "cluster_1"}, @@ -1215,10 +1215,10 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xec0a2e9d, }, "metric_2 with value_1": { - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "metric_2"}, {Name: "key", Value: "value_1"}, - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_2"}, {Name: "key", Value: "value_1"}, @@ -1226,10 +1226,10 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * expectedToken: 0xa60906f2, }, "metric_1 with value_2": { - inputSeries: []labels.Labels{{ + inputSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "key", Value: "value_2"}, - }}, + }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "metric_1"}, {Name: "key", Value: "value_2"}, @@ -1254,7 +1254,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * }) // Push the series to the distributor - req := mockWriteRequest(testData.inputSeries, 1, 1) + req := mockWriteRequest([]labels.Labels{testData.inputSeries}, 1, 1) _, err := ds[0].Push(ctx, req) require.NoError(t, err) @@ -1280,24 +1280,24 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { - inputLabels []labels.Labels + inputLabels labels.Labels skipLabelNameValidationCfg bool skipLabelNameValidationReq bool errExpected bool errMessage string }{ "label name validation is on by default": { - inputLabels: []labels.Labels{inputLabels}, + inputLabels: inputLabels, errExpected: true, errMessage: `sample invalid label: "999.illegal" metric "foo{999.illegal=\"baz\"}"`, }, "label name validation can be skipped via config": { - inputLabels: []labels.Labels{inputLabels}, + inputLabels: inputLabels, skipLabelNameValidationCfg: true, errExpected: false, }, "label name validation can be skipped via WriteRequest parameter": { - inputLabels: []labels.Labels{inputLabels}, + inputLabels: inputLabels, skipLabelNameValidationReq: true, errExpected: false, }, @@ -1312,7 +1312,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) { shuffleShardSize: 1, skipLabelNameValidation: tc.skipLabelNameValidationCfg, }) - req := mockWriteRequest(tc.inputLabels, 42, 100000) + req := mockWriteRequest([]labels.Labels{tc.inputLabels}, 42, 100000) req.SkipLabelNameValidation = tc.skipLabelNameValidationReq _, err := ds[0].Push(ctx, req) if tc.errExpected { @@ -2642,8 +2642,7 @@ func TestSortLabels(t *testing.T) { } func TestDistributor_Push_Relabel(t *testing.T) { - const orgID = "user" - ctx := user.InjectOrgID(context.Background(), orgID) + ctx := user.InjectOrgID(context.Background(), "user") type testcase struct { name string From 3767e369ec0a2eaf13a360d2bd4c692ace867e58 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Fri, 1 Oct 2021 14:29:07 +0200 Subject: [PATCH 08/14] Small rename on comment Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index ba9048a306..b9325eb14b 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -638,7 +638,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 { l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...) if len(l) == 0 { - // all labels are gone, therefore the __name__ label is not present, metric will be discarded + // all labels are gone, therefore the __name__ label is not present, samples will be discarded validation.DiscardedSamples.WithLabelValues( validation.DroppedByRelabelConfiguration, userID, From 4725dc7fa0446c10a71082a0b85a9f0f28a06c97 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 14 Oct 2021 11:50:56 +0200 Subject: [PATCH 09/14] Fixing linting offenses Signed-off-by: Pedro Tanaka --- pkg/util/validation/validate.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index a062bfc6ec..c58fcd0fd8 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -34,16 +34,16 @@ const ( // ErrQueryTooLong is used in chunk store, querier and query frontend. ErrQueryTooLong = "the query time range exceeds the limit (query length: %s, limit: %s)" - missingMetricName = "missing_metric_name" - invalidMetricName = "metric_name_invalid" - greaterThanMaxSampleAge = "greater_than_max_sample_age" - maxLabelNamesPerSeries = "max_label_names_per_series" - tooFarInFuture = "too_far_in_future" - invalidLabel = "label_invalid" - labelNameTooLong = "label_name_too_long" - duplicateLabelNames = "duplicate_label_names" - labelsNotSorted = "labels_not_sorted" - labelValueTooLong = "label_value_too_long" + missingMetricName = "missing_metric_name" + invalidMetricName = "metric_name_invalid" + greaterThanMaxSampleAge = "greater_than_max_sample_age" + maxLabelNamesPerSeries = "max_label_names_per_series" + tooFarInFuture = "too_far_in_future" + invalidLabel = "label_invalid" + labelNameTooLong = "label_name_too_long" + duplicateLabelNames = "duplicate_label_names" + labelsNotSorted = "labels_not_sorted" + labelValueTooLong = "label_value_too_long" // Exemplar-specific validation reasons exemplarLabelsMissing = "exemplar_labels_missing" From c1f9f80427ed882dcec08a2eb5b9214ea604d16e Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 14 Oct 2021 12:22:56 +0200 Subject: [PATCH 10/14] Reseting validation dropped samples metric to avoid getting metrics from other test runs Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor_test.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index edb9b1fed1..c9410026b5 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2779,13 +2779,13 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing shardByAllLabels: true, limits: &limits, }) - reg := regs[0] - reg.MustRegister(validation.DiscardedSamples) defer stopAll(ds, r) + validation.DiscardedSamples.Reset() + // Push the series to the distributor req := mockWriteRequest(inputSeries, 1, 1) - ctx := user.InjectOrgID(context.Background(), "user") + ctx := user.InjectOrgID(context.Background(), "user1") _, err = ds[0].Push(ctx, req) require.NoError(t, err) @@ -2797,14 +2797,17 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing } metrics := []string{"cortex_distributor_received_samples_total", "cortex_discarded_samples_total"} - require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` + + expectedMetrics := ` # HELP cortex_discarded_samples_total The total number of samples that were discarded. # TYPE cortex_discarded_samples_total counter cortex_discarded_samples_total{reason="relabel_configuration",user="user"} 1 # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_total counter - cortex_distributor_received_samples_total{user="user"} 1 - `), metrics...)) + cortex_distributor_received_samples_total{user="user1"} 1 + ` + + testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...) } func countMockIngestersCalls(ingesters []mockIngester, name string) int { From 71125491570940acdccd2abb1b13dfaa8921b78c Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Thu, 14 Oct 2021 12:31:38 +0200 Subject: [PATCH 11/14] Resolving problems after rebase conflicts Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c9410026b5..23bab6575a 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2720,7 +2720,7 @@ func TestDistributor_Push_Relabel(t *testing.T) { flagext.DefaultValues(&limits) limits.MetricRelabelConfigs = tc.metricRelabelConfigs - ds, ingesters, _ := prepare(t, prepConfig{ + ds, ingesters, _ := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, @@ -2728,7 +2728,6 @@ func TestDistributor_Push_Relabel(t *testing.T) { limits: &limits, }) - // Push the series to the distributor req := mockWriteRequest(tc.inputSeries, 1, 1) _, err = ds[0].Push(ctx, req) @@ -2772,14 +2771,13 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing flagext.DefaultValues(&limits) limits.MetricRelabelConfigs = metricRelabelConfigs - ds, ingesters, r, regs := prepare(t, prepConfig{ + ds, ingesters, regs := prepare(t, prepConfig{ numIngesters: 2, happyIngesters: 2, numDistributors: 1, shardByAllLabels: true, limits: &limits, }) - defer stopAll(ds, r) validation.DiscardedSamples.Reset() @@ -2807,7 +2805,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing cortex_distributor_received_samples_total{user="user1"} 1 ` - testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...) + require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...)) } func countMockIngestersCalls(ingesters []mockIngester, name string) int { From 76b68c191843ab75d1010dfeeef2dbc3176b9cb5 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Wed, 3 Nov 2021 07:26:59 +0100 Subject: [PATCH 12/14] Registering counter for dropped metrics in test Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 23bab6575a..b61e98c928 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2779,6 +2779,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing limits: &limits, }) + regs[0].MustRegister(validation.DiscardedSamples) validation.DiscardedSamples.Reset() // Push the series to the distributor @@ -2799,7 +2800,7 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing expectedMetrics := ` # HELP cortex_discarded_samples_total The total number of samples that were discarded. # TYPE cortex_discarded_samples_total counter - cortex_discarded_samples_total{reason="relabel_configuration",user="user"} 1 + cortex_discarded_samples_total{reason="relabel_configuration",user="user1"} 1 # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. # TYPE cortex_distributor_received_samples_total counter cortex_distributor_received_samples_total{user="user1"} 1 From 75fbf414d9a104d12334c0b37a5675390f6898ef Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Wed, 1 Dec 2021 09:58:36 +0100 Subject: [PATCH 13/14] Checking if user label drop configuration did not drop __name__ label Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor.go | 22 ++++++++++++++++++++++ pkg/distributor/distributor_test.go | 17 ++++++++++++++++- pkg/util/validation/validate.go | 2 ++ 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index b9325eb14b..eaf943ac6a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -659,6 +659,15 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co removeLabel(labelName, &ts.Labels) } + if len(ts.Labels) == 0 || wasNameLabelRemoved(ts.Labels) { + validation.DiscardedExemplars.WithLabelValues( + validation.DroppedByUserConfigurationOverride, + userID, + ).Add(float64(len(ts.Samples))) + + continue + } + // We rely on sorted labels in different places: // 1) When computing token for labels, and sharding by all labels. Here different order of labels returns // different tokens, which is bad. @@ -784,6 +793,19 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return &cortexpb.WriteResponse{}, firstPartialErr } +func wasNameLabelRemoved(labels []cortexpb.LabelAdapter) bool { + const nameLabel = "__name__" + + for i := 0; i < len(labels); i++ { + pair := labels[i] + if pair.Name == nameLabel { + return false + } + } + + return true +} + func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { // no need to run sort.Slice, if labels are already sorted, which is most of the time. // we can avoid extra memory allocations (mostly interface-related) this way. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b61e98c928..b5eb239de9 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1102,6 +1102,17 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { {Name: "__name__", Value: "some_metric"}, }, }, + // Edge case: remove label __name__ will drop all user metrics + { + removeReplica: true, + removeLabels: []string{"__name__"}, + inputSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + expectedSeries: labels.Labels{}, + }, // Remove multiple labels and replica. { removeReplica: true, @@ -1154,11 +1165,15 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { _, err = ds[0].Push(ctx, req) require.NoError(t, err) + expectedTimeseriesLen := 0 + if len(tc.expectedSeries) > 0 { + expectedTimeseriesLen = 1 + } // Since each test pushes only 1 series, we do expect the ingester // to have received exactly 1 series for i := range ingesters { timeseries := ingesters[i].series() - assert.Equal(t, 1, len(timeseries)) + assert.Equal(t, expectedTimeseriesLen, len(timeseries)) for _, v := range timeseries { assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) } diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index c58fcd0fd8..e6cb39777a 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -59,6 +59,8 @@ const ( // DroppedByRelabelConfiguration Samples can also be discarded because of relabeling configuration DroppedByRelabelConfiguration = "relabel_configuration" + // DroppedByUserConfigurationOverride Samples discarded due to user configuration removing label __name__ + DroppedByUserConfigurationOverride = "user_label_removal_configuration" // The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars From c29bd27a35c2dd16a17a139a8815b588f19c77a9 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Fri, 3 Dec 2021 09:43:29 +0100 Subject: [PATCH 14/14] Do not check for name label, adding new test Signed-off-by: Pedro Tanaka --- pkg/distributor/distributor.go | 17 +-------- pkg/distributor/distributor_test.go | 58 +++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 31 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index eaf943ac6a..e9d2b590f2 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -638,7 +638,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 { l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...) if len(l) == 0 { - // all labels are gone, therefore the __name__ label is not present, samples will be discarded + // all labels are gone, samples will be discarded validation.DiscardedSamples.WithLabelValues( validation.DroppedByRelabelConfiguration, userID, @@ -659,7 +659,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co removeLabel(labelName, &ts.Labels) } - if len(ts.Labels) == 0 || wasNameLabelRemoved(ts.Labels) { + if len(ts.Labels) == 0 { validation.DiscardedExemplars.WithLabelValues( validation.DroppedByUserConfigurationOverride, userID, @@ -793,19 +793,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return &cortexpb.WriteResponse{}, firstPartialErr } -func wasNameLabelRemoved(labels []cortexpb.LabelAdapter) bool { - const nameLabel = "__name__" - - for i := 0; i < len(labels); i++ { - pair := labels[i] - if pair.Name == nameLabel { - return false - } - } - - return true -} - func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) { // no need to run sort.Slice, if labels are already sorted, which is most of the time. // we can avoid extra memory allocations (mostly interface-related) this way. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index b5eb239de9..1487db7aa2 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1102,17 +1102,6 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { {Name: "__name__", Value: "some_metric"}, }, }, - // Edge case: remove label __name__ will drop all user metrics - { - removeReplica: true, - removeLabels: []string{"__name__"}, - inputSeries: labels.Labels{ - {Name: "__name__", Value: "some_metric"}, - {Name: "cluster", Value: "one"}, - {Name: "__replica__", Value: "two"}, - }, - expectedSeries: labels.Labels{}, - }, // Remove multiple labels and replica. { removeReplica: true, @@ -1165,15 +1154,11 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { _, err = ds[0].Push(ctx, req) require.NoError(t, err) - expectedTimeseriesLen := 0 - if len(tc.expectedSeries) > 0 { - expectedTimeseriesLen = 1 - } // Since each test pushes only 1 series, we do expect the ingester // to have received exactly 1 series for i := range ingesters { timeseries := ingesters[i].series() - assert.Equal(t, expectedTimeseriesLen, len(timeseries)) + assert.Equal(t, 1, len(timeseries)) for _, v := range timeseries { assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) } @@ -1181,6 +1166,47 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { } } +func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "user") + type testcase struct { + inputSeries labels.Labels + expectedSeries labels.Labels + removeReplica bool + removeLabels []string + } + + tc := testcase{ + removeReplica: true, + removeLabels: []string{"__name__"}, + inputSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + expectedSeries: labels.Labels{}, + } + + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.DropLabels = tc.removeLabels + limits.AcceptHASamples = tc.removeReplica + + ds, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + + // Push the series to the distributor + req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1) + _, err = ds[0].Push(ctx, req) + require.Error(t, err) + assert.Equal(t, "rpc error: code = Code(400) desc = sample missing metric name", err.Error()) +} + func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct {