Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a unpack parser. #3420

Merged
merged 3 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions docs/sources/clients/promtail/stages/pack.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,32 @@ This would create a log line
}
```

Loki 2.0 has some tools to make querying packed log lines easier as well.
**Loki 2.2 also includes a new [`unpack`](../../../../logql/#unpack) parser to work with the pack stage.**

Display the log line as if it were never packed:
For example:

```logql
{cluster="us-central1", job="myjob"} | unpack
```
{cluster="us-central1", job="myjob"} | json | line_format "{{._entry}}"
```

Will automatically unpack embedded labels and log line and replace the log line with the original log line automatically.

### More Examples

Use the packed labels for filtering:

```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}"
```logql
{cluster="us-central1", job="myjob"} | unpack | container="myapp"
```

You can even use the `json` parser twice if your original message was json:

```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | json | val_from_original_log_json="foo"
```logql
{cluster="us-central1", job="myjob"} | unpack | container="myapp" | json | val_from_original_log_json="foo"
```

Or any other parser

```
{cluster="us-central1", job="myjob"} | json | container="myapp" | line_format "{{._entry}}" | logfmt | val_from_original_log_json="foo"
```logql
{cluster="us-central1", job="myjob"} | unpack | container="myapp" | logfmt | val_from_original_log_json="foo"
```
29 changes: 27 additions & 2 deletions docs/sources/logql/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ In case of errors, for instance if the line is not in the expected format, the l

If an extracted label key name already exists in the original log stream, the extracted label key will be suffixed with the `_extracted` keyword to make the distinction between the two labels. You can forcefully override the original label using a [label formatter expression](#labels-format-expression). However if an extracted key appears twice, only the latest label value will be kept.

We support currently support json, logfmt and regexp parsers.
We support currently support [json](#json), [logfmt](#logfmt), [regexp](#regexp) and [unpack](#unpack) parsers.

It's easier to use the predefined parsers like `json` and `logfmt` when you can, falling back to `regexp` when the log lines have unusual structure. Multiple parsers can be used during the same log pipeline which is useful when you want to parse complex logs. ([see examples](#multiple-parsers))

##### Json

The **json** parser operates in two modes:

Expand Down Expand Up @@ -240,6 +244,8 @@ The **json** parser operates in two modes:
"headers" => `{"Accept": "*/*", "User-Agent": "curl/7.68.0"}`
```

##### logfmt

The **logfmt** parser can be added using the `| logfmt` and will extract all keys and values from the [logfmt](https://brandur.org/logfmt) formatted log line.

For example the following log line:
Expand All @@ -260,6 +266,8 @@ will get those labels extracted:
"status" => "200"
```

##### regexp

Unlike the logfmt and json, which extract implicitly all values and takes no parameters, the **regexp** parser takes a single parameter `| regexp "<re>"` which is the regular expression using the [Golang](https://golang.org/) [RE2 syntax](https://github.com/google/re2/wiki/Syntax).

The regular expression must contain a least one named sub-match (e.g `(?P<name>re)`), each sub-match will extract a different label.
Expand All @@ -279,7 +287,24 @@ those labels:
"duration" => "1.5s"
```

It's easier to use the predefined parsers like `json` and `logfmt` when you can, falling back to `regexp` when the log lines have unusual structure. Multiple parsers can be used during the same log pipeline which is useful when you want to parse complex logs. ([see examples](#multiple-parsers))
##### unpack

The `unpack` parser will parse a json log line, and unpack all embedded labels via the [`pack`](../clients/promtail/stages/pack/) stage.
**A special property `_entry` will also be used to replace the original log line**.

For example, using `| unpack` with the following log line:

```json
{
"container": "myapp",
"pod": "pod-3223f",
"_entry": "original log message"
}
```

allows to extract the `container` and `pod` labels and the `original log message` as the new log line.

> You can combine `unpack` with `json` parser (or any other parsers) if the original embedded log line is specific format.

#### Label Filter Expression

Expand Down
9 changes: 3 additions & 6 deletions pkg/logentry/stages/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

const (
entryKey = "_entry"
logql_log "github.com/grafana/loki/pkg/logql/log"
)

var (
Expand All @@ -40,7 +38,7 @@ func (w *Packed) UnmarshalJSON(data []byte) error {
w.Labels = map[string]string{}
for k, v := range *m {
// _entry key goes to the Entry field, everything else becomes a label
if k == entryKey {
if k == logql_log.PackedEntryKey {
if s, ok := v.(string); ok {
w.Entry = s
} else {
Expand All @@ -59,7 +57,6 @@ func (w *Packed) UnmarshalJSON(data []byte) error {

// MarshalJSON creates a Packed struct as JSON where the Labels are flattened into the top level of the object
func (w Packed) MarshalJSON() ([]byte, error) {

// Marshal the entry to properly escape if it's json or contains quotes
b, err := json.Marshal(w.Entry)
if err != nil {
Expand Down Expand Up @@ -101,7 +98,7 @@ func (w Packed) MarshalJSON() ([]byte, error) {
buf.WriteString(",")
}
// Add the line entry
buf.WriteString("\"" + entryKey + "\":")
buf.WriteString("\"" + logql_log.PackedEntryKey + "\":")
buf.Write(b)

buf.WriteString("}")
Expand Down
16 changes: 8 additions & 8 deletions pkg/logentry/stages/pack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
ww "github.com/weaveworks/common/server"

"github.com/grafana/loki/pkg/logproto"
logql_log "github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/promtail/api"
)

Expand Down Expand Up @@ -139,7 +140,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
Expand Down Expand Up @@ -170,7 +171,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
Expand Down Expand Up @@ -199,7 +200,7 @@ func Test_packStage_Run(t *testing.T) {
Labels: model.LabelSet{},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"bar\":\"baz\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"bar\":\"baz\",\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
Expand Down Expand Up @@ -233,7 +234,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"extr1\":\"etr1val\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"extr1\":\"etr1val\",\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
Expand Down Expand Up @@ -267,7 +268,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
Expand Down Expand Up @@ -301,7 +302,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "{\"ex\\\"tr2\":\"\\\"fd\\\"\",\"foo\":\"bar\",\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"ex\\\"tr2\":\"\\\"fd\\\"\",\"foo\":\"bar\",\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
Expand Down Expand Up @@ -333,7 +334,7 @@ func Test_packStage_Run(t *testing.T) {
},
Entry: logproto.Entry{
Timestamp: time.Unix(1, 0), // Ignored in test execution below
Line: "{\"" + entryKey + "\":\"test line 1\"}",
Line: "{\"" + logql_log.PackedEntryKey + "\":\"test line 1\"}",
},
},
},
Expand Down Expand Up @@ -362,7 +363,6 @@ func Test_packStage_Run(t *testing.T) {
} else {
assert.Equal(t, tt.expectedEntry.Timestamp, out[0].Timestamp)
}

})
}
}
3 changes: 3 additions & 0 deletions pkg/logql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ func (e *labelParserExpr) Stage() (log.Stage, error) {
return log.NewLogfmtParser(), nil
case OpParserTypeRegexp:
return log.NewRegexpParser(e.param)
case OpParserTypeUnpack:
return log.NewUnpackParser(), nil
default:
return nil, fmt.Errorf("unknown parser operator: %s", e.op)
}
Expand Down Expand Up @@ -573,6 +575,7 @@ const (
OpParserTypeJSON = "json"
OpParserTypeLogfmt = "logfmt"
OpParserTypeRegexp = "regexp"
OpParserTypeUnpack = "unpack"

OpFmtLine = "line_format"
OpFmtLabel = "label_format"
Expand Down
3 changes: 3 additions & 0 deletions pkg/logql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func Test_logSelectorExpr_String(t *testing.T) {
{`{foo="bar", bar!="baz"} |~ "" |= "" |~ ".*"`, false},
{`{foo="bar", bar!="baz"} != "bip" !~ ".+bop" | json`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | logfmt`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | unpack | foo>5`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | logfmt | b>=10GB`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | regexp "(?P<foo>foo|bar)"`, true},
{`{foo="bar"} |= "baz" |~ "blip" != "flip" !~ "flap" | regexp "(?P<foo>foo|bar)" | ( ( foo<5.01 , bar>20ms ) or foo="bar" ) | line_format "blip{{.boop}}bap" | label_format foo=bar,bar="blip{{.blop}}"`, true},
Expand Down Expand Up @@ -64,6 +65,7 @@ func Test_SampleExpr_String(t *testing.T) {
`sum(count_over_time({job="mysql"}[5m]))`,
`sum(count_over_time({job="mysql"} | json [5m]))`,
`sum(count_over_time({job="mysql"} | logfmt [5m]))`,
`sum(count_over_time({job="mysql"} | unpack | json [5m]))`,
`sum(count_over_time({job="mysql"} | regexp "(?P<foo>foo|bar)" [5m]))`,
`topk(10,sum(rate({region="us-east1"}[5m])) by (name))`,
`topk by (name)(10,sum(rate({region="us-east1"}[5m])))`,
Expand Down Expand Up @@ -328,6 +330,7 @@ func Test_parserExpr_Parser(t *testing.T) {
wantErr bool
}{
{"json", OpParserTypeJSON, "", log.NewJSONParser(), false},
{"unpack", OpParserTypeUnpack, "", log.NewUnpackParser(), false},
{"logfmt", OpParserTypeLogfmt, "", log.NewLogfmtParser(), false},
{"regexp", OpParserTypeRegexp, "(?P<foo>foo)", mustNewRegexParser("(?P<foo>foo)"), false},
{"regexp err ", OpParserTypeRegexp, "foo", nil, true},
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/expr.y
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ import (
OPEN_PARENTHESIS CLOSE_PARENTHESIS BY WITHOUT COUNT_OVER_TIME RATE SUM AVG MAX MIN COUNT STDDEV STDVAR BOTTOMK TOPK
BYTES_OVER_TIME BYTES_RATE BOOL JSON REGEXP LOGFMT PIPE LINE_FMT LABEL_FMT UNWRAP AVG_OVER_TIME SUM_OVER_TIME MIN_OVER_TIME
MAX_OVER_TIME STDVAR_OVER_TIME STDDEV_OVER_TIME QUANTILE_OVER_TIME BYTES_CONV DURATION_CONV DURATION_SECONDS_CONV
ABSENT_OVER_TIME LABEL_REPLACE
ABSENT_OVER_TIME LABEL_REPLACE UNPACK

// Operators are listed with increasing precedence.
%left <binOp> OR
Expand Down Expand Up @@ -231,6 +231,7 @@ labelParser:
JSON { $$ = newLabelParserExpr(OpParserTypeJSON, "") }
| LOGFMT { $$ = newLabelParserExpr(OpParserTypeLogfmt, "") }
| REGEXP STRING { $$ = newLabelParserExpr(OpParserTypeRegexp, $2) }
| UNPACK { $$ = newLabelParserExpr(OpParserTypeUnpack, "") }
;

jsonExpressionParser:
Expand Down
Loading