From c87548d4aea6d9cf053e41364dcd5f8716042cc9 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 28 Oct 2020 11:41:57 +0100 Subject: [PATCH 1/6] Adding a benchmark to compare before and after. Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk_test.go | 99 +++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index fdba84eb4ae78..5424d6b57104c 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -13,6 +13,7 @@ import ( "time" "github.com/dustin/go-humanize" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -777,3 +778,101 @@ func TestBytesWith(t *testing.T) { require.Equal(t, exp, out) } + +var streams = []logproto.Stream{} +var series = []logproto.Series{} + +func BenchmarkBufferedIteratorLabels(b *testing.B) { + c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize) + _ = fillChunk(c) + labelsSet := []labels.Labels{ + { + {Name: "cluster", Value: "us-central1"}, + {Name: "stream", Value: "stdout"}, + {Name: "filename", Value: "/var/log/pods/loki-prod_query-frontend-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, + {Name: "namespace", Value: "loki-dev"}, + {Name: "job", Value: "loki-prod/query-frontend"}, + {Name: "container", Value: "query-frontend"}, + {Name: "pod", Value: "query-frontend-6894f97b98-89q2n"}, + }, + { + {Name: "cluster", Value: "us-central2"}, + {Name: "stream", Value: "stderr"}, + {Name: "filename", Value: "/var/log/pods/loki-prod_querier-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"}, + {Name: "namespace", Value: "loki-dev"}, + {Name: "job", Value: "loki-prod/querier"}, + {Name: "container", Value: "querier"}, + {Name: "pod", Value: "querier-6894f97b98-89q2n"}, + }, + } + for _, test := range []string{ + `{app="foo"}`, + `{app="foo"} != "foo"`, + `{app="foo"} != "foo" | logfmt `, + `{app="foo"} != "foo" | logfmt | duration > 10ms`, + `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, + } { + b.Run(test, func(b *testing.B) { + b.ReportAllocs() + expr, err := logql.ParseLogSelector(test) + if err != nil { + b.Fatal(err) + } + p, err := expr.Pipeline() + if err != nil { + b.Fatal(err) + } + var iters []iter.EntryIterator + for _, lbs := range labelsSet { + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, lbs, p) + if err != nil { + b.Fatal(err) + } + iters = append(iters, it) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, it := range iters { + for it.Next() { + streams = append(streams, logproto.Stream{Labels: it.Labels(), Entries: []logproto.Entry{it.Entry()}}) + } + } + } + streams = streams[:0] + }) + } + + for _, test := range []string{ + `rate({app="foo"}[1m])`, + `sum by (cluster) (rate({app="foo"}[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, + } { + b.Run(test, func(b *testing.B) { + b.ReportAllocs() + expr, err := logql.ParseSampleExpr(test) + if err != nil { + b.Fatal(err) + } + ex, err := expr.Extractor() + if err != nil { + b.Fatal(err) + } + var iters []iter.SampleIterator + for _, lbs := range labelsSet { + iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), lbs, ex)) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + for _, it := range iters { + for it.Next() { + series = append(series, logproto.Series{Labels: it.Labels(), Samples: []logproto.Sample{it.Sample()}}) + } + } + } + series = series[:0] + }) + } +} From 339a4f727aa5186cd038384531e763aa9b5973c5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 3 Nov 2020 16:20:22 +0100 Subject: [PATCH 2/6] Improves labels management in logql v2. Signed-off-by: Cyril Tovena --- pkg/chunkenc/dumb_chunk.go | 8 +- pkg/chunkenc/interface.go | 12 +- pkg/chunkenc/memchunk.go | 56 +++-- pkg/chunkenc/memchunk_test.go | 52 +++-- pkg/ingester/chunk_test.go | 4 +- pkg/ingester/flush_test.go | 2 +- pkg/ingester/instance.go | 4 +- pkg/ingester/stream.go | 10 +- pkg/ingester/stream_test.go | 4 +- pkg/ingester/tailer.go | 8 +- pkg/ingester/transfer_test.go | 2 +- pkg/logentry/stages/match.go | 5 +- pkg/logql/ast.go | 10 +- pkg/logql/ast_test.go | 12 +- pkg/logql/functions.go | 4 +- pkg/logql/log/fmt_test.go | 16 +- pkg/logql/log/label_filter_test.go | 8 +- pkg/logql/log/labels.go | 284 +++++++++++++++++++++-- pkg/logql/log/labels_test.go | 9 +- pkg/logql/log/metrics_extraction.go | 174 +++++++------- pkg/logql/log/metrics_extraction_test.go | 5 +- pkg/logql/log/parser.go | 29 ++- pkg/logql/log/parser_test.go | 12 +- pkg/logql/log/pipeline.go | 83 +++++-- pkg/logql/log/pipeline_test.go | 5 +- pkg/logql/parser_test.go | 6 +- pkg/logql/test_utils.go | 6 +- pkg/storage/batch.go | 50 ++-- pkg/storage/batch_test.go | 49 +--- pkg/storage/lazy_chunk.go | 15 +- pkg/storage/lazy_chunk_test.go | 7 +- 31 files changed, 595 insertions(+), 356 deletions(-) diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 31bd475ad174e..f8977925ffeb0 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -5,11 +5,9 @@ import ( "sort" "time" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) const ( @@ -72,7 +70,7 @@ func (c *dumbChunk) Encoding() Encoding { return EncNone } // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ labels.Labels, _ logql.Pipeline) (iter.EntryIterator, error) { +func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) { i := sort.Search(len(c.entries), func(i int) bool { return !from.After(c.entries[i].Timestamp) }) @@ -97,7 +95,7 @@ func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, directi }, nil } -func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ labels.Labels, _ logql.SampleExtractor) iter.SampleIterator { +func (c *dumbChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator { return nil } diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 6f45e12a41973..de0bfc418fbd2 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -7,11 +7,9 @@ import ( "strings" "time" - "github.com/prometheus/prometheus/pkg/labels" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) // Errors returned by the chunk interface. @@ -100,8 +98,8 @@ type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool Append(*logproto.Entry) error - Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error) - SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator + Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) + SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. Blocks(mintT, maxtT time.Time) []Block Size() int @@ -126,7 +124,7 @@ type Block interface { // Entries is the amount of entries in the block. Entries() int // Iterator returns an entry iterator for the block. - Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator + Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator // SampleIterator returns a sample iterator for the block. - SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator + SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 546a79e84f864..27d596758e0f1 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -16,11 +16,10 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/stats" ) @@ -471,7 +470,7 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) { } // Iterator implements Chunk. -func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, lbs labels.Labels, pipeline logql.Pipeline) (iter.EntryIterator, error) { +func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { mint, maxt := mintT.UnixNano(), maxtT.UnixNano() its := make([]iter.EntryIterator, 0, len(c.blocks)+1) @@ -479,11 +478,11 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if maxt < b.mint || b.maxt < mint { continue } - its = append(its, encBlock{c.encoding, b}.Iterator(ctx, lbs, pipeline)) + its = append(its, encBlock{c.encoding, b}.Iterator(ctx, pipeline)) } if !c.head.isEmpty() { - its = append(its, c.head.iterator(ctx, direction, mint, maxt, lbs, pipeline)) + its = append(its, c.head.iterator(ctx, direction, mint, maxt, pipeline)) } if direction == logproto.FORWARD { @@ -513,7 +512,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi } // Iterator implements Chunk. -func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator { mint, maxt := from.UnixNano(), through.UnixNano() its := make([]iter.SampleIterator, 0, len(c.blocks)+1) @@ -521,11 +520,11 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, if maxt < b.mint || b.maxt < mint { continue } - its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, lbs, extractor)) + its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor)) } if !c.head.isEmpty() { - its = append(its, c.head.sampleIterator(ctx, mint, maxt, lbs, extractor)) + its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor)) } return iter.NewTimeRangedSampleIterator( @@ -557,18 +556,18 @@ type encBlock struct { block } -func (b encBlock) Iterator(ctx context.Context, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator { +func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newEntryIterator(ctx, getReaderPool(b.enc), b.b, lbs, pipeline) + return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline) } -func (b encBlock) SampleIterator(ctx context.Context, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newSampleIterator(ctx, getReaderPool(b.enc), b.b, lbs, extractor) + return newSampleIterator(ctx, getReaderPool(b.enc), b.b, extractor) } func (b block) Offset() int { @@ -585,7 +584,7 @@ func (b block) MaxTime() int64 { return b.maxt } -func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator { +func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } @@ -601,7 +600,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, for _, e := range hb.entries { chunkStats.HeadChunkBytes += int64(len(e.s)) line := []byte(e.s) - newLine, parsedLbs, ok := pipeline.Process(line, lbs) + newLine, parsedLbs, ok := pipeline.Process(line) if !ok { continue } @@ -630,7 +629,7 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, return iter.NewStreamsIterator(ctx, streamsResult, direction) } -func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator { if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } @@ -640,7 +639,7 @@ func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, lbs l for _, e := range hb.entries { chunkStats.HeadChunkBytes += int64(len(e.s)) line := []byte(e.s) - value, parsedLabels, ok := extractor.Process(line, lbs) + value, parsedLabels, ok := extractor.Process(line) if !ok { continue } @@ -687,11 +686,9 @@ type bufferedIterator struct { currTs int64 closed bool - - baseLbs labels.Labels } -func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels) *bufferedIterator { +func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte) *bufferedIterator { chunkStats := stats.GetChunkData(ctx) chunkStats.CompressedBytes += int64(len(b)) return &bufferedIterator{ @@ -701,7 +698,6 @@ func newBufferedIterator(ctx context.Context, pool ReaderPool, b []byte, lbs lab bufReader: nil, // will be initialized later pool: pool, decBuf: make([]byte, binary.MaxVarintLen64), - baseLbs: lbs, } } @@ -806,19 +802,19 @@ func (si *bufferedIterator) close() { si.decBuf = nil } -func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, pipeline logql.Pipeline) iter.EntryIterator { +func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline) iter.EntryIterator { return &entryBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b, lbs), + bufferedIterator: newBufferedIterator(ctx, pool, b), pipeline: pipeline, } } type entryBufferedIterator struct { *bufferedIterator - pipeline logql.Pipeline + pipeline log.StreamPipeline cur logproto.Entry - currLabels labels.Labels + currLabels log.LabelsResult } func (e *entryBufferedIterator) Entry() logproto.Entry { @@ -829,7 +825,7 @@ func (e *entryBufferedIterator) Labels() string { return e.currLabels.String() } func (e *entryBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - newLine, lbs, ok := e.pipeline.Process(e.currLine, e.baseLbs) + newLine, lbs, ok := e.pipeline.Process(e.currLine) if !ok { continue } @@ -841,9 +837,9 @@ func (e *entryBufferedIterator) Next() bool { return false } -func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs labels.Labels, extractor logql.SampleExtractor) iter.SampleIterator { +func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, extractor log.StreamSampleExtractor) iter.SampleIterator { it := &sampleBufferedIterator{ - bufferedIterator: newBufferedIterator(ctx, pool, b, lbs), + bufferedIterator: newBufferedIterator(ctx, pool, b), extractor: extractor, } return it @@ -852,15 +848,15 @@ func newSampleIterator(ctx context.Context, pool ReaderPool, b []byte, lbs label type sampleBufferedIterator struct { *bufferedIterator - extractor logql.SampleExtractor + extractor log.StreamSampleExtractor cur logproto.Sample - currLabels labels.Labels + currLabels log.LabelsResult } func (e *sampleBufferedIterator) Next() bool { for e.bufferedIterator.Next() { - val, labels, ok := e.extractor.Process(e.currLine, e.baseLbs) + val, labels, ok := e.extractor.Process(e.currLine) if !ok { continue } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 5424d6b57104c..711fdc971ee17 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -36,8 +36,16 @@ var testEncoding = []Encoding{ } var ( - testBlockSize = 256 * 1024 - testTargetSize = 1500 * 1024 + testBlockSize = 256 * 1024 + testTargetSize = 1500 * 1024 + noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) + countExtractor = func() log.StreamSampleExtractor { + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + if err != nil { + panic(err) + } + return ex.ForStream(labels.Labels{}) + }() ) func TestBlocksInclusive(t *testing.T) { @@ -115,7 +123,7 @@ func TestBlock(t *testing.T) { } } - it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) idx := 0 @@ -130,7 +138,7 @@ func TestBlock(t *testing.T) { require.NoError(t, it.Close()) require.Equal(t, len(cases), idx) - sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false)) + sampleIt := chk.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) idx = 0 for sampleIt.Next() { s := sampleIt.Sample() @@ -145,7 +153,7 @@ func TestBlock(t *testing.T) { require.Equal(t, len(cases), idx) t.Run("bounded-iteration", func(t *testing.T) { - it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := chk.Iterator(context.Background(), time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) idx := 2 @@ -178,7 +186,7 @@ func TestReadFormatV1(t *testing.T) { t.Fatal(err) } - it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := r.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -205,7 +213,7 @@ func TestRoundtripV2(t *testing.T) { assertLines := func(c *MemChunk) { require.Equal(t, enc, c.Encoding()) - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -267,7 +275,7 @@ func TestSerialization(t *testing.T) { bc, err := NewByteChunk(byt, testBlockSize, testTargetSize) require.NoError(t, err) - it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) for i := 0; i < numSamples; i++ { require.True(t, it.Next()) @@ -278,7 +286,7 @@ func TestSerialization(t *testing.T) { } require.NoError(t, it.Error()) - sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), nil, log.CountExtractor.ToSampleExtractor(nil, false, false)) + sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor) for i := 0; i < numSamples; i++ { require.True(t, sampleIt.Next(), i) @@ -321,7 +329,7 @@ func TestChunkFilling(t *testing.T) { require.Equal(t, int64(lines), i) - it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) i = 0 for it.Next() { @@ -464,7 +472,7 @@ func TestChunkStats(t *testing.T) { expectedSize := (inserted * len(entry.Line)) + (inserted * 2 * binary.MaxVarintLen64) ctx := stats.NewContext(context.Background()) - it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, nil, logql.NoopPipeline) + it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -493,7 +501,7 @@ func TestChunkStats(t *testing.T) { t.Fatal(err) } ctx = stats.NewContext(context.Background()) - it, err = cb.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, nil, logql.NoopPipeline) + it, err = cb.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -541,7 +549,7 @@ func TestIteratorClose(t *testing.T) { } { c := NewMemChunk(enc, testBlockSize, testTargetSize) inserted := fillChunk(c) - iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, nil, logql.NoopPipeline) + iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline) if err != nil { t.Fatal(err) } @@ -592,7 +600,7 @@ func BenchmarkRead(b *testing.B) { for n := 0; n < b.N; n++ { for _, c := range chunks { // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory - iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil, logql.NoopPipeline) + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline) if err != nil { panic(err) } @@ -617,7 +625,7 @@ func BenchmarkBackwardIterator(b *testing.B) { _ = fillChunk(c) b.ResetTimer() for n := 0; n < b.N; n++ { - iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, nil, logql.NoopPipeline) + iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.BACKWARD, noopStreamPipeline) if err != nil { panic(err) } @@ -638,7 +646,7 @@ func TestGenerateDataSize(t *testing.T) { bytesRead := uint64(0) for _, c := range chunks { // use forward iterator for benchmark -- backward iterator does extra allocations by keeping entries in memory - iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, nil, logql.NoopPipeline) + iterator, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Now(), logproto.FORWARD, noopStreamPipeline) if err != nil { panic(err) } @@ -672,7 +680,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, nil, logql.NoopPipeline) + iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline) for iter.Next() { _ = iter.Entry() @@ -731,7 +739,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) { c := createChunk() // testing headchunk - it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil, logql.NoopPipeline) + it, err := c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, noopStreamPipeline) require.NoError(t, err) for i := range tt.expect { require.Equal(t, tt.expect[i], it.Next()) @@ -740,7 +748,7 @@ func TestMemChunk_IteratorBounds(t *testing.T) { // testing chunk blocks require.NoError(t, c.cut()) - it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, nil, logql.NoopPipeline) + it, err = c.Iterator(context.Background(), tt.mint, tt.maxt, tt.direction, noopStreamPipeline) require.NoError(t, err) for i := range tt.expect { require.Equal(t, tt.expect[i], it.Next()) @@ -759,7 +767,7 @@ func TestMemchunkLongLine(t *testing.T) { for i := 1; i <= 10; i++ { require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) } - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil, logql.NoopPipeline) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline) require.NoError(t, err) for i := 1; i <= 10; i++ { require.True(t, it.Next()) @@ -824,7 +832,7 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { } var iters []iter.EntryIterator for _, lbs := range labelsSet { - it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, lbs, p) + it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, p.ForStream(lbs)) if err != nil { b.Fatal(err) } @@ -862,7 +870,7 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { } var iters []iter.SampleIterator for _, lbs := range labelsSet { - iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), lbs, ex)) + iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs))) } b.ResetTimer() for n := 0; n < b.N; n++ { diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index fbd1e1028d54c..8ce8866e922ba 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -64,7 +64,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, labels.Labels{}, logql.NoopPipeline) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{})) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) _ = iter.Close() @@ -73,7 +73,7 @@ func TestIterator(t *testing.T) { for i := 0; i < entries; i++ { from := rand.Intn(entries - 1) len := rand.Intn(entries-from) + 1 - iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, labels.Labels{}, logql.NoopPipeline) + iter, err := chunk.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(labels.Labels{})) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) _ = iter.Close() diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 986b6ef1dbc13..51f3a49930355 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -320,7 +320,7 @@ func (s *testStore) getChunksForUser(userID string) []chunk.Chunk { } func buildStreamsFromChunk(t *testing.T, lbs string, chk chunkenc.Chunk) logproto.Stream { - it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, labels.Labels{}, logql.NoopPipeline) + it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{})) require.NoError(t, err) stream := logproto.Stream{ diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 0654af3c8aff6..25f47824d3715 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -211,7 +211,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) ([]iter expr.Matchers(), func(stream *stream) error { ingStats.TotalChunksMatched += int64(len(stream.chunks)) - iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline) + iter, err := stream.Iterator(ctx, req.Start, req.End, req.Direction, pipeline.ForStream(stream.labels)) if err != nil { return err } @@ -242,7 +242,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams expr.Selector().Matchers(), func(stream *stream) error { ingStats.TotalChunksMatched += int64(len(stream.chunks)) - iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor) + iter, err := stream.SampleIterator(ctx, req.Start, req.End, extractor.ForStream(stream.labels)) if err != nil { return err } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index 08c374dd6f0af..30e2f07ccd120 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -18,7 +18,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) var ( @@ -258,10 +258,10 @@ func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp t } // Returns an iterator. -func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, pipeline logql.Pipeline) (iter.EntryIterator, error) { +func (s *stream) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { iterators := make([]iter.EntryIterator, 0, len(s.chunks)) for _, c := range s.chunks { - itr, err := c.chunk.Iterator(ctx, from, through, direction, s.labels, pipeline) + itr, err := c.chunk.Iterator(ctx, from, through, direction, pipeline) if err != nil { return nil, err } @@ -280,10 +280,10 @@ func (s *stream) Iterator(ctx context.Context, from, through time.Time, directio } // Returns an SampleIterator. -func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, extractor logql.SampleExtractor) (iter.SampleIterator, error) { +func (s *stream) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) { iterators := make([]iter.SampleIterator, 0, len(s.chunks)) for _, c := range s.chunks { - if itr := c.chunk.SampleIterator(ctx, from, through, s.labels, extractor); itr != nil { + if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil { iterators = append(iterators, itr) } } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index ffaa422725e08..b32ad34dccf13 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -120,7 +120,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(chunks*entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, logql.NoopPipeline.ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorForward(t, iter, int64(from), int64(from+len)) @@ -130,7 +130,7 @@ func TestStreamIterator(t *testing.T) { for i := 0; i < 100; i++ { from := rand.Intn(entries - 1) len := rand.Intn(chunks*entries-from) + 1 - iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline) + iter, err := s.Iterator(context.TODO(), time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, logql.NoopPipeline.ForStream(s.labels)) require.NotNil(t, iter) require.NoError(t, err) testIteratorBackward(t, iter, int64(from), int64(from+len)) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 8c03c8fcc644e..4b25fe10edb6c 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -145,18 +145,18 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error if err != nil { return nil, err } + sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := t.pipeline.Process([]byte(e.Line), lbs) + newLine, parsedLbs, ok := sp.Process([]byte(e.Line)) if !ok { continue } var stream *logproto.Stream - lhash := parsedLbs.Hash() - if stream, ok = streams[lhash]; !ok { + if stream, ok = streams[parsedLbs.Hash()]; !ok { stream = &logproto.Stream{ Labels: parsedLbs.String(), } - streams[lhash] = stream + streams[parsedLbs.Hash()] = stream } stream.Entries = append(stream.Entries, logproto.Entry{ Timestamp: e.Timestamp, diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 694bb38876bfb..e4e5ffc7797bd 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -95,7 +95,7 @@ func TestTransferOut(t *testing.T) { time.Unix(0, 0), time.Unix(10, 0), logproto.FORWARD, - logql.NoopPipeline, + logql.NoopPipeline.ForStream(stream.labels), ) if !assert.NoError(t, err) { continue diff --git a/pkg/logentry/stages/match.go b/pkg/logentry/stages/match.go index cbb68b06dd038..b06f7b280091b 100644 --- a/pkg/logentry/stages/match.go +++ b/pkg/logentry/stages/match.go @@ -132,7 +132,8 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac } } - if newLine, newLabels, ok := m.pipeline.Process([]byte(*entry), labels.FromMap(util.ModelLabelSetToMap(lbs))); ok { + sp := m.pipeline.ForStream(labels.FromMap(util.ModelLabelSetToMap(lbs))) + if newLine, newLabels, ok := sp.Process([]byte(*entry)); ok { switch m.action { case MatchActionDrop: // Adds the drop label to not be sent by the api.EntryHandler @@ -142,7 +143,7 @@ func (m *matcherStage) Process(lbs model.LabelSet, extracted map[string]interfac for k := range lbs { delete(lbs, k) } - for _, l := range newLabels { + for _, l := range newLabels.Labels() { lbs[model.LabelName(l.Name)] = model.LabelValue(l.Value) } m.stage.Process(lbs, extracted, t, entry) diff --git a/pkg/logql/ast.go b/pkg/logql/ast.go index 1902d1bfcfcac..1575d84ccb6ec 100644 --- a/pkg/logql/ast.go +++ b/pkg/logql/ast.go @@ -85,7 +85,7 @@ type Pipeline = log.Pipeline type SampleExtractor = log.SampleExtractor var ( - NoopPipeline = log.NoopPipeline + NoopPipeline = log.NewNoopPipeline() ) // PipelineExpr is an expression defining a log pipeline. @@ -109,7 +109,7 @@ func (m MultiStageExpr) Pipeline() (log.Pipeline, error) { return nil, err } if len(stages) == 0 { - return log.NoopPipeline, nil + return NoopPipeline, nil } return log.NewPipeline(stages), nil } @@ -169,7 +169,7 @@ func (e *matchersExpr) String() string { } func (e *matchersExpr) Pipeline() (log.Pipeline, error) { - return log.NoopPipeline, nil + return NoopPipeline, nil } func (e *matchersExpr) HasFilter() bool { @@ -719,7 +719,7 @@ func (e *vectorAggregationExpr) Extractor() (log.SampleExtractor, error) { // inject in the range vector extractor the outer groups to improve performance. // This is only possible if the operation is a sum. Anything else needs all labels. if r, ok := e.left.(*rangeAggregationExpr); ok && e.operation == OpTypeSum { - return r.extractor(e.grouping, true) + return r.extractor(e.grouping, len(e.grouping.groups) == 0) } return e.left.Extractor() } @@ -860,7 +860,7 @@ func (e *literalExpr) String() string { func (e *literalExpr) Selector() LogSelectorExpr { return e } func (e *literalExpr) HasFilter() bool { return false } func (e *literalExpr) Operations() []string { return nil } -func (e *literalExpr) Pipeline() (log.Pipeline, error) { return log.NoopPipeline, nil } +func (e *literalExpr) Pipeline() (log.Pipeline, error) { return NoopPipeline, nil } func (e *literalExpr) Matchers() []*labels.Matcher { return nil } func (e *literalExpr) Extractor() (log.SampleExtractor, error) { return nil, nil } diff --git a/pkg/logql/ast_test.go b/pkg/logql/ast_test.go index 7ebe137aaced2..b71afc84bd711 100644 --- a/pkg/logql/ast_test.go +++ b/pkg/logql/ast_test.go @@ -43,7 +43,7 @@ func Test_logSelectorExpr_String(t *testing.T) { if err != nil { t.Fatalf("failed to get filter: %s", err) } - require.Equal(t, tt.expectFilter, p != log.NoopPipeline) + require.Equal(t, tt.expectFilter, p != NoopPipeline) if expr.String() != tt.selector { t.Fatalf("error expected: %s got: %s", tt.selector, expr.String()) } @@ -132,7 +132,7 @@ func Test_NilFilterDoesntPanic(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) - _, _, ok := p.Process([]byte("bleepbloop"), labelBar) + _, _, ok := p.ForStream(labelBar).Process([]byte("bleepbloop")) require.True(t, ok) }) @@ -216,10 +216,11 @@ func Test_FilterMatcher(t *testing.T) { p, err := expr.Pipeline() assert.Nil(t, err) if tt.lines == nil { - assert.Equal(t, p, log.NoopPipeline) + assert.Equal(t, p, NoopPipeline) } else { + sp := p.ForStream(labelBar) for _, lc := range tt.lines { - _, _, ok := p.Process([]byte(lc.l), labelBar) + _, _, ok := sp.Process([]byte(lc.l)) assert.Equal(t, lc.e, ok) } } @@ -281,8 +282,9 @@ func BenchmarkContainsFilter(b *testing.B) { b.ResetTimer() + sp := p.ForStream(labelBar) for i := 0; i < b.N; i++ { - if _, _, ok := p.Process(line, labelBar); !ok { + if _, _, ok := sp.Process(line); !ok { b.Fatal("doesn't match") } } diff --git a/pkg/logql/functions.go b/pkg/logql/functions.go index 3a2d6be528cdc..e41ae695aa281 100644 --- a/pkg/logql/functions.go +++ b/pkg/logql/functions.go @@ -68,9 +68,9 @@ func (r rangeAggregationExpr) extractor(gr *grouping, all bool) (log.SampleExtra // otherwise we extract metrics from the log line. switch r.operation { case OpRangeTypeRate, OpRangeTypeCount: - return log.LineExtractorWithStages(log.CountExtractor, stages, groups, without, all) + return log.NewLineSampleExtractor(log.CountExtractor, stages, groups, without, all) case OpRangeTypeBytes, OpRangeTypeBytesRate: - return log.LineExtractorWithStages(log.BytesExtractor, stages, groups, without, all) + return log.NewLineSampleExtractor(log.BytesExtractor, stages, groups, without, all) default: return nil, fmt.Errorf(unsupportedErr, r.operation) } diff --git a/pkg/logql/log/fmt_test.go b/pkg/logql/log/fmt_test.go index b86d6ad06a27e..a7bbf8889f085 100644 --- a/pkg/logql/log/fmt_test.go +++ b/pkg/logql/log/fmt_test.go @@ -41,11 +41,11 @@ func Test_lineFormatter_Format(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) - outLine, _ := tt.fmter.Process(nil, b) + builder := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + builder.Reset() + outLine, _ := tt.fmter.Process(nil, builder) require.Equal(t, tt.want, outLine) - require.Equal(t, tt.wantLbs, b.Labels()) + require.Equal(t, tt.wantLbs, builder.Labels()) }) } } @@ -94,11 +94,11 @@ func Test_labelsFormatter_Format(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.in) - _, _ = tt.fmter.Process(nil, b) + builder := NewBaseLabelsBuilder().ForLabels(tt.in, tt.in.Hash()) + builder.Reset() + _, _ = tt.fmter.Process(nil, builder) sort.Sort(tt.want) - require.Equal(t, tt.want, b.Labels()) + require.Equal(t, tt.want, builder.Labels()) }) } } diff --git a/pkg/logql/log/label_filter_test.go b/pkg/logql/log/label_filter_test.go index 0d909d237b3ac..371e6b601931e 100644 --- a/pkg/logql/log/label_filter_test.go +++ b/pkg/logql/log/label_filter_test.go @@ -151,8 +151,8 @@ func TestBinary_Filter(t *testing.T) { for _, tt := range tests { t.Run(tt.f.String(), func(t *testing.T) { sort.Sort(tt.lbs) - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, got := tt.f.Process(nil, b) require.Equal(t, tt.want, got) sort.Sort(tt.wantLbs) @@ -231,8 +231,8 @@ func TestErrorFiltering(t *testing.T) { for _, tt := range tests { t.Run(tt.f.String(), func(t *testing.T) { sort.Sort(tt.lbs) - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() b.SetErr(tt.err) _, got := tt.f.Process(nil, b) require.Equal(t, tt.want, got) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index f087d22983275..750e012452d65 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -6,26 +6,128 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) -// LabelsBuilder is the same as labels.Builder but tailored for this package. -type LabelsBuilder struct { +var ( + emptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash()) +) + +type LabelsResult interface { + String() string + Labels() labels.Labels + Hash() uint64 +} + +func NewLabelsResult(lbs labels.Labels, hash uint64) LabelsResult { + return &labelsResult{lbs: lbs, s: lbs.String(), h: hash} +} + +type labelsResult struct { + lbs labels.Labels + s string + h uint64 +} + +func (l labelsResult) String() string { + return l.s +} + +func (l labelsResult) Labels() labels.Labels { + return l.lbs +} + +func (l labelsResult) Hash() uint64 { + return l.h +} + +type hasher struct { + buf []byte // buffer for computing hash without bytes slice allocation. +} + +func newHasher() *hasher { + return &hasher{ + buf: make([]byte, 0, 1024), + } +} + +func (h *hasher) Hash(lbs labels.Labels) uint64 { + var hash uint64 + hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...) + return hash +} + +func (h *hasher) hashWithoutLabels(lbs labels.Labels, groups ...string) uint64 { + var hash uint64 + hash, h.buf = lbs.HashWithoutLabels(h.buf, groups...) + return hash +} + +func (h *hasher) hashForLabels(lbs labels.Labels, groups ...string) uint64 { + var hash uint64 + hash, h.buf = lbs.HashForLabels(h.buf, groups...) + return hash +} + +type BaseLabelsBuilder struct { + // the current base base labels.Labels del []string add []labels.Label err string + + groups []string + without, noLabels bool + + resultCache map[uint64]LabelsResult + *hasher +} + +// LabelsBuilder is the same as labels.Builder but tailored for this package. +type LabelsBuilder struct { + currentLabels labels.Labels + currentResult LabelsResult + + *BaseLabelsBuilder +} + +func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) *BaseLabelsBuilder { + return &BaseLabelsBuilder{ + del: make([]string, 0, 5), + add: make([]labels.Label, 0, 16), + resultCache: make(map[uint64]LabelsResult), + hasher: newHasher(), + groups: groups, + noLabels: noLabels, + without: without, + } } // NewLabelsBuilder creates a new labels builder. -func NewLabelsBuilder() *LabelsBuilder { - return &LabelsBuilder{ - del: make([]string, 0, 5), - add: make([]labels.Label, 0, 5), +func NewBaseLabelsBuilder() *BaseLabelsBuilder { + return NewBaseLabelsBuilderWithGrouping(nil, false, false) +} + +func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBuilder { + if labelResult, ok := b.resultCache[hash]; ok { + res := &LabelsBuilder{ + currentLabels: lbs, + currentResult: labelResult, + BaseLabelsBuilder: b, + } + return res } + labelResult := NewLabelsResult(lbs, hash) + b.resultCache[hash] = labelResult + res := &LabelsBuilder{ + currentLabels: lbs, + currentResult: labelResult, + BaseLabelsBuilder: b, + } + return res } // Reset clears all current state for the builder. -func (b *LabelsBuilder) Reset(base labels.Labels) { - b.base = base +func (b *LabelsBuilder) Reset() { + b.base = b.currentLabels b.del = b.del[:0] b.add = b.add[:0] b.err = "" @@ -47,9 +149,9 @@ func (b *LabelsBuilder) HasErr() bool { return b.err != "" } -// Base returns the base labels unmodified -func (b *LabelsBuilder) Base() labels.Labels { - return b.base +// BaseHas returns the base labels have the given key +func (b *LabelsBuilder) BaseHas(key string) bool { + return b.base.Has(key) } func (b *LabelsBuilder) Get(key string) (string, bool) { @@ -136,12 +238,160 @@ Outer: return res } -func (b *LabelsBuilder) WithoutLabels(names ...string) labels.Labels { - // naive implementation for now. - return b.Labels().WithoutLabels(names...) +// Labels returns the labels from the builder. If no modifications +// were made, the original labels are returned. +func (b *LabelsBuilder) LabelsResult() LabelsResult { + // unchanged path. + if len(b.del) == 0 && len(b.add) == 0 { + if b.err == "" { + return b.currentResult + } + // unchanged but with error. + res := append(b.base.Copy(), labels.Label{Name: ErrorLabel, Value: b.err}) + sort.Sort(res) + return b.toResult(res) + } + + // In the general case, labels are removed, modified or moved + // rather than added. + res := make(labels.Labels, 0, len(b.base)+len(b.add)) +Outer: + for _, l := range b.base { + for _, n := range b.del { + if l.Name == n { + continue Outer + } + } + for _, la := range b.add { + if l.Name == la.Name { + continue Outer + } + } + res = append(res, l) + } + res = append(res, b.add...) + if b.err != "" { + res = append(res, labels.Label{Name: ErrorLabel, Value: b.err}) + } + sort.Sort(res) + + return b.toResult(res) +} + +func (b *BaseLabelsBuilder) toResult(lbs labels.Labels) LabelsResult { + hash := b.hasher.Hash(lbs) + if cached, ok := b.resultCache[hash]; ok { + return cached + } + res := NewLabelsResult(lbs, hash) + b.resultCache[hash] = res + return res +} + +// Labels returns the labels from the builder. If no modifications +// were made, the original labels are returned. +func (b *LabelsBuilder) GroupedLabels() LabelsResult { + if b.err != "" { + // We need to return now before applying grouping otherwise the error might get lost. + return b.LabelsResult() + } + if b.noLabels { + return emptyLabelsResult + } + // unchanged path. + if len(b.del) == 0 && len(b.add) == 0 { + if len(b.groups) == 0 { + return b.currentResult + } + return b.toGroup(b.currentLabels) + } + + if b.without { + return b.withoutResult() + } + return b.withResult() +} + +func (b *LabelsBuilder) withResult() LabelsResult { + res := make(labels.Labels, 0, len(b.groups)) +Outer: + for _, g := range b.groups { + for _, n := range b.del { + if g == n { + continue Outer + } + } + for _, la := range b.add { + if g == la.Name { + res = append(res, la) + continue Outer + } + } + for _, l := range b.base { + if g == l.Name { + res = append(res, l) + continue Outer + } + } + } + return b.toResult(res) } -func (b *LabelsBuilder) WithLabels(names ...string) labels.Labels { - // naive implementation for now. - return b.Labels().WithLabels(names...) +func (b *LabelsBuilder) withoutResult() LabelsResult { + size := len(b.base) + len(b.add) - len(b.del) - len(b.groups) + if size < 0 { + size = 0 + } + res := make(labels.Labels, 0, size) +Outer: + for _, l := range b.base { + for _, n := range b.del { + if l.Name == n { + continue Outer + } + } + for _, la := range b.add { + if l.Name == la.Name { + continue Outer + } + } + for _, lg := range b.groups { + if l.Name == lg { + continue Outer + } + } + res = append(res, l) + } +OuterAdd: + for _, la := range b.add { + for _, lg := range b.groups { + if la.Name == lg { + continue OuterAdd + } + } + res = append(res, la) + } + sort.Sort(res) + return b.toResult(res) +} + +func (b *LabelsBuilder) toGroup(from labels.Labels) LabelsResult { + var hash uint64 + if b.without { + hash = b.hasher.hashWithoutLabels(from, b.groups...) + } else { + hash = b.hasher.hashForLabels(from, b.groups...) + } + if cached, ok := b.resultCache[hash]; ok { + return cached + } + var lbs labels.Labels + if b.without { + lbs = from.WithoutLabels(b.groups...) + } else { + lbs = from.WithLabels(b.groups...) + } + res := NewLabelsResult(lbs, hash) + b.resultCache[hash] = res + return res } diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index 544499601a887..250090c460f9a 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -8,8 +8,9 @@ import ( ) func TestLabelsBuilder_Get(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(labels.Labels{labels.Label{Name: "already", Value: "in"}}) + lbs := labels.Labels{labels.Label{Name: "already", Value: "in"}} + b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + b.Reset() b.Set("foo", "bar") b.Set("bar", "buzz") b.Del("foo") @@ -31,8 +32,8 @@ func TestLabelsBuilder_Get(t *testing.T) { func TestLabelsBuilder_LabelsError(t *testing.T) { lbs := labels.Labels{labels.Label{Name: "already", Value: "in"}} - b := NewLabelsBuilder() - b.Reset(lbs) + b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + b.Reset() b.SetErr("err") lbsWithErr := b.Labels() require.Equal( diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index 98149dc3506fa..f9bd332604f76 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -17,96 +17,82 @@ const ( ConvertFloat = "float" ) -// SampleExtractor extracts sample for a log line. -type SampleExtractor interface { - Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) -} - -type SampleExtractorFunc func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) - -func (fn SampleExtractorFunc) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { - return fn(line, lbs) -} - // LineExtractor extracts a float64 from a log line. type LineExtractor func([]byte) float64 -// ToSampleExtractor transform a LineExtractor into a SampleExtractor. -// Useful for metric conversion without log Pipeline. -func (l LineExtractor) ToSampleExtractor(groups []string, without bool, noLabels bool) SampleExtractor { - return SampleExtractorFunc(func(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { - // todo(cyriltovena) grouping should be done once per stream/chunk not for everyline. - // so for now we'll cover just vector without grouping. This requires changes to SampleExtractor interface. - // For another day ! - if len(groups) == 0 && noLabels { - return l(line), labels.Labels{}, true - } - return l(line), lbs, true - }) -} - var ( CountExtractor LineExtractor = func(line []byte) float64 { return 1. } BytesExtractor LineExtractor = func(line []byte) float64 { return float64(len(line)) } ) +// SampleExtractor extracts sample for a log line. +type SampleExtractor interface { + ForStream(labels labels.Labels) StreamSampleExtractor +} + +type StreamSampleExtractor interface { + Process(line []byte) (float64, LabelsResult, bool) +} + type lineSampleExtractor struct { Stage LineExtractor - groups []string - without bool - noLabels bool - builder *LabelsBuilder + baseBuilder *BaseLabelsBuilder + streamExtractors map[uint64]StreamSampleExtractor } -func (l lineSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { - l.builder.Reset(lbs) - line, ok := l.Stage.Process(line, l.builder) - if !ok { - return 0, nil, false - } - if len(l.groups) != 0 { - if l.without { - return l.LineExtractor(line), l.builder.WithoutLabels(l.groups...), true - } - return l.LineExtractor(line), l.builder.WithLabels(l.groups...), true +// NewLineSampleExtractor creates a SampleExtractor from a LineExtractor. +// Multiple log stages are run before converting the log line. +func NewLineSampleExtractor(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) { + return &lineSampleExtractor{ + Stage: ReduceStages(stages), + LineExtractor: ex, + baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels), + streamExtractors: make(map[uint64]StreamSampleExtractor), + }, nil +} + +func (l *lineSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + hash := l.baseBuilder.Hash(labels) + if res, ok := l.streamExtractors[hash]; ok { + return res } - if l.noLabels { - // no grouping but it was a vector operation so we return a single vector - return l.LineExtractor(line), labels.Labels{}, true + + res := &streamLineSampleExtractor{ + Stage: l.Stage, + LineExtractor: l.LineExtractor, + builder: l.baseBuilder.ForLabels(labels, hash), } - return l.LineExtractor(line), l.builder.Labels(), true + l.streamExtractors[hash] = res + return res } -// LineExtractorWithStages creates a SampleExtractor from a LineExtractor. -// Multiple log stages are run before converting the log line. -func LineExtractorWithStages(ex LineExtractor, stages []Stage, groups []string, without bool, noLabels bool) (SampleExtractor, error) { - if len(stages) == 0 { - return ex.ToSampleExtractor(groups, without, noLabels), nil +type streamLineSampleExtractor struct { + Stage + LineExtractor + builder *LabelsBuilder +} + +func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { + l.builder.Reset() + line, ok := l.Stage.Process(line, l.builder) + if !ok { + return 0, nil, false } - return lineSampleExtractor{ - Stage: ReduceStages(stages), - LineExtractor: ex, - builder: NewLabelsBuilder(), - groups: groups, - without: without, - noLabels: noLabels, - }, nil + return l.LineExtractor(line), l.builder.GroupedLabels(), true } type convertionFn func(value string) (float64, error) type labelSampleExtractor struct { - preStage Stage - postFilter Stage - builder *LabelsBuilder - + preStage Stage + postFilter Stage labelName string conversionFn convertionFn - groups []string - without bool - noLabels bool + + baseBuilder *BaseLabelsBuilder + streamExtractors map[uint64]StreamSampleExtractor } // LabelExtractorWithStages creates a SampleExtractor that will extract metrics from a labels. @@ -129,25 +115,43 @@ func LabelExtractorWithStages( default: return nil, errors.Errorf("unsupported conversion operation %s", conversion) } - if len(groups) != 0 && without { + if len(groups) == 0 || without { + without = true groups = append(groups, labelName) sort.Strings(groups) } return &labelSampleExtractor{ - preStage: ReduceStages(preStages), - conversionFn: convFn, - groups: groups, - labelName: labelName, - postFilter: postFilter, - without: without, - builder: NewLabelsBuilder(), - noLabels: noLabels, + preStage: ReduceStages(preStages), + conversionFn: convFn, + labelName: labelName, + postFilter: postFilter, + baseBuilder: NewBaseLabelsBuilderWithGrouping(groups, without, noLabels), + streamExtractors: make(map[uint64]StreamSampleExtractor), }, nil } -func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, labels.Labels, bool) { +type streamLabelSampleExtractor struct { + *labelSampleExtractor + builder *LabelsBuilder +} + +func (l *labelSampleExtractor) ForStream(labels labels.Labels) StreamSampleExtractor { + hash := l.baseBuilder.Hash(labels) + if res, ok := l.streamExtractors[hash]; ok { + return res + } + + res := &streamLabelSampleExtractor{ + labelSampleExtractor: l, + builder: l.baseBuilder.ForLabels(labels, hash), + } + l.streamExtractors[hash] = res + return res +} + +func (l *streamLabelSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { // Apply the pipeline first. - l.builder.Reset(lbs) + l.builder.Reset() line, ok := l.preStage.Process(line, l.builder) if !ok { return 0, nil, false @@ -168,25 +172,7 @@ func (l *labelSampleExtractor) Process(line []byte, lbs labels.Labels) (float64, if _, ok = l.postFilter.Process(line, l.builder); !ok { return 0, nil, false } - if l.builder.HasErr() { - // we still have an error after post filtering. - // We need to return now before applying grouping otherwise the error might get lost. - return v, l.builder.Labels(), true - } - return v, l.groupLabels(l.builder), true -} - -func (l *labelSampleExtractor) groupLabels(lbs *LabelsBuilder) labels.Labels { - if len(l.groups) != 0 { - if l.without { - return lbs.WithoutLabels(l.groups...) - } - return lbs.WithLabels(l.groups...) - } - if l.noLabels { - return labels.Labels{} - } - return lbs.WithoutLabels(l.labelName) + return v, l.builder.GroupedLabels(), true } func convertFloat(v string) (float64, error) { diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index a0b5d53fc5d6e..d52a6dd5ce370 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -112,10 +112,11 @@ func Test_labelSampleExtractor_Extract(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sort.Sort(tt.in) - outval, outlbs, ok := tt.ex.Process([]byte(""), tt.in) + + outval, outlbs, ok := tt.ex.ForStream(tt.in).Process([]byte("")) require.Equal(t, tt.wantOk, ok) require.Equal(t, tt.want, outval) - require.Equal(t, tt.wantLbs, outlbs) + require.Equal(t, tt.wantLbs, outlbs.Labels()) }) } } diff --git a/pkg/logql/log/parser.go b/pkg/logql/log/parser.go index b979127058d43..aa9ef3d28edab 100644 --- a/pkg/logql/log/parser.go +++ b/pkg/logql/log/parser.go @@ -26,14 +26,12 @@ var ( errMissingCapture = errors.New("at least one named capture must be supplied") ) -func addLabel(lbs *LabelsBuilder) func(key, value string) { - return func(key, value string) { - key = sanitizeKey(key) - if lbs.Base().Has(key) { - key = fmt.Sprintf("%s%s", key, duplicateSuffix) - } - lbs.Set(key, value) +func addLabel(lbs *LabelsBuilder, key, value string) { + key = sanitizeKey(key) + if lbs.BaseHas(key) { + key = fmt.Sprintf("%s%s", key, duplicateSuffix) } + lbs.Set(key, value) } func sanitizeKey(key string) string { @@ -66,20 +64,20 @@ func (j *JSONParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { lbs.SetErr(errJSON) return line, true } - parseMap("", data, addLabel(lbs)) + parseMap("", data, lbs) return line, true } -func parseMap(prefix string, data map[string]interface{}, add func(key, value string)) { +func parseMap(prefix string, data map[string]interface{}, lbs *LabelsBuilder) { for key, val := range data { switch concrete := val.(type) { case map[string]interface{}: - parseMap(jsonKey(prefix, key), concrete, add) + parseMap(jsonKey(prefix, key), concrete, lbs) case string: - add(jsonKey(prefix, key), concrete) + addLabel(lbs, jsonKey(prefix, key), concrete) case float64: f := strconv.FormatFloat(concrete, 'f', -1, 64) - add(jsonKey(prefix, key), f) + addLabel(lbs, jsonKey(prefix, key), f) } } } @@ -138,10 +136,9 @@ func mustNewRegexParser(re string) *RegexpParser { } func (r *RegexpParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { - add := addLabel(lbs) for i, value := range r.regex.FindSubmatch(line) { if name, ok := r.nameIndex[i]; ok { - add(name, string(value)) + addLabel(lbs, name, string(value)) } } return line, true @@ -161,11 +158,11 @@ func NewLogfmtParser() *LogfmtParser { func (l *LogfmtParser) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { l.dec.Reset(line) - add := addLabel(lbs) + for l.dec.ScanKeyval() { key := string(l.dec.Key()) val := string(l.dec.Value()) - add(key, val) + addLabel(lbs, key, val) } if l.dec.Err() != nil { lbs.SetErr(errLogfmt) diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 651add226f8f9..659b7ce17a8a0 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -77,8 +77,8 @@ func Test_jsonParser_Parse(t *testing.T) { for _, tt := range tests { j := NewJSONParser() t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, _ = j.Process(tt.line, b) sort.Sort(tt.want) require.Equal(t, tt.want, b.Labels()) @@ -169,8 +169,8 @@ func Test_regexpParser_Parse(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, _ = tt.parser.Process(tt.line, b) sort.Sort(tt.want) require.Equal(t, tt.want, b.Labels()) @@ -281,8 +281,8 @@ func Test_logfmtParser_Parse(t *testing.T) { p := NewLogfmtParser() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - b := NewLabelsBuilder() - b.Reset(tt.lbs) + b := NewBaseLabelsBuilder().ForLabels(tt.lbs, tt.lbs.Hash()) + b.Reset() _, _ = p.Process(tt.line, b) sort.Sort(tt.want) require.Equal(t, tt.want, b.Labels()) diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index 3c5d1b6459fa2..a864f7576c9e9 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -4,9 +4,17 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) +var ( + NoopStage Stage = &noopStage{} +) + // Pipeline transform and filter log lines and labels. type Pipeline interface { - Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) + ForStream(labels labels.Labels) StreamPipeline +} + +type StreamPipeline interface { + Process(line []byte) ([]byte, LabelsResult, bool) } // Stage is a single step of a Pipeline. @@ -14,15 +22,32 @@ type Stage interface { Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) } -var ( - NoopPipeline Pipeline = &noopPipeline{} - NoopStage Stage = &noopStage{} -) +func NewNoopPipeline() Pipeline { + return &noopPipeline{ + cache: map[uint64]*noopStreamPipeline{}, + } +} + +type noopPipeline struct { + cache map[uint64]*noopStreamPipeline +} -type noopPipeline struct{} +type noopStreamPipeline struct { + LabelsResult +} -func (noopPipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) { - return line, lbs, true +func (n noopStreamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { + return line, n.LabelsResult, true +} + +func (n *noopPipeline) ForStream(labels labels.Labels) StreamPipeline { + h := labels.Hash() + if cached, ok := n.cache[h]; ok { + return cached + } + sp := &noopStreamPipeline{LabelsResult: NewLabelsResult(labels, h)} + n.cache[h] = sp + return sp } type noopStage struct{} @@ -40,30 +65,52 @@ func (fn StageFunc) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { // pipeline is a combinations of multiple stages. // It can also be reduced into a single stage for convenience. type pipeline struct { - stages []Stage - builder *LabelsBuilder + stages []Stage + baseBuilder *BaseLabelsBuilder + + streamPipelines map[uint64]StreamPipeline } func NewPipeline(stages []Stage) Pipeline { + if len(stages) == 0 { + return NewNoopPipeline() + } return &pipeline{ - stages: stages, - builder: NewLabelsBuilder(), + stages: stages, + baseBuilder: NewBaseLabelsBuilder(), + streamPipelines: make(map[uint64]StreamPipeline), } } -func (p *pipeline) Process(line []byte, lbs labels.Labels) ([]byte, labels.Labels, bool) { - var ok bool - if len(p.stages) == 0 { - return line, lbs, true +type streamPipeline struct { + stages []Stage + builder *LabelsBuilder +} + +func (p *pipeline) ForStream(labels labels.Labels) StreamPipeline { + hash := p.baseBuilder.Hash(labels) + if res, ok := p.streamPipelines[hash]; ok { + return res + } + + res := &streamPipeline{ + stages: p.stages, + builder: p.baseBuilder.ForLabels(labels, hash), } - p.builder.Reset(lbs) + p.streamPipelines[hash] = res + return res +} + +func (p *streamPipeline) Process(line []byte) ([]byte, LabelsResult, bool) { + var ok bool + p.builder.Reset() for _, s := range p.stages { line, ok = s.Process(line, p.builder) if !ok { return nil, nil, false } } - return line, p.builder.Labels(), true + return line, p.builder.LabelsResult(), true } // ReduceStages reduces multiple stages into one. diff --git a/pkg/logql/log/pipeline_test.go b/pkg/logql/log/pipeline_test.go index 60b42e55f9392..2038d717486b2 100644 --- a/pkg/logql/log/pipeline_test.go +++ b/pkg/logql/log/pipeline_test.go @@ -10,7 +10,7 @@ import ( var ( resOK bool resLine []byte - resLbs labels.Labels + resLbs LabelsResult ) func Benchmark_Pipeline(b *testing.B) { @@ -39,8 +39,9 @@ func Benchmark_Pipeline(b *testing.B) { {Name: "pod_template_hash", Value: "5896759c79"}, } b.ResetTimer() + sp := p.ForStream(lbs) for n := 0; n < b.N; n++ { - resLine, resLbs, resOK = p.Process(line, lbs) + resLine, resLbs, resOK = sp.Process(line) } } diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index 10cde5bcf152c..ffbabe2506fbe 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -1990,13 +1990,13 @@ func Test_PipelineCombined(t *testing.T) { p, err := expr.Pipeline() require.Nil(t, err) - - line, lbs, ok := p.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`), labels.Labels{}) + sp := p.ForStream(labels.Labels{}) + line, lbs, ok := sp.Process([]byte(`level=debug ts=2020-10-02T10:10:42.092268913Z caller=logging.go:66 traceID=a9d4d8a928d8db1 msg="POST /api/prom/api/v1/query_range (200) 1.5s"`)) require.True(t, ok) require.Equal( t, labels.Labels{labels.Label{Name: "caller", Value: "logging.go:66"}, labels.Label{Name: "duration", Value: "1.5s"}, labels.Label{Name: "level", Value: "debug"}, labels.Label{Name: "method", Value: "POST"}, labels.Label{Name: "msg", Value: "POST /api/prom/api/v1/query_range (200) 1.5s"}, labels.Label{Name: "path", Value: "/api/prom/api/v1/query_range"}, labels.Label{Name: "status", Value: "200"}, labels.Label{Name: "traceID", Value: "a9d4d8a928d8db1"}, labels.Label{Name: "ts", Value: "2020-10-02T10:10:42.092268913Z"}}, - lbs, + lbs.Labels(), ) require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line)) } diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index 01db6fceeb33d..ea26c8127b1d1 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -97,7 +97,8 @@ func processStream(in []logproto.Stream, pipeline log.Pipeline) []logproto.Strea for _, stream := range in { for _, e := range stream.Entries { - if l, out, ok := pipeline.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok { + sp := pipeline.ForStream(mustParseLabels(stream.Labels)) + if l, out, ok := sp.Process([]byte(e.Line)); ok { var s *logproto.Stream var found bool s, found = resByStream[out.String()] @@ -124,7 +125,8 @@ func processSeries(in []logproto.Stream, ex log.SampleExtractor) []logproto.Seri for _, stream := range in { for _, e := range stream.Entries { - if f, lbs, ok := ex.Process([]byte(e.Line), mustParseLabels(stream.Labels)); ok { + exs := ex.ForStream(mustParseLabels(stream.Labels)) + if f, lbs, ok := exs.Process([]byte(e.Line)); ok { var s *logproto.Series var found bool s, found = resBySeries[lbs.String()] diff --git a/pkg/storage/batch.go b/pkg/storage/batch.go index 471860c0cac2a..31ecbbdfb04f9 100644 --- a/pkg/storage/batch.go +++ b/pkg/storage/batch.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/logql/stats" ) @@ -381,19 +382,21 @@ func (it *logBatchIterator) newChunksIterator(b *chunkBatch) (iter.EntryIterator func (it *logBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for _, chunks := range chks { + if len(chunks) != 0 && len(chunks[0]) != 0 { + streamPipeline := it.pipeline.ForStream(chunks[0][0].Chunk.Metric.WithoutLabels(labels.MetricName)) + iterator, err := it.buildHeapIterator(chunks, from, through, streamPipeline, nextChunk) + if err != nil { + return nil, err + } - iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk) - if err != nil { - return nil, err + result = append(result, iterator) } - - result = append(result, iterator) } return result, nil } -func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.EntryIterator, error) { +func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamPipeline log.StreamPipeline, nextChunk *LazyChunk) (iter.EntryIterator, error) { result := make([]iter.EntryIterator, 0, len(chks)) for i := range chks { @@ -402,7 +405,7 @@ func (it *logBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through if !chks[i][j].IsValid { continue } - iterator, err := chks[i][j].Iterator(it.ctx, from, through, it.direction, it.pipeline, nextChunk) + iterator, err := chks[i][j].Iterator(it.ctx, from, through, it.direction, streamPipeline, nextChunk) if err != nil { return nil, err } @@ -514,17 +517,21 @@ func (it *sampleBatchIterator) newChunksIterator(b *chunkBatch) (iter.SampleIter func (it *sampleBatchIterator) buildIterators(chks map[model.Fingerprint][][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) ([]iter.SampleIterator, error) { result := make([]iter.SampleIterator, 0, len(chks)) for _, chunks := range chks { - iterator, err := it.buildHeapIterator(chunks, from, through, nextChunk) - if err != nil { - return nil, err + if len(chunks) != 0 && len(chunks[0]) != 0 { + streamExtractor := it.extractor.ForStream(chunks[0][0].Chunk.Metric.WithoutLabels(labels.MetricName)) + iterator, err := it.buildHeapIterator(chunks, from, through, streamExtractor, nextChunk) + if err != nil { + return nil, err + } + result = append(result, iterator) } - result = append(result, iterator) + } return result, nil } -func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, nextChunk *LazyChunk) (iter.SampleIterator, error) { +func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, through time.Time, streamExtractor log.StreamSampleExtractor, nextChunk *LazyChunk) (iter.SampleIterator, error) { result := make([]iter.SampleIterator, 0, len(chks)) for i := range chks { @@ -533,7 +540,7 @@ func (it *sampleBatchIterator) buildHeapIterator(chks [][]*LazyChunk, from, thro if !chks[i][j].IsValid { continue } - iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, it.extractor, nextChunk) + iterator, err := chks[i][j].SampleIterator(it.ctx, from, through, streamExtractor, nextChunk) if err != nil { return nil, err } @@ -755,20 +762,3 @@ outer: return css } - -// dropLabels returns a new label set with certain labels dropped -func dropLabels(ls labels.Labels, removals ...string) (dst labels.Labels) { - toDel := make(map[string]struct{}) - for _, r := range removals { - toDel[r] = struct{}{} - } - - for _, l := range ls { - _, remove := toDel[l.Name] - if !remove { - dst = append(dst, l) - } - } - - return dst -} diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index 01742e5514211..fe50e95100d1a 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1231,7 +1231,10 @@ func Test_newSampleBatchChunkIterator(t *testing.T) { for name, tt := range tests { tt := tt t.Run(name, func(t *testing.T) { - it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), log.CountExtractor.ToSampleExtractor(nil, false, false), tt.start, tt.end) + ex, err := log.NewLineSampleExtractor(log.CountExtractor, nil, nil, false, false) + require.NoError(t, err) + + it, err := newSampleBatchIterator(context.Background(), NilMetrics, tt.chunks, tt.batchSize, newMatchers(tt.matchers), ex, tt.start, tt.end) require.NoError(t, err) series, _, err := iter.ReadSampleBatch(it, 1000) _ = it.Close() @@ -1441,7 +1444,7 @@ func TestBuildHeapIterator(t *testing.T) { ctx: ctx, pipeline: logql.NoopPipeline, } - it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), nil) + it, err := b.buildHeapIterator(tc.input, from, from.Add(6*time.Millisecond), b.pipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) if err != nil { t.Errorf("buildHeapIterator error = %v", err) return @@ -1457,48 +1460,6 @@ func TestBuildHeapIterator(t *testing.T) { } } -func TestDropLabels(t *testing.T) { - - for i, tc := range []struct { - ls labels.Labels - drop []string - expected labels.Labels - }{ - { - ls: labels.Labels{ - labels.Label{ - Name: "a", - Value: "1", - }, - labels.Label{ - Name: "b", - Value: "2", - }, - labels.Label{ - Name: "c", - Value: "3", - }, - }, - drop: []string{"b"}, - expected: labels.Labels{ - labels.Label{ - Name: "a", - Value: "1", - }, - labels.Label{ - Name: "c", - Value: "3", - }, - }, - }, - } { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - dropped := dropLabels(tc.ls, tc.drop...) - require.Equal(t, tc.expected, dropped) - }) - } -} - func Test_IsInvalidChunkError(t *testing.T) { tests := []struct { name string diff --git a/pkg/storage/lazy_chunk.go b/pkg/storage/lazy_chunk.go index f868ed5684336..9be9dd3f1d8ea 100644 --- a/pkg/storage/lazy_chunk.go +++ b/pkg/storage/lazy_chunk.go @@ -6,12 +6,11 @@ import ( "time" "github.com/cortexproject/cortex/pkg/chunk" - "github.com/prometheus/prometheus/pkg/labels" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" ) // LazyChunk loads the chunk when it is accessed. @@ -33,7 +32,7 @@ func (c *LazyChunk) Iterator( ctx context.Context, from, through time.Time, direction logproto.Direction, - pipeline logql.Pipeline, + pipeline log.StreamPipeline, nextChunk *LazyChunk, ) (iter.EntryIterator, error) { @@ -59,7 +58,7 @@ func (c *LazyChunk) Iterator( // if the block is overlapping cache it with the next chunk boundaries. if nextChunk != nil && IsBlockOverlapping(b, nextChunk, direction) { // todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset. - it := iter.NewCachedIterator(b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline), b.Entries()) + it := iter.NewCachedIterator(b.Iterator(ctx, pipeline), b.Entries()) its = append(its, it) if c.overlappingBlocks == nil { c.overlappingBlocks = make(map[int]iter.CacheEntryIterator) @@ -71,7 +70,7 @@ func (c *LazyChunk) Iterator( delete(c.overlappingBlocks, b.Offset()) } // non-overlapping block with the next chunk are not cached. - its = append(its, b.Iterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), pipeline)) + its = append(its, b.Iterator(ctx, pipeline)) } if direction == logproto.FORWARD { @@ -106,7 +105,7 @@ func (c *LazyChunk) Iterator( func (c *LazyChunk) SampleIterator( ctx context.Context, from, through time.Time, - extractor logql.SampleExtractor, + extractor log.StreamSampleExtractor, nextChunk *LazyChunk, ) (iter.SampleIterator, error) { @@ -132,7 +131,7 @@ func (c *LazyChunk) SampleIterator( // if the block is overlapping cache it with the next chunk boundaries. if nextChunk != nil && IsBlockOverlapping(b, nextChunk, logproto.FORWARD) { // todo(cyriltovena) we can avoid to drop the metric name for each chunks since many chunks have the same metric/labelset. - it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor), b.Entries()) + it := iter.NewCachedSampleIterator(b.SampleIterator(ctx, extractor), b.Entries()) its = append(its, it) if c.overlappingSampleBlocks == nil { c.overlappingSampleBlocks = make(map[int]iter.CacheSampleIterator) @@ -144,7 +143,7 @@ func (c *LazyChunk) SampleIterator( delete(c.overlappingSampleBlocks, b.Offset()) } // non-overlapping block with the next chunk are not cached. - its = append(its, b.SampleIterator(ctx, dropLabels(c.Chunk.Metric, labels.MetricName), extractor)) + its = append(its, b.SampleIterator(ctx, extractor)) } // build the final iterator bound to the requested time range. diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 98b64c6417a03..bc435e5270d04 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util" ) @@ -46,7 +47,7 @@ func TestLazyChunkIterator(t *testing.T) { }, } { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline, nil) + it, err := tc.chunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, logql.NoopPipeline.ForStream(labels.Labels{labels.Label{Name: "foo", Value: "bar"}}), nil) require.Nil(t, err) streams, _, err := iter.ReadBatch(it, 1000) require.Nil(t, err) @@ -174,10 +175,10 @@ func (fakeBlock) Entries() int { return 0 } func (fakeBlock) Offset() int { return 0 } func (f fakeBlock) MinTime() int64 { return f.mint } func (f fakeBlock) MaxTime() int64 { return f.maxt } -func (fakeBlock) Iterator(context.Context, labels.Labels, logql.Pipeline) iter.EntryIterator { +func (fakeBlock) Iterator(context.Context, log.StreamPipeline) iter.EntryIterator { return nil } -func (fakeBlock) SampleIterator(context.Context, labels.Labels, logql.SampleExtractor) iter.SampleIterator { +func (fakeBlock) SampleIterator(context.Context, log.StreamSampleExtractor) iter.SampleIterator { return nil } From b5e5a04881f4cf040825eb0d10cf73ac7d77346a Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 3 Nov 2020 17:33:10 +0100 Subject: [PATCH 3/6] Cache grouped result when no changes has occured. Signed-off-by: Cyril Tovena --- pkg/logql/log/labels.go | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 750e012452d65..53ec6becef640 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -85,6 +85,7 @@ type BaseLabelsBuilder struct { type LabelsBuilder struct { currentLabels labels.Labels currentResult LabelsResult + groupedResult LabelsResult *BaseLabelsBuilder } @@ -303,7 +304,7 @@ func (b *LabelsBuilder) GroupedLabels() LabelsResult { if len(b.groups) == 0 { return b.currentResult } - return b.toGroup(b.currentLabels) + return b.toBaseGroup() } if b.without { @@ -375,23 +376,17 @@ OuterAdd: return b.toResult(res) } -func (b *LabelsBuilder) toGroup(from labels.Labels) LabelsResult { - var hash uint64 - if b.without { - hash = b.hasher.hashWithoutLabels(from, b.groups...) - } else { - hash = b.hasher.hashForLabels(from, b.groups...) - } - if cached, ok := b.resultCache[hash]; ok { - return cached +func (b *LabelsBuilder) toBaseGroup() LabelsResult { + if b.groupedResult != nil { + return b.groupedResult } var lbs labels.Labels if b.without { - lbs = from.WithoutLabels(b.groups...) + lbs = b.currentLabels.WithoutLabels(b.groups...) } else { - lbs = from.WithLabels(b.groups...) + lbs = b.currentLabels.WithLabels(b.groups...) } - res := NewLabelsResult(lbs, hash) - b.resultCache[hash] = res + res := NewLabelsResult(lbs, lbs.Hash()) + b.groupedResult = res return res } From 415ad7cfed68da88385fcb2d2c56aa72fd70c35d Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 3 Nov 2020 17:55:27 +0100 Subject: [PATCH 4/6] Removes unused methods. Signed-off-by: Cyril Tovena --- pkg/logql/log/labels.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 53ec6becef640..a59eb268bcdd9 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -54,18 +54,6 @@ func (h *hasher) Hash(lbs labels.Labels) uint64 { return hash } -func (h *hasher) hashWithoutLabels(lbs labels.Labels, groups ...string) uint64 { - var hash uint64 - hash, h.buf = lbs.HashWithoutLabels(h.buf, groups...) - return hash -} - -func (h *hasher) hashForLabels(lbs labels.Labels, groups ...string) uint64 { - var hash uint64 - hash, h.buf = lbs.HashForLabels(h.buf, groups...) - return hash -} - type BaseLabelsBuilder struct { // the current base base labels.Labels From 9c20705bf89892ce60fc20be626f08c79e756cff Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 4 Nov 2020 11:37:30 +0100 Subject: [PATCH 5/6] Adds docs and tests. Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk_test.go | 21 ++--- pkg/logql/log/labels.go | 39 +++++---- pkg/logql/log/labels_test.go | 100 +++++++++++++++++++++++ pkg/logql/log/metrics_extraction.go | 7 +- pkg/logql/log/metrics_extraction_test.go | 30 +++++++ pkg/logql/log/pipeline.go | 6 +- 6 files changed, 175 insertions(+), 28 deletions(-) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 711fdc971ee17..73cdd46f5be33 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -814,11 +814,11 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { }, } for _, test := range []string{ - `{app="foo"}`, + // `{app="foo"}`, `{app="foo"} != "foo"`, - `{app="foo"} != "foo" | logfmt `, - `{app="foo"} != "foo" | logfmt | duration > 10ms`, - `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, + // `{app="foo"} != "foo" | logfmt `, + // `{app="foo"} != "foo" | logfmt | duration > 10ms`, + // `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, } { b.Run(test, func(b *testing.B) { b.ReportAllocs() @@ -851,12 +851,13 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { } for _, test := range []string{ - `rate({app="foo"}[1m])`, - `sum by (cluster) (rate({app="foo"}[10s]))`, - `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, - `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, - `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, - `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, + // `rate({app="foo"}[1m])`, + // `sum by (cluster) (rate({app="foo"}[10s]))`, + // `sum by (cluster) (rate({app="foo"} != "foo" [10s]))`, + // `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, + // `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, + // `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, + // `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, } { b.Run(test, func(b *testing.B) { b.ReportAllocs() diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index a59eb268bcdd9..03bdbf9f272fe 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -10,12 +10,15 @@ var ( emptyLabelsResult = NewLabelsResult(labels.Labels{}, labels.Labels{}.Hash()) ) +// LabelsResult is a computed labels result that contains the labels set with associated string and hash. +// The is mainly used for caching and returning labels computations out of pipelines and stages. type LabelsResult interface { String() string Labels() labels.Labels Hash() uint64 } +// NewLabelsResult creates a new LabelsResult from a labels set and a hash. func NewLabelsResult(lbs labels.Labels, hash uint64) LabelsResult { return &labelsResult{lbs: lbs, s: lbs.String(), h: hash} } @@ -42,24 +45,25 @@ type hasher struct { buf []byte // buffer for computing hash without bytes slice allocation. } +// newHasher allow to compute hashes for labels by reusing the same buffer. func newHasher() *hasher { return &hasher{ buf: make([]byte, 0, 1024), } } +// Hash hashes the labels func (h *hasher) Hash(lbs labels.Labels) uint64 { var hash uint64 hash, h.buf = lbs.HashWithoutLabels(h.buf, []string(nil)...) return hash } +// BaseLabelsBuilder is a label builder used by pipeline and stages. +// Only one base builder is used and it contains cache for each LabelsBuilders. type BaseLabelsBuilder struct { - // the current base - base labels.Labels - del []string - add []labels.Label - + del []string + add []labels.Label err string groups []string @@ -71,13 +75,14 @@ type BaseLabelsBuilder struct { // LabelsBuilder is the same as labels.Builder but tailored for this package. type LabelsBuilder struct { - currentLabels labels.Labels + base labels.Labels currentResult LabelsResult groupedResult LabelsResult *BaseLabelsBuilder } +// NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) *BaseLabelsBuilder { return &BaseLabelsBuilder{ del: make([]string, 0, 5), @@ -90,15 +95,17 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, without, noLabels bool) * } } -// NewLabelsBuilder creates a new labels builder. +// NewLabelsBuilder creates a new base labels builder. func NewBaseLabelsBuilder() *BaseLabelsBuilder { return NewBaseLabelsBuilderWithGrouping(nil, false, false) } +// ForLabels creates a labels builder for a given labels set as base. +// The labels cache is shared across all created LabelsBuilders. func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBuilder { if labelResult, ok := b.resultCache[hash]; ok { res := &LabelsBuilder{ - currentLabels: lbs, + base: lbs, currentResult: labelResult, BaseLabelsBuilder: b, } @@ -107,7 +114,7 @@ func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBui labelResult := NewLabelsResult(lbs, hash) b.resultCache[hash] = labelResult res := &LabelsBuilder{ - currentLabels: lbs, + base: lbs, currentResult: labelResult, BaseLabelsBuilder: b, } @@ -116,7 +123,6 @@ func (b *BaseLabelsBuilder) ForLabels(lbs labels.Labels, hash uint64) *LabelsBui // Reset clears all current state for the builder. func (b *LabelsBuilder) Reset() { - b.base = b.currentLabels b.del = b.del[:0] b.add = b.add[:0] b.err = "" @@ -143,6 +149,7 @@ func (b *LabelsBuilder) BaseHas(key string) bool { return b.base.Has(key) } +// Get returns the value of a labels key if it exists. func (b *LabelsBuilder) Get(key string) (string, bool) { for _, a := range b.add { if a.Name == key { @@ -227,8 +234,8 @@ Outer: return res } -// Labels returns the labels from the builder. If no modifications -// were made, the original labels are returned. +// LabelsResult returns the LabelsResult from the builder. +// No grouping is applied and the cache is used when possible. func (b *LabelsBuilder) LabelsResult() LabelsResult { // unchanged path. if len(b.del) == 0 && len(b.add) == 0 { @@ -277,8 +284,8 @@ func (b *BaseLabelsBuilder) toResult(lbs labels.Labels) LabelsResult { return res } -// Labels returns the labels from the builder. If no modifications -// were made, the original labels are returned. +// GroupedLabels returns the LabelsResult from the builder. +// Groups are applied and the cache is used when possible. func (b *LabelsBuilder) GroupedLabels() LabelsResult { if b.err != "" { // We need to return now before applying grouping otherwise the error might get lost. @@ -370,9 +377,9 @@ func (b *LabelsBuilder) toBaseGroup() LabelsResult { } var lbs labels.Labels if b.without { - lbs = b.currentLabels.WithoutLabels(b.groups...) + lbs = b.base.WithoutLabels(b.groups...) } else { - lbs = b.currentLabels.WithLabels(b.groups...) + lbs = b.base.WithLabels(b.groups...) } res := NewLabelsResult(lbs, lbs.Hash()) b.groupedResult = res diff --git a/pkg/logql/log/labels_test.go b/pkg/logql/log/labels_test.go index 250090c460f9a..c29afb900227e 100644 --- a/pkg/logql/log/labels_test.go +++ b/pkg/logql/log/labels_test.go @@ -1,6 +1,7 @@ package log import ( + "sort" "testing" "github.com/prometheus/prometheus/pkg/labels" @@ -47,3 +48,102 @@ func TestLabelsBuilder_LabelsError(t *testing.T) { // make sure the original labels is unchanged. require.Equal(t, labels.Labels{labels.Label{Name: "already", Value: "in"}}, lbs) } + +func TestLabelsBuilder_LabelsResult(t *testing.T) { + lbs := labels.Labels{ + labels.Label{Name: "namespace", Value: "loki"}, + labels.Label{Name: "job", Value: "us-central1/loki"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + } + sort.Sort(lbs) + b := NewBaseLabelsBuilder().ForLabels(lbs, lbs.Hash()) + b.Reset() + assertLabelResult(t, lbs, b.LabelsResult()) + b.SetErr("err") + withErr := append(lbs, labels.Label{Name: ErrorLabel, Value: "err"}) + sort.Sort(withErr) + assertLabelResult(t, withErr, b.LabelsResult()) + + b.Set("foo", "bar") + b.Set("namespace", "tempo") + b.Set("buzz", "fuzz") + b.Del("job") + expected := labels.Labels{ + labels.Label{Name: ErrorLabel, Value: "err"}, + labels.Label{Name: "namespace", Value: "tempo"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + labels.Label{Name: "foo", Value: "bar"}, + labels.Label{Name: "buzz", Value: "fuzz"}, + } + sort.Sort(expected) + assertLabelResult(t, expected, b.LabelsResult()) + // cached. + assertLabelResult(t, expected, b.LabelsResult()) +} + +func TestLabelsBuilder_GroupedLabelsResult(t *testing.T) { + lbs := labels.Labels{ + labels.Label{Name: "namespace", Value: "loki"}, + labels.Label{Name: "job", Value: "us-central1/loki"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + } + sort.Sort(lbs) + b := NewBaseLabelsBuilderWithGrouping([]string{"namespace"}, false, false).ForLabels(lbs, lbs.Hash()) + b.Reset() + assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "loki"}}, b.GroupedLabels()) + b.SetErr("err") + withErr := append(lbs, labels.Label{Name: ErrorLabel, Value: "err"}) + sort.Sort(withErr) + assertLabelResult(t, withErr, b.GroupedLabels()) + + b.Reset() + b.Set("foo", "bar") + b.Set("namespace", "tempo") + b.Set("buzz", "fuzz") + b.Del("job") + expected := labels.Labels{ + labels.Label{Name: "namespace", Value: "tempo"}, + } + sort.Sort(expected) + assertLabelResult(t, expected, b.GroupedLabels()) + // cached. + assertLabelResult(t, expected, b.GroupedLabels()) + + b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, false, false).ForLabels(lbs, lbs.Hash()) + assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) + assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) + b.Del("job") + assertLabelResult(t, labels.Labels{}, b.GroupedLabels()) + b.Reset() + b.Set("namespace", "tempo") + assertLabelResult(t, labels.Labels{labels.Label{Name: "job", Value: "us-central1/loki"}}, b.GroupedLabels()) + + b = NewBaseLabelsBuilderWithGrouping([]string{"job"}, true, false).ForLabels(lbs, lbs.Hash()) + b.Del("job") + b.Set("foo", "bar") + b.Set("job", "something") + expected = labels.Labels{ + labels.Label{Name: "namespace", Value: "loki"}, + labels.Label{Name: "cluster", Value: "us-central1"}, + labels.Label{Name: "foo", Value: "bar"}, + } + sort.Sort(expected) + assertLabelResult(t, expected, b.GroupedLabels()) + +} + +func assertLabelResult(t *testing.T, lbs labels.Labels, res LabelsResult) { + t.Helper() + require.Equal(t, + lbs, + res.Labels(), + ) + require.Equal(t, + lbs.Hash(), + res.Hash(), + ) + require.Equal(t, + lbs.String(), + res.String(), + ) +} diff --git a/pkg/logql/log/metrics_extraction.go b/pkg/logql/log/metrics_extraction.go index f9bd332604f76..b6b2cce3366a9 100644 --- a/pkg/logql/log/metrics_extraction.go +++ b/pkg/logql/log/metrics_extraction.go @@ -25,11 +25,12 @@ var ( BytesExtractor LineExtractor = func(line []byte) float64 { return float64(len(line)) } ) -// SampleExtractor extracts sample for a log line. +// SampleExtractor creates StreamSampleExtractor that can extract samples for a given log stream. type SampleExtractor interface { ForStream(labels labels.Labels) StreamSampleExtractor } +// StreamSampleExtractor extracts sample for a log line. type StreamSampleExtractor interface { Process(line []byte) (float64, LabelsResult, bool) } @@ -75,6 +76,10 @@ type streamLineSampleExtractor struct { } func (l *streamLineSampleExtractor) Process(line []byte) (float64, LabelsResult, bool) { + // short circuit. + if l.Stage == NoopStage { + return l.LineExtractor(line), l.builder.GroupedLabels(), true + } l.builder.Reset() line, ok := l.Stage.Process(line, l.builder) if !ok { diff --git a/pkg/logql/log/metrics_extraction_test.go b/pkg/logql/log/metrics_extraction_test.go index d52a6dd5ce370..68e22177d6a2e 100644 --- a/pkg/logql/log/metrics_extraction_test.go +++ b/pkg/logql/log/metrics_extraction_test.go @@ -127,3 +127,33 @@ func mustSampleExtractor(ex SampleExtractor, err error) SampleExtractor { } return ex } + +func TestNewLineSampleExtractor(t *testing.T) { + + se, err := NewLineSampleExtractor(CountExtractor, nil, nil, false, false) + require.NoError(t, err) + lbs := labels.Labels{ + {Name: "namespace", Value: "dev"}, + {Name: "cluster", Value: "us-central1"}, + } + sort.Sort(lbs) + sse := se.ForStream(lbs) + f, l, ok := sse.Process([]byte(`foo`)) + require.True(t, ok) + require.Equal(t, 1., f) + assertLabelResult(t, lbs, l) + + filter, err := NewFilter("foo", labels.MatchEqual) + require.NoError(t, err) + + se, err = NewLineSampleExtractor(BytesExtractor, []Stage{filter.ToStage()}, []string{"namespace"}, false, false) + require.NoError(t, err) + sse = se.ForStream(lbs) + f, l, ok = sse.Process([]byte(`foo`)) + require.True(t, ok) + require.Equal(t, 3., f) + assertLabelResult(t, labels.Labels{labels.Label{Name: "namespace", Value: "dev"}}, l) + sse = se.ForStream(lbs) + _, _, ok = sse.Process([]byte(`nope`)) + require.False(t, ok) +} diff --git a/pkg/logql/log/pipeline.go b/pkg/logql/log/pipeline.go index a864f7576c9e9..6a7906319d7b6 100644 --- a/pkg/logql/log/pipeline.go +++ b/pkg/logql/log/pipeline.go @@ -5,14 +5,16 @@ import ( ) var ( + // NoopStage is a stage that doesn't process a log line. NoopStage Stage = &noopStage{} ) -// Pipeline transform and filter log lines and labels. +// Pipeline can create pipelines for each log stream. type Pipeline interface { ForStream(labels labels.Labels) StreamPipeline } +// StreamPipeline transform and filter log lines and labels. type StreamPipeline interface { Process(line []byte) ([]byte, LabelsResult, bool) } @@ -22,6 +24,7 @@ type Stage interface { Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) } +// NewNoopPipeline creates a pipelines that does not process anything and returns log streams as is. func NewNoopPipeline() Pipeline { return &noopPipeline{ cache: map[uint64]*noopStreamPipeline{}, @@ -71,6 +74,7 @@ type pipeline struct { streamPipelines map[uint64]StreamPipeline } +// NewPipeline creates a new pipeline for a given set of stages. func NewPipeline(stages []Stage) Pipeline { if len(stages) == 0 { return NewNoopPipeline() From beadfcd0032aa64d04acd9db363397254833aec2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 4 Nov 2020 11:39:26 +0100 Subject: [PATCH 6/6] Uncomment tests. Signed-off-by: Cyril Tovena --- pkg/chunkenc/memchunk_test.go | 22 ++++++++++----------- pkg/logql/log/labels.go | 37 ++++------------------------------- 2 files changed, 15 insertions(+), 44 deletions(-) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 73cdd46f5be33..459f51354b2a9 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -814,11 +814,11 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { }, } for _, test := range []string{ - // `{app="foo"}`, + `{app="foo"}`, `{app="foo"} != "foo"`, - // `{app="foo"} != "foo" | logfmt `, - // `{app="foo"} != "foo" | logfmt | duration > 10ms`, - // `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, + `{app="foo"} != "foo" | logfmt `, + `{app="foo"} != "foo" | logfmt | duration > 10ms`, + `{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`, } { b.Run(test, func(b *testing.B) { b.ReportAllocs() @@ -851,13 +851,13 @@ func BenchmarkBufferedIteratorLabels(b *testing.B) { } for _, test := range []string{ - // `rate({app="foo"}[1m])`, - // `sum by (cluster) (rate({app="foo"}[10s]))`, - // `sum by (cluster) (rate({app="foo"} != "foo" [10s]))`, - // `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, - // `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, - // `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, - // `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, + `rate({app="foo"}[1m])`, + `sum by (cluster) (rate({app="foo"}[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" [10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`, + `sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`, } { b.Run(test, func(b *testing.B) { b.ReportAllocs() diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 03bdbf9f272fe..24283a36c9143 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -64,6 +64,7 @@ func (h *hasher) Hash(lbs labels.Labels) uint64 { type BaseLabelsBuilder struct { del []string add []labels.Label + // nolint(structcheck) https://github.com/golangci/golangci-lint/issues/826 err string groups []string @@ -238,40 +239,10 @@ Outer: // No grouping is applied and the cache is used when possible. func (b *LabelsBuilder) LabelsResult() LabelsResult { // unchanged path. - if len(b.del) == 0 && len(b.add) == 0 { - if b.err == "" { - return b.currentResult - } - // unchanged but with error. - res := append(b.base.Copy(), labels.Label{Name: ErrorLabel, Value: b.err}) - sort.Sort(res) - return b.toResult(res) - } - - // In the general case, labels are removed, modified or moved - // rather than added. - res := make(labels.Labels, 0, len(b.base)+len(b.add)) -Outer: - for _, l := range b.base { - for _, n := range b.del { - if l.Name == n { - continue Outer - } - } - for _, la := range b.add { - if l.Name == la.Name { - continue Outer - } - } - res = append(res, l) + if len(b.del) == 0 && len(b.add) == 0 && b.err == "" { + return b.currentResult } - res = append(res, b.add...) - if b.err != "" { - res = append(res, labels.Label{Name: ErrorLabel, Value: b.err}) - } - sort.Sort(res) - - return b.toResult(res) + return b.toResult(b.Labels()) } func (b *BaseLabelsBuilder) toResult(lbs labels.Labels) LabelsResult {