Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logql parser allocations. #2927

Merged
merged 2 commits into from
Nov 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestDistributor(t *testing.T) {
lines: 100,
mangleLabels: true,
expectedResponse: success,
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: parse error at line 1, col 4: literal not terminated"),
expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: 1:4: parse error: unterminated quoted string"),
},
} {
t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error
defer t.pipelineMtx.Unlock()

streams := map[uint64]*logproto.Stream{}
lbs, err := util.ParseLabels(stream.Labels)
lbs, err := logql.ParseLabels(stream.Labels)
if err != nil {
return nil, err
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ import (
"sort"
"time"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/prometheus/promql/parser"

"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
Expand Down Expand Up @@ -46,7 +45,7 @@ func (streams Streams) Less(i, j int) bool {
}

// Type implements `promql.Value`
func (Streams) Type() parser.ValueType { return ValueTypeStreams }
func (Streams) Type() promql_parser.ValueType { return ValueTypeStreams }

// String implements `promql.Value`
func (Streams) String() string {
Expand All @@ -55,7 +54,7 @@ func (Streams) String() string {

// Result is the result of a query execution.
type Result struct {
Data parser.Value
Data promql_parser.Value
Statistics stats.Result
}

Expand Down Expand Up @@ -161,7 +160,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) {
}, err
}

func (q *query) Eval(ctx context.Context) (parser.Value, error) {
func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) {
ctx, cancel := context.WithTimeout(ctx, q.timeout)
defer cancel()

Expand Down Expand Up @@ -190,7 +189,7 @@ func (q *query) Eval(ctx context.Context) (parser.Value, error) {
}

// evalSample evaluate a sampleExpr
func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value, error) {
func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.Value, error) {
if lit, ok := expr.(*literalExpr); ok {
return q.evalLiteral(ctx, lit)
}
Expand Down Expand Up @@ -254,7 +253,7 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value,
return result, stepEvaluator.Error()
}

func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (parser.Value, error) {
func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql_parser.Value, error) {
s := promql.Scalar{
T: q.params.Start().UnixNano() / int64(time.Millisecond),
V: expr.value,
Expand Down
8 changes: 4 additions & 4 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
json "github.com/json-iterator/go"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -41,7 +41,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) {
data interface{}
params interface{}

expected parser.Value
expected promql_parser.Value
}{
{
`{app="foo"}`, time.Unix(30, 0), logproto.FORWARD, 10,
Expand Down Expand Up @@ -493,7 +493,7 @@ func TestEngine_RangeQuery(t *testing.T) {
data interface{}
params interface{}

expected parser.Value
expected promql_parser.Value
}{
{
`{app="foo"}`, time.Unix(0, 0), time.Unix(30, 0), time.Second, 0, logproto.FORWARD, 10,
Expand Down Expand Up @@ -1649,7 +1649,7 @@ func BenchmarkRangeQuery1000000(b *testing.B) {
benchmarkRangeQuery(int64(1000000), b)
}

var result parser.Value
var result promql_parser.Value

func benchmarkRangeQuery(testsize int64, b *testing.B) {
b.ReportAllocs()
Expand Down
2 changes: 1 addition & 1 deletion pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ import (

%%

root: expr { exprlex.(*lexer).expr = $1 };
root: expr { exprlex.(*parser).expr = $1 };

expr:
logExpr { $$ = $1 }
Expand Down
5 changes: 4 additions & 1 deletion pkg/logql/expr.y.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions pkg/logql/lex.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package logql

import (
"strconv"
"strings"
"text/scanner"
"time"
"unicode"

"github.com/dustin/go-humanize"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/util/strutil"
)

var tokens = map[string]int{
Expand Down Expand Up @@ -93,9 +93,7 @@ var functionTokens = map[string]int{

type lexer struct {
scanner.Scanner
errs []ParseError
expr Expr
parser *exprParserImpl
errs []ParseError
}

func (l *lexer) Lex(lval *exprSymType) int {
Expand Down Expand Up @@ -124,7 +122,7 @@ func (l *lexer) Lex(lval *exprSymType) int {

case scanner.String, scanner.RawString:
var err error
lval.str, err = strconv.Unquote(l.TokenText())
lval.str, err = strutil.Unquote(l.TokenText())
if err != nil {
l.Error(err.Error())
return 0
Expand All @@ -133,7 +131,7 @@ func (l *lexer) Lex(lval *exprSymType) int {
}

// scanning duration tokens
if l.TokenText() == "[" {
if r == '[' {
d := ""
for r := l.Next(); r != scanner.EOF; r = l.Next() {
if string(r) == "]" {
Expand All @@ -151,7 +149,9 @@ func (l *lexer) Lex(lval *exprSymType) int {
return 0
}

if tok, ok := functionTokens[l.TokenText()+string(l.Peek())]; ok {
tokenText := l.TokenText()
tokenNext := tokenText + string(l.Peek())
if tok, ok := functionTokens[tokenNext]; ok {
// create a copy to advance to the entire token for testing suffix
sc := l.Scanner
sc.Next()
Expand All @@ -161,20 +161,20 @@ func (l *lexer) Lex(lval *exprSymType) int {
}
}

if tok, ok := functionTokens[l.TokenText()]; ok && isFunction(l.Scanner) {
if tok, ok := functionTokens[tokenText]; ok && isFunction(l.Scanner) {
return tok
}

if tok, ok := tokens[l.TokenText()+string(l.Peek())]; ok {
if tok, ok := tokens[tokenNext]; ok {
l.Next()
return tok
}

if tok, ok := tokens[l.TokenText()]; ok {
if tok, ok := tokens[tokenText]; ok {
return tok
}

lval.str = l.TokenText()
lval.str = tokenText
return IDENTIFIER
}

Expand Down
64 changes: 50 additions & 14 deletions pkg/logql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,26 @@ package logql

import (
"errors"
"sort"
"strings"
"sync"
"text/scanner"

"github.com/prometheus/prometheus/pkg/labels"
promql_parser "github.com/prometheus/prometheus/promql/parser"
)

var parserPool = sync.Pool{
New: func() interface{} {
p := &parser{
p: &exprParserImpl{},
Reader: strings.NewReader(""),
lexer: &lexer{},
}
return p
},
}

func init() {
// Improve the error messages coming out of yacc.
exprErrorVerbose = true
Expand All @@ -18,11 +32,29 @@ func init() {
}
}

type parser struct {
p *exprParserImpl
*lexer
expr Expr
*strings.Reader
}

func (p *parser) Parse() (Expr, error) {
p.lexer.errs = p.lexer.errs[:0]
p.lexer.Scanner.Error = func(_ *scanner.Scanner, msg string) {
p.lexer.Error(msg)
}
e := p.p.Parse(p)
if e != 0 || len(p.lexer.errs) > 0 {
return nil, p.lexer.errs[0]
}
return p.expr, nil
}

// ParseExpr parses a string and returns an Expr.
func ParseExpr(input string) (expr Expr, err error) {
defer func() {
r := recover()
if r != nil {
if r := recover(); r != nil {
var ok bool
if err, ok = r.(error); ok {
if IsParseError(err) {
Expand All @@ -32,18 +64,12 @@ func ParseExpr(input string) (expr Expr, err error) {
}
}
}()
l := lexer{
parser: exprNewParser().(*exprParserImpl),
}
l.Init(strings.NewReader(input))
l.Scanner.Error = func(_ *scanner.Scanner, msg string) {
l.Error(msg)
}
e := l.parser.Parse(&l)
if e != 0 || len(l.errs) > 0 {
return nil, l.errs[0]
}
return l.expr, nil
p := parserPool.Get().(*parser)
defer parserPool.Put(p)

p.Reader.Reset(input)
p.lexer.Init(p.Reader)
return p.Parse()
}

// ParseMatchers parses a string and returns labels matchers, if the expression contains
Expand Down Expand Up @@ -85,3 +111,13 @@ func ParseLogSelector(input string) (LogSelectorExpr, error) {
}
return logSelector, nil
}

// ParseLabels parses labels from a string using logql parser.
func ParseLabels(lbs string) (labels.Labels, error) {
ls, err := promql_parser.ParseMetric(lbs)
if err != nil {
return nil, err
}
sort.Sort(ls)
return ls, nil
}
30 changes: 30 additions & 0 deletions pkg/logql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2000,3 +2000,33 @@ func Test_PipelineCombined(t *testing.T) {
)
require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line))
}

var c []*labels.Matcher

func Benchmark_ParseMatchers(b *testing.B) {
s := `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`
var err error
for n := 0; n < b.N; n++ {
c, err = ParseMatchers(s)
require.NoError(b, err)
}
}

var lbs labels.Labels

func Benchmark_CompareParseLabels(b *testing.B) {
s := `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}`
var err error
b.Run("logql", func(b *testing.B) {
for n := 0; n < b.N; n++ {
c, err = ParseMatchers(s)
require.NoError(b, err)
}
})
b.Run("promql", func(b *testing.B) {
for n := 0; n < b.N; n++ {
lbs, err = ParseLabels(s)
require.NoError(b, err)
}
})
}
4 changes: 2 additions & 2 deletions pkg/logql/range_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
promql_parser "github.com/prometheus/prometheus/promql/parser"

"github.com/grafana/loki/pkg/iter"
)
Expand Down Expand Up @@ -117,7 +117,7 @@ func (r *rangeVectorIterator) load(start, end int64) {
var metric labels.Labels
if metric, ok = r.metrics[lbs]; !ok {
var err error
metric, err = parser.ParseMetric(lbs)
metric, err = promql_parser.ParseMetric(lbs)
if err != nil {
continue
}
Expand Down
Loading