Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapse multiline logs based on a start line. #3024

Merged
merged 15 commits into from
Dec 3, 2020
Merged
1 change: 1 addition & 0 deletions docs/sources/clients/promtail/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ Parsing stages:

Transform stages:

- [multiline](../stages/multiline/): Merges multiple lines, e.g. stack traces, into multiline blocks.
- [template](../stages/template/): Use Go templates to modify extracted data.

Action stages:
Expand Down
21 changes: 21 additions & 0 deletions docs/sources/clients/promtail/stages/multiline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
title: multiline
---

# `multiline` stage

...

## Schema

```yaml
multiline:
# TODO: document
firstline: <string>

# TODO: document
max_lines: <integer>

# TODO: document
max_wait_time: <duration>
```
195 changes: 195 additions & 0 deletions pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package stages

import (
"bytes"
"fmt"
"regexp"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

const (
ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression"
ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v"
ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v"
)

const maxLineDefault uint64 = 128

// MultilineConfig contains the configuration for a multilineStage
type MultilineConfig struct {
Expression *string `mapstructure:"firstline"`
regex *regexp.Regexp
MaxLines *uint64 `mapstructure:"max_lines"`
MaxWaitTime *string `mapstructure:"max_wait_time"`
maxWait time.Duration
}

func validateMultilineConfig(cfg *MultilineConfig) error {
if cfg == nil || cfg.Expression == nil || cfg.MaxWaitTime == nil {
return errors.New(ErrMultilineStageEmptyConfig)
}

expr, err := regexp.Compile(*cfg.Expression)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidRegex, err)
}
cfg.regex = expr

maxWait, err := time.ParseDuration(*cfg.MaxWaitTime)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err)
}
cfg.maxWait = maxWait

if cfg.MaxLines == nil {
cfg.MaxLines = new(uint64)
*cfg.MaxLines = maxLineDefault
}

return nil
}

// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed
type multilineStage struct {
logger log.Logger
cfg *MultilineConfig
}

// multilineState captures the internal state of a running multiline stage.
type multilineState struct {
buffer *bytes.Buffer // The lines of the current multiline block.
startLineEntry Entry // The entry of the start line of a multiline block.
currentLines uint64 // The number of lines of the current multiline block.
}

// newMulitlineStage creates a MulitlineStage from config
func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) {
cfg := &MultilineConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateMultilineConfig(cfg)
if err != nil {
return nil, err
}

return &multilineStage{
logger: log.With(logger, "component", "stage", "type", "multiline"),
cfg: cfg,
}, nil
}

func (m *multilineStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)

streams := make(map[model.Fingerprint](chan Entry))
wg := new(sync.WaitGroup)

for e := range in {
key := e.Labels.FastFingerprint()
s, ok := streams[key]
if !ok {
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
level.Debug(m.logger).Log("msg", "creating new stream", "stream", key)
s = make(chan Entry)
streams[key] = s

wg.Add(1)
go m.runMultiline(s, out, wg)
}
level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line)
s <- e
}

// Close all streams and wait for them to finish being processed.
for _, s := range streams {
close(s)
}
wg.Wait()
}()
return out
}

func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.WaitGroup) {
defer wg.Done()

state := &multilineState{
buffer: new(bytes.Buffer),
currentLines: 0,
}

for {
select {
case <-time.After(m.cfg.maxWait):
cyriltovena marked this conversation as resolved.
Show resolved Hide resolved
level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String())
m.flush(out, state)
case e, ok := <-in:
level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint())

if !ok {
level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
m.flush(out, state)
return
}

isFirstLine := m.cfg.regex.MatchString(e.Line)
if isFirstLine {
level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
m.flush(out, state)

// The start line entry is used to set timestamp and labels in the flush method.
// The timestamps for following lines are ignored for now.
state.startLineEntry = e
}

// Append block line
if state.buffer.Len() > 0 {
state.buffer.WriteRune('\n')
}
state.buffer.WriteString(e.Line)
state.currentLines++

if state.currentLines == *m.cfg.MaxLines {
m.flush(out, state)
}
}
}
}

func (m *multilineStage) flush(out chan Entry, s *multilineState) {
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
if s.buffer.Len() == 0 {
level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len())
return
}

collapsed := &Entry{
jeschkies marked this conversation as resolved.
Show resolved Hide resolved
Extracted: s.startLineEntry.Extracted,
Entry: api.Entry{
Labels: s.startLineEntry.Entry.Labels,
Entry: logproto.Entry{
Timestamp: s.startLineEntry.Entry.Entry.Timestamp,
Line: s.buffer.String(),
},
},
}
s.buffer.Reset()
s.currentLines = 0

out <- *collapsed
}

// Name implements Stage
func (m *multilineStage) Name() string {
return StageTypeMultiline
}
154 changes: 154 additions & 0 deletions pkg/logentry/stages/multiline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package stages

import (
"sort"
"sync"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
ww "github.com/weaveworks/common/server"
)

func Test_multilineStage_Process(t *testing.T) {

// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util.InitLogger(cfg)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
err := validateMultilineConfig(mcfg)
require.NoError(t, err)

stage := &multilineStage{
cfg: mcfg,
logger: util.Logger,
}

out := processEntries(stage,
simpleEntry("START line 1", "label"),
simpleEntry("not a start line", "label"),
simpleEntry("START line 2", "label"),
simpleEntry("START line 3", "label"))

require.Len(t, out, 3)
require.Equal(t, "START line 1\nnot a start line", out[0].Line)
require.Equal(t, "START line 2", out[1].Line)
require.Equal(t, "START line 3", out[2].Line)
}
func Test_multilineStage_MultiStreams(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util.InitLogger(cfg)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")}
err := validateMultilineConfig(mcfg)
require.NoError(t, err)

stage := &multilineStage{
cfg: mcfg,
logger: util.Logger,
}

out := processEntries(stage,
simpleEntry("START line 1", "one"),
simpleEntry("not a start line 1", "one"),
simpleEntry("START line 1", "two"),
simpleEntry("not a start line 2", "one"),
simpleEntry("START line 2", "two"),
simpleEntry("START line 2", "one"),
simpleEntry("not a start line 1", "one"),
)

sort.Slice(out, func(l, r int) bool {
return out[l].Timestamp.Before(out[r].Timestamp)
})

require.Len(t, out, 4)

require.Equal(t, "START line 1\nnot a start line 1\nnot a start line 2", out[0].Line)
require.Equal(t, model.LabelValue("one"), out[0].Labels["value"])

require.Equal(t, "START line 1", out[1].Line)
require.Equal(t, model.LabelValue("two"), out[1].Labels["value"])

require.Equal(t, "START line 2", out[2].Line)
require.Equal(t, model.LabelValue("two"), out[2].Labels["value"])

require.Equal(t, "START line 2\nnot a start line 1", out[3].Line)
require.Equal(t, model.LabelValue("one"), out[3].Labels["value"])
}

func Test_multilineStage_MaxWaitTime(t *testing.T) {
// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util.InitLogger(cfg)
Debug = true

maxWait := time.Duration(2 * time.Second)
mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString(maxWait.String())}
err := validateMultilineConfig(mcfg)
require.NoError(t, err)

stage := &multilineStage{
cfg: mcfg,
logger: util.Logger,
}

in := make(chan Entry, 2)
out := stage.Run(in)

// Accumulate result
mu := new(sync.Mutex)
var res []Entry
go func() {
for e := range out {
mu.Lock()
t.Logf("appending %s", e.Line)
res = append(res, e)
mu.Unlock()
}
return
}()

// Write input with a delay
go func() {
in <- simpleEntry("START line", "label")

// Trigger flush due to max wait timeout
time.Sleep(2 * maxWait)

in <- simpleEntry("not a start line hitting timeout", "label")

// Signal pipeline we are done.
close(in)
return
}()

require.Eventually(t, func() bool { mu.Lock(); defer mu.Unlock(); return len(res) == 2 }, time.Duration(3*maxWait), time.Second)
require.Equal(t, "START line", res[0].Line)
require.Equal(t, "not a start line hitting timeout", res[1].Line)
}

func simpleEntry(line, label string) Entry {
return Entry{
Extracted: map[string]interface{}{},
Entry: api.Entry{
Labels: model.LabelSet{"value": model.LabelValue(label)},
Entry: logproto.Entry{
Timestamp: time.Now(),
Line: line,
},
},
}

}
Loading