diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 9597f7b525b9b..88005ca04530b 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -70,7 +70,7 @@ func TestDistributor(t *testing.T) { lines: 100, mangleLabels: true, expectedResponse: success, - expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: parse error at line 1, col 4: literal not terminated"), + expectedError: httpgrpc.Errorf(http.StatusBadRequest, "error parsing labels: 1:4: parse error: unterminated quoted string"), }, } { t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) { diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 73ebb23b71900..d1670e4c29047 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -153,7 +153,7 @@ func (t *tailer) processStream(stream logproto.Stream) ([]logproto.Stream, error defer t.pipelineMtx.Unlock() streams := map[uint64]*logproto.Stream{} - lbs, err := util.ParseLabels(stream.Labels) + lbs, err := logql.ParseLabels(stream.Labels) if err != nil { return nil, err } diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index af161d94f47ff..6885a64ae8d19 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -8,14 +8,13 @@ import ( "sort" "time" - "github.com/go-kit/kit/log/level" - "github.com/prometheus/prometheus/promql/parser" - "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" + promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" @@ -46,7 +45,7 @@ func (streams Streams) Less(i, j int) bool { } // Type implements `promql.Value` -func (Streams) Type() parser.ValueType { return ValueTypeStreams } +func (Streams) Type() promql_parser.ValueType { return ValueTypeStreams } // String implements `promql.Value` func (Streams) String() string { @@ -55,7 +54,7 @@ func (Streams) String() string { // Result is the result of a query execution. type Result struct { - Data parser.Value + Data promql_parser.Value Statistics stats.Result } @@ -161,7 +160,7 @@ func (q *query) Exec(ctx context.Context) (Result, error) { }, err } -func (q *query) Eval(ctx context.Context) (parser.Value, error) { +func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { ctx, cancel := context.WithTimeout(ctx, q.timeout) defer cancel() @@ -190,7 +189,7 @@ func (q *query) Eval(ctx context.Context) (parser.Value, error) { } // evalSample evaluate a sampleExpr -func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value, error) { +func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.Value, error) { if lit, ok := expr.(*literalExpr); ok { return q.evalLiteral(ctx, lit) } @@ -254,7 +253,7 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (parser.Value, return result, stepEvaluator.Error() } -func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (parser.Value, error) { +func (q *query) evalLiteral(_ context.Context, expr *literalExpr) (promql_parser.Value, error) { s := promql.Scalar{ T: q.params.Start().UnixNano() / int64(time.Millisecond), V: expr.value, diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index 5acd40eb762e5..a7de24b7abf7c 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -13,7 +13,7 @@ import ( json "github.com/json-iterator/go" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" + promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,7 +41,7 @@ func TestEngine_LogsInstantQuery(t *testing.T) { data interface{} params interface{} - expected parser.Value + expected promql_parser.Value }{ { `{app="foo"}`, time.Unix(30, 0), logproto.FORWARD, 10, @@ -493,7 +493,7 @@ func TestEngine_RangeQuery(t *testing.T) { data interface{} params interface{} - expected parser.Value + expected promql_parser.Value }{ { `{app="foo"}`, time.Unix(0, 0), time.Unix(30, 0), time.Second, 0, logproto.FORWARD, 10, @@ -1649,7 +1649,7 @@ func BenchmarkRangeQuery1000000(b *testing.B) { benchmarkRangeQuery(int64(1000000), b) } -var result parser.Value +var result promql_parser.Value func benchmarkRangeQuery(testsize int64, b *testing.B) { b.ReportAllocs() diff --git a/pkg/logql/expr.y b/pkg/logql/expr.y index e43e15f484f9a..9b1d8dd7eff96 100644 --- a/pkg/logql/expr.y +++ b/pkg/logql/expr.y @@ -101,7 +101,7 @@ import ( %% -root: expr { exprlex.(*lexer).expr = $1 }; +root: expr { exprlex.(*parser).expr = $1 }; expr: logExpr { $$ = $1 } diff --git a/pkg/logql/expr.y.go b/pkg/logql/expr.y.go index 5bdaf8120b7f2..887211767025d 100644 --- a/pkg/logql/expr.y.go +++ b/pkg/logql/expr.y.go @@ -4,6 +4,7 @@ package logql import __yyfmt__ "fmt" + import ( "github.com/grafana/loki/pkg/logql/log" "github.com/prometheus/prometheus/pkg/labels" @@ -200,6 +201,7 @@ const exprEofCode = 1 const exprErrCode = 2 const exprInitialStackSize = 16 + var exprExca = [...]int{ -1, 1, 1, -1, @@ -408,6 +410,7 @@ var exprErrorMessages = [...]struct { msg string }{} + /* parser for yacc output */ var ( @@ -740,7 +743,7 @@ exprdefault: case 1: exprDollar = exprS[exprpt-1 : exprpt+1] { - exprlex.(*lexer).expr = exprDollar[1].Expr + exprlex.(*parser).expr = exprDollar[1].Expr } case 2: exprDollar = exprS[exprpt-1 : exprpt+1] diff --git a/pkg/logql/lex.go b/pkg/logql/lex.go index c977d008d81f3..ccc5c1a7d4639 100644 --- a/pkg/logql/lex.go +++ b/pkg/logql/lex.go @@ -1,7 +1,6 @@ package logql import ( - "strconv" "strings" "text/scanner" "time" @@ -9,6 +8,7 @@ import ( "github.com/dustin/go-humanize" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/util/strutil" ) var tokens = map[string]int{ @@ -93,9 +93,7 @@ var functionTokens = map[string]int{ type lexer struct { scanner.Scanner - errs []ParseError - expr Expr - parser *exprParserImpl + errs []ParseError } func (l *lexer) Lex(lval *exprSymType) int { @@ -124,7 +122,7 @@ func (l *lexer) Lex(lval *exprSymType) int { case scanner.String, scanner.RawString: var err error - lval.str, err = strconv.Unquote(l.TokenText()) + lval.str, err = strutil.Unquote(l.TokenText()) if err != nil { l.Error(err.Error()) return 0 @@ -133,7 +131,7 @@ func (l *lexer) Lex(lval *exprSymType) int { } // scanning duration tokens - if l.TokenText() == "[" { + if r == '[' { d := "" for r := l.Next(); r != scanner.EOF; r = l.Next() { if string(r) == "]" { @@ -151,7 +149,9 @@ func (l *lexer) Lex(lval *exprSymType) int { return 0 } - if tok, ok := functionTokens[l.TokenText()+string(l.Peek())]; ok { + tokenText := l.TokenText() + tokenNext := tokenText + string(l.Peek()) + if tok, ok := functionTokens[tokenNext]; ok { // create a copy to advance to the entire token for testing suffix sc := l.Scanner sc.Next() @@ -161,20 +161,20 @@ func (l *lexer) Lex(lval *exprSymType) int { } } - if tok, ok := functionTokens[l.TokenText()]; ok && isFunction(l.Scanner) { + if tok, ok := functionTokens[tokenText]; ok && isFunction(l.Scanner) { return tok } - if tok, ok := tokens[l.TokenText()+string(l.Peek())]; ok { + if tok, ok := tokens[tokenNext]; ok { l.Next() return tok } - if tok, ok := tokens[l.TokenText()]; ok { + if tok, ok := tokens[tokenText]; ok { return tok } - lval.str = l.TokenText() + lval.str = tokenText return IDENTIFIER } diff --git a/pkg/logql/parser.go b/pkg/logql/parser.go index 135cfeea14851..9556395a9f098 100644 --- a/pkg/logql/parser.go +++ b/pkg/logql/parser.go @@ -2,12 +2,26 @@ package logql import ( "errors" + "sort" "strings" + "sync" "text/scanner" "github.com/prometheus/prometheus/pkg/labels" + promql_parser "github.com/prometheus/prometheus/promql/parser" ) +var parserPool = sync.Pool{ + New: func() interface{} { + p := &parser{ + p: &exprParserImpl{}, + Reader: strings.NewReader(""), + lexer: &lexer{}, + } + return p + }, +} + func init() { // Improve the error messages coming out of yacc. exprErrorVerbose = true @@ -18,11 +32,29 @@ func init() { } } +type parser struct { + p *exprParserImpl + *lexer + expr Expr + *strings.Reader +} + +func (p *parser) Parse() (Expr, error) { + p.lexer.errs = p.lexer.errs[:0] + p.lexer.Scanner.Error = func(_ *scanner.Scanner, msg string) { + p.lexer.Error(msg) + } + e := p.p.Parse(p) + if e != 0 || len(p.lexer.errs) > 0 { + return nil, p.lexer.errs[0] + } + return p.expr, nil +} + // ParseExpr parses a string and returns an Expr. func ParseExpr(input string) (expr Expr, err error) { defer func() { - r := recover() - if r != nil { + if r := recover(); r != nil { var ok bool if err, ok = r.(error); ok { if IsParseError(err) { @@ -32,18 +64,12 @@ func ParseExpr(input string) (expr Expr, err error) { } } }() - l := lexer{ - parser: exprNewParser().(*exprParserImpl), - } - l.Init(strings.NewReader(input)) - l.Scanner.Error = func(_ *scanner.Scanner, msg string) { - l.Error(msg) - } - e := l.parser.Parse(&l) - if e != 0 || len(l.errs) > 0 { - return nil, l.errs[0] - } - return l.expr, nil + p := parserPool.Get().(*parser) + defer parserPool.Put(p) + + p.Reader.Reset(input) + p.lexer.Init(p.Reader) + return p.Parse() } // ParseMatchers parses a string and returns labels matchers, if the expression contains @@ -85,3 +111,13 @@ func ParseLogSelector(input string) (LogSelectorExpr, error) { } return logSelector, nil } + +// ParseLabels parses labels from a string using logql parser. +func ParseLabels(lbs string) (labels.Labels, error) { + ls, err := promql_parser.ParseMetric(lbs) + if err != nil { + return nil, err + } + sort.Sort(ls) + return ls, nil +} diff --git a/pkg/logql/parser_test.go b/pkg/logql/parser_test.go index ffbabe2506fbe..ee3a02ea8bb9e 100644 --- a/pkg/logql/parser_test.go +++ b/pkg/logql/parser_test.go @@ -2000,3 +2000,33 @@ func Test_PipelineCombined(t *testing.T) { ) require.Equal(t, string([]byte(`1.5s|POST|200`)), string(line)) } + +var c []*labels.Matcher + +func Benchmark_ParseMatchers(b *testing.B) { + s := `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}` + var err error + for n := 0; n < b.N; n++ { + c, err = ParseMatchers(s) + require.NoError(b, err) + } +} + +var lbs labels.Labels + +func Benchmark_CompareParseLabels(b *testing.B) { + s := `{cpu="10",endpoint="https",instance="10.253.57.87:9100",job="node-exporter",mode="idle",namespace="observability",pod="node-exporter-l454v",service="node-exporter"}` + var err error + b.Run("logql", func(b *testing.B) { + for n := 0; n < b.N; n++ { + c, err = ParseMatchers(s) + require.NoError(b, err) + } + }) + b.Run("promql", func(b *testing.B) { + for n := 0; n < b.N; n++ { + lbs, err = ParseLabels(s) + require.NoError(b, err) + } + }) +} diff --git a/pkg/logql/range_vector.go b/pkg/logql/range_vector.go index 2f25e3a162791..dc6ad42d80246 100644 --- a/pkg/logql/range_vector.go +++ b/pkg/logql/range_vector.go @@ -5,7 +5,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" + promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/iter" ) @@ -117,7 +117,7 @@ func (r *rangeVectorIterator) load(start, end int64) { var metric labels.Labels if metric, ok = r.metrics[lbs]; !ok { var err error - metric, err = parser.ParseMetric(lbs) + metric, err = promql_parser.ParseMetric(lbs) if err != nil { continue } diff --git a/pkg/logql/range_vector_test.go b/pkg/logql/range_vector_test.go index 18e166ae8bea1..24f29b796d2c6 100644 --- a/pkg/logql/range_vector_test.go +++ b/pkg/logql/range_vector_test.go @@ -7,7 +7,7 @@ import ( "time" "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/promql/parser" + promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/iter" @@ -28,8 +28,8 @@ var samples = []logproto.Sample{ {Timestamp: time.Unix(100, 1).UnixNano(), Hash: 11, Value: 1.}, } -var labelFoo, _ = parser.ParseMetric("{app=\"foo\"}") -var labelBar, _ = parser.ParseMetric("{app=\"bar\"}") +var labelFoo, _ = promql_parser.ParseMetric("{app=\"foo\"}") +var labelBar, _ = promql_parser.ParseMetric("{app=\"bar\"}") func newSampleIterator() iter.SampleIterator { return iter.NewHeapSampleIterator(context.Background(), []iter.SampleIterator{ diff --git a/pkg/logql/test_utils.go b/pkg/logql/test_utils.go index ea26c8127b1d1..c15970bcf726d 100644 --- a/pkg/logql/test_utils.go +++ b/pkg/logql/test_utils.go @@ -10,7 +10,7 @@ import ( "github.com/cespare/xxhash/v2" "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/prometheus/prometheus/pkg/labels" - "github.com/prometheus/prometheus/promql/parser" + promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -271,7 +271,7 @@ func randomStreams(nStreams, nEntries, nShards int, labelNames []string) (stream } func mustParseLabels(s string) labels.Labels { - labels, err := parser.ParseMetric(s) + labels, err := promql_parser.ParseMetric(s) if err != nil { logger.Fatalf("Failed to parse %s", s) } diff --git a/pkg/promtail/targets/lokipush/pushtarget.go b/pkg/promtail/targets/lokipush/pushtarget.go index ab64646f75e70..dc91720f39b81 100644 --- a/pkg/promtail/targets/lokipush/pushtarget.go +++ b/pkg/promtail/targets/lokipush/pushtarget.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/cortexproject/cortex/pkg/util" + cortex_util "github.com/cortexproject/cortex/pkg/util" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/imdario/mergo" @@ -80,7 +80,7 @@ func (t *PushTarget) run() error { // We don't want the /debug and /metrics endpoints running t.config.Server.RegisterInstrumentation = false - util.InitLogger(&t.config.Server) + cortex_util.InitLogger(&t.config.Server) srv, err := server.New(t.config.Server) if err != nil { @@ -109,18 +109,13 @@ func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) { } var lastErr error for _, stream := range req.Streams { - matchers, err := logql.ParseMatchers(stream.Labels) + ls, err := logql.ParseLabels(stream.Labels) if err != nil { lastErr = err continue } - lb := labels.NewBuilder(make(labels.Labels, 0, len(matchers)+len(t.config.Labels))) - - // Add stream labels - for i := range matchers { - lb.Set(matchers[i].Name, matchers[i].Value) - } + lb := labels.NewBuilder(ls) // Add configured labels for k, v := range t.config.Labels { diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index 4fb0df56d03fb..0a086b3368128 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -22,8 +22,8 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" lstore "github.com/grafana/loki/pkg/storage" - "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/validation" ) @@ -100,14 +100,14 @@ func fillStore() error { wgPush.Add(1) go func(j int) { defer wgPush.Done() - lbs, err := util.ToClientLabels(fmt.Sprintf("{foo=\"bar\",level=\"%d\"}", j)) + lbs, err := logql.ParseLabels(fmt.Sprintf("{foo=\"bar\",level=\"%d\"}", j)) if err != nil { panic(err) } - labelsBuilder := labels.NewBuilder(client.FromLabelAdaptersToLabels(lbs)) + labelsBuilder := labels.NewBuilder(lbs) labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() - fp := client.FastFingerprint(lbs) + fp := client.Fingerprint(lbs) chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, 262144, 1572864) for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { entry := &logproto.Entry{ diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 95af65e1fdeec..dd480d805cf57 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/util" ) var fooLabelsWithName = "{foo=\"bar\", __name__=\"logs\"}" @@ -88,15 +87,14 @@ func newLazyInvalidChunk(stream logproto.Stream) *LazyChunk { } func newChunk(stream logproto.Stream) chunk.Chunk { - lbs, err := util.ToClientLabels(stream.Labels) + lbs, err := logql.ParseLabels(stream.Labels) if err != nil { panic(err) } - l := client.FromLabelAdaptersToLabels(lbs) - if !l.Has(labels.MetricName) { - builder := labels.NewBuilder(l) + if !lbs.Has(labels.MetricName) { + builder := labels.NewBuilder(lbs) builder.Set(labels.MetricName, "logs") - l = builder.Labels() + lbs = builder.Labels() } from, through := model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()), model.TimeFromUnixNano(stream.Entries[0].Timestamp.UnixNano()) chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) @@ -110,7 +108,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk { _ = chk.Append(&e) } chk.Close() - c := chunk.NewChunk("fake", client.Fingerprint(l), l, chunkenc.NewFacade(chk, 0, 0), from, through) + c := chunk.NewChunk("fake", client.Fingerprint(lbs), lbs, chunkenc.NewFacade(chk, 0, 0), from, through) // force the checksum creation if err := c.Encode(); err != nil { panic(err) diff --git a/pkg/util/conv.go b/pkg/util/conv.go index 0fec5e392ad3f..39a49020657bd 100644 --- a/pkg/util/conv.go +++ b/pkg/util/conv.go @@ -3,56 +3,23 @@ package util import ( "math" "sort" - "strings" "time" "unsafe" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - - "github.com/grafana/loki/pkg/logql" + "github.com/prometheus/prometheus/promql/parser" ) -type byLabel []client.LabelAdapter - -func (s byLabel) Len() int { return len(s) } -func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 } -func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - // ToClientLabels parses the labels and converts them to the Cortex type. func ToClientLabels(labels string) ([]client.LabelAdapter, error) { - matchers, err := logql.ParseMatchers(labels) + ls, err := parser.ParseMetric(labels) if err != nil { return nil, err } - result := make([]client.LabelAdapter, 0, len(matchers)) - for _, m := range matchers { - result = append(result, client.LabelAdapter{ - Name: m.Name, - Value: m.Value, - }) - } - sort.Sort(byLabel(result)) - return result, nil -} - -// ParseLabels parses labels from a string using logql parser. -func ParseLabels(lbs string) (labels.Labels, error) { - matchers, err := logql.ParseMatchers(lbs) - if err != nil { - return nil, err - } - - result := make(labels.Labels, 0, len(matchers)) - for _, m := range matchers { - result = append(result, labels.Label{ - Name: m.Name, - Value: m.Value, - }) - } - sort.Sort(result) - return result, nil + sort.Sort(ls) + return client.FromLabelsToLabelAdapters(ls), nil } // ModelLabelSetToMap convert a model.LabelSet to a map[string]string