Skip to content

Commit

Permalink
Collapse multiline logs based on a start line.
Browse files Browse the repository at this point in the history
Summary:
This is a very simple approach based on grafana#1380 to provide multiline
or block log entries in promtail.

A `multiline` stage is added to pipelines. This stages matches a start
line. Once a start line is matched all following lines are appended
to an entry and not passed on to downstream stages. Once a new start
line is matched the former block of multilines is sent.

If now new line arrives withing `max_wait_time` the block is flushed to
the next stage and a new block is started.
  • Loading branch information
jeschkies committed Dec 3, 2020
1 parent c4faa57 commit 6e70704
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
125 changes: 125 additions & 0 deletions pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package stages

import (
"bytes"
"regexp"
"time"

"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
)

const (
ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression"
ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v"
ErrMultilineStageInvalidMaxWaitTime = "multiline stage `max_wait_time` parse error: %v"
)

// MultilineConfig contains the configuration for a multilineStage
type MultilineConfig struct {
Expression *string `mapstructure:"firstline"`
MaxWaitTime *string `mapstructure:"max_wait_time"`
maxWait time.Duration
regex *regexp.Regexp
}

func validateMultilineConfig(cfg *MultilineConfig) error {
if cfg == nil ||
(cfg.Expression == nil) {
return errors.New(ErrMultilineStageEmptyConfig)
}

expr, err := regexp.Compile(*cfg.Expression)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidRegex, err)
}
cfg.regex = expr

maxWait, err := time.ParseDuration(*cfg.MaxWaitTime)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidMaxWaitTime, err)
}
cfg.maxWait = maxWait

return nil
}

// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed
type multilineStage struct {
logger log.Logger
cfg *MultilineConfig
buffer *bytes.Buffer
startLineEntry Entry
}

// newMulitlineStage creates a MulitlineStage from config
func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) {
cfg := &MultilineConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateMultilineConfig(cfg)
if err != nil {
return nil, err
}

return &multilineStage{
logger: log.With(logger, "component", "stage", "type", "multiline"),
cfg: cfg,
buffer: new(bytes.Buffer),
}, nil
}

func (m *multilineStage) Run(in chan Entry) chan Entry {
out := make(chan Entry)
go func() {
defer close(out)
for {
select {
case <- time.After(m.cfg.maxWait):
m.flush(out)
case e, ok := <- in:
if !ok {
return
}

isFirstLine := m.cfg.regex.MatchString(e.Line)
if isFirstLine {
m.flush(out)
// TODO: we only consider the labels and timestamp from the firt entry. Should merge all entries?
m.startLineEntry = e
}

// Append block line
if m.buffer.Len() > 0 {
m.buffer.WriteRune('\n')
}
m.buffer.WriteString(e.Line)
}
}
}()
return out
}

func (m *multilineStage) flush(out chan Entry) {
if m.buffer.Len() == 0 {
return
}

collapsed := &Entry{
Labels: m.startLineEntry.Labels,
Extracted: m.startLineEntry.Extracted,
Timestamp: m.startLineEntry.Timestamp,
Line: m.buffer.String(),
}
m.buffer.Reset()

out <- *collapsed
}

// Name implements Stage
func (m *multilineStage) Name() string {
return StageTypeMultiline
}
47 changes: 47 additions & 0 deletions pkg/logentry/stages/multiline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package stages

import (
"bytes"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
ww "github.com/weaveworks/common/server"
)

func Test_multilineStage_Process(t *testing.T) {

// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util.InitLogger(cfg)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START")}
err := validateMultilineConfig(mcfg)
require.NoError(t, err)

stage := &multilineStage{
cfg: mcfg,
logger: util.Logger,
buffer: new(bytes.Buffer),
}

out := processEntries(stage, simpleEntry("START line 1"), simpleEntry("not a start line"), simpleEntry("START line 2"), simpleEntry("START line 3"))

require.Equal(t, "START line 1\nnot a start line", out[0].Line)
require.Equal(t, "START line 2", out[1].Line)
require.Equal(t, "START line 3", out[2].Line)
}

func simpleEntry(line string) Entry {
return Entry{
Labels: model.LabelSet{},
Line: ptrFromString(line),
Extracted: map[string]interface{}{},
Timestamp: ptrFromTime(time.Now()),
}

}
6 changes: 6 additions & 0 deletions pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeMultiline = "multiline"
)

// Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
Expand Down Expand Up @@ -139,6 +140,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeMultiline:
s, err = newMultilineStage(logger, cfg)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("Unknown stage type: %s", stageType)
}
Expand Down

0 comments on commit 6e70704

Please sign in to comment.