diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index ebe531e2ab4b..cf6e8c841ea8 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -438,6 +438,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log pushSize += len(entry.Line) } stream.Entries = stream.Entries[:n] + if len(stream.Entries) == 0 { + // Empty stream after validating all the entries + continue + } shardStreamsCfg := d.validator.Limits.ShardStreams(tenantID) if shardStreamsCfg.Enabled { diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 19019e62dd4a..e0621aa9b9df 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -589,6 +589,26 @@ func Test_TruncateLogLines(t *testing.T) { }) } +func Test_DiscardEmptyStreamsAfterValidation(t *testing.T) { + setup := func() (*validation.Limits, *mockIngester) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + limits.MaxLineSize = 5 + return limits, &mockIngester{} + } + + t.Run("it discards invalid entries and discards resulting empty streams completely", func(t *testing.T) { + limits, ingester := setup() + distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) + + _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) + require.Equal(t, err, httpgrpc.Errorf(http.StatusBadRequest, fmt.Sprintf(validation.LineTooLongErrorMsg, 5, "{foo=\"bar\", service_name=\"unknown_service\"}", 10))) + topVal := ingester.Peek() + require.Nil(t, topVal) + }) +} + func TestStreamShard(t *testing.T) { // setup base stream. baseStream := logproto.Stream{}