Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapse multiline logs based on a start line. #3024

Merged
merged 15 commits into from
Dec 3, 2020
Merged
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Parsing stages:

Transform stages:

- [multiline](../stages/multiline/): Merges multiple lines, e.g. stack traces, into multiline blocks.
- [template](../stages/template/): Use Go templates to modify extracted data.

Action stages:
Expand Down
67 changes: 67 additions & 0 deletions docs/sources/clients/promtail/stages/multiline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
---
title: multiline
---

# `multiline` stage

The `multiline` stage multiple lines into a multiline block before passing it on to the next stage in the pipeline.
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

A new block is identified by the `firstline` regular expression. Any line that does *not* match the expression is considered to be part of the block of the previous match.

## Schema

```yaml
multiline:
# RE2 regular expression, if matched will start a new multiline block.
# This expresion must be provided.
firstline: <string>

# The maximum wait time will be parsed as a Go duration: https://golang.org/pkg/time/#ParseDuration.
# If now new logs arrive withing this maximum wait time the current block will be sent on.
# This is useful if the opserved application dies with e.g. an exception. No new logs will arrive and the exception
# block is sent *after* the maximum wait time expired.
# It defaults to 3s.
max_wait_time: <duration>
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

# Maximum number of lines a block can have. If block has more lines a new block is started.
# The default is 128 lines.
max_lines: <integer>
```

## Examples

Let's say we have the following logs from a very simple [flask](https://flask.palletsprojects.com) service.

```
[2020-12-03 11:36:20] "GET /hello HTTP/1.1" 200 -
[2020-12-03 11:36:23] ERROR in app: Exception on /error [GET]
Traceback (most recent call last):
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
response = self.full_dispatch_request()
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
raise value
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
rv = self.dispatch_request()
File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/home/pallets/src/deployment_tools/hello.py", line 10, in error
raise Exception("Sorry, this route always breaks")
Exception: Sorry, this route always breaks
[2020-12-03 11:36:23] "GET /error HTTP/1.1" 500 -
[2020-12-03 11:36:26] "GET /hello HTTP/1.1" 200 -
[2020-12-03 11:36:27] "GET /hello HTTP/1.1" 200 -
```

We would like to collapse all lines of the traceback into one multiline block. All blocks start with a timestamp in brackets. Thus we configure a `multiline` stage with the `firstline` regular expression `^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]`. This will match the start of the traceback but not the following lines until `Exception: Sorry, this route always breaks`. These will be part of a multiline block and one log entry in Loki.
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

```yaml
multiline:
# Identify timestamps as first line of a multiline block.
firstline: "^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]"

max_wait_time: 3s
```
226 changes: 226 additions & 0 deletions pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
package stages

import (
"bytes"
"fmt"
"regexp"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
)

const (
ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression"
ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v"
ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v"
)

const (
maxLineDefault uint64 = 128
maxWaitDefault = 3 * time.Second
)

// MultilineConfig contains the configuration for a multilineStage
type MultilineConfig struct {
Expression *string `mapstructure:"firstline"`
regex *regexp.Regexp
MaxLines *uint64 `mapstructure:"max_lines"`
MaxWaitTime *string `mapstructure:"max_wait_time"`
maxWait time.Duration
}

func validateMultilineConfig(cfg *MultilineConfig) error {
if cfg == nil || cfg.Expression == nil {
return errors.New(ErrMultilineStageEmptyConfig)
}

expr, err := regexp.Compile(*cfg.Expression)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidRegex, err)
}
cfg.regex = expr

if cfg.MaxWaitTime != nil {
maxWait, err := time.ParseDuration(*cfg.MaxWaitTime)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err)
}
cfg.maxWait = maxWait
} else {
cfg.maxWait = maxWaitDefault
}

if cfg.MaxLines == nil {
cfg.MaxLines = new(uint64)
*cfg.MaxLines = maxLineDefault
}

return nil
}

// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed
type multilineStage struct {
logger log.Logger
cfg *MultilineConfig
}

// multilineState captures the internal state of a running multiline stage.
type multilineState struct {
buffer *bytes.Buffer // The lines of the current multiline block.
startLineEntry Entry // The entry of the start line of a multiline block.
currentLines uint64 // The number of lines of the current multiline block.
}

// newMulitlineStage creates a MulitlineStage from config
func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) {
cfg := &MultilineConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateMultilineConfig(cfg)
if err != nil {
return nil, err
}

return &multilineStage{
logger: log.With(logger, "component", "stage", "type", "multiline"),
cfg: cfg,
}, nil
}

func (m *multilineStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

streams := make(map[model.Fingerprint](chan Entry))
wg := new(sync.WaitGroup)

for e := range in {
key := e.Labels.FastFingerprint()
s, ok := streams[key]
if !ok {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
// Pass through entries until we hit first start line.
if !m.cfg.regex.MatchString(e.Line) {
if Debug {
level.Debug(m.logger).Log("msg", "pass through entry", "stream", key)
}
out <- e
continue
}

if Debug {
level.Debug(m.logger).Log("msg", "creating new stream", "stream", key)
}
s = make(chan Entry)
streams[key] = s

wg.Add(1)
go m.runMultiline(s, out, wg)
}
if Debug {
level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line)
}
s <- e
}

// Close all streams and wait for them to finish being processed.
for _, s := range streams {
close(s)
}
wg.Wait()
}()
return out
}

func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.WaitGroup) {
defer wg.Done()

state := &multilineState{
buffer: new(bytes.Buffer),
currentLines: 0,
}

for {
select {
case <-time.After(m.cfg.maxWait):
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String())
}
m.flush(out, state)
case e, ok := <-in:
if Debug {
level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint())
}

if !ok {
if Debug {
level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
}
m.flush(out, state)
return
}

isFirstLine := m.cfg.regex.MatchString(e.Line)
if isFirstLine {
if Debug {
level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
}
m.flush(out, state)

// The start line entry is used to set timestamp and labels in the flush method.
// The timestamps for following lines are ignored for now.
state.startLineEntry = e
}

// Append block line
if state.buffer.Len() > 0 {
state.buffer.WriteRune('\n')
}
state.buffer.WriteString(e.Line)
state.currentLines++

if state.currentLines == *m.cfg.MaxLines {
m.flush(out, state)
}
}
}
}

func (m *multilineStage) flush(out chan Entry, s *multilineState) {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if s.buffer.Len() == 0 {
if Debug {
level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len())
}
return
}

collapsed := Entry{
Extracted: s.startLineEntry.Extracted,
Entry: api.Entry{
Labels: s.startLineEntry.Entry.Labels,
Entry: logproto.Entry{
Timestamp: s.startLineEntry.Entry.Entry.Timestamp,
Line: s.buffer.String(),
},
},
}
s.buffer.Reset()
s.currentLines = 0

out <- collapsed
}

// Name implements Stage
func (m *multilineStage) Name() string {
return StageTypeMultiline
}
Loading