Skip to content

Commit

Permalink
[pkg/stanza] Extrant trim.Config from SplitterConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski committed Sep 8, 2023
1 parent c03c234 commit 696edd0
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 56 deletions.
29 changes: 29 additions & 0 deletions .chloggen/pkg-stanza-trim-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Extract whitespace trim configuration into trim.Config

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26511]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- PreserveLeading and PreserveTrailing removed from tokenize.SplitterConfig.
- PreserveLeadingWhitespaces and PreserveTrailingWhitespaces removed from tcp.BaseConfig and udp.BaseConfig.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
4 changes: 3 additions & 1 deletion pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const (
Expand Down Expand Up @@ -76,6 +77,7 @@ type Config struct {
MaxBatches int `mapstructure:"max_batches,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
Splitter tokenize.SplitterConfig `mapstructure:",squash,omitempty"`
TrimConfig trim.Config `mapstructure:",squash,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
}
Expand All @@ -97,7 +99,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager,
}

// Ensure that splitter is buildable
factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize))
factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize), c.TrimConfig.Func())
if _, err := factory.Build(); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/splitter/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFu

// Build builds Multiline Splitter struct
func (f *customFactory) Build() (bufio.SplitFunc, error) {
return f.flusherCfg.Wrap(f.splitFunc, trim.Whitespace(true, true)), nil
return f.flusherCfg.Wrap(f.splitFunc, trim.Nop), nil
}
6 changes: 5 additions & 1 deletion pkg/stanza/fileconsumer/internal/splitter/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

type multilineFactory struct {
splitterCfg tokenize.SplitterConfig
encoding encoding.Encoding
maxLogSize int
trimFunc trim.Func
}

var _ Factory = (*multilineFactory)(nil)
Expand All @@ -23,15 +25,17 @@ func NewMultilineFactory(
splitterCfg tokenize.SplitterConfig,
encoding encoding.Encoding,
maxLogSize int,
trimFunc trim.Func,
) Factory {
return &multilineFactory{
splitterCfg: splitterCfg,
encoding: encoding,
maxLogSize: maxLogSize,
trimFunc: trimFunc,
}
}

// Build builds Multiline Splitter struct
func (f *multilineFactory) Build() (bufio.SplitFunc, error) {
return f.splitterCfg.Build(f.encoding, false, f.maxLogSize)
return f.splitterCfg.Build(f.encoding, false, f.maxLogSize, f.trimFunc)
}
3 changes: 2 additions & 1 deletion pkg/stanza/fileconsumer/internal/splitter/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

func TestMultilineBuild(t *testing.T) {
Expand Down Expand Up @@ -44,7 +45,7 @@ func TestMultilineBuild(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize)
factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize, trim.Nop)
got, err := factory.Build()
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
6 changes: 4 additions & 2 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

func TestPersistFlusher(t *testing.T) {
Expand Down Expand Up @@ -180,7 +181,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) {
f.splitterFactory = splitter.NewMultilineFactory(tokenize.SplitterConfig{
Flusher: tokenize.NewFlusherConfig(),
Multiline: mlc,
}, unicode.UTF8, 15)
}, unicode.UTF8, 15, trim.Whitespace)
f.readerConfig.maxLogSize = 15

temp := openTemp(t, t.TempDir())
Expand Down Expand Up @@ -240,6 +241,7 @@ func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) {
func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.SplitterConfig) (*readerFactory, chan *emitParams) {
emitChan := make(chan *emitParams, 100)
enc, err := decode.LookupEncoding(defaultEncoding)
trimFunc := trim.Whitespace
require.NoError(t, err)
return &readerFactory{
SugaredLogger: testutil.Logger(t),
Expand All @@ -249,7 +251,7 @@ func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.Splitte
emit: testEmitFunc(emitChan),
},
fromBeginning: true,
splitterFactory: splitter.NewMultilineFactory(splitterConfig, enc, defaultMaxLogSize),
splitterFactory: splitter.NewMultilineFactory(splitterConfig, enc, defaultMaxLogSize, trimFunc),
encoding: enc,
}, emitChan
}
Expand Down
21 changes: 10 additions & 11 deletions pkg/stanza/operator/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,21 @@ type Config struct {

// BaseConfig is the detailed configuration of a tcp input operator.
type BaseConfig struct {
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
ListenAddress string `mapstructure:"listen_address,omitempty"`
TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
MultiLineBuilder MultiLineBuilderFunc
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"`
ListenAddress string `mapstructure:"listen_address,omitempty"`
TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
MultiLineBuilder MultiLineBuilderFunc
}

type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error)

func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) {
trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces)
trimFunc := c.TrimConfig.Func()
splitFunc, err := c.Multiline.Build(enc, true, int(c.MaxLogSize), trimFunc)
if err != nil {
return nil, err
Expand Down
15 changes: 7 additions & 8 deletions pkg/stanza/operator/input/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@ type Config struct {

// BaseConfig is the details configuration of a udp input operator.
type BaseConfig struct {
ListenAddress string `mapstructure:"listen_address,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
ListenAddress string `mapstructure:"listen_address,omitempty"`
OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"`
AddAttributes bool `mapstructure:"add_attributes,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"`
TrimConfig trim.Config `mapstructure:",squash"`
}

// Build will build a udp input operator.
Expand All @@ -92,7 +91,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
}

// Build multiline
trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces)
trimFunc := c.TrimConfig.Func()
splitFunc, err := c.Multiline.Build(enc, true, MaxUDPSize, trimFunc)
if err != nil {
return nil, err
Expand Down
25 changes: 16 additions & 9 deletions pkg/stanza/tokenize/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

var noTrim = trim.Whitespace(true, true)

const (
// Those values has been experimentally figured out for windows
sleepDuration time.Duration = time.Millisecond * 80
Expand Down Expand Up @@ -218,7 +216,10 @@ func TestLineStartSplitFunc(t *testing.T) {
LineStartPattern: tc.Pattern,
}

trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
trimFunc := trim.Config{
PreserveLeading: tc.PreserveLeadingWhitespaces,
PreserveTrailing: tc.PreserveTrailingWhitespaces,
}.Func()
splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
Expand All @@ -228,7 +229,7 @@ func TestLineStartSplitFunc(t *testing.T) {
}

t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) {
splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, noTrim)
splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, trim.Nop)
data := []byte(`LOGSTART`)

t.Run("NotAtEOF", func(t *testing.T) {
Expand Down Expand Up @@ -430,7 +431,10 @@ func TestLineEndSplitFunc(t *testing.T) {
LineEndPattern: tc.Pattern,
}

trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
trimFunc := trim.Config{
PreserveLeading: tc.PreserveLeadingWhitespaces,
PreserveTrailing: tc.PreserveTrailingWhitespaces,
}.Func()
splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
Expand Down Expand Up @@ -596,7 +600,10 @@ func TestNewlineSplitFunc(t *testing.T) {
}

for _, tc := range testCases {
trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces)
trimFunc := trim.Config{
PreserveLeading: tc.PreserveLeadingWhitespaces,
PreserveTrailing: tc.PreserveTrailingWhitespaces,
}.Func()
splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc)
require.NoError(t, err)
if tc.Flusher != nil {
Expand Down Expand Up @@ -672,14 +679,14 @@ func TestNoopEncodingError(t *testing.T) {
LineEndPattern: "\n",
}

_, err := cfg.getSplitFunc(encoding.Nop, false, 0, noTrim)
_, err := cfg.getSplitFunc(encoding.Nop, false, 0, trim.Nop)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))

cfg = &MultilineConfig{
LineStartPattern: "\n",
}

_, err = cfg.getSplitFunc(encoding.Nop, false, 0, noTrim)
_, err = cfg.getSplitFunc(encoding.Nop, false, 0, trim.Nop)
require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding"))
}

Expand Down Expand Up @@ -740,7 +747,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
splitFunc, err := NewlineSplitFunc(tc.encoding, false, noTrim)
splitFunc, err := NewlineSplitFunc(tc.encoding, false, trim.Nop)
require.NoError(t, err)
scanner := bufio.NewScanner(bytes.NewReader(tc.input))
scanner.Split(splitFunc)
Expand Down
10 changes: 3 additions & 7 deletions pkg/stanza/tokenize/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@ import (

// SplitterConfig consolidates MultilineConfig and FlusherConfig
type SplitterConfig struct {
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
}

// NewSplitterConfig returns default SplitterConfig
Expand All @@ -28,12 +26,10 @@ func NewSplitterConfig() SplitterConfig {
}

// Build builds bufio.SplitFunc based on the config
func (c *SplitterConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int) (bufio.SplitFunc, error) {
trimFunc := trim.Whitespace(c.PreserveLeading, c.PreserveTrailing)
func (c *SplitterConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) {
splitFunc, err := c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc)
if err != nil {
return nil, err
}

return c.Flusher.Wrap(splitFunc, trimFunc), nil
}
31 changes: 18 additions & 13 deletions pkg/stanza/trim/trim.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,29 @@ import (

type Func func([]byte) []byte

func Whitespace(preserveLeading, preserveTrailing bool) Func {
if preserveLeading && preserveTrailing {
return noTrim
type Config struct {
PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"`
}

func (c Config) Func() Func {
if c.PreserveLeading && c.PreserveTrailing {
return Nop
}
if preserveLeading {
return trimTrailingWhitespacesFunc
if c.PreserveLeading {
return Trailing
}
if preserveTrailing {
return trimLeadingWhitespacesFunc
if c.PreserveTrailing {
return Leading
}
return trimWhitespacesFunc
return Whitespace
}

func noTrim(token []byte) []byte {
func Nop(token []byte) []byte {
return token
}

func trimLeadingWhitespacesFunc(data []byte) []byte {
func Leading(data []byte) []byte {
// TrimLeft to strip EOF whitespaces in case of using $ in regex
// For some reason newline and carriage return are being moved to beginning of next log
token := bytes.TrimLeft(data, "\r\n\t ")
Expand All @@ -38,11 +43,11 @@ func trimLeadingWhitespacesFunc(data []byte) []byte {
return token
}

func trimTrailingWhitespacesFunc(data []byte) []byte {
func Trailing(data []byte) []byte {
// TrimRight to strip all whitespaces from the end of log
return bytes.TrimRight(data, "\r\n\t ")
}

func trimWhitespacesFunc(data []byte) []byte {
return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data))
func Whitespace(data []byte) []byte {
return Leading(Trailing(data))
}
7 changes: 5 additions & 2 deletions pkg/stanza/trim/trim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestWhitespace(t *testing.T) {
func TestTrim(t *testing.T) {
// Test all permutations of trimming whitespace
testCases := []struct {
name string
Expand Down Expand Up @@ -50,7 +50,10 @@ func TestWhitespace(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
trimFunc := Whitespace(tc.preserveLeading, tc.preserveTrailing)
trimFunc := Config{
PreserveLeading: tc.preserveLeading,
PreserveTrailing: tc.preserveTrailing,
}.Func()
assert.Equal(t, []byte(tc.expect), trimFunc([]byte(tc.input)))

// Also test that regardless of configuration, an empty []byte in gives an empty []byte out
Expand Down

0 comments on commit 696edd0

Please sign in to comment.