From b64a97db6baf100797e8a5407e884e9a21526597 Mon Sep 17 00:00:00 2001 From: Pedro Tanaka Date: Mon, 3 Jan 2022 21:30:55 +0100 Subject: [PATCH] Adds new metric for dropped samples in ingester (#4503) * Adding test case for dropping metrics by name to understand better flow of distributor Signed-off-by: Pedro Tanaka * Adding test case and new metric for dropped samples Signed-off-by: Pedro Tanaka * Updating CHANGELOG with new changes Signed-off-by: Pedro Tanaka * Fixing linting problem on distributor file Signed-off-by: Pedro Tanaka * Reusing discarded samples metric from validate package Signed-off-by: Pedro Tanaka * Compare labelset with len() instead of comparing to nil Signed-off-by: Pedro Tanaka * Undoing unnecessary changes on tests and distributor Signed-off-by: Pedro Tanaka * Small rename on comment Signed-off-by: Pedro Tanaka * Fixing linting offenses Signed-off-by: Pedro Tanaka * Reseting validation dropped samples metric to avoid getting metrics from other test runs Signed-off-by: Pedro Tanaka * Resolving problems after rebase conflicts Signed-off-by: Pedro Tanaka * Registering counter for dropped metrics in test Signed-off-by: Pedro Tanaka * Checking if user label drop configuration did not drop __name__ label Signed-off-by: Pedro Tanaka * Do not check for name label, adding new test Signed-off-by: Pedro Tanaka Signed-off-by: Alvin Lin --- CHANGELOG.md | 1 + pkg/distributor/distributor.go | 13 ++ pkg/distributor/distributor_test.go | 217 +++++++++++++++++++++++----- pkg/util/validation/validate.go | 5 + 4 files changed, 196 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index abac9a0c9e..e431733378 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [FEATURE] AlertManager: Add support for SNS Receiver. #4382 ======= +* [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503 * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 * [CHANGE] query-frontend: Do not print anything in the logs of `query-frontend` if a in-progress query has been canceled (context canceled). #4562 * [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index edc959ed99..e9d2b590f2 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -637,6 +637,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 { l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...) + if len(l) == 0 { + // all labels are gone, samples will be discarded + validation.DiscardedSamples.WithLabelValues( + validation.DroppedByRelabelConfiguration, + userID, + ).Add(float64(len(ts.Samples))) + continue + } ts.Labels = cortexpb.FromLabelsToLabelAdapters(l) } @@ -652,6 +660,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co } if len(ts.Labels) == 0 { + validation.DiscardedExemplars.WithLabelValues( + validation.DroppedByUserConfigurationOverride, + userID, + ).Add(float64(len(ts.Samples))) + continue } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 02da0678f4..1487db7aa2 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1150,7 +1150,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { }) // Push the series to the distributor - req := mockWriteRequest(tc.inputSeries, 1, 1) + req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1) _, err = ds[0].Push(ctx, req) require.NoError(t, err) @@ -1166,6 +1166,47 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) { } } +func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "user") + type testcase struct { + inputSeries labels.Labels + expectedSeries labels.Labels + removeReplica bool + removeLabels []string + } + + tc := testcase{ + removeReplica: true, + removeLabels: []string{"__name__"}, + inputSeries: labels.Labels{ + {Name: "__name__", Value: "some_metric"}, + {Name: "cluster", Value: "one"}, + {Name: "__replica__", Value: "two"}, + }, + expectedSeries: labels.Labels{}, + } + + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.DropLabels = tc.removeLabels + limits.AcceptHASamples = tc.removeReplica + + ds, _, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + + // Push the series to the distributor + req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1) + _, err = ds[0].Push(ctx, req) + require.Error(t, err) + assert.Equal(t, "rpc error: code = Code(400) desc = sample missing metric name", err.Error()) +} + func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") tests := map[string]struct { @@ -1254,7 +1295,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t * }) // Push the series to the distributor - req := mockWriteRequest(testData.inputSeries, 1, 1) + req := mockWriteRequest([]labels.Labels{testData.inputSeries}, 1, 1) _, err := ds[0].Push(ctx, req) require.NoError(t, err) @@ -1312,7 +1353,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) { shuffleShardSize: 1, skipLabelNameValidation: tc.skipLabelNameValidationCfg, }) - req := mockWriteRequest(tc.inputLabels, 42, 100000) + req := mockWriteRequest([]labels.Labels{tc.inputLabels}, 42, 100000) req.SkipLabelNameValidation = tc.skipLabelNameValidationReq _, err := ds[0].Push(ctx, req) if tc.errExpected { @@ -1790,7 +1831,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "test") for _, series := range fixtures { - req := mockWriteRequest(series.lbls, series.value, series.timestamp) + req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp) _, err := ds[0].Push(ctx, req) require.NoError(t, err) } @@ -1875,15 +1916,16 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher { return m } -func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest { - samples := []cortexpb.Sample{ - { +func mockWriteRequest(lbls []labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest { + samples := make([]cortexpb.Sample, len(lbls)) + for i := range lbls { + samples[i] = cortexpb.Sample{ TimestampMs: timestampMs, Value: value, - }, + } } - return cortexpb.ToWriteRequest([]labels.Labels{lbls}, samples, nil, cortexpb.API) + return cortexpb.ToWriteRequest(lbls, samples, nil, cortexpb.API) } type prepConfig struct { @@ -2644,17 +2686,20 @@ func TestDistributor_Push_Relabel(t *testing.T) { ctx := user.InjectOrgID(context.Background(), "user") type testcase struct { - inputSeries labels.Labels + name string + inputSeries []labels.Labels expectedSeries labels.Labels metricRelabelConfigs []*relabel.Config } cases := []testcase{ - // No relabel config. { - inputSeries: labels.Labels{ - {Name: "__name__", Value: "foo"}, - {Name: "cluster", Value: "one"}, + name: "with no relabel config", + inputSeries: []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "foo"}, @@ -2662,9 +2707,12 @@ func TestDistributor_Push_Relabel(t *testing.T) { }, }, { - inputSeries: labels.Labels{ - {Name: "__name__", Value: "foo"}, - {Name: "cluster", Value: "one"}, + name: "with hardcoded replace", + inputSeries: []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, }, expectedSeries: labels.Labels{ {Name: "__name__", Value: "foo"}, @@ -2680,37 +2728,126 @@ func TestDistributor_Push_Relabel(t *testing.T) { }, }, }, + { + name: "with drop action", + inputSeries: []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + }, + expectedSeries: labels.Labels{ + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + metricRelabelConfigs: []*relabel.Config{ + { + SourceLabels: []model.LabelName{"__name__"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("(foo)"), + }, + }, + }, } for _, tc := range cases { - var err error - var limits validation.Limits - flagext.DefaultValues(&limits) - limits.MetricRelabelConfigs = tc.metricRelabelConfigs + t.Run(tc.name, func(t *testing.T) { + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.MetricRelabelConfigs = tc.metricRelabelConfigs - ds, ingesters, _ := prepare(t, prepConfig{ - numIngesters: 2, - happyIngesters: 2, - numDistributors: 1, - shardByAllLabels: true, - limits: &limits, - }) + ds, ingesters, _ := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) - // Push the series to the distributor - req := mockWriteRequest(tc.inputSeries, 1, 1) - _, err = ds[0].Push(ctx, req) - require.NoError(t, err) + // Push the series to the distributor + req := mockWriteRequest(tc.inputSeries, 1, 1) + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) - // Since each test pushes only 1 series, we do expect the ingester - // to have received exactly 1 series - for i := range ingesters { - timeseries := ingesters[i].series() - assert.Equal(t, 1, len(timeseries)) - for _, v := range timeseries { - assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) + for _, v := range timeseries { + assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels)) + } } - } + }) + } +} + +func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) { + metricRelabelConfigs := []*relabel.Config{ + { + SourceLabels: []model.LabelName{"__name__"}, + Action: relabel.Drop, + Regex: relabel.MustNewRegexp("(foo)"), + }, + } + + inputSeries := []labels.Labels{ + { + {Name: "__name__", Value: "foo"}, + {Name: "cluster", Value: "one"}, + }, + { + {Name: "__name__", Value: "bar"}, + {Name: "cluster", Value: "two"}, + }, + } + + var err error + var limits validation.Limits + flagext.DefaultValues(&limits) + limits.MetricRelabelConfigs = metricRelabelConfigs + + ds, ingesters, regs := prepare(t, prepConfig{ + numIngesters: 2, + happyIngesters: 2, + numDistributors: 1, + shardByAllLabels: true, + limits: &limits, + }) + + regs[0].MustRegister(validation.DiscardedSamples) + validation.DiscardedSamples.Reset() + + // Push the series to the distributor + req := mockWriteRequest(inputSeries, 1, 1) + ctx := user.InjectOrgID(context.Background(), "user1") + _, err = ds[0].Push(ctx, req) + require.NoError(t, err) + + // Since each test pushes only 1 series, we do expect the ingester + // to have received exactly 1 series + for i := range ingesters { + timeseries := ingesters[i].series() + assert.Equal(t, 1, len(timeseries)) } + + metrics := []string{"cortex_distributor_received_samples_total", "cortex_discarded_samples_total"} + + expectedMetrics := ` + # HELP cortex_discarded_samples_total The total number of samples that were discarded. + # TYPE cortex_discarded_samples_total counter + cortex_discarded_samples_total{reason="relabel_configuration",user="user1"} 1 + # HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples. + # TYPE cortex_distributor_received_samples_total counter + cortex_distributor_received_samples_total{user="user1"} 1 + ` + + require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...)) } func countMockIngestersCalls(ingesters []mockIngester, name string) int { diff --git a/pkg/util/validation/validate.go b/pkg/util/validation/validate.go index 094c879970..e6cb39777a 100644 --- a/pkg/util/validation/validate.go +++ b/pkg/util/validation/validate.go @@ -57,6 +57,11 @@ const ( // Too many HA clusters is one of the reasons for discarding samples. TooManyHAClusters = "too_many_ha_clusters" + // DroppedByRelabelConfiguration Samples can also be discarded because of relabeling configuration + DroppedByRelabelConfiguration = "relabel_configuration" + // DroppedByUserConfigurationOverride Samples discarded due to user configuration removing label __name__ + DroppedByUserConfigurationOverride = "user_label_removal_configuration" + // The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters // https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars ExemplarMaxLabelSetLength = 128