Skip to content

Commit

Permalink
Logqv2 optimization (#2778)
Browse files Browse the repository at this point in the history
* Adds logfmt, regexp and json logql parser

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook the ast with parsers.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook parser with memchunk.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook parser with the storage.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* hook parser with ingesters

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fixes all tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Refactor to pipeline and implement ast parsing.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes the lexer for duration and range

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes all tests and add some for label filters

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add label and line format.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add tests for fmt label and line with validations.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Polishing parsers and add some more test cases

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Finish the unwrap parser, still need to add more tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Indent this hell.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Moar tests and it works.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more tests which lead me to find a bug in the lexer

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add more tests and fix all engine tests

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes match stage in promtail pipelines.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Hook Pipeline into ingester, tailer and storage.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Correctly setup sharding for logqlv2

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes precedences issue with label filters and add moar tests ✌️

* Adds quantile_over_time, grouping for non associate range aggregation parsing and moar tests

* Extract with grouping

* Adds parsing duration on unwrap

* Improve the lexer to support more common identifier as functions.

Also add duration convertion for unwrap.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes the frontend logs to include org_id.

The auth middleware was happening after the stats one and so org_id was not set 🤦.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Support byte sizes in label filters.

This patch extends the duration label filter with support for byte sizes
such as `1kB` and `42MiB`.

* Wip on error handling.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes json parser with prometheus label name rules.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* fixup! Support byte sizes in label filters.

* Wip error handling, commit before big refactoring.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Refactoring in progress.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Work in progress.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Got something that builds and throw __error__ labels properly now.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Add error handling + fixes groupins and post filtering.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* 400 on pipeline errors.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes a races in the log pipeline.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Unsure the key is parsable and valid.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Cleanup and code documentation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes frontend handler.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes old test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fix go1.15 local failing test.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes race conditions in the batch iterator.

We should never advance an iterator in parallel. Unfortunately before the code was building iterators while advancing previous one, building iterator can advance iterator and thus creates a race condition. This changeset make sure we only fetch chunks in advance and build iterator and iterate over them in sequence.

Also add support for labels in the cacheIterator which is required for logqlv2.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Handle panic in the store goroutine.

This could cause Loki to crash if a panic happens in the store since it was happening in another goroutine.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Now that races are gone we can use a global decoder.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improve labels handling in all LogQL stages.

benchmark before:

```
pkg: github.com/grafana/loki/pkg/logql/log
Benchmark_Pipeline
Benchmark_Pipeline-16    	  107580	     10271 ns/op	    6387 B/op	      67 allocs/op
PASS
ok  	github.com/grafana/loki/pkg/logql/log	1.228s
```

after
```
pkg: github.com/grafana/loki/pkg/logql/log
Benchmark_Pipeline
Benchmark_Pipeline-16    	  199170	      5670 ns/op	    3307 B/op	      54 allocs/op
PASS
ok  	github.com/grafana/loki/pkg/logql/log	1.201s
```

Way less allocations !

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes error label filtering.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Reviews comment.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
  • Loading branch information
cyriltovena and jeschkies authored Oct 21, 2020
1 parent 9405299 commit dd5fceb
Show file tree
Hide file tree
Showing 15 changed files with 593 additions and 327 deletions.
6 changes: 3 additions & 3 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func (m MultiStageExpr) Pipeline() (log.Pipeline, error) {
if len(stages) == 0 {
return log.NoopPipeline, nil
}
return stages, nil
return log.NewPipeline(stages), nil
}

func (m MultiStageExpr) stages() (log.MultiStage, error) {
c := make(log.MultiStage, 0, len(m))
func (m MultiStageExpr) stages() ([]log.Stage, error) {
c := make([]log.Stage, 0, len(m))
for _, e := range m {
p, err := e.Stage()
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/logql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) {
if err := r.validate(); err != nil {
return nil, err
}
stages := log.MultiStage{}
var stages []log.Stage
if p, ok := r.left.left.(*pipelineExpr); ok {
// if the expression is a pipeline then take all stages into account first.
st, err := p.pipeline.stages()
Expand All @@ -40,18 +40,18 @@ func (r rangeAggregationExpr) Extractor() (log.SampleExtractor, error) {
groups = r.grouping.groups
without = r.grouping.without
}
return stages.WithLabelExtractor(
return log.LabelExtractorWithStages(
r.left.unwrap.identifier,
convOp, groups, without,
convOp, groups, without, stages,
log.ReduceAndLabelFilter(r.left.unwrap.postFilters),
)
}
// otherwise we extract metrics from the log line.
switch r.operation {
case OpRangeTypeRate, OpRangeTypeCount:
return stages.WithLineExtractor(log.CountExtractor)
return log.LineExtractorWithStages(log.CountExtractor, stages)
case OpRangeTypeBytes, OpRangeTypeBytesRate:
return stages.WithLineExtractor(log.BytesExtractor)
return log.LineExtractorWithStages(log.BytesExtractor, stages)
default:
return nil, fmt.Errorf(unsupportedErr, r.operation)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/logql/log/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (n notFilter) Filter(line []byte) bool {
}

func (n notFilter) ToStage() Stage {
return StageFunc(func(line []byte, lbs Labels) ([]byte, bool) {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, n.Filter(line)
})
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func (a andFilter) Filter(line []byte) bool {
}

func (a andFilter) ToStage() Stage {
return StageFunc(func(line []byte, lbs Labels) ([]byte, bool) {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, a.Filter(line)
})
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func (a orFilter) Filter(line []byte) bool {
}

func (a orFilter) ToStage() Stage {
return StageFunc(func(line []byte, lbs Labels) ([]byte, bool) {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, a.Filter(line)
})
}
Expand Down Expand Up @@ -148,7 +148,7 @@ func (r regexpFilter) Filter(line []byte) bool {
}

func (r regexpFilter) ToStage() Stage {
return StageFunc(func(line []byte, lbs Labels) ([]byte, bool) {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, r.Filter(line)
})
}
Expand All @@ -166,7 +166,7 @@ func (l containsFilter) Filter(line []byte) bool {
}

func (l containsFilter) ToStage() Stage {
return StageFunc(func(line []byte, lbs Labels) ([]byte, bool) {
return StageFunc(func(line []byte, _ *LabelsBuilder) ([]byte, bool) {
return line, l.Filter(line)
})
}
Expand Down
21 changes: 14 additions & 7 deletions pkg/logql/log/fmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func NewFormatter(tmpl string) (*LineFormatter, error) {
}, nil
}

func (lf *LineFormatter) Process(_ []byte, lbs Labels) ([]byte, bool) {
func (lf *LineFormatter) Process(_ []byte, lbs *LabelsBuilder) ([]byte, bool) {
lf.buf.Reset()
// todo(cyriltovena): handle error
_ = lf.Template.Execute(lf.buf, lbs)
_ = lf.Template.Execute(lf.buf, lbs.Labels().Map())
// todo(cyriltovena): we might want to reuse the input line or a bytes buffer.
res := make([]byte, len(lf.buf.Bytes()))
copy(res, lf.buf.Bytes())
Expand Down Expand Up @@ -138,17 +138,24 @@ func validate(fmts []LabelFmt) error {
return nil
}

func (lf *LabelsFormatter) Process(l []byte, lbs Labels) ([]byte, bool) {
func (lf *LabelsFormatter) Process(l []byte, lbs *LabelsBuilder) ([]byte, bool) {
var data interface{}
for _, f := range lf.formats {
if f.Rename {
lbs[f.Name] = lbs[f.Value]
delete(lbs, f.Value)
v, ok := lbs.Get(f.Value)
if ok {
lbs.Set(f.Name, v)
lbs.Del(f.Value)
}
continue
}
lf.buf.Reset()
//todo (cyriltovena): handle error
_ = f.tmpl.Execute(lf.buf, lbs)
lbs[f.Name] = lf.buf.String()
if data == nil {
data = lbs.Labels().Map()
}
_ = f.tmpl.Execute(lf.buf, data)
lbs.Set(f.Name, lf.buf.String())
}
return l, true
}
48 changes: 28 additions & 20 deletions pkg/logql/log/fmt_test.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,51 @@
package log

import (
"sort"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"
)

func Test_lineFormatter_Format(t *testing.T) {
tests := []struct {
name string
fmter *LineFormatter
lbs map[string]string
lbs labels.Labels

want []byte
wantLbs map[string]string
wantLbs labels.Labels
}{
{
"combining",
newMustLineFormatter("foo{{.foo}}buzz{{ .bar }}"),
map[string]string{"foo": "blip", "bar": "blop"},
labels.Labels{{Name: "foo", Value: "blip"}, {Name: "bar", Value: "blop"}},
[]byte("fooblipbuzzblop"),
map[string]string{"foo": "blip", "bar": "blop"},
labels.Labels{{Name: "foo", Value: "blip"}, {Name: "bar", Value: "blop"}},
},
{
"missing",
newMustLineFormatter("foo {{.foo}}buzz{{ .bar }}"),
map[string]string{"bar": "blop"},
labels.Labels{{Name: "bar", Value: "blop"}},
[]byte("foo buzzblop"),
map[string]string{"bar": "blop"},
labels.Labels{{Name: "bar", Value: "blop"}},
},
{
"function",
newMustLineFormatter("foo {{.foo | ToUpper }} buzz{{ .bar }}"),
map[string]string{"foo": "blip", "bar": "blop"},
labels.Labels{{Name: "foo", Value: "blip"}, {Name: "bar", Value: "blop"}},
[]byte("foo BLIP buzzblop"),
map[string]string{"foo": "blip", "bar": "blop"},
labels.Labels{{Name: "foo", Value: "blip"}, {Name: "bar", Value: "blop"}},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
outLine, _ := tt.fmter.Process(nil, tt.lbs)
b := NewLabelsBuilder()
b.Reset(tt.lbs)
outLine, _ := tt.fmter.Process(nil, b)
require.Equal(t, tt.want, outLine)
require.Equal(t, tt.wantLbs, tt.lbs)
require.Equal(t, tt.wantLbs, b.Labels())
})
}
}
Expand All @@ -59,38 +63,42 @@ func Test_labelsFormatter_Format(t *testing.T) {
name string
fmter *LabelsFormatter

in Labels
want Labels
in labels.Labels
want labels.Labels
}{
{
"combined with template",
mustNewLabelsFormatter([]LabelFmt{NewTemplateLabelFmt("foo", "{{.foo}} and {{.bar}}")}),
map[string]string{"foo": "blip", "bar": "blop"},
map[string]string{"foo": "blip and blop", "bar": "blop"},
labels.Labels{{Name: "foo", Value: "blip"}, {Name: "bar", Value: "blop"}},
labels.Labels{{Name: "foo", Value: "blip and blop"}, {Name: "bar", Value: "blop"}},
},
{
"combined with template and rename",
mustNewLabelsFormatter([]LabelFmt{
NewTemplateLabelFmt("blip", "{{.foo}} and {{.bar}}"),
NewRenameLabelFmt("bar", "foo"),
}),
map[string]string{"foo": "blip", "bar": "blop"},
map[string]string{"blip": "blip and blop", "bar": "blip"},
labels.Labels{{Name: "foo", Value: "blip"}, {Name: "bar", Value: "blop"}},
labels.Labels{{Name: "blip", Value: "blip and blop"}, {Name: "bar", Value: "blip"}},
},
{
"fn",
mustNewLabelsFormatter([]LabelFmt{
NewTemplateLabelFmt("blip", "{{.foo | ToUpper }} and {{.bar}}"),
NewRenameLabelFmt("bar", "foo"),
}),
map[string]string{"foo": "blip", "bar": "blop"},
map[string]string{"blip": "BLIP and blop", "bar": "blip"},
labels.Labels{{Name: "foo", Value: "blip"}, {Name: "bar", Value: "blop"}},
labels.Labels{{Name: "blip", Value: "BLIP and blop"}, {Name: "bar", Value: "blip"}},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _ = tt.fmter.Process(nil, tt.in)
require.Equal(t, tt.want, tt.in)
b := NewLabelsBuilder()
b.Reset(tt.in)
_, _ = tt.fmter.Process(nil, b)
sort.Sort(tt.want)
require.Equal(t, tt.want, b.Labels())
})
}
}
Expand Down
48 changes: 24 additions & 24 deletions pkg/logql/log/label_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewOrLabelFilter(left LabelFilterer, right LabelFilterer) *BinaryLabelFilte
}
}

func (b *BinaryLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
func (b *BinaryLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
line, lok := b.Left.Process(line, lbs)
if !b.and && lok {
return line, true
Expand Down Expand Up @@ -110,8 +110,8 @@ func (b *BinaryLabelFilter) String() string {

type noopLabelFilter struct{}

func (noopLabelFilter) String() string { return "" }
func (noopLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) { return line, true }
func (noopLabelFilter) String() string { return "" }
func (noopLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) { return line, true }

// ReduceAndLabelFilter Reduces multiple label filterer into one using binary and operation.
func ReduceAndLabelFilter(filters []LabelFilterer) LabelFilterer {
Expand Down Expand Up @@ -144,19 +144,19 @@ func NewBytesLabelFilter(t LabelFilterType, name string, b uint64) *BytesLabelFi
}
}

func (d *BytesLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
if lbs.HasError() {
func (d *BytesLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
if lbs.HasErr() {
// if there's an error only the string matchers can filter it out.
return line, true
}
v, ok := lbs[d.Name]
v, ok := lbs.Get(d.Name)
if !ok {
// we have not found this label.
return line, false
}
value, err := humanize.ParseBytes(v)
if err != nil {
lbs.SetError(errLabelFilter)
lbs.SetErr(errLabelFilter)
return line, true
}
switch d.Type {
Expand All @@ -173,7 +173,7 @@ func (d *BytesLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
case LabelFilterLesserThanOrEqual:
return line, value <= d.Value
default:
lbs.SetError(errLabelFilter)
lbs.SetErr(errLabelFilter)
return line, true
}
}
Expand All @@ -198,19 +198,19 @@ func NewDurationLabelFilter(t LabelFilterType, name string, d time.Duration) *Du
}
}

func (d *DurationLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
if lbs.HasError() {
func (d *DurationLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
if lbs.HasErr() {
// if there's an error only the string matchers can filter out.
return line, true
}
v, ok := lbs[d.Name]
v, ok := lbs.Get(d.Name)
if !ok {
// we have not found this label.
return line, false
}
value, err := time.ParseDuration(v)
if err != nil {
lbs.SetError(errLabelFilter)
lbs.SetErr(errLabelFilter)
return line, true
}
switch d.Type {
Expand All @@ -227,7 +227,7 @@ func (d *DurationLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
case LabelFilterLesserThanOrEqual:
return line, value <= d.Value
default:
lbs.SetError(errLabelFilter)
lbs.SetErr(errLabelFilter)
return line, true
}
}
Expand All @@ -252,23 +252,19 @@ func NewNumericLabelFilter(t LabelFilterType, name string, v float64) *NumericLa
}
}

func (n *NumericLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
if lbs.HasError() {
func (n *NumericLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
if lbs.HasErr() {
// if there's an error only the string matchers can filter out.
return line, true
}
if lbs.HasError() {
// if there's an error only the string matchers can filter out.
return line, true
}
v, ok := lbs[n.Name]
v, ok := lbs.Get(n.Name)
if !ok {
// we have not found this label.
return line, false
}
value, err := strconv.ParseFloat(v, 64)
if err != nil {
lbs.SetError(errLabelFilter)
lbs.SetErr(errLabelFilter)
return line, true
}
switch n.Type {
Expand All @@ -285,7 +281,7 @@ func (n *NumericLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
case LabelFilterLesserThanOrEqual:
return line, value <= n.Value
default:
lbs.SetError(errLabelFilter)
lbs.SetErr(errLabelFilter)
return line, true
}

Expand All @@ -308,6 +304,10 @@ func NewStringLabelFilter(m *labels.Matcher) *StringLabelFilter {
}
}

func (s *StringLabelFilter) Process(line []byte, lbs Labels) ([]byte, bool) {
return line, s.Matches(lbs[s.Name])
func (s *StringLabelFilter) Process(line []byte, lbs *LabelsBuilder) ([]byte, bool) {
if s.Name == ErrorLabel {
return line, s.Matches(lbs.GetErr())
}
v, _ := lbs.Get(s.Name)
return line, s.Matches(v)
}
Loading

0 comments on commit dd5fceb

Please sign in to comment.