Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collapse multiline logs based on a start line. #2971

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions pkg/logentry/stages/multiline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package stages

import (
"bytes"
"regexp"
"time"

"github.com/go-kit/kit/log"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)

const (
ErrMultilineStageEmptyConfig = "multiline stage config must define `firstline` regular expression"
ErrMultilineStageInvalidRegex = "multiline stage first line regex compilation error: %v"
)

const MultilineDropReason = "multiline collapse"

// MultilineConfig contains the configuration for a multilineStage
type MultilineConfig struct {
Expression *string `mapstructure:"firstline"`
regex *regexp.Regexp
}

func validateMultilineConfig(cfg *MultilineConfig) error {
if cfg == nil ||
(cfg.Expression == nil) {
return errors.New(ErrMultilineStageEmptyConfig)
}
expr, err := regexp.Compile(*cfg.Expression)
if err != nil {
return errors.Errorf(ErrMultilineStageInvalidRegex, err)
}
cfg.regex = expr

return nil
}

// newMulitlineStage creates a MulitlineStage from config
func newMultilineStage(logger log.Logger, config interface{}) (Stage, error) {
cfg := &MultilineConfig{}
err := mapstructure.WeakDecode(config, cfg)
if err != nil {
return nil, err
}
err = validateMultilineConfig(cfg)
if err != nil {
return nil, err
}

return &multilineStage{
logger: log.With(logger, "component", "stage", "type", "multiline"),
cfg: cfg,
buffer: new(bytes.Buffer),
}, nil
}

// dropMultiline matches lines to determine whether the following lines belong to a block and should be collapsed
type multilineStage struct {
logger log.Logger
cfg *MultilineConfig
buffer *bytes.Buffer
}

// Process implements Stage
func (m *multilineStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
isFirstLine := m.cfg.regex.MatchString(*entry)

if isFirstLine {
previous := m.buffer.String()

m.buffer.Reset()
m.buffer.WriteString(*entry)

*entry = previous
} else {
// Append block line
if m.buffer.Len() > 0 {
m.buffer.WriteRune('\n')
}
m.buffer.WriteString(*entry)

// Adds the drop label to not be sent by the api.EntryHandler
labels[dropLabel] = model.LabelValue(MultilineDropReason)
}
}

// Name implements Stage
func (m *multilineStage) Name() string {
return StageTypeMultiline
}
43 changes: 43 additions & 0 deletions pkg/logentry/stages/multiline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package stages

import (
"bytes"
"testing"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
ww "github.com/weaveworks/common/server"
)

func Test_multilineStage_Process(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sure I did not cover all the edge cases.

// Enable debug logging
cfg := &ww.Config{}
require.Nil(t, cfg.LogLevel.Set("debug"))
util.InitLogger(cfg)
Debug = true

mcfg := &MultilineConfig{Expression: ptrFromString("^START")}
err := validateMultilineConfig(mcfg)
require.NoError(t, err)

stage := &multilineStage{
cfg: mcfg,
logger: util.Logger,
buffer: new(bytes.Buffer),
}

stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), ptrFromString("START line 1"))
stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), ptrFromString("not a start line"))

nextStart := "START line 2"
stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), &nextStart)

require.Equal(t, "START line 1\nnot a start line", nextStart)
Comment on lines +34 to +37
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the confusing part. A new line is pushed but we get the older multiline blog. Ideally a pipeline stage would be able to push out lines without a call. This could be done with channels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we are processing inputs from channels and push changes out to channels the multiline stage could add a simple timeout to flush a block:

struct  Entry type {
    labels model.LabelSet,
    extracted map[string]interface{},
    t *time.Time,
    entry *string
}

// Process implements Stage
func (m *multilineStage) Process(entries <- chan Entry) <- chan Entry {
  out := make(chan Entry)
  currentMultiline := make(bytes.Buffer)
 
  loop { // This should be in a goroutine
    select {
    case next := <-entries:
        if isNewStart(entry) {
            // Flush
            out <- NewEntry(currentMultiline.String())
            currentMultiline.reset()
        }
        currentMultiline.WriteString(entry.entry)
    case <-time.After(timeout * time.Second):
            // Flush
            out <- currentMultiline.String()
            currentMultiline.reset()
    }
  }
  return out
}

Process is not a fitting name any more since it is called once for all incoming entries. Anyhow, I think it illustrates how the flush time out becomes much simpler.


nextStart = "START line 3"
stage.Process(model.LabelSet{}, map[string]interface{}{}, ptrFromTime(time.Now()), &nextStart)

require.Equal(t, "START line 2", nextStart)
}
6 changes: 6 additions & 0 deletions pkg/logentry/stages/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
StageTypePipeline = "pipeline"
StageTypeTenant = "tenant"
StageTypeDrop = "drop"
StageTypeMultiline = "multiline"
)

// Stage takes an existing set of labels, timestamp and log entry and returns either a possibly mutated
Expand Down Expand Up @@ -118,6 +119,11 @@ func New(logger log.Logger, jobName *string, stageType string,
if err != nil {
return nil, err
}
case StageTypeMultiline:
s, err = newMultilineStage(logger, cfg)
if err != nil {
return nil, err
}
default:
return nil, errors.Errorf("Unknown stage type: %s", stageType)
}
Expand Down