Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds new metric for dropped samples in ingester #4503

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [ENHANCEMENT] Keep track of discarded samples due to relabel configuration in `cortex_discarded_samples_total`. #4503
* [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539
* [CHANGE] query-frontend: Do not print anything in the logs of `query-frontend` if a in-progress query has been canceled (context canceled). #4562
* [ENHANCEMENT] Ruler: Add `-ruler.disable-rule-group-label` to disable the `rule_group` label on exported metrics. #4571
Expand Down
13 changes: 13 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co

if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
if len(l) == 0 {
// all labels are gone, samples will be discarded
validation.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
).Add(float64(len(ts.Samples)))
continue
}
pedro-stanaka marked this conversation as resolved.
Show resolved Hide resolved
ts.Labels = cortexpb.FromLabelsToLabelAdapters(l)
}

Expand All @@ -652,6 +660,11 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}

if len(ts.Labels) == 0 {
validation.DiscardedExemplars.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
).Add(float64(len(ts.Samples)))

continue
}

Expand Down
217 changes: 177 additions & 40 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1150,7 +1150,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
})

// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

Expand All @@ -1166,6 +1166,47 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
}
}

func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
type testcase struct {
inputSeries labels.Labels
expectedSeries labels.Labels
removeReplica bool
removeLabels []string
}

tc := testcase{
removeReplica: true,
removeLabels: []string{"__name__"},
inputSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
expectedSeries: labels.Labels{},
}

var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = tc.removeLabels
limits.AcceptHASamples = tc.removeReplica

ds, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

// Push the series to the distributor
req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1)
_, err = ds[0].Push(ctx, req)
require.Error(t, err)
assert.Equal(t, "rpc error: code = Code(400) desc = sample missing metric name", err.Error())
}

func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
Expand Down Expand Up @@ -1254,7 +1295,7 @@ func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *
})

// Push the series to the distributor
req := mockWriteRequest(testData.inputSeries, 1, 1)
req := mockWriteRequest([]labels.Labels{testData.inputSeries}, 1, 1)
_, err := ds[0].Push(ctx, req)
require.NoError(t, err)

Expand Down Expand Up @@ -1312,7 +1353,7 @@ func TestDistributor_Push_LabelNameValidation(t *testing.T) {
shuffleShardSize: 1,
skipLabelNameValidation: tc.skipLabelNameValidationCfg,
})
req := mockWriteRequest(tc.inputLabels, 42, 100000)
req := mockWriteRequest([]labels.Labels{tc.inputLabels}, 42, 100000)
req.SkipLabelNameValidation = tc.skipLabelNameValidationReq
_, err := ds[0].Push(ctx, req)
if tc.errExpected {
Expand Down Expand Up @@ -1790,7 +1831,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "test")

for _, series := range fixtures {
req := mockWriteRequest(series.lbls, series.value, series.timestamp)
req := mockWriteRequest([]labels.Labels{series.lbls}, series.value, series.timestamp)
_, err := ds[0].Push(ctx, req)
require.NoError(t, err)
}
Expand Down Expand Up @@ -1875,15 +1916,16 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
return m
}

func mockWriteRequest(lbls labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest {
samples := []cortexpb.Sample{
{
func mockWriteRequest(lbls []labels.Labels, value float64, timestampMs int64) *cortexpb.WriteRequest {
samples := make([]cortexpb.Sample, len(lbls))
for i := range lbls {
samples[i] = cortexpb.Sample{
TimestampMs: timestampMs,
Value: value,
},
}
}

return cortexpb.ToWriteRequest([]labels.Labels{lbls}, samples, nil, cortexpb.API)
return cortexpb.ToWriteRequest(lbls, samples, nil, cortexpb.API)
}

type prepConfig struct {
Expand Down Expand Up @@ -2644,27 +2686,33 @@ func TestDistributor_Push_Relabel(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")

type testcase struct {
inputSeries labels.Labels
name string
inputSeries []labels.Labels
expectedSeries labels.Labels
metricRelabelConfigs []*relabel.Config
}

cases := []testcase{
// No relabel config.
{
inputSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
name: "with no relabel config",
inputSeries: []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
},
{
inputSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
name: "with hardcoded replace",
inputSeries: []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "foo"},
Expand All @@ -2680,37 +2728,126 @@ func TestDistributor_Push_Relabel(t *testing.T) {
},
},
},
{
name: "with drop action",
inputSeries: []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
},
expectedSeries: labels.Labels{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
metricRelabelConfigs: []*relabel.Config{
{
SourceLabels: []model.LabelName{"__name__"},
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("(foo)"),
},
},
},
}

for _, tc := range cases {
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = tc.metricRelabelConfigs
t.Run(tc.name, func(t *testing.T) {
var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = tc.metricRelabelConfigs

ds, ingesters, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})
ds, ingesters, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)
// Push the series to the distributor
req := mockWriteRequest(tc.inputSeries, 1, 1)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
for _, v := range timeseries {
assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels))
// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
for _, v := range timeseries {
assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels))
}
}
}
})
}
}

func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing.T) {
metricRelabelConfigs := []*relabel.Config{
{
SourceLabels: []model.LabelName{"__name__"},
Action: relabel.Drop,
Regex: relabel.MustNewRegexp("(foo)"),
},
}

inputSeries := []labels.Labels{
{
{Name: "__name__", Value: "foo"},
{Name: "cluster", Value: "one"},
},
{
{Name: "__name__", Value: "bar"},
{Name: "cluster", Value: "two"},
},
}

var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = metricRelabelConfigs

ds, ingesters, regs := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

regs[0].MustRegister(validation.DiscardedSamples)
validation.DiscardedSamples.Reset()

// Push the series to the distributor
req := mockWriteRequest(inputSeries, 1, 1)
ctx := user.InjectOrgID(context.Background(), "user1")
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
}

metrics := []string{"cortex_distributor_received_samples_total", "cortex_discarded_samples_total"}

expectedMetrics := `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="relabel_configuration",user="user1"} 1
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="user1"} 1
`

require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...))
}

func countMockIngestersCalls(ingesters []mockIngester, name string) int {
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ const (
// Too many HA clusters is one of the reasons for discarding samples.
TooManyHAClusters = "too_many_ha_clusters"

// DroppedByRelabelConfiguration Samples can also be discarded because of relabeling configuration
DroppedByRelabelConfiguration = "relabel_configuration"
// DroppedByUserConfigurationOverride Samples discarded due to user configuration removing label __name__
DroppedByUserConfigurationOverride = "user_label_removal_configuration"

// The combined length of the label names and values of an Exemplar's LabelSet MUST NOT exceed 128 UTF-8 characters
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#exemplars
ExemplarMaxLabelSetLength = 128
Expand Down