Skip to content

Commit

Permalink
Do not check for name label, adding new test
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
  • Loading branch information
pedro-stanaka committed Dec 14, 2021
1 parent 75fbf41 commit c29bd27
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 31 deletions.
17 changes: 2 additions & 15 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
if mrc := d.limits.MetricRelabelConfigs(userID); len(mrc) > 0 {
l := relabel.Process(cortexpb.FromLabelAdaptersToLabels(ts.Labels), mrc...)
if len(l) == 0 {
// all labels are gone, therefore the __name__ label is not present, samples will be discarded
// all labels are gone, samples will be discarded
validation.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
Expand All @@ -659,7 +659,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
removeLabel(labelName, &ts.Labels)
}

if len(ts.Labels) == 0 || wasNameLabelRemoved(ts.Labels) {
if len(ts.Labels) == 0 {
validation.DiscardedExemplars.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
Expand Down Expand Up @@ -793,19 +793,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
return &cortexpb.WriteResponse{}, firstPartialErr
}

func wasNameLabelRemoved(labels []cortexpb.LabelAdapter) bool {
const nameLabel = "__name__"

for i := 0; i < len(labels); i++ {
pair := labels[i]
if pair.Name == nameLabel {
return false
}
}

return true
}

func sortLabelsIfNeeded(labels []cortexpb.LabelAdapter) {
// no need to run sort.Slice, if labels are already sorted, which is most of the time.
// we can avoid extra memory allocations (mostly interface-related) this way.
Expand Down
58 changes: 42 additions & 16 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1102,17 +1102,6 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
{Name: "__name__", Value: "some_metric"},
},
},
// Edge case: remove label __name__ will drop all user metrics
{
removeReplica: true,
removeLabels: []string{"__name__"},
inputSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
expectedSeries: labels.Labels{},
},
// Remove multiple labels and replica.
{
removeReplica: true,
Expand Down Expand Up @@ -1165,22 +1154,59 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

expectedTimeseriesLen := 0
if len(tc.expectedSeries) > 0 {
expectedTimeseriesLen = 1
}
// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, expectedTimeseriesLen, len(timeseries))
assert.Equal(t, 1, len(timeseries))
for _, v := range timeseries {
assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels))
}
}
}
}

func TestDistributor_Push_LabelRemoval_RemovingNameLabelWillError(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
type testcase struct {
inputSeries labels.Labels
expectedSeries labels.Labels
removeReplica bool
removeLabels []string
}

tc := testcase{
removeReplica: true,
removeLabels: []string{"__name__"},
inputSeries: labels.Labels{
{Name: "__name__", Value: "some_metric"},
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
expectedSeries: labels.Labels{},
}

var err error
var limits validation.Limits
flagext.DefaultValues(&limits)
limits.DropLabels = tc.removeLabels
limits.AcceptHASamples = tc.removeReplica

ds, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

// Push the series to the distributor
req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1)
_, err = ds[0].Push(ctx, req)
require.Error(t, err)
assert.Equal(t, "rpc error: code = Code(400) desc = sample missing metric name", err.Error())
}

func TestDistributor_Push_ShouldGuaranteeShardingTokenConsistencyOverTheTime(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
tests := map[string]struct {
Expand Down

0 comments on commit c29bd27

Please sign in to comment.