diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index 8ce8866e922ba..99ea0d04a8502 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -14,7 +14,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) func testIteratorForward(t *testing.T, iter iter.EntryIterator, from, through int64) { @@ -64,7 +64,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{})) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) _ = iter.Close() @@ -73,7 +73,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(labels.Labels{})) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) _ = iter.Close() diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 51f3a49930355..db3b15d6a7c78 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" @@ -320,7 +321,7 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk { } func buildStreamsFromChunk(t *testing.T, lbs string, chk chunkenc.Chunk) logproto.Stream { - it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{})) + it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) require.NoError(t, err) stream := logproto.Stream{ diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index b32ad34dccf13..c9402035eb2a3 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -16,7 +16,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) func TestMaxReturnedStreamsErrors(t *testing.T) { @@ -120,7 +120,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(chunks*entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(s.labels)) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) @@ -130,7 +130,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(s.labels)) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, log.NewNoopPipeline().ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 4b25fe10edb6c..714db64264b3c 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util" ) @@ -137,7 +138,7 @@ func (t *tailer) send(stream logproto.Stream) error { func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error) { // Optimization: skip filtering entirely, if no filter is set - if t.pipeline == logql.NoopPipeline { + if log.IsNoopPipeline(t.pipeline) { return []logproto.Stream{stream}, nil } streams := map[uint64]*logproto.Stream{} diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index e4e5ffc7797bd..0653b18b7fb98 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -23,7 +23,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) func TestTransferOut(t *testing.T) { @@ -95,7 +95,7 @@ func TestTransferOut(t *testing.T) { time.Unix(0, 0), time.Unix(10, 0), logproto.FORWARD, - logql.NoopPipeline.ForStream(stream.labels), + log.NewNoopPipeline().ForStream(stream.labels), ) if !assert.NoError(t, err) { continue diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 1575d84ccb6ec..fc80c41338347 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -84,10 +84,6 @@ type LogSelectorExpr interface { type Pipeline = log.Pipeline type SampleExtractor = log.SampleExtractor -var ( - NoopPipeline = log.NewNoopPipeline() -) - // PipelineExpr is an expression defining a log pipeline. type PipelineExpr interface { Pipeline() (Pipeline, error) @@ -108,9 +104,6 @@ func (m MultiStageExpr) Pipeline() (log.Pipeline, error) { if err != nil { return nil, err } - if len(stages) == 0 { - return NoopPipeline, nil - } return log.NewPipeline(stages), nil } @@ -169,7 +162,7 @@ func (e *matchersExpr) String() string { } func (e *matchersExpr) Pipeline() (log.Pipeline, error) { - return NoopPipeline, nil + return log.NewNoopPipeline(), nil } func (e *matchersExpr) HasFilter() bool { @@ -860,7 +853,7 @@ func (e *literalExpr) String() string { func (e *literalExpr) Selector() LogSelectorExpr { return e } func (e *literalExpr) HasFilter() bool { return false } func (e *literalExpr) Operations() []string { return nil } -func (e *literalExpr) Pipeline() (log.Pipeline, error) { return NoopPipeline, nil } +func (e *literalExpr) Pipeline() (log.Pipeline, error) { return log.NewNoopPipeline(), nil } func (e *literalExpr) Matchers() []*labels.Matcher { return nil } func (e *literalExpr) Extractor() (log.SampleExtractor, error) { return nil, nil } diff --git a/pkg/logql/ast_test.go b/pkg/logql/ast_test.go index b71afc84bd711..3eb4ccc9a8084 100644 --- a/pkg/logql/ast_test.go +++ b/pkg/logql/ast_test.go @@ -43,7 +43,9 @@ func Test_logSelectorExpr_String(t *testing.T) { if err != nil { t.Fatalf("failed to get filter: %s", err) } - require.Equal(t, tt.expectFilter, p != NoopPipeline) + if !tt.expectFilter { + require.Equal(t, log.NewNoopPipeline(), p) + } if expr.String() != tt.selector { t.Fatalf("error expected: %s got: %s", tt.selector, expr.String()) } @@ -216,7 +218,7 @@ func Test_FilterMatcher(t *testing.T) { p, err := expr.Pipeline() assert.Nil(t, err) if tt.lines == nil { - assert.Equal(t, p, NoopPipeline) + assert.Equal(t, p, log.NewNoopPipeline()) } else { sp := p.ForStream(labelBar) for _, lc := range tt.lines { diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 6a7906319d7b6..d503a2bec35fe 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -35,6 +35,12 @@ type noopPipeline struct { cache map[uint64]*noopStreamPipeline } +// IsNoopPipeline tells if a pipeline is a Noop. +func IsNoopPipeline(p Pipeline) bool { + _, ok := p.(*noopPipeline) + return ok +} + type noopStreamPipeline struct { LabelsResult } diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index fe50e95100d1a..2422f7dffd836 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -946,7 +946,7 @@ func Test_newLogBatchChunkIterator(t *testing.T) { for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { - it, err := newLogBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), logql.NoopPipeline, tt.direction, tt.start, tt.end) + it, err := newLogBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.NewNoopPipeline(), tt.direction, tt.start, tt.end) require.NoError(t, err) streams, _, err := iter.ReadBatch(it, 1000) _ = it.Close() @@ -1442,7 +1442,7 @@ func TestBuildHeapIterator(t *testing.T) { direction: logproto.FORWARD, }, ctx: ctx, - pipeline: logql.NoopPipeline, + pipeline: log.NewNoopPipeline(), } it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) if err != nil { diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index bc435e5270d04..896295b7aa66e 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util" ) @@ -47,7 +46,7 @@ func TestLazyChunkIterator(t *testing.T) { }, } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) + it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) require.Nil(t, err) streams, _, err := iter.ReadBatch(it, 1000) require.Nil(t, err)