Skip to content

Commit

Permalink
Adds new metric for dropped samples in ingester (cortexproject#4503)
Browse files Browse the repository at this point in the history
* Adding test case for dropping metrics by name to understand better flow of distributor

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Adding test case and new metric for dropped samples

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Updating CHANGELOG with new changes

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Fixing linting problem on distributor file

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Reusing discarded samples metric from validate package

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Compare labelset with len() instead of comparing to nil

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Undoing unnecessary changes on tests and distributor

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Small rename on comment

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Fixing linting offenses

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Reseting validation dropped samples metric to avoid getting metrics from other test runs

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Resolving problems after rebase conflicts

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Registering counter for dropped metrics in test

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Checking if user label drop configuration did not drop __name__ label

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Do not check for name label, adding new test

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
Signed-off-by: Alvin Lin <alvinlin@amazon.com>
  • Loading branch information
pedro-stanaka authored and alvinlin123 committed Jan 14, 2022
1 parent cc7154b commit b64a97d
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* [FEATURE] AlertManager: Add support for SNS Receiver. #4382
=======

* [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503
* [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539
* [CHANGE] query-frontend: Do not print anything in the logs of `query-frontend` if a in-progress query has been canceled (context canceled). #4562
* [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571
Expand Down
13 changes: 13 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
if len(l) == 0 {
// all labels are gone, samples will be discarded
validation.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
).Add(float64(len(ts.Samples)))
continue
}
ts.Labels = cortexpb.FromLabelsToLabelAdapters(l)
}

Expand All @@ -652,6 +660,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

if len(ts.Labels) == 0 {
validation.DiscardedExemplars.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
).Add(float64(len(ts.Samples)))

continue
}

Expand Down
217 changes: 177 additions & 40 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
})

// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

Expand All @@ -1166,6 +1166,47 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
}
}

func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
type testcase struct {
inputSeries labels.Labels
expectedSeries labels.Labels
removeReplica bool
removeLabels []string
}

tc := testcase{
removeReplica: true,
removeLabels: []string{"__name__"},
inputSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
expectedSeries: labels.Labels{},
}

var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = tc.removeLabels
limits.AcceptHASamples = tc.removeReplica

ds, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

// Push the series to the distributor
req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1)
_, err = ds[0].Push(ctx, req)
require.Error(t, err)
assert.Equal(t, "rpc error: code = Code(400) desc = sample missing metric name", err.Error())
}

func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
Expand Down Expand Up @@ -1254,7 +1295,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
})

// Push the series to the distributor
req := mockWriteRequest(testData.inputSeries, 1, 1)
req := mockWriteRequest([]labels.Labels{testData.inputSeries}, 1, 1)
_, err := ds[0].Push(ctx, req)
require.NoError(t, err)

Expand Down Expand Up @@ -1312,7 +1353,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
shuffleShardSize: 1,
skipLabelNameValidation: tc.skipLabelNameValidationCfg,
})
req := mockWriteRequest(tc.inputLabels, 42, 100000)
req := mockWriteRequest([]labels.Labels{tc.inputLabels}, 42, 100000)
req.SkipLabelNameValidation = tc.skipLabelNameValidationReq
_, err := ds[0].Push(ctx, req)
if tc.errExpected {
Expand Down Expand Up @@ -1790,7 +1831,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

for _, series := range fixtures {
req := mockWriteRequest(series.lbls, series.value, series.timestamp)
req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp)
_, err := ds[0].Push(ctx, req)
require.NoError(t, err)
}
Expand Down Expand Up @@ -1875,15 +1916,16 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
return m
}

func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest {
samples := []cortexpb.Sample{
{
func mockWriteRequest(lbls []labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest {
samples := make([]cortexpb.Sample, len(lbls))
for i := range lbls {
samples[i] = cortexpb.Sample{
TimestampMs: timestampMs,
Value: value,
},
}
}

return cortexpb.ToWriteRequest([]labels.Labels{lbls}, samples, nil, cortexpb.API)
return cortexpb.ToWriteRequest(lbls, samples, nil, cortexpb.API)
}

type prepConfig struct {
Expand Down Expand Up @@ -2644,27 +2686,33 @@ func TestDistributor_Push_Relabel(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")

type testcase struct {
inputSeries labels.Labels
name string
inputSeries []labels.Labels
expectedSeries labels.Labels
metricRelabelConfigs []*relabel.Config
}

cases := []testcase{
// No relabel config.
{
inputSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
name: "with no relabel config",
inputSeries: []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
},
{
inputSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
name: "with hardcoded replace",
inputSeries: []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
Expand All @@ -2680,37 +2728,126 @@ func TestDistributor_Push_Relabel(t *testing.T) {
},
},
},
{
name: "with drop action",
inputSeries: []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
metricRelabelConfigs: []*relabel.Config{
{
SourceLabels: []model.LabelName{"__name__"},
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("(foo)"),
},
},
},
}

for _, tc := range cases {
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = tc.metricRelabelConfigs
t.Run(tc.name, func(t *testing.T) {
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = tc.metricRelabelConfigs

ds, ingesters, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})
ds, ingesters, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)
// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
for _, v := range timeseries {
assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels))
// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
for _, v := range timeseries {
assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels))
}
}
}
})
}
}

func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) {
metricRelabelConfigs := []*relabel.Config{
{
SourceLabels: []model.LabelName{"__name__"},
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("(foo)"),
},
}

inputSeries := []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
}

var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = metricRelabelConfigs

ds, ingesters, regs := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

regs[0].MustRegister(validation.DiscardedSamples)
validation.DiscardedSamples.Reset()

// Push the series to the distributor
req := mockWriteRequest(inputSeries, 1, 1)
ctx := user.InjectOrgID(context.Background(), "user1")
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
}

metrics := []string{"cortex_distributor_received_samples_total", "cortex_discarded_samples_total"}

expectedMetrics := `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="relabel_configuration",user="user1"} 1
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="user1"} 1
`

require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...))
}

func countMockIngestersCalls(ingesters []mockIngester, name string) int {
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ const (
// Too many HA clusters is one of the reasons for discarding samples.
TooManyHAClusters = "too_many_ha_clusters"

// DroppedByRelabelConfiguration Samples can also be discarded because of relabeling configuration
DroppedByRelabelConfiguration = "relabel_configuration"
// DroppedByUserConfigurationOverride Samples discarded due to user configuration removing label __name__
DroppedByUserConfigurationOverride = "user_label_removal_configuration"

// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
ExemplarMaxLabelSetLength = 128
Expand Down

0 comments on commit b64a97d

Please sign in to comment.