diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 31bd475ad174e..f8977925ffeb0 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -5,11 +5,9 @@ import ( "sort" "time" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) const ( @@ -72,7 +70,7 @@ func (c *dumbChunk) Encoding() Encoding { return EncNone } // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ labels.Labels, _ logql.Pipeline) (iter.EntryIterator, error) { +func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) { i := sort.Search(len(c.entries), func(i int) bool { return !from.After(c.entries[i].Timestamp) }) @@ -97,7 +95,7 @@ func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, directi }, nil } -func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ labels.Labels, _ logql.SampleExtractor) iter.SampleIterator { +func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator { return nil } diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 6f45e12a41973..de0bfc418fbd2 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -7,11 +7,9 @@ import ( "strings" "time" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) // Errors returned by the chunk interface. @@ -100,8 +98,8 @@ type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool Append(*logproto.Entry) error - Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error) - SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator + Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) + SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. Blocks(mintT, maxtT time.Time) []Block Size() int @@ -126,7 +124,7 @@ type Block interface { // Entries is the amount of entries in the block. Entries() int // Iterator returns an entry iterator for the block. - Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator + Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator // SampleIterator returns a sample iterator for the block. - SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator + SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 546a79e84f864..27d596758e0f1 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -16,11 +16,10 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/stats" ) @@ -471,7 +470,7 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) { } // Iterator implements Chunk. -func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error) { +func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { mint, maxt := mintT.UnixNano(), maxtT.UnixNano() its := make([]iter.EntryIterator, 0, len(c.blocks)+1) @@ -479,11 +478,11 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if maxt < b.mint || b.maxt < mint { continue } - its = append(its, encBlock{c.encoding, b}.Iterator(ctx, lbs, pipeline)) + its = append(its, encBlock{c.encoding, b}.Iterator(ctx, pipeline)) } if !c.head.isEmpty() { - its = append(its, c.head.iterator(ctx, direction, mint, maxt, lbs, pipeline)) + its = append(its, c.head.iterator(ctx, direction, mint, maxt, pipeline)) } if direction == logproto.FORWARD { @@ -513,7 +512,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi } // Iterator implements Chunk. -func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator { mint, maxt := from.UnixNano(), through.UnixNano() its := make([]iter.SampleIterator, 0, len(c.blocks)+1) @@ -521,11 +520,11 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, if maxt < b.mint || b.maxt < mint { continue } - its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, lbs, extractor)) + its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor)) } if !c.head.isEmpty() { - its = append(its, c.head.sampleIterator(ctx, mint, maxt, lbs, extractor)) + its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor)) } return iter.NewTimeRangedSampleIterator( @@ -557,18 +556,18 @@ type encBlock struct { block } -func (b encBlock) Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator { +func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newEntryIterator(ctx, getReaderPool(b.enc), b.b, lbs, pipeline) + return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline) } -func (b encBlock) SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newSampleIterator(ctx, getReaderPool(b.enc), b.b, lbs, extractor) + return newSampleIterator(ctx, getReaderPool(b.enc), b.b, extractor) } func (b block) Offset() int { @@ -585,7 +584,7 @@ func (b block) MaxTime() int64 { return b.maxt } -func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator { +func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } @@ -601,7 +600,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, for _, e := range hb.entries { chunkStats.HeadChunkBytes += int64(len(e.s)) line := []byte(e.s) - newLine, parsedLbs, ok := pipeline.Process(line, lbs) + newLine, parsedLbs, ok := pipeline.Process(line) if !ok { continue } @@ -630,7 +629,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, return iter.NewStreamsIterator(ctx, streamsResult, direction) } -func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } @@ -640,7 +639,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l for _, e := range hb.entries { chunkStats.HeadChunkBytes += int64(len(e.s)) line := []byte(e.s) - value, parsedLabels, ok := extractor.Process(line, lbs) + value, parsedLabels, ok := extractor.Process(line) if !ok { continue } @@ -687,11 +686,9 @@ type bufferedIterator struct { currTs int64 closed bool - - baseLbs labels.Labels } -func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels) *bufferedIterator { +func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *bufferedIterator { chunkStats := stats.GetChunkData(ctx) chunkStats.CompressedBytes += int64(len(b)) return &bufferedIterator{ @@ -701,7 +698,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs lab bufReader: nil, // will be initialized later pool: pool, decBuf: make([]byte, binary.MaxVarintLen64), - baseLbs: lbs, } } @@ -806,19 +802,19 @@ func (si *bufferedIterator) close() { si.decBuf = nil } -func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator { +func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator { return &entryBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b, lbs), + bufferedIterator: newBufferedIterator(ctx, pool, b), pipeline: pipeline, } } type entryBufferedIterator struct { *bufferedIterator - pipeline logql.Pipeline + pipeline log.StreamPipeline cur logproto.Entry - currLabels labels.Labels + currLabels log.LabelsResult } func (e *entryBufferedIterator) Entry() logproto.Entry { @@ -829,7 +825,7 @@ func (e *entryBufferedIterator) Labels() string { return e.currLabels.String() } func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - newLine, lbs, ok := e.pipeline.Process(e.currLine, e.baseLbs) + newLine, lbs, ok := e.pipeline.Process(e.currLine) if !ok { continue } @@ -841,9 +837,9 @@ func (e *entryBufferedIterator) Next() bool { return false } -func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, extractor log.StreamSampleExtractor) iter.SampleIterator { it := &sampleBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b, lbs), + bufferedIterator: newBufferedIterator(ctx, pool, b), extractor: extractor, } return it @@ -852,15 +848,15 @@ func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs label type sampleBufferedIterator struct { *bufferedIterator - extractor logql.SampleExtractor + extractor log.StreamSampleExtractor cur logproto.Sample - currLabels labels.Labels + currLabels log.LabelsResult } func (e *sampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - val, labels, ok := e.extractor.Process(e.currLine, e.baseLbs) + val, labels, ok := e.extractor.Process(e.currLine) if !ok { continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index fdba84eb4ae78..459f51354b2a9 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -35,8 +36,16 @@ var testEncoding = []Encoding{ } var ( - testBlockSize = 256 * 1024 - testTargetSize = 1500 * 1024 + testBlockSize = 256 * 1024 + testTargetSize = 1500 * 1024 + noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) + countExtractor = func() log.StreamSampleExtractor { + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + if err != nil { + panic(err) + } + return ex.ForStream(labels.Labels{}) + }() ) func TestBlocksInclusive(t *testing.T) { @@ -114,7 +123,7 @@ func TestBlock(t *testing.T) { } } - it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) idx := 0 @@ -129,7 +138,7 @@ func TestBlock(t *testing.T) { require.NoError(t, it.Close()) require.Equal(t, len(cases), idx) - sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false)) + sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) idx = 0 for sampleIt.Next() { s := sampleIt.Sample() @@ -144,7 +153,7 @@ func TestBlock(t *testing.T) { require.Equal(t, len(cases), idx) t.Run("bounded-iteration", func(t *testing.T) { - it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) idx := 2 @@ -177,7 +186,7 @@ func TestReadFormatV1(t *testing.T) { t.Fatal(err) } - it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -204,7 +213,7 @@ func TestRoundtripV2(t *testing.T) { assertLines := func(c *MemChunk) { require.Equal(t, enc, c.Encoding()) - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -266,7 +275,7 @@ func TestSerialization(t *testing.T) { bc, err := NewByteChunk(byt, testBlockSize, testTargetSize) require.NoError(t, err) - it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) for i := 0; i < numSamples; i++ { require.True(t, it.Next()) @@ -277,7 +286,7 @@ func TestSerialization(t *testing.T) { } require.NoError(t, it.Error()) - sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false)) + sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) for i := 0; i < numSamples; i++ { require.True(t, sampleIt.Next(), i) @@ -320,7 +329,7 @@ func TestChunkFilling(t *testing.T) { require.Equal(t, int64(lines), i) - it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) i = 0 for it.Next() { @@ -463,7 +472,7 @@ func TestChunkStats(t *testing.T) { expectedSize := (inserted * len(entry.Line)) + (inserted * 2 * binary.MaxVarintLen64) ctx := stats.NewContext(context.Background()) - it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, nil, logql.NoopPipeline) + it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -492,7 +501,7 @@ func TestChunkStats(t *testing.T) { t.Fatal(err) } ctx = stats.NewContext(context.Background()) - it, err = cb.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, nil, logql.NoopPipeline) + it, err = cb.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -540,7 +549,7 @@ func TestIteratorClose(t *testing.T) { } { c := NewMemChunk(enc, testBlockSize, testTargetSize) inserted := fillChunk(c) - iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil, logql.NoopPipeline) + iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -591,7 +600,7 @@ func BenchmarkRead(b *testing.B) { for n := 0; n < b.N; n++ { for _, c := range chunks { // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory - iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil, logql.NoopPipeline) + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline) if err != nil { panic(err) } @@ -616,7 +625,7 @@ func BenchmarkBackwardIterator(b *testing.B) { _ = fillChunk(c) b.ResetTimer() for n := 0; n < b.N; n++ { - iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, nil, logql.NoopPipeline) + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline) if err != nil { panic(err) } @@ -637,7 +646,7 @@ func TestGenerateDataSize(t *testing.T) { bytesRead := uint64(0) for _, c := range chunks { // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory - iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil, logql.NoopPipeline) + iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline) if err != nil { panic(err) } @@ -671,7 +680,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, nil, logql.NoopPipeline) + iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline) for iter.Next() { _ = iter.Entry() @@ -730,7 +739,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) { c := createChunk() // testing headchunk - it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil, logql.NoopPipeline) + it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, noopStreamPipeline) require.NoError(t, err) for i := range tt.expect { require.Equal(t, tt.expect[i], it.Next()) @@ -739,7 +748,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) { // testing chunk blocks require.NoError(t, c.cut()) - it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil, logql.NoopPipeline) + it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, noopStreamPipeline) require.NoError(t, err) for i := range tt.expect { require.Equal(t, tt.expect[i], it.Next()) @@ -758,7 +767,7 @@ func TestMemchunkLongLine(t *testing.T) { for i := 1; i <= 10; i++ { require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) } - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) for i := 1; i <= 10; i++ { require.True(t, it.Next()) @@ -777,3 +786,102 @@ func TestBytesWith(t *testing.T) { require.Equal(t, exp, out) } + +var streams = []logproto.Stream{} +var series = []logproto.Series{} + +func BenchmarkBufferedIteratorLabels(b *testing.B) { + c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize) + _ = fillChunk(c) + labelsSet := []labels.Labels{ + { + {Name: "cluster", Value: "us-central1"}, + {Name: "stream", Value: "stdout"}, + {Name: "filename", Value: "/var/log/pods/loki-prod_query-frontend-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, + {Name: "namespace", Value: "loki-dev"}, + {Name: "job", Value: "loki-prod/query-frontend"}, + {Name: "container", Value: "query-frontend"}, + {Name: "pod", Value: "query-frontend-6894f97b98-89q2n"}, + }, + { + {Name: "cluster", Value: "us-central2"}, + {Name: "stream", Value: "stderr"}, + {Name: "filename", Value: "/var/log/pods/loki-prod_querier-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, + {Name: "namespace", Value: "loki-dev"}, + {Name: "job", Value: "loki-prod/querier"}, + {Name: "container", Value: "querier"}, + {Name: "pod", Value: "querier-6894f97b98-89q2n"}, + }, + } + for _, test := range []string{ + `{app="foo"}`, + `{app="foo"} != "foo"`, + `{app="foo"} != "foo" | logfmt `, + `{app="foo"} != "foo" | logfmt | duration > 10ms`, + `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, + } { + b.Run(test, func(b *testing.B) { + b.ReportAllocs() + expr, err := logql.ParseLogSelector(test) + if err != nil { + b.Fatal(err) + } + p, err := expr.Pipeline() + if err != nil { + b.Fatal(err) + } + var iters []iter.EntryIterator + for _, lbs := range labelsSet { + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, p.ForStream(lbs)) + if err != nil { + b.Fatal(err) + } + iters = append(iters, it) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, it := range iters { + for it.Next() { + streams = append(streams, logproto.Stream{Labels: it.Labels(), Entries: []logproto.Entry{it.Entry()}}) + } + } + } + streams = streams[:0] + }) + } + + for _, test := range []string{ + `rate({app="foo"}[1m])`, + `sum by (cluster) (rate({app="foo"}[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" [10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, + } { + b.Run(test, func(b *testing.B) { + b.ReportAllocs() + expr, err := logql.ParseSampleExpr(test) + if err != nil { + b.Fatal(err) + } + ex, err := expr.Extractor() + if err != nil { + b.Fatal(err) + } + var iters []iter.SampleIterator + for _, lbs := range labelsSet { + iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs))) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, it := range iters { + for it.Next() { + series = append(series, logproto.Series{Labels: it.Labels(), Samples: []logproto.Sample{it.Sample()}}) + } + } + } + series = series[:0] + }) + } +} diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index fbd1e1028d54c..8ce8866e922ba 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -64,7 +64,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, labels.Labels{}, logql.NoopPipeline) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{})) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) _ = iter.Close() @@ -73,7 +73,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, labels.Labels{}, logql.NoopPipeline) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(labels.Labels{})) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) _ = iter.Close() diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 986b6ef1dbc13..51f3a49930355 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -320,7 +320,7 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk { } func buildStreamsFromChunk(t *testing.T, lbs string, chk chunkenc.Chunk) logproto.Stream { - it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, labels.Labels{}, logql.NoopPipeline) + it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{})) require.NoError(t, err) stream := logproto.Stream{ diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 0654af3c8aff6..25f47824d3715 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -211,7 +211,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter expr.Matchers(), func(stream *stream) error { ingStats.TotalChunksMatched += int64(len(stream.chunks)) - iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline) + iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels)) if err != nil { return err } @@ -242,7 +242,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams expr.Selector().Matchers(), func(stream *stream) error { ingStats.TotalChunksMatched += int64(len(stream.chunks)) - iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor) + iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor.ForStream(stream.labels)) if err != nil { return err } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 08c374dd6f0af..30e2f07ccd120 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -18,7 +18,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) var ( @@ -258,10 +258,10 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t } // Returns an iterator. -func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, pipeline logql.Pipeline) (iter.EntryIterator, error) { +func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { iterators := make([]iter.EntryIterator, 0, len(s.chunks)) for _, c := range s.chunks { - itr, err := c.chunk.Iterator(ctx, from, through, direction, s.labels, pipeline) + itr, err := c.chunk.Iterator(ctx, from, through, direction, pipeline) if err != nil { return nil, err } @@ -280,10 +280,10 @@ func (s *stream) Iterator(ctx context.Context, from, through time.Time, directio } // Returns an SampleIterator. -func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, extractor logql.SampleExtractor) (iter.SampleIterator, error) { +func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) { iterators := make([]iter.SampleIterator, 0, len(s.chunks)) for _, c := range s.chunks { - if itr := c.chunk.SampleIterator(ctx, from, through, s.labels, extractor); itr != nil { + if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil { iterators = append(iterators, itr) } } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index ffaa422725e08..b32ad34dccf13 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -120,7 +120,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(chunks*entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) @@ -130,7 +130,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 8c03c8fcc644e..4b25fe10edb6c 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -145,18 +145,18 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error if err != nil { return nil, err } + sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := t.pipeline.Process([]byte(e.Line), lbs) + newLine, parsedLbs, ok := sp.Process([]byte(e.Line)) if !ok { continue } var stream *logproto.Stream - lhash := parsedLbs.Hash() - if stream, ok = streams[lhash]; !ok { + if stream, ok = streams[parsedLbs.Hash()]; !ok { stream = &logproto.Stream{ Labels: parsedLbs.String(), } - streams[lhash] = stream + streams[parsedLbs.Hash()] = stream } stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: e.Timestamp, diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 694bb38876bfb..e4e5ffc7797bd 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -95,7 +95,7 @@ func TestTransferOut(t *testing.T) { time.Unix(0, 0), time.Unix(10, 0), logproto.FORWARD, - logql.NoopPipeline, + logql.NoopPipeline.ForStream(stream.labels), ) if !assert.NoError(t, err) { continue diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index cbb68b06dd038..b06f7b280091b 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -132,7 +132,8 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac } } - if newLine, newLabels, ok := m.pipeline.Process([]byte(*entry), labels.FromMap(util.ModelLabelSetToMap(lbs))); ok { + sp := m.pipeline.ForStream(labels.FromMap(util.ModelLabelSetToMap(lbs))) + if newLine, newLabels, ok := sp.Process([]byte(*entry)); ok { switch m.action { case MatchActionDrop: // Adds the drop label to not be sent by the api.EntryHandler @@ -142,7 +143,7 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac for k := range lbs { delete(lbs, k) } - for _, l := range newLabels { + for _, l := range newLabels.Labels() { lbs[model.LabelName(l.Name)] = model.LabelValue(l.Value) } m.stage.Process(lbs, extracted, t, entry) diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 1902d1bfcfcac..1575d84ccb6ec 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -85,7 +85,7 @@ type Pipeline = log.Pipeline type SampleExtractor = log.SampleExtractor var ( - NoopPipeline = log.NoopPipeline + NoopPipeline = log.NewNoopPipeline() ) // PipelineExpr is an expression defining a log pipeline. @@ -109,7 +109,7 @@ func (m MultiStageExpr) Pipeline() (log.Pipeline, error) { return nil, err } if len(stages) == 0 { - return log.NoopPipeline, nil + return NoopPipeline, nil } return log.NewPipeline(stages), nil } @@ -169,7 +169,7 @@ func (e *matchersExpr) String() string { } func (e *matchersExpr) Pipeline() (log.Pipeline, error) { - return log.NoopPipeline, nil + return NoopPipeline, nil } func (e *matchersExpr) HasFilter() bool { @@ -719,7 +719,7 @@ func (e *vectorAggregationExpr) Extractor() (log.SampleExtractor, error) { // inject in the range vector extractor the outer groups to improve performance. // This is only possible if the operation is a sum. Anything else needs all labels. if r, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum { - return r.extractor(e.grouping, true) + return r.extractor(e.grouping, len(e.grouping.groups) == 0) } return e.left.Extractor() } @@ -860,7 +860,7 @@ func (e *literalExpr) String() string { func (e *literalExpr) Selector() LogSelectorExpr { return e } func (e *literalExpr) HasFilter() bool { return false } func (e *literalExpr) Operations() []string { return nil } -func (e *literalExpr) Pipeline() (log.Pipeline, error) { return log.NoopPipeline, nil } +func (e *literalExpr) Pipeline() (log.Pipeline, error) { return NoopPipeline, nil } func (e *literalExpr) Matchers() []*labels.Matcher { return nil } func (e *literalExpr) Extractor() (log.SampleExtractor, error) { return nil, nil } diff --git a/pkg/logql/ast_test.go b/pkg/logql/ast_test.go index 7ebe137aaced2..b71afc84bd711 100644 --- a/pkg/logql/ast_test.go +++ b/pkg/logql/ast_test.go @@ -43,7 +43,7 @@ func Test_logSelectorExpr_String(t *testing.T) { if err != nil { t.Fatalf("failed to get filter: %s", err) } - require.Equal(t, tt.expectFilter, p != log.NoopPipeline) + require.Equal(t, tt.expectFilter, p != NoopPipeline) if expr.String() != tt.selector { t.Fatalf("error expected: %s got: %s", tt.selector, expr.String()) } @@ -132,7 +132,7 @@ func Test_NilFilterDoesntPanic(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) - _, _, ok := p.Process([]byte("bleepbloop"), labelBar) + _, _, ok := p.ForStream(labelBar).Process([]byte("bleepbloop")) require.True(t, ok) }) @@ -216,10 +216,11 @@ func Test_FilterMatcher(t *testing.T) { p, err := expr.Pipeline() assert.Nil(t, err) if tt.lines == nil { - assert.Equal(t, p, log.NoopPipeline) + assert.Equal(t, p, NoopPipeline) } else { + sp := p.ForStream(labelBar) for _, lc := range tt.lines { - _, _, ok := p.Process([]byte(lc.l), labelBar) + _, _, ok := sp.Process([]byte(lc.l)) assert.Equal(t, lc.e, ok) } } @@ -281,8 +282,9 @@ func BenchmarkContainsFilter(b *testing.B) { b.ResetTimer() + sp := p.ForStream(labelBar) for i := 0; i < b.N; i++ { - if _, _, ok := p.Process(line, labelBar); !ok { + if _, _, ok := sp.Process(line); !ok { b.Fatal("doesn't match") } } diff --git a/pkg/logql/functions.go b/pkg/logql/functions.go index 3a2d6be528cdc..e41ae695aa281 100644 --- a/pkg/logql/functions.go +++ b/pkg/logql/functions.go @@ -68,9 +68,9 @@ func (r rangeAggregationExpr) extractor(gr *grouping, all bool) (log.SampleExtra // otherwise we extract metrics from the log line. switch r.operation { case OpRangeTypeRate, OpRangeTypeCount: - return log.LineExtractorWithStages(log.CountExtractor, stages, groups, without, all) + return log.NewLineSampleExtractor(log.CountExtractor, stages, groups, without, all) case OpRangeTypeBytes, OpRangeTypeBytesRate: - return log.LineExtractorWithStages(log.BytesExtractor, stages, groups, without, all) + return log.NewLineSampleExtractor(log.BytesExtractor, stages, groups, without, all) default: return nil, fmt.Errorf(unsupportedErr, r.operation) } diff --git a/pkg/logql/log/fmt_test.go b/pkg/logql/log/fmt_test.go index b86d6ad06a27e..a7bbf8889f085 100644 --- a/pkg/logql/log/fmt_test.go +++ b/pkg/logql/log/fmt_test.go @@ -41,11 +41,11 @@ func Test_lineFormatter_Format(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) - outLine, _ := tt.fmter.Process(nil, b) + builder := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + builder.Reset() + outLine, _ := tt.fmter.Process(nil, builder) require.Equal(t, tt.want, outLine) - require.Equal(t, tt.wantLbs, b.Labels()) + require.Equal(t, tt.wantLbs, builder.Labels()) }) } } @@ -94,11 +94,11 @@ func Test_labelsFormatter_Format(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.in) - _, _ = tt.fmter.Process(nil, b) + builder := NewBaseLabelsBuilder().ForLabels(tt.in, tt.in.Hash()) + builder.Reset() + _, _ = tt.fmter.Process(nil, builder) sort.Sort(tt.want) - require.Equal(t, tt.want, b.Labels()) + require.Equal(t, tt.want, builder.Labels()) }) } } diff --git a/pkg/logql/log/label_filter_test.go b/pkg/logql/log/label_filter_test.go index 0d909d237b3ac..371e6b601931e 100644 --- a/pkg/logql/log/label_filter_test.go +++ b/pkg/logql/log/label_filter_test.go @@ -151,8 +151,8 @@ func TestBinary_Filter(t *testing.T) { for _, tt := range tests { t.Run(tt.f.String(), func(t *testing.T) { sort.Sort(tt.lbs) - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, got := tt.f.Process(nil, b) require.Equal(t, tt.want, got) sort.Sort(tt.wantLbs) @@ -231,8 +231,8 @@ func TestErrorFiltering(t *testing.T) { for _, tt := range tests { t.Run(tt.f.String(), func(t *testing.T) { sort.Sort(tt.lbs) - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() b.SetErr(tt.err) _, got := tt.f.Process(nil, b) require.Equal(t, tt.want, got) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index f087d22983275..24283a36c9143 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -6,26 +6,124 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) +var ( + emptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash()) +) + +// LabelsResult is a computed labels result that contains the labels set with associated string and hash. +// The is mainly used for caching and returning labels computations out of pipelines and stages. +type LabelsResult interface { + String() string + Labels() labels.Labels + Hash() uint64 +} + +// NewLabelsResult creates a new LabelsResult from a labels set and a hash. +func NewLabelsResult(lbs labels.Labels, hash uint64) LabelsResult { + return &labelsResult{lbs: lbs, s: lbs.String(), h: hash} +} + +type labelsResult struct { + lbs labels.Labels + s string + h uint64 +} + +func (l labelsResult) String() string { + return l.s +} + +func (l labelsResult) Labels() labels.Labels { + return l.lbs +} + +func (l labelsResult) Hash() uint64 { + return l.h +} + +type hasher struct { + buf []byte // buffer for computing hash without bytes slice allocation. +} + +// newHasher allow to compute hashes for labels by reusing the same buffer. +func newHasher() *hasher { + return &hasher{ + buf: make([]byte, 0, 1024), + } +} + +// Hash hashes the labels +func (h *hasher) Hash(lbs labels.Labels) uint64 { + var hash uint64 + hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...) + return hash +} + +// BaseLabelsBuilder is a label builder used by pipeline and stages. +// Only one base builder is used and it contains cache for each LabelsBuilders. +type BaseLabelsBuilder struct { + del []string + add []labels.Label + // nolint(structcheck) https://github.com/golangci/golangci-lint/issues/826 + err string + + groups []string + without, noLabels bool + + resultCache map[uint64]LabelsResult + *hasher +} + // LabelsBuilder is the same as labels.Builder but tailored for this package. type LabelsBuilder struct { - base labels.Labels - del []string - add []labels.Label + base labels.Labels + currentResult LabelsResult + groupedResult LabelsResult - err string + *BaseLabelsBuilder +} + +// NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. +func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) *BaseLabelsBuilder { + return &BaseLabelsBuilder{ + del: make([]string, 0, 5), + add: make([]labels.Label, 0, 16), + resultCache: make(map[uint64]LabelsResult), + hasher: newHasher(), + groups: groups, + noLabels: noLabels, + without: without, + } } -// NewLabelsBuilder creates a new labels builder. -func NewLabelsBuilder() *LabelsBuilder { - return &LabelsBuilder{ - del: make([]string, 0, 5), - add: make([]labels.Label, 0, 5), +// NewLabelsBuilder creates a new base labels builder. +func NewBaseLabelsBuilder() *BaseLabelsBuilder { + return NewBaseLabelsBuilderWithGrouping(nil, false, false) +} + +// ForLabels creates a labels builder for a given labels set as base. +// The labels cache is shared across all created LabelsBuilders. +func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBuilder { + if labelResult, ok := b.resultCache[hash]; ok { + res := &LabelsBuilder{ + base: lbs, + currentResult: labelResult, + BaseLabelsBuilder: b, + } + return res + } + labelResult := NewLabelsResult(lbs, hash) + b.resultCache[hash] = labelResult + res := &LabelsBuilder{ + base: lbs, + currentResult: labelResult, + BaseLabelsBuilder: b, } + return res } // Reset clears all current state for the builder. -func (b *LabelsBuilder) Reset(base labels.Labels) { - b.base = base +func (b *LabelsBuilder) Reset() { b.del = b.del[:0] b.add = b.add[:0] b.err = "" @@ -47,11 +145,12 @@ func (b *LabelsBuilder) HasErr() bool { return b.err != "" } -// Base returns the base labels unmodified -func (b *LabelsBuilder) Base() labels.Labels { - return b.base +// BaseHas returns the base labels have the given key +func (b *LabelsBuilder) BaseHas(key string) bool { + return b.base.Has(key) } +// Get returns the value of a labels key if it exists. func (b *LabelsBuilder) Get(key string) (string, bool) { for _, a := range b.add { if a.Name == key { @@ -136,12 +235,124 @@ Outer: return res } -func (b *LabelsBuilder) WithoutLabels(names ...string) labels.Labels { - // naive implementation for now. - return b.Labels().WithoutLabels(names...) +// LabelsResult returns the LabelsResult from the builder. +// No grouping is applied and the cache is used when possible. +func (b *LabelsBuilder) LabelsResult() LabelsResult { + // unchanged path. + if len(b.del) == 0 && len(b.add) == 0 && b.err == "" { + return b.currentResult + } + return b.toResult(b.Labels()) +} + +func (b *BaseLabelsBuilder) toResult(lbs labels.Labels) LabelsResult { + hash := b.hasher.Hash(lbs) + if cached, ok := b.resultCache[hash]; ok { + return cached + } + res := NewLabelsResult(lbs, hash) + b.resultCache[hash] = res + return res } -func (b *LabelsBuilder) WithLabels(names ...string) labels.Labels { - // naive implementation for now. - return b.Labels().WithLabels(names...) +// GroupedLabels returns the LabelsResult from the builder. +// Groups are applied and the cache is used when possible. +func (b *LabelsBuilder) GroupedLabels() LabelsResult { + if b.err != "" { + // We need to return now before applying grouping otherwise the error might get lost. + return b.LabelsResult() + } + if b.noLabels { + return emptyLabelsResult + } + // unchanged path. + if len(b.del) == 0 && len(b.add) == 0 { + if len(b.groups) == 0 { + return b.currentResult + } + return b.toBaseGroup() + } + + if b.without { + return b.withoutResult() + } + return b.withResult() +} + +func (b *LabelsBuilder) withResult() LabelsResult { + res := make(labels.Labels, 0, len(b.groups)) +Outer: + for _, g := range b.groups { + for _, n := range b.del { + if g == n { + continue Outer + } + } + for _, la := range b.add { + if g == la.Name { + res = append(res, la) + continue Outer + } + } + for _, l := range b.base { + if g == l.Name { + res = append(res, l) + continue Outer + } + } + } + return b.toResult(res) +} + +func (b *LabelsBuilder) withoutResult() LabelsResult { + size := len(b.base) + len(b.add) - len(b.del) - len(b.groups) + if size < 0 { + size = 0 + } + res := make(labels.Labels, 0, size) +Outer: + for _, l := range b.base { + for _, n := range b.del { + if l.Name == n { + continue Outer + } + } + for _, la := range b.add { + if l.Name == la.Name { + continue Outer + } + } + for _, lg := range b.groups { + if l.Name == lg { + continue Outer + } + } + res = append(res, l) + } +OuterAdd: + for _, la := range b.add { + for _, lg := range b.groups { + if la.Name == lg { + continue OuterAdd + } + } + res = append(res, la) + } + sort.Sort(res) + return b.toResult(res) +} + +func (b *LabelsBuilder) toBaseGroup() LabelsResult { + if b.groupedResult != nil { + return b.groupedResult + } + var lbs labels.Labels + if b.without { + lbs = b.base.WithoutLabels(b.groups...) + } else { + lbs = b.base.WithLabels(b.groups...) + } + res := NewLabelsResult(lbs, lbs.Hash()) + b.groupedResult = res + return res } diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index 544499601a887..c29afb900227e 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -1,6 +1,7 @@ package log import ( + "sort" "testing" "github.com/prometheus/prometheus/pkg/labels" @@ -8,8 +9,9 @@ import ( ) func TestLabelsBuilder_Get(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(labels.Labels{labels.Label{Name: "already", Value: "in"}}) + lbs := labels.Labels{labels.Label{Name: "already", Value: "in"}} + b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + b.Reset() b.Set("foo", "bar") b.Set("bar", "buzz") b.Del("foo") @@ -31,8 +33,8 @@ func TestLabelsBuilder_Get(t *testing.T) { func TestLabelsBuilder_LabelsError(t *testing.T) { lbs := labels.Labels{labels.Label{Name: "already", Value: "in"}} - b := NewLabelsBuilder() - b.Reset(lbs) + b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + b.Reset() b.SetErr("err") lbsWithErr := b.Labels() require.Equal( @@ -46,3 +48,102 @@ func TestLabelsBuilder_LabelsError(t *testing.T) { // make sure the original labels is unchanged. require.Equal(t, labels.Labels{labels.Label{Name: "already", Value: "in"}}, lbs) } + +func TestLabelsBuilder_LabelsResult(t *testing.T) { + lbs := labels.Labels{ + labels.Label{Name: "namespace", Value: "loki"}, + labels.Label{Name: "job", Value: "us-central1/loki"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + } + sort.Sort(lbs) + b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + b.Reset() + assertLabelResult(t, lbs, b.LabelsResult()) + b.SetErr("err") + withErr := append(lbs, labels.Label{Name: ErrorLabel, Value: "err"}) + sort.Sort(withErr) + assertLabelResult(t, withErr, b.LabelsResult()) + + b.Set("foo", "bar") + b.Set("namespace", "tempo") + b.Set("buzz", "fuzz") + b.Del("job") + expected := labels.Labels{ + labels.Label{Name: ErrorLabel, Value: "err"}, + labels.Label{Name: "namespace", Value: "tempo"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + labels.Label{Name: "foo", Value: "bar"}, + labels.Label{Name: "buzz", Value: "fuzz"}, + } + sort.Sort(expected) + assertLabelResult(t, expected, b.LabelsResult()) + // cached. + assertLabelResult(t, expected, b.LabelsResult()) +} + +func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { + lbs := labels.Labels{ + labels.Label{Name: "namespace", Value: "loki"}, + labels.Label{Name: "job", Value: "us-central1/loki"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + } + sort.Sort(lbs) + b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, false, false).ForLabels(lbs, lbs.Hash()) + b.Reset() + assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "loki"}}, b.GroupedLabels()) + b.SetErr("err") + withErr := append(lbs, labels.Label{Name: ErrorLabel, Value: "err"}) + sort.Sort(withErr) + assertLabelResult(t, withErr, b.GroupedLabels()) + + b.Reset() + b.Set("foo", "bar") + b.Set("namespace", "tempo") + b.Set("buzz", "fuzz") + b.Del("job") + expected := labels.Labels{ + labels.Label{Name: "namespace", Value: "tempo"}, + } + sort.Sort(expected) + assertLabelResult(t, expected, b.GroupedLabels()) + // cached. + assertLabelResult(t, expected, b.GroupedLabels()) + + b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, false, false).ForLabels(lbs, lbs.Hash()) + assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) + assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) + b.Del("job") + assertLabelResult(t, labels.Labels{}, b.GroupedLabels()) + b.Reset() + b.Set("namespace", "tempo") + assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) + + b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, true, false).ForLabels(lbs, lbs.Hash()) + b.Del("job") + b.Set("foo", "bar") + b.Set("job", "something") + expected = labels.Labels{ + labels.Label{Name: "namespace", Value: "loki"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + labels.Label{Name: "foo", Value: "bar"}, + } + sort.Sort(expected) + assertLabelResult(t, expected, b.GroupedLabels()) + +} + +func assertLabelResult(t *testing.T, lbs labels.Labels, res LabelsResult) { + t.Helper() + require.Equal(t, + lbs, + res.Labels(), + ) + require.Equal(t, + lbs.Hash(), + res.Hash(), + ) + require.Equal(t, + lbs.String(), + res.String(), + ) +} diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 98149dc3506fa..b6b2cce3366a9 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -17,96 +17,87 @@ const ( ConvertFloat = "float" ) -// SampleExtractor extracts sample for a log line. -type SampleExtractor interface { - Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) -} - -type SampleExtractorFunc func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) - -func (fn SampleExtractorFunc) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { - return fn(line, lbs) -} - // LineExtractor extracts a float64 from a log line. type LineExtractor func([]byte) float64 -// ToSampleExtractor transform a LineExtractor into a SampleExtractor. -// Useful for metric conversion without log Pipeline. -func (l LineExtractor) ToSampleExtractor(groups []string, without bool, noLabels bool) SampleExtractor { - return SampleExtractorFunc(func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { - // todo(cyriltovena) grouping should be done once per stream/chunk not for everyline. - // so for now we'll cover just vector without grouping. This requires changes to SampleExtractor interface. - // For another day ! - if len(groups) == 0 && noLabels { - return l(line), labels.Labels{}, true - } - return l(line), lbs, true - }) -} - var ( CountExtractor LineExtractor = func(line []byte) float64 { return 1. } BytesExtractor LineExtractor = func(line []byte) float64 { return float64(len(line)) } ) +// SampleExtractor creates StreamSampleExtractor that can extract samples for a given log stream. +type SampleExtractor interface { + ForStream(labels labels.Labels) StreamSampleExtractor +} + +// StreamSampleExtractor extracts sample for a log line. +type StreamSampleExtractor interface { + Process(line []byte) (float64, LabelsResult, bool) +} + type lineSampleExtractor struct { Stage LineExtractor - groups []string - without bool - noLabels bool - builder *LabelsBuilder + baseBuilder *BaseLabelsBuilder + streamExtractors map[uint64]StreamSampleExtractor } -func (l lineSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { - l.builder.Reset(lbs) - line, ok := l.Stage.Process(line, l.builder) - if !ok { - return 0, nil, false - } - if len(l.groups) != 0 { - if l.without { - return l.LineExtractor(line), l.builder.WithoutLabels(l.groups...), true - } - return l.LineExtractor(line), l.builder.WithLabels(l.groups...), true +// NewLineSampleExtractor creates a SampleExtractor from a LineExtractor. +// Multiple log stages are run before converting the log line. +func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) { + return &lineSampleExtractor{ + Stage: ReduceStages(stages), + LineExtractor: ex, + baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels), + streamExtractors: make(map[uint64]StreamSampleExtractor), + }, nil +} + +func (l *lineSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + hash := l.baseBuilder.Hash(labels) + if res, ok := l.streamExtractors[hash]; ok { + return res } - if l.noLabels { - // no grouping but it was a vector operation so we return a single vector - return l.LineExtractor(line), labels.Labels{}, true + + res := &streamLineSampleExtractor{ + Stage: l.Stage, + LineExtractor: l.LineExtractor, + builder: l.baseBuilder.ForLabels(labels, hash), } - return l.LineExtractor(line), l.builder.Labels(), true + l.streamExtractors[hash] = res + return res } -// LineExtractorWithStages creates a SampleExtractor from a LineExtractor. -// Multiple log stages are run before converting the log line. -func LineExtractorWithStages(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) { - if len(stages) == 0 { - return ex.ToSampleExtractor(groups, without, noLabels), nil +type streamLineSampleExtractor struct { + Stage + LineExtractor + builder *LabelsBuilder +} + +func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { + // short circuit. + if l.Stage == NoopStage { + return l.LineExtractor(line), l.builder.GroupedLabels(), true } - return lineSampleExtractor{ - Stage: ReduceStages(stages), - LineExtractor: ex, - builder: NewLabelsBuilder(), - groups: groups, - without: without, - noLabels: noLabels, - }, nil + l.builder.Reset() + line, ok := l.Stage.Process(line, l.builder) + if !ok { + return 0, nil, false + } + return l.LineExtractor(line), l.builder.GroupedLabels(), true } type convertionFn func(value string) (float64, error) type labelSampleExtractor struct { - preStage Stage - postFilter Stage - builder *LabelsBuilder - + preStage Stage + postFilter Stage labelName string conversionFn convertionFn - groups []string - without bool - noLabels bool + + baseBuilder *BaseLabelsBuilder + streamExtractors map[uint64]StreamSampleExtractor } // LabelExtractorWithStages creates a SampleExtractor that will extract metrics from a labels. @@ -129,25 +120,43 @@ func LabelExtractorWithStages( default: return nil, errors.Errorf("unsupported conversion operation %s", conversion) } - if len(groups) != 0 && without { + if len(groups) == 0 || without { + without = true groups = append(groups, labelName) sort.Strings(groups) } return &labelSampleExtractor{ - preStage: ReduceStages(preStages), - conversionFn: convFn, - groups: groups, - labelName: labelName, - postFilter: postFilter, - without: without, - builder: NewLabelsBuilder(), - noLabels: noLabels, + preStage: ReduceStages(preStages), + conversionFn: convFn, + labelName: labelName, + postFilter: postFilter, + baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels), + streamExtractors: make(map[uint64]StreamSampleExtractor), }, nil } -func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { +type streamLabelSampleExtractor struct { + *labelSampleExtractor + builder *LabelsBuilder +} + +func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + hash := l.baseBuilder.Hash(labels) + if res, ok := l.streamExtractors[hash]; ok { + return res + } + + res := &streamLabelSampleExtractor{ + labelSampleExtractor: l, + builder: l.baseBuilder.ForLabels(labels, hash), + } + l.streamExtractors[hash] = res + return res +} + +func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { // Apply the pipeline first. - l.builder.Reset(lbs) + l.builder.Reset() line, ok := l.preStage.Process(line, l.builder) if !ok { return 0, nil, false @@ -168,25 +177,7 @@ func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, if _, ok = l.postFilter.Process(line, l.builder); !ok { return 0, nil, false } - if l.builder.HasErr() { - // we still have an error after post filtering. - // We need to return now before applying grouping otherwise the error might get lost. - return v, l.builder.Labels(), true - } - return v, l.groupLabels(l.builder), true -} - -func (l *labelSampleExtractor) groupLabels(lbs *LabelsBuilder) labels.Labels { - if len(l.groups) != 0 { - if l.without { - return lbs.WithoutLabels(l.groups...) - } - return lbs.WithLabels(l.groups...) - } - if l.noLabels { - return labels.Labels{} - } - return lbs.WithoutLabels(l.labelName) + return v, l.builder.GroupedLabels(), true } func convertFloat(v string) (float64, error) { diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index a0b5d53fc5d6e..68e22177d6a2e 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -112,10 +112,11 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sort.Sort(tt.in) - outval, outlbs, ok := tt.ex.Process([]byte(""), tt.in) + + outval, outlbs, ok := tt.ex.ForStream(tt.in).Process([]byte("")) require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) - require.Equal(t, tt.wantLbs, outlbs) + require.Equal(t, tt.wantLbs, outlbs.Labels()) }) } } @@ -126,3 +127,33 @@ func mustSampleExtractor(ex SampleExtractor, err error) SampleExtractor { } return ex } + +func TestNewLineSampleExtractor(t *testing.T) { + + se, err := NewLineSampleExtractor(CountExtractor, nil, nil, false, false) + require.NoError(t, err) + lbs := labels.Labels{ + {Name: "namespace", Value: "dev"}, + {Name: "cluster", Value: "us-central1"}, + } + sort.Sort(lbs) + sse := se.ForStream(lbs) + f, l, ok := sse.Process([]byte(`foo`)) + require.True(t, ok) + require.Equal(t, 1., f) + assertLabelResult(t, lbs, l) + + filter, err := NewFilter("foo", labels.MatchEqual) + require.NoError(t, err) + + se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter.ToStage()}, []string{"namespace"}, false, false) + require.NoError(t, err) + sse = se.ForStream(lbs) + f, l, ok = sse.Process([]byte(`foo`)) + require.True(t, ok) + require.Equal(t, 3., f) + assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "dev"}}, l) + sse = se.ForStream(lbs) + _, _, ok = sse.Process([]byte(`nope`)) + require.False(t, ok) +} diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index b979127058d43..aa9ef3d28edab 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -26,14 +26,12 @@ var ( errMissingCapture = errors.New("at least one named capture must be supplied") ) -func addLabel(lbs *LabelsBuilder) func(key, value string) { - return func(key, value string) { - key = sanitizeKey(key) - if lbs.Base().Has(key) { - key = fmt.Sprintf("%s%s", key, duplicateSuffix) - } - lbs.Set(key, value) +func addLabel(lbs *LabelsBuilder, key, value string) { + key = sanitizeKey(key) + if lbs.BaseHas(key) { + key = fmt.Sprintf("%s%s", key, duplicateSuffix) } + lbs.Set(key, value) } func sanitizeKey(key string) string { @@ -66,20 +64,20 @@ func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { lbs.SetErr(errJSON) return line, true } - parseMap("", data, addLabel(lbs)) + parseMap("", data, lbs) return line, true } -func parseMap(prefix string, data map[string]interface{}, add func(key, value string)) { +func parseMap(prefix string, data map[string]interface{}, lbs *LabelsBuilder) { for key, val := range data { switch concrete := val.(type) { case map[string]interface{}: - parseMap(jsonKey(prefix, key), concrete, add) + parseMap(jsonKey(prefix, key), concrete, lbs) case string: - add(jsonKey(prefix, key), concrete) + addLabel(lbs, jsonKey(prefix, key), concrete) case float64: f := strconv.FormatFloat(concrete, 'f', -1, 64) - add(jsonKey(prefix, key), f) + addLabel(lbs, jsonKey(prefix, key), f) } } } @@ -138,10 +136,9 @@ func mustNewRegexParser(re string) *RegexpParser { } func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { - add := addLabel(lbs) for i, value := range r.regex.FindSubmatch(line) { if name, ok := r.nameIndex[i]; ok { - add(name, string(value)) + addLabel(lbs, name, string(value)) } } return line, true @@ -161,11 +158,11 @@ func NewLogfmtParser() *LogfmtParser { func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { l.dec.Reset(line) - add := addLabel(lbs) + for l.dec.ScanKeyval() { key := string(l.dec.Key()) val := string(l.dec.Value()) - add(key, val) + addLabel(lbs, key, val) } if l.dec.Err() != nil { lbs.SetErr(errLogfmt) diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 651add226f8f9..659b7ce17a8a0 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -77,8 +77,8 @@ func Test_jsonParser_Parse(t *testing.T) { for _, tt := range tests { j := NewJSONParser() t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, _ = j.Process(tt.line, b) sort.Sort(tt.want) require.Equal(t, tt.want, b.Labels()) @@ -169,8 +169,8 @@ func Test_regexpParser_Parse(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, _ = tt.parser.Process(tt.line, b) sort.Sort(tt.want) require.Equal(t, tt.want, b.Labels()) @@ -281,8 +281,8 @@ func Test_logfmtParser_Parse(t *testing.T) { p := NewLogfmtParser() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, _ = p.Process(tt.line, b) sort.Sort(tt.want) require.Equal(t, tt.want, b.Labels()) diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 3c5d1b6459fa2..6a7906319d7b6 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -4,9 +4,19 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -// Pipeline transform and filter log lines and labels. +var ( + // NoopStage is a stage that doesn't process a log line. + NoopStage Stage = &noopStage{} +) + +// Pipeline can create pipelines for each log stream. type Pipeline interface { - Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) + ForStream(labels labels.Labels) StreamPipeline +} + +// StreamPipeline transform and filter log lines and labels. +type StreamPipeline interface { + Process(line []byte) ([]byte, LabelsResult, bool) } // Stage is a single step of a Pipeline. @@ -14,15 +24,33 @@ type Stage interface { Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) } -var ( - NoopPipeline Pipeline = &noopPipeline{} - NoopStage Stage = &noopStage{} -) +// NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is. +func NewNoopPipeline() Pipeline { + return &noopPipeline{ + cache: map[uint64]*noopStreamPipeline{}, + } +} + +type noopPipeline struct { + cache map[uint64]*noopStreamPipeline +} -type noopPipeline struct{} +type noopStreamPipeline struct { + LabelsResult +} -func (noopPipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) { - return line, lbs, true +func (n noopStreamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { + return line, n.LabelsResult, true +} + +func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline { + h := labels.Hash() + if cached, ok := n.cache[h]; ok { + return cached + } + sp := &noopStreamPipeline{LabelsResult: NewLabelsResult(labels, h)} + n.cache[h] = sp + return sp } type noopStage struct{} @@ -40,30 +68,53 @@ func (fn StageFunc) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { // pipeline is a combinations of multiple stages. // It can also be reduced into a single stage for convenience. type pipeline struct { - stages []Stage - builder *LabelsBuilder + stages []Stage + baseBuilder *BaseLabelsBuilder + + streamPipelines map[uint64]StreamPipeline } +// NewPipeline creates a new pipeline for a given set of stages. func NewPipeline(stages []Stage) Pipeline { + if len(stages) == 0 { + return NewNoopPipeline() + } return &pipeline{ - stages: stages, - builder: NewLabelsBuilder(), + stages: stages, + baseBuilder: NewBaseLabelsBuilder(), + streamPipelines: make(map[uint64]StreamPipeline), } } -func (p *pipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) { - var ok bool - if len(p.stages) == 0 { - return line, lbs, true +type streamPipeline struct { + stages []Stage + builder *LabelsBuilder +} + +func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline { + hash := p.baseBuilder.Hash(labels) + if res, ok := p.streamPipelines[hash]; ok { + return res + } + + res := &streamPipeline{ + stages: p.stages, + builder: p.baseBuilder.ForLabels(labels, hash), } - p.builder.Reset(lbs) + p.streamPipelines[hash] = res + return res +} + +func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { + var ok bool + p.builder.Reset() for _, s := range p.stages { line, ok = s.Process(line, p.builder) if !ok { return nil, nil, false } } - return line, p.builder.Labels(), true + return line, p.builder.LabelsResult(), true } // ReduceStages reduces multiple stages into one. diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 60b42e55f9392..2038d717486b2 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -10,7 +10,7 @@ import ( var ( resOK bool resLine []byte - resLbs labels.Labels + resLbs LabelsResult ) func Benchmark_Pipeline(b *testing.B) { @@ -39,8 +39,9 @@ func Benchmark_Pipeline(b *testing.B) { {Name: "pod_template_hash", Value: "5896759c79"}, } b.ResetTimer() + sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = p.Process(line, lbs) + resLine, resLbs, resOK = sp.Process(line) } } diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index 10cde5bcf152c..ffbabe2506fbe 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -1990,13 +1990,13 @@ func Test_PipelineCombined(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) - - line, lbs, ok := p.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`), labels.Labels{}) + sp := p.ForStream(labels.Labels{}) + line, lbs, ok := sp.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)) require.True(t, ok) require.Equal( t, labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}}, - lbs, + lbs.Labels(), ) require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line)) } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 01db6fceeb33d..ea26c8127b1d1 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -97,7 +97,8 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea for _, stream := range in { for _, e := range stream.Entries { - if l, out, ok := pipeline.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok { + sp := pipeline.ForStream(mustParseLabels(stream.Labels)) + if l, out, ok := sp.Process([]byte(e.Line)); ok { var s *logproto.Stream var found bool s, found = resByStream[out.String()] @@ -124,7 +125,8 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri for _, stream := range in { for _, e := range stream.Entries { - if f, lbs, ok := ex.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok { + exs := ex.ForStream(mustParseLabels(stream.Labels)) + if f, lbs, ok := exs.Process([]byte(e.Line)); ok { var s *logproto.Series var found bool s, found = resBySeries[lbs.String()] diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 471860c0cac2a..31ecbbdfb04f9 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/stats" ) @@ -381,19 +382,21 @@ func (it *logBatchIterator) newChunksIterator(b *chunkBatch) (iter.EntryIterator func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for _, chunks := range chks { + if len(chunks) != 0 && len(chunks[0]) != 0 { + streamPipeline := it.pipeline.ForStream(chunks[0][0].Chunk.Metric.WithoutLabels(labels.MetricName)) + iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk) + if err != nil { + return nil, err + } - iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk) - if err != nil { - return nil, err + result = append(result, iterator) } - - result = append(result, iterator) } return result, nil } -func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.EntryIterator, error) { +func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for i := range chks { @@ -402,7 +405,7 @@ func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through if !chks[i][j].IsValid { continue } - iterator, err := chks[i][j].Iterator(it.ctx, from, through, it.direction, it.pipeline, nextChunk) + iterator, err := chks[i][j].Iterator(it.ctx, from, through, it.direction, streamPipeline, nextChunk) if err != nil { return nil, err } @@ -514,17 +517,21 @@ func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIter func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.SampleIterator, error) { result := make([]iter.SampleIterator, 0, len(chks)) for _, chunks := range chks { - iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk) - if err != nil { - return nil, err + if len(chunks) != 0 && len(chunks[0]) != 0 { + streamExtractor := it.extractor.ForStream(chunks[0][0].Chunk.Metric.WithoutLabels(labels.MetricName)) + iterator, err := it.buildHeapIterator(chunks, from, through, streamExtractor, nextChunk) + if err != nil { + return nil, err + } + result = append(result, iterator) } - result = append(result, iterator) + } return result, nil } -func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.SampleIterator, error) { +func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractor log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) { result := make([]iter.SampleIterator, 0, len(chks)) for i := range chks { @@ -533,7 +540,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro if !chks[i][j].IsValid { continue } - iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, it.extractor, nextChunk) + iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, streamExtractor, nextChunk) if err != nil { return nil, err } @@ -755,20 +762,3 @@ outer: return css } - -// dropLabels returns a new label set with certain labels dropped -func dropLabels(ls labels.Labels, removals ...string) (dst labels.Labels) { - toDel := make(map[string]struct{}) - for _, r := range removals { - toDel[r] = struct{}{} - } - - for _, l := range ls { - _, remove := toDel[l.Name] - if !remove { - dst = append(dst, l) - } - } - - return dst -} diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 01742e5514211..fe50e95100d1a 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1231,7 +1231,10 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { - it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.CountExtractor.ToSampleExtractor(nil, false, false), tt.start, tt.end) + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + require.NoError(t, err) + + it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end) require.NoError(t, err) series, _, err := iter.ReadSampleBatch(it, 1000) _ = it.Close() @@ -1441,7 +1444,7 @@ func TestBuildHeapIterator(t *testing.T) { ctx: ctx, pipeline: logql.NoopPipeline, } - it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), nil) + it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) if err != nil { t.Errorf("buildHeapIterator error = %v", err) return @@ -1457,48 +1460,6 @@ func TestBuildHeapIterator(t *testing.T) { } } -func TestDropLabels(t *testing.T) { - - for i, tc := range []struct { - ls labels.Labels - drop []string - expected labels.Labels - }{ - { - ls: labels.Labels{ - labels.Label{ - Name: "a", - Value: "1", - }, - labels.Label{ - Name: "b", - Value: "2", - }, - labels.Label{ - Name: "c", - Value: "3", - }, - }, - drop: []string{"b"}, - expected: labels.Labels{ - labels.Label{ - Name: "a", - Value: "1", - }, - labels.Label{ - Name: "c", - Value: "3", - }, - }, - }, - } { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - dropped := dropLabels(tc.ls, tc.drop...) - require.Equal(t, tc.expected, dropped) - }) - } -} - func Test_IsInvalidChunkError(t *testing.T) { tests := []struct { name string diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go index f868ed5684336..9be9dd3f1d8ea 100644 --- a/pkg/storage/lazy_chunk.go +++ b/pkg/storage/lazy_chunk.go @@ -6,12 +6,11 @@ import ( "time" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) // LazyChunk loads the chunk when it is accessed. @@ -33,7 +32,7 @@ func (c *LazyChunk) Iterator( ctx context.Context, from, through time.Time, direction logproto.Direction, - pipeline logql.Pipeline, + pipeline log.StreamPipeline, nextChunk *LazyChunk, ) (iter.EntryIterator, error) { @@ -59,7 +58,7 @@ func (c *LazyChunk) Iterator( // if the block is overlapping cache it with the next chunk boundaries. if nextChunk != nil && IsBlockOverlapping(b, nextChunk, direction) { // todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset. - it := iter.NewCachedIterator(b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline), b.Entries()) + it := iter.NewCachedIterator(b.Iterator(ctx, pipeline), b.Entries()) its = append(its, it) if c.overlappingBlocks == nil { c.overlappingBlocks = make(map[int]iter.CacheEntryIterator) @@ -71,7 +70,7 @@ func (c *LazyChunk) Iterator( delete(c.overlappingBlocks, b.Offset()) } // non-overlapping block with the next chunk are not cached. - its = append(its, b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline)) + its = append(its, b.Iterator(ctx, pipeline)) } if direction == logproto.FORWARD { @@ -106,7 +105,7 @@ func (c *LazyChunk) Iterator( func (c *LazyChunk) SampleIterator( ctx context.Context, from, through time.Time, - extractor logql.SampleExtractor, + extractor log.StreamSampleExtractor, nextChunk *LazyChunk, ) (iter.SampleIterator, error) { @@ -132,7 +131,7 @@ func (c *LazyChunk) SampleIterator( // if the block is overlapping cache it with the next chunk boundaries. if nextChunk != nil && IsBlockOverlapping(b, nextChunk, logproto.FORWARD) { // todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset. - it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor), b.Entries()) + it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, extractor), b.Entries()) its = append(its, it) if c.overlappingSampleBlocks == nil { c.overlappingSampleBlocks = make(map[int]iter.CacheSampleIterator) @@ -144,7 +143,7 @@ func (c *LazyChunk) SampleIterator( delete(c.overlappingSampleBlocks, b.Offset()) } // non-overlapping block with the next chunk are not cached. - its = append(its, b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor)) + its = append(its, b.SampleIterator(ctx, extractor)) } // build the final iterator bound to the requested time range. diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 98b64c6417a03..bc435e5270d04 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util" ) @@ -46,7 +47,7 @@ func TestLazyChunkIterator(t *testing.T) { }, } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline, nil) + it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) require.Nil(t, err) streams, _, err := iter.ReadBatch(it, 1000) require.Nil(t, err) @@ -174,10 +175,10 @@ func (fakeBlock) Entries() int { return 0 } func (fakeBlock) Offset() int { return 0 } func (f fakeBlock) MinTime() int64 { return f.mint } func (f fakeBlock) MaxTime() int64 { return f.maxt } -func (fakeBlock) Iterator(context.Context, labels.Labels, logql.Pipeline) iter.EntryIterator { +func (fakeBlock) Iterator(context.Context, log.StreamPipeline) iter.EntryIterator { return nil } -func (fakeBlock) SampleIterator(context.Context, labels.Labels, logql.SampleExtractor) iter.SampleIterator { +func (fakeBlock) SampleIterator(context.Context, log.StreamSampleExtractor) iter.SampleIterator { return nil }