From 696edd092a682bec07b818c7ec0d74de91583a98 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 7 Sep 2023 10:35:13 -0600 Subject: [PATCH] [pkg/stanza] Extrant trim.Config from SplitterConfig --- .chloggen/pkg-stanza-trim-2.yaml | 29 +++++++++++++++++ pkg/stanza/fileconsumer/config.go | 4 ++- .../fileconsumer/internal/splitter/custom.go | 2 +- .../internal/splitter/multiline.go | 6 +++- .../internal/splitter/multiline_test.go | 3 +- pkg/stanza/fileconsumer/reader_test.go | 6 ++-- pkg/stanza/operator/input/tcp/tcp.go | 21 ++++++------- pkg/stanza/operator/input/udp/udp.go | 15 +++++---- pkg/stanza/tokenize/multiline_test.go | 25 +++++++++------ pkg/stanza/tokenize/splitter.go | 10 ++---- pkg/stanza/trim/trim.go | 31 +++++++++++-------- pkg/stanza/trim/trim_test.go | 7 +++-- 12 files changed, 103 insertions(+), 56 deletions(-) create mode 100755 .chloggen/pkg-stanza-trim-2.yaml diff --git a/.chloggen/pkg-stanza-trim-2.yaml b/.chloggen/pkg-stanza-trim-2.yaml new file mode 100755 index 000000000000..7e7ef2687abf --- /dev/null +++ b/.chloggen/pkg-stanza-trim-2.yaml @@ -0,0 +1,29 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Extract whitespace trim configuration into trim.Config + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26511] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + - PreserveLeading and PreserveTrailing removed from tokenize.SplitterConfig. + - PreserveLeadingWhitespaces and PreserveTrailingWhitespaces removed from tcp.BaseConfig and udp.BaseConfig. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 7c7346aec199..f015d3d8a4fa 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -21,6 +21,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) const ( @@ -76,6 +77,7 @@ type Config struct { MaxBatches int `mapstructure:"max_batches,omitempty"` DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` Splitter tokenize.SplitterConfig `mapstructure:",squash,omitempty"` + TrimConfig trim.Config `mapstructure:",squash,omitempty"` Encoding string `mapstructure:"encoding,omitempty"` Header *HeaderConfig `mapstructure:"header,omitempty"` } @@ -97,7 +99,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager, } // Ensure that splitter is buildable - factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize)) + factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize), c.TrimConfig.Func()) if _, err := factory.Build(); err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index f10c9b349042..712c63eb1427 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -26,5 +26,5 @@ func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFu // Build builds Multiline Splitter struct func (f *customFactory) Build() (bufio.SplitFunc, error) { - return f.flusherCfg.Wrap(f.splitFunc, trim.Whitespace(true, true)), nil + return f.flusherCfg.Wrap(f.splitFunc, trim.Nop), nil } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline.go b/pkg/stanza/fileconsumer/internal/splitter/multiline.go index 9bec7abd6b33..a8b882cd3c00 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline.go @@ -9,12 +9,14 @@ import ( "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) type multilineFactory struct { splitterCfg tokenize.SplitterConfig encoding encoding.Encoding maxLogSize int + trimFunc trim.Func } var _ Factory = (*multilineFactory)(nil) @@ -23,15 +25,17 @@ func NewMultilineFactory( splitterCfg tokenize.SplitterConfig, encoding encoding.Encoding, maxLogSize int, + trimFunc trim.Func, ) Factory { return &multilineFactory{ splitterCfg: splitterCfg, encoding: encoding, maxLogSize: maxLogSize, + trimFunc: trimFunc, } } // Build builds Multiline Splitter struct func (f *multilineFactory) Build() (bufio.SplitFunc, error) { - return f.splitterCfg.Build(f.encoding, false, f.maxLogSize) + return f.splitterCfg.Build(f.encoding, false, f.maxLogSize, f.trimFunc) } diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index a7d252b3bdde..cd8559245ede 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -11,6 +11,7 @@ import ( "golang.org/x/text/encoding/unicode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestMultilineBuild(t *testing.T) { @@ -44,7 +45,7 @@ func TestMultilineBuild(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize) + factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize, trim.Nop) got, err := factory.Build() if (err != nil) != tt.wantErr { t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 64c2e3468094..1d49c4075767 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -20,6 +20,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestPersistFlusher(t *testing.T) { @@ -180,7 +181,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { f.splitterFactory = splitter.NewMultilineFactory(tokenize.SplitterConfig{ Flusher: tokenize.NewFlusherConfig(), Multiline: mlc, - }, unicode.UTF8, 15) + }, unicode.UTF8, 15, trim.Whitespace) f.readerConfig.maxLogSize = 15 temp := openTemp(t, t.TempDir()) @@ -240,6 +241,7 @@ func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) { func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.SplitterConfig) (*readerFactory, chan *emitParams) { emitChan := make(chan *emitParams, 100) enc, err := decode.LookupEncoding(defaultEncoding) + trimFunc := trim.Whitespace require.NoError(t, err) return &readerFactory{ SugaredLogger: testutil.Logger(t), @@ -249,7 +251,7 @@ func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.Splitte emit: testEmitFunc(emitChan), }, fromBeginning: true, - splitterFactory: splitter.NewMultilineFactory(splitterConfig, enc, defaultMaxLogSize), + splitterFactory: splitter.NewMultilineFactory(splitterConfig, enc, defaultMaxLogSize, trimFunc), encoding: enc, }, emitChan } diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index cf60af119a37..283652bb9ad2 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -69,22 +69,21 @@ type Config struct { // BaseConfig is the detailed configuration of a tcp input operator. type BaseConfig struct { - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` - ListenAddress string `mapstructure:"listen_address,omitempty"` - TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"` - AddAttributes bool `mapstructure:"add_attributes,omitempty"` - OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` - Encoding string `mapstructure:"encoding,omitempty"` - Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"` - PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` - PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` - MultiLineBuilder MultiLineBuilderFunc + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty"` + TLS *configtls.TLSServerSetting `mapstructure:"tls,omitempty"` + AddAttributes bool `mapstructure:"add_attributes,omitempty"` + OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` + Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"` + TrimConfig trim.Config `mapstructure:",squash"` + MultiLineBuilder MultiLineBuilderFunc } type MultiLineBuilderFunc func(enc encoding.Encoding) (bufio.SplitFunc, error) func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { - trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces) + trimFunc := c.TrimConfig.Func() splitFunc, err := c.Multiline.Build(enc, true, int(c.MaxLogSize), trimFunc) if err != nil { return nil, err diff --git a/pkg/stanza/operator/input/udp/udp.go b/pkg/stanza/operator/input/udp/udp.go index cdb2ac539f93..6141ad8d24ce 100644 --- a/pkg/stanza/operator/input/udp/udp.go +++ b/pkg/stanza/operator/input/udp/udp.go @@ -61,13 +61,12 @@ type Config struct { // BaseConfig is the details configuration of a udp input operator. type BaseConfig struct { - ListenAddress string `mapstructure:"listen_address,omitempty"` - OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` - AddAttributes bool `mapstructure:"add_attributes,omitempty"` - Encoding string `mapstructure:"encoding,omitempty"` - Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"` - PreserveLeadingWhitespaces bool `mapstructure:"preserve_leading_whitespaces,omitempty"` - PreserveTrailingWhitespaces bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` + ListenAddress string `mapstructure:"listen_address,omitempty"` + OneLogPerPacket bool `mapstructure:"one_log_per_packet,omitempty"` + AddAttributes bool `mapstructure:"add_attributes,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` + Multiline tokenize.MultilineConfig `mapstructure:"multiline,omitempty"` + TrimConfig trim.Config `mapstructure:",squash"` } // Build will build a udp input operator. @@ -92,7 +91,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } // Build multiline - trimFunc := trim.Whitespace(c.PreserveLeadingWhitespaces, c.PreserveTrailingWhitespaces) + trimFunc := c.TrimConfig.Func() splitFunc, err := c.Multiline.Build(enc, true, MaxUDPSize, trimFunc) if err != nil { return nil, err diff --git a/pkg/stanza/tokenize/multiline_test.go b/pkg/stanza/tokenize/multiline_test.go index 555009bf836a..a8c85db245e5 100644 --- a/pkg/stanza/tokenize/multiline_test.go +++ b/pkg/stanza/tokenize/multiline_test.go @@ -20,8 +20,6 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) -var noTrim = trim.Whitespace(true, true) - const ( // Those values has been experimentally figured out for windows sleepDuration time.Duration = time.Millisecond * 80 @@ -218,7 +216,10 @@ func TestLineStartSplitFunc(t *testing.T) { LineStartPattern: tc.Pattern, } - trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + trimFunc := trim.Config{ + PreserveLeading: tc.PreserveLeadingWhitespaces, + PreserveTrailing: tc.PreserveTrailingWhitespaces, + }.Func() splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) if tc.Flusher != nil { @@ -228,7 +229,7 @@ func TestLineStartSplitFunc(t *testing.T) { } t.Run("FirstMatchHitsEndOfBuffer", func(t *testing.T) { - splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, noTrim) + splitFunc := LineStartSplitFunc(regexp.MustCompile("LOGSTART"), false, trim.Nop) data := []byte(`LOGSTART`) t.Run("NotAtEOF", func(t *testing.T) { @@ -430,7 +431,10 @@ func TestLineEndSplitFunc(t *testing.T) { LineEndPattern: tc.Pattern, } - trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + trimFunc := trim.Config{ + PreserveLeading: tc.PreserveLeadingWhitespaces, + PreserveTrailing: tc.PreserveTrailingWhitespaces, + }.Func() splitFunc, err := cfg.getSplitFunc(unicode.UTF8, false, 0, trimFunc) require.NoError(t, err) if tc.Flusher != nil { @@ -596,7 +600,10 @@ func TestNewlineSplitFunc(t *testing.T) { } for _, tc := range testCases { - trimFunc := trim.Whitespace(tc.PreserveLeadingWhitespaces, tc.PreserveTrailingWhitespaces) + trimFunc := trim.Config{ + PreserveLeading: tc.PreserveLeadingWhitespaces, + PreserveTrailing: tc.PreserveTrailingWhitespaces, + }.Func() splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc) require.NoError(t, err) if tc.Flusher != nil { @@ -672,14 +679,14 @@ func TestNoopEncodingError(t *testing.T) { LineEndPattern: "\n", } - _, err := cfg.getSplitFunc(encoding.Nop, false, 0, noTrim) + _, err := cfg.getSplitFunc(encoding.Nop, false, 0, trim.Nop) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) cfg = &MultilineConfig{ LineStartPattern: "\n", } - _, err = cfg.getSplitFunc(encoding.Nop, false, 0, noTrim) + _, err = cfg.getSplitFunc(encoding.Nop, false, 0, trim.Nop) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) } @@ -740,7 +747,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - splitFunc, err := NewlineSplitFunc(tc.encoding, false, noTrim) + splitFunc, err := NewlineSplitFunc(tc.encoding, false, trim.Nop) require.NoError(t, err) scanner := bufio.NewScanner(bytes.NewReader(tc.input)) scanner.Split(splitFunc) diff --git a/pkg/stanza/tokenize/splitter.go b/pkg/stanza/tokenize/splitter.go index a457bc47637b..3cae0b137ded 100644 --- a/pkg/stanza/tokenize/splitter.go +++ b/pkg/stanza/tokenize/splitter.go @@ -13,10 +13,8 @@ import ( // SplitterConfig consolidates MultilineConfig and FlusherConfig type SplitterConfig struct { - Flusher FlusherConfig `mapstructure:",squash,omitempty"` - Multiline MultilineConfig `mapstructure:"multiline,omitempty"` - PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"` - PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` + Flusher FlusherConfig `mapstructure:",squash,omitempty"` + Multiline MultilineConfig `mapstructure:"multiline,omitempty"` } // NewSplitterConfig returns default SplitterConfig @@ -28,12 +26,10 @@ func NewSplitterConfig() SplitterConfig { } // Build builds bufio.SplitFunc based on the config -func (c *SplitterConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int) (bufio.SplitFunc, error) { - trimFunc := trim.Whitespace(c.PreserveLeading, c.PreserveTrailing) +func (c *SplitterConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int, trimFunc trim.Func) (bufio.SplitFunc, error) { splitFunc, err := c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc) if err != nil { return nil, err } - return c.Flusher.Wrap(splitFunc, trimFunc), nil } diff --git a/pkg/stanza/trim/trim.go b/pkg/stanza/trim/trim.go index b49e6f7af143..90118dfc6543 100644 --- a/pkg/stanza/trim/trim.go +++ b/pkg/stanza/trim/trim.go @@ -9,24 +9,29 @@ import ( type Func func([]byte) []byte -func Whitespace(preserveLeading, preserveTrailing bool) Func { - if preserveLeading && preserveTrailing { - return noTrim +type Config struct { + PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"` + PreserveTrailing bool `mapstructure:"preserve_trailing_whitespaces,omitempty"` +} + +func (c Config) Func() Func { + if c.PreserveLeading && c.PreserveTrailing { + return Nop } - if preserveLeading { - return trimTrailingWhitespacesFunc + if c.PreserveLeading { + return Trailing } - if preserveTrailing { - return trimLeadingWhitespacesFunc + if c.PreserveTrailing { + return Leading } - return trimWhitespacesFunc + return Whitespace } -func noTrim(token []byte) []byte { +func Nop(token []byte) []byte { return token } -func trimLeadingWhitespacesFunc(data []byte) []byte { +func Leading(data []byte) []byte { // TrimLeft to strip EOF whitespaces in case of using $ in regex // For some reason newline and carriage return are being moved to beginning of next log token := bytes.TrimLeft(data, "\r\n\t ") @@ -38,11 +43,11 @@ func trimLeadingWhitespacesFunc(data []byte) []byte { return token } -func trimTrailingWhitespacesFunc(data []byte) []byte { +func Trailing(data []byte) []byte { // TrimRight to strip all whitespaces from the end of log return bytes.TrimRight(data, "\r\n\t ") } -func trimWhitespacesFunc(data []byte) []byte { - return trimLeadingWhitespacesFunc(trimTrailingWhitespacesFunc(data)) +func Whitespace(data []byte) []byte { + return Leading(Trailing(data)) } diff --git a/pkg/stanza/trim/trim_test.go b/pkg/stanza/trim/trim_test.go index c603d59f1c29..114a645853ee 100644 --- a/pkg/stanza/trim/trim_test.go +++ b/pkg/stanza/trim/trim_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestWhitespace(t *testing.T) { +func TestTrim(t *testing.T) { // Test all permutations of trimming whitespace testCases := []struct { name string @@ -50,7 +50,10 @@ func TestWhitespace(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - trimFunc := Whitespace(tc.preserveLeading, tc.preserveTrailing) + trimFunc := Config{ + PreserveLeading: tc.preserveLeading, + PreserveTrailing: tc.preserveTrailing, + }.Func() assert.Equal(t, []byte(tc.expect), trimFunc([]byte(tc.input))) // Also test that regardless of configuration, an empty []byte in gives an empty []byte out