diff --git a/docs/sources/clients/promtail/pipelines.md b/docs/sources/clients/promtail/pipelines.md index ee1da3cf71da..765717ce897a 100644 --- a/docs/sources/clients/promtail/pipelines.md +++ b/docs/sources/clients/promtail/pipelines.md @@ -206,6 +206,7 @@ Parsing stages: Transform stages: + - [multiline](../stages/multiline/): Merges multiple lines, e.g. stack traces, into multiline blocks. - [template](../stages/template/): Use Go templates to modify extracted data. Action stages: diff --git a/docs/sources/clients/promtail/stages/multiline.md b/docs/sources/clients/promtail/stages/multiline.md new file mode 100644 index 000000000000..9755cb1f0edf --- /dev/null +++ b/docs/sources/clients/promtail/stages/multiline.md @@ -0,0 +1,67 @@ +--- +title: multiline +--- + +# `multiline` stage + +The `multiline` stage multiple lines into a multiline block before passing it on to the next stage in the pipeline. + +A new block is identified by the `firstline` regular expression. Any line that does *not* match the expression is considered to be part of the block of the previous match. + +## Schema + +```yaml +multiline: + # RE2 regular expression, if matched will start a new multiline block. + # This expresion must be provided. + firstline: + + # The maximum wait time will be parsed as a Go duration: https://golang.org/pkg/time/#ParseDuration. + # If now new logs arrive withing this maximum wait time the current block will be sent on. + # This is useful if the opserved application dies with e.g. an exception. No new logs will arrive and the exception + # block is sent *after* the maximum wait time expired. + # It defaults to 3s. + max_wait_time: + + # Maximum number of lines a block can have. If block has more lines a new block is started. + # The default is 128 lines. + max_lines: +``` + +## Examples + +Let's say we have the following logs from a very simple [flask](https://flask.palletsprojects.com) service. + +``` +[2020-12-03 11:36:20] "GET /hello HTTP/1.1" 200 - +[2020-12-03 11:36:23] ERROR in app: Exception on /error [GET] +Traceback (most recent call last): + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app + response = self.full_dispatch_request() + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request + rv = self.handle_user_exception(e) + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception + reraise(exc_type, exc_value, tb) + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise + raise value + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request + rv = self.dispatch_request() + File "/home/pallets/.pyenv/versions/3.8.5/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request + return self.view_functions[rule.endpoint](**req.view_args) + File "/home/pallets/src/deployment_tools/hello.py", line 10, in error + raise Exception("Sorry, this route always breaks") +Exception: Sorry, this route always breaks +[2020-12-03 11:36:23] "GET /error HTTP/1.1" 500 - +[2020-12-03 11:36:26] "GET /hello HTTP/1.1" 200 - +[2020-12-03 11:36:27] "GET /hello HTTP/1.1" 200 - +``` + +We would like to collapse all lines of the traceback into one multiline block. All blocks start with a timestamp in brackets. Thus we configure a `multiline` stage with the `firstline` regular expression `^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]`. This will match the start of the traceback but not the following lines until `Exception: Sorry, this route always breaks`. These will be part of a multiline block and one log entry in Loki. + +```yaml +multiline: + # Identify timestamps as first line of a multiline block. + firstline: "^\[\d{4}-\d{2}-\d{2} \d{1,2}:\d{2}:\d{2}\]" + + max_wait_time: 3s +``` \ No newline at end of file diff --git a/pkg/logentry/stages/multiline.go b/pkg/logentry/stages/multiline.go new file mode 100644 index 000000000000..731d3ae31c9e --- /dev/null +++ b/pkg/logentry/stages/multiline.go @@ -0,0 +1,226 @@ +package stages + +import ( + "bytes" + "fmt" + "regexp" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/mitchellh/mapstructure" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" +) + +const ( + ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression" + ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v" + ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v" +) + +const ( + maxLineDefault uint64 = 128 + maxWaitDefault = 3 * time.Second +) + +// MultilineConfig contains the configuration for a multilineStage +type MultilineConfig struct { + Expression *string `mapstructure:"firstline"` + regex *regexp.Regexp + MaxLines *uint64 `mapstructure:"max_lines"` + MaxWaitTime *string `mapstructure:"max_wait_time"` + maxWait time.Duration +} + +func validateMultilineConfig(cfg *MultilineConfig) error { + if cfg == nil || cfg.Expression == nil { + return errors.New(ErrMultilineStageEmptyConfig) + } + + expr, err := regexp.Compile(*cfg.Expression) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidRegex, err) + } + cfg.regex = expr + + if cfg.MaxWaitTime != nil { + maxWait, err := time.ParseDuration(*cfg.MaxWaitTime) + if err != nil { + return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err) + } + cfg.maxWait = maxWait + } else { + cfg.maxWait = maxWaitDefault + } + + if cfg.MaxLines == nil { + cfg.MaxLines = new(uint64) + *cfg.MaxLines = maxLineDefault + } + + return nil +} + +// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed +type multilineStage struct { + logger log.Logger + cfg *MultilineConfig +} + +// multilineState captures the internal state of a running multiline stage. +type multilineState struct { + buffer *bytes.Buffer // The lines of the current multiline block. + startLineEntry Entry // The entry of the start line of a multiline block. + currentLines uint64 // The number of lines of the current multiline block. +} + +// newMulitlineStage creates a MulitlineStage from config +func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) { + cfg := &MultilineConfig{} + err := mapstructure.WeakDecode(config, cfg) + if err != nil { + return nil, err + } + err = validateMultilineConfig(cfg) + if err != nil { + return nil, err + } + + return &multilineStage{ + logger: log.With(logger, "component", "stage", "type", "multiline"), + cfg: cfg, + }, nil +} + +func (m *multilineStage) Run(in chan Entry) chan Entry { + out := make(chan Entry) + go func() { + defer close(out) + + streams := make(map[model.Fingerprint](chan Entry)) + wg := new(sync.WaitGroup) + + for e := range in { + key := e.Labels.FastFingerprint() + s, ok := streams[key] + if !ok { + // Pass through entries until we hit first start line. + if !m.cfg.regex.MatchString(e.Line) { + if Debug { + level.Debug(m.logger).Log("msg", "pass through entry", "stream", key) + } + out <- e + continue + } + + if Debug { + level.Debug(m.logger).Log("msg", "creating new stream", "stream", key) + } + s = make(chan Entry) + streams[key] = s + + wg.Add(1) + go m.runMultiline(s, out, wg) + } + if Debug { + level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line) + } + s <- e + } + + // Close all streams and wait for them to finish being processed. + for _, s := range streams { + close(s) + } + wg.Wait() + }() + return out +} + +func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.WaitGroup) { + defer wg.Done() + + state := &multilineState{ + buffer: new(bytes.Buffer), + currentLines: 0, + } + + for { + select { + case <-time.After(m.cfg.maxWait): + if Debug { + level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.maxWait), "block", state.buffer.String()) + } + m.flush(out, state) + case e, ok := <-in: + if Debug { + level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint()) + } + + if !ok { + if Debug { + level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + } + m.flush(out, state) + return + } + + isFirstLine := m.cfg.regex.MatchString(e.Line) + if isFirstLine { + if Debug { + level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) + } + m.flush(out, state) + + // The start line entry is used to set timestamp and labels in the flush method. + // The timestamps for following lines are ignored for now. + state.startLineEntry = e + } + + // Append block line + if state.buffer.Len() > 0 { + state.buffer.WriteRune('\n') + } + state.buffer.WriteString(e.Line) + state.currentLines++ + + if state.currentLines == *m.cfg.MaxLines { + m.flush(out, state) + } + } + } +} + +func (m *multilineStage) flush(out chan Entry, s *multilineState) { + if s.buffer.Len() == 0 { + if Debug { + level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len()) + } + return + } + + collapsed := Entry{ + Extracted: s.startLineEntry.Extracted, + Entry: api.Entry{ + Labels: s.startLineEntry.Entry.Labels, + Entry: logproto.Entry{ + Timestamp: s.startLineEntry.Entry.Entry.Timestamp, + Line: s.buffer.String(), + }, + }, + } + s.buffer.Reset() + s.currentLines = 0 + + out <- collapsed +} + +// Name implements Stage +func (m *multilineStage) Name() string { + return StageTypeMultiline +} diff --git a/pkg/logentry/stages/multiline_test.go b/pkg/logentry/stages/multiline_test.go new file mode 100644 index 000000000000..9b58bca094af --- /dev/null +++ b/pkg/logentry/stages/multiline_test.go @@ -0,0 +1,157 @@ +package stages + +import ( + "sort" + "sync" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + ww "github.com/weaveworks/common/server" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/promtail/api" +) + +func Test_multilineStage_Process(t *testing.T) { + + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + } + + out := processEntries(stage, + simpleEntry("not a start line before 1", "label"), + simpleEntry("not a start line before 2", "label"), + simpleEntry("START line 1", "label"), + simpleEntry("not a start line", "label"), + simpleEntry("START line 2", "label"), + simpleEntry("START line 3", "label")) + + require.Len(t, out, 5) + require.Equal(t, "not a start line before 1", out[0].Line) + require.Equal(t, "not a start line before 2", out[1].Line) + require.Equal(t, "START line 1\nnot a start line", out[2].Line) + require.Equal(t, "START line 2", out[3].Line) + require.Equal(t, "START line 3", out[4].Line) +} +func Test_multilineStage_MultiStreams(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString("3s")} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + } + + out := processEntries(stage, + simpleEntry("START line 1", "one"), + simpleEntry("not a start line 1", "one"), + simpleEntry("START line 1", "two"), + simpleEntry("not a start line 2", "one"), + simpleEntry("START line 2", "two"), + simpleEntry("START line 2", "one"), + simpleEntry("not a start line 1", "one"), + ) + + sort.Slice(out, func(l, r int) bool { + return out[l].Timestamp.Before(out[r].Timestamp) + }) + + require.Len(t, out, 4) + + require.Equal(t, "START line 1\nnot a start line 1\nnot a start line 2", out[0].Line) + require.Equal(t, model.LabelValue("one"), out[0].Labels["value"]) + + require.Equal(t, "START line 1", out[1].Line) + require.Equal(t, model.LabelValue("two"), out[1].Labels["value"]) + + require.Equal(t, "START line 2", out[2].Line) + require.Equal(t, model.LabelValue("two"), out[2].Labels["value"]) + + require.Equal(t, "START line 2\nnot a start line 1", out[3].Line) + require.Equal(t, model.LabelValue("one"), out[3].Labels["value"]) +} + +func Test_multilineStage_MaxWaitTime(t *testing.T) { + // Enable debug logging + cfg := &ww.Config{} + require.Nil(t, cfg.LogLevel.Set("debug")) + util.InitLogger(cfg) + Debug = true + + maxWait := 2 * time.Second + mcfg := &MultilineConfig{Expression: ptrFromString("^START"), MaxWaitTime: ptrFromString(maxWait.String())} + err := validateMultilineConfig(mcfg) + require.NoError(t, err) + + stage := &multilineStage{ + cfg: mcfg, + logger: util.Logger, + } + + in := make(chan Entry, 2) + out := stage.Run(in) + + // Accumulate result + mu := new(sync.Mutex) + var res []Entry + go func() { + for e := range out { + mu.Lock() + t.Logf("appending %s", e.Line) + res = append(res, e) + mu.Unlock() + } + }() + + // Write input with a delay + go func() { + in <- simpleEntry("START line", "label") + + // Trigger flush due to max wait timeout + time.Sleep(2 * maxWait) + + in <- simpleEntry("not a start line hitting timeout", "label") + + // Signal pipeline we are done. + close(in) + }() + + require.Eventually(t, func() bool { mu.Lock(); defer mu.Unlock(); return len(res) == 2 }, 3*maxWait, time.Second) + require.Equal(t, "START line", res[0].Line) + require.Equal(t, "not a start line hitting timeout", res[1].Line) +} + +func simpleEntry(line, label string) Entry { + return Entry{ + Extracted: map[string]interface{}{}, + Entry: api.Entry{ + Labels: model.LabelSet{"value": model.LabelValue(label)}, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: line, + }, + }, + } + +} diff --git a/pkg/logentry/stages/stage.go b/pkg/logentry/stages/stage.go index 28eb917c2891..82c265074ebc 100644 --- a/pkg/logentry/stages/stage.go +++ b/pkg/logentry/stages/stage.go @@ -27,6 +27,7 @@ const ( StageTypePipeline = "pipeline" StageTypeTenant = "tenant" StageTypeDrop = "drop" + StageTypeMultiline = "multiline" ) // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -139,6 +140,11 @@ func New(logger log.Logger, jobName *string, stageType string, if err != nil { return nil, err } + case StageTypeMultiline: + s, err = newMultilineStage(logger, cfg) + if err != nil { + return nil, err + } default: return nil, errors.Errorf("Unknown stage type: %s", stageType) }