Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(detected_fields): return parsed labels when parsers are passed #14047

Merged
merged 5 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 72 additions & 17 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/v3/pkg/storage/stores/index/stats"
listutil "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/httpreq"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
util_validation "github.com/grafana/loki/v3/pkg/util/validation"

Expand Down Expand Up @@ -1075,6 +1076,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
if err != nil {
return nil, err
}
// just incject the header to categorize labels
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// just incject the header to categorize labels
// just inject the header to categorize labels

ctx = httpreq.InjectHeader(ctx, httpreq.LokiEncodingFlagsHeader, (string)(httpreq.FlagCategorizeLabels))
params := logql.SelectLogParams{
QueryRequest: &logproto.QueryRequest{
Start: req.Start,
Expand All @@ -1099,8 +1102,9 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
if err != nil {
return nil, err
}
parsers := getParsersFromExpr(expr)

detectedFields := parseDetectedFields(req.FieldLimit, streams)
detectedFields := parseDetectedFields(req.FieldLimit, streams, parsers)

fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
Expand Down Expand Up @@ -1128,21 +1132,44 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
}, nil
}

func getParsersFromExpr(expr syntax.LogSelectorExpr) []string {
parsers := make([]string, 0)
expr.Walk(func(e syntax.Expr) {
switch concrete := e.(type) {
case *syntax.LogfmtParserExpr, *syntax.LogfmtExpressionParser:
if !slices.Contains(parsers, "logfmt") {
parsers = append(parsers, "logfmt")
}
case *syntax.JSONExpressionParser:
if !slices.Contains(parsers, "json") {
parsers = append(parsers, "json")
}
case *syntax.LabelParserExpr:
if concrete.Op == syntax.OpParserTypeJSON {
if !slices.Contains(parsers, "json") {
parsers = append(parsers, "json")
}
}
}
// bail if we found both parsers
if len(parsers) == 2 {
return
}
})
return parsers
}

type parsedFields struct {
sketch *hyperloglog.Sketch
fieldType logproto.DetectedFieldType
parsers []string
}

func newParsedFields(parser *string) *parsedFields {
p := ""
if parser != nil {
p = *parser
}
func newParsedFields(parsers []string) *parsedFields {
return &parsedFields{
sketch: hyperloglog.New(),
fieldType: logproto.DetectedFieldString,
parsers: []string{p},
parsers: parsers,
}
}

Expand Down Expand Up @@ -1193,10 +1220,10 @@ func determineType(value string) logproto.DetectedFieldType {
return logproto.DetectedFieldString
}

func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
func parseDetectedFields(limit uint32, streams logqlmodel.Streams, queryParsers []string) map[string]*parsedFields {
detectedFields := make(map[string]*parsedFields, limit)
fieldCount := uint32(0)
emtpyparser := ""
emtpyparsers := []string{}

for _, stream := range streams {
streamLbls, err := syntax.ParseLabels(stream.Labels)
Expand All @@ -1209,7 +1236,7 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
for k, vals := range structuredMetadata {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
df = newParsedFields(&emtpyparser)
df = newParsedFields(emtpyparsers)
detectedFields[k] = df
fieldCount++
}
Expand All @@ -1231,11 +1258,15 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
}
}

detected, parser := parseLine(entry.Line, streamLbls)
for k, vals := range detected {
parsers := queryParsers
parsedLabels := getParsedLabels(entry)
if len(parsedLabels) == 0 {
parsedLabels, parsers = parseLine(entry.Line, streamLbls)
}
for k, vals := range parsedLabels {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
df = newParsedFields(parser)
df = newParsedFields(parsers)
detectedFields[k] = df
fieldCount++
}
Expand All @@ -1244,8 +1275,10 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
continue
}

if !slices.Contains(df.parsers, *parser) {
df.parsers = append(df.parsers, *parser)
for _, parser := range parsers {
if !slices.Contains(df.parsers, parser) {
df.parsers = append(df.parsers, parser)
}
}

detectType := true
Expand All @@ -1266,6 +1299,28 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
return detectedFields
}

func getParsedLabels(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.Parsed {
if values, ok := labels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
}
}

result := make(map[string][]string, len(labels))
for lbl, values := range labels {
vals := make([]string, 0, len(values))
for v := range values {
vals = append(vals, v)
}
result[lbl] = vals
}

return result
}

func getStructuredMetadata(entry push.Entry) map[string][]string {
labels := map[string]map[string]struct{}{}
for _, lbl := range entry.StructuredMetadata {
Expand All @@ -1288,7 +1343,7 @@ func getStructuredMetadata(entry push.Entry) map[string][]string {
return result
}

func parseLine(line string, streamLbls labels.Labels) (map[string][]string, *string) {
func parseLine(line string, streamLbls labels.Labels) (map[string][]string, []string) {
parser := "logfmt"
logFmtParser := logql_log.NewLogfmtParser(true, false)

Expand Down Expand Up @@ -1326,7 +1381,7 @@ func parseLine(line string, streamLbls labels.Labels) (map[string][]string, *str
result[lbl] = vals
}

return result, &parser
return result, []string{parser}
}

// streamsForFieldDetection reads the streams from the iterator and returns them sorted.
Expand Down
44 changes: 44 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2052,3 +2052,47 @@ func BenchmarkQuerierDetectedFields(b *testing.B) {
assert.NoError(b, err)
}
}

func Test_getParsersFromExpr(t *testing.T) {
t.Run("detects logfmt parser", func(t *testing.T) {
exprStr := `{foo="bar"} | logfmt`
expr, err := syntax.ParseLogSelector(exprStr, true)
require.NoError(t, err)
assert.Equal(t, []string{"logfmt"}, getParsersFromExpr(expr))
})

t.Run("detects json parser", func(t *testing.T) {
exprStr := `{foo="bar"} | json`
expr, err := syntax.ParseLogSelector(exprStr, true)
require.NoError(t, err)
assert.Equal(t, []string{"json"}, getParsersFromExpr(expr))
})

t.Run("detects multiple parsers", func(t *testing.T) {
exprStr := `{foo="bar"} | logfmt | json`
expr, err := syntax.ParseLogSelector(exprStr, true)
require.NoError(t, err)
assert.Equal(t, []string{"logfmt", "json"}, getParsersFromExpr(expr))
})

t.Run("detects logfmt expression parser", func(t *testing.T) {
exprStr := `{foo="bar"} | logfmt msg="message"`
expr, err := syntax.ParseLogSelector(exprStr, true)
require.NoError(t, err)
assert.Equal(t, []string{"logfmt"}, getParsersFromExpr(expr))
})

t.Run("detects json expression parser", func(t *testing.T) {
exprStr := `{foo="bar"} | json first_server="servers[0]"`
expr, err := syntax.ParseLogSelector(exprStr, true)
require.NoError(t, err)
assert.Equal(t, []string{"json"}, getParsersFromExpr(expr))
})

t.Run("detects multiple expression parsers", func(t *testing.T) {
exprStr := `{foo="bar"} | logfmt msg="message" | json first_server="servers[0]"`
expr, err := syntax.ParseLogSelector(exprStr, true)
require.NoError(t, err)
assert.Equal(t, []string{"logfmt", "json"}, getParsersFromExpr(expr))
})
}
Loading