Skip to content

Commit

Permalink
[pkg/stanza] Extract Encoding from SplitterConfig (#26511)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Sep 7, 2023
1 parent d722dae commit c1d70f2
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 66 deletions.
27 changes: 27 additions & 0 deletions .chloggen/pkg-stanza-encoding-2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Move tokenize.SplitterConfig.Encoding to fileconsumer.Config.Encoding

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26511]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
27 changes: 15 additions & 12 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
const (
defaultMaxLogSize = 1024 * 1024
defaultMaxConcurrentFiles = 1024
defaultEncoding = "utf-8"
)

var allowFileDeletion = featuregate.GlobalRegistry().MustRegister(
Expand All @@ -51,6 +52,7 @@ func NewConfig() *Config {
IncludeFilePathResolved: false,
PollInterval: 200 * time.Millisecond,
Splitter: tokenize.NewSplitterConfig(),
Encoding: defaultEncoding,
StartAt: "end",
FingerprintSize: fingerprint.DefaultSize,
MaxLogSize: defaultMaxLogSize,
Expand All @@ -74,6 +76,7 @@ type Config struct {
MaxBatches int `mapstructure:"max_batches,omitempty"`
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
Splitter tokenize.SplitterConfig `mapstructure:",squash,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
}

Expand All @@ -88,8 +91,13 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager,
return nil, err
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, err
}

// Ensure that splitter is buildable
factory := splitter.NewMultilineFactory(c.Splitter, int(c.MaxLogSize))
factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize))
if _, err := factory.Build(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,13 +138,13 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
return nil, fmt.Errorf("invalid start_at location '%s'", c.StartAt)
}

enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to find encoding: %w", err)
}

var hCfg *header.Config
if c.Header != nil {
enc, err := decode.LookupEncoding(c.Splitter.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to create encoding: %w", err)
}

hCfg, err = header.NewConfig(c.Header.Pattern, c.Header.MetadataOperators, enc)
if err != nil {
return nil, fmt.Errorf("failed to build header config: %w", err)
Expand All @@ -148,11 +156,6 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
return nil, err
}

enc, err := decode.LookupEncoding(c.Splitter.Encoding)
if err != nil {
return nil, err
}

return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
Expand Down Expand Up @@ -220,7 +223,7 @@ func (c Config) validate() error {
return errors.New("`max_batches` must not be negative")
}

enc, err := decode.LookupEncoding(c.Splitter.Encoding)
enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,15 @@ func TestUnmarshal(t *testing.T) {
Name: "encoding_lower",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.Splitter.Encoding = "utf-16le"
cfg.Encoding = "utf-16le"
return newMockOperatorConfig(cfg)
}(),
},
{
Name: "encoding_upper",
Expect: func() *mockOperatorConfig {
cfg := NewConfig()
cfg.Splitter.Encoding = "UTF-16lE"
cfg.Encoding = "UTF-16lE"
return newMockOperatorConfig(cfg)
}(),
},
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestBuild(t *testing.T) {
{
"InvalidEncoding",
func(f *Config) {
f.Splitter.Encoding = "UTF-3233"
f.Encoding = "UTF-3233"
},
require.Error,
nil,
Expand Down Expand Up @@ -681,7 +681,7 @@ func TestBuildWithSplitFunc(t *testing.T) {
{
"InvalidEncoding",
func(f *Config) {
f.Splitter.Encoding = "UTF-3233"
f.Encoding = "UTF-3233"
},
require.Error,
nil,
Expand Down
6 changes: 3 additions & 3 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestReadUsingNopEncoding(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.MaxLogSize = 8
cfg.Splitter.Encoding = "nop"
cfg.Encoding = "nop"
operator, emitCalls := buildTestManager(t, cfg)

// Create a file, then start
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestNopEncodingDifferentLogSizes(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.MaxLogSize = tc.maxLogSize
cfg.Splitter.Encoding = "nop"
cfg.Encoding = "nop"
operator, emitCalls := buildTestManager(t, cfg)

// Create a file, then start
Expand Down Expand Up @@ -1289,7 +1289,7 @@ func TestEncodings(t *testing.T) {
tempDir := t.TempDir()
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.Splitter.Encoding = tc.encoding
cfg.Encoding = tc.encoding
operator, emitCalls := buildTestManager(t, cfg)

// Populate the file
Expand Down
17 changes: 14 additions & 3 deletions pkg/stanza/fileconsumer/internal/splitter/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,32 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"bufio"

"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
)

type multilineFactory struct {
splitterCfg tokenize.SplitterConfig
encoding encoding.Encoding
maxLogSize int
}

var _ Factory = (*multilineFactory)(nil)

func NewMultilineFactory(splitterCfg tokenize.SplitterConfig, maxLogSize int) Factory {
return &multilineFactory{splitterCfg: splitterCfg, maxLogSize: maxLogSize}
func NewMultilineFactory(
splitterCfg tokenize.SplitterConfig,
encoding encoding.Encoding,
maxLogSize int,
) Factory {
return &multilineFactory{
splitterCfg: splitterCfg,
encoding: encoding,
maxLogSize: maxLogSize,
}
}

// Build builds Multiline Splitter struct
func (f *multilineFactory) Build() (bufio.SplitFunc, error) {
return f.splitterCfg.Build(false, f.maxLogSize)
return f.splitterCfg.Build(f.encoding, false, f.maxLogSize)
}
39 changes: 12 additions & 27 deletions pkg/stanza/fileconsumer/internal/splitter/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,59 +7,44 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
)

func TestMultilineBuild(t *testing.T) {
type args struct {
maxLogSize int
}
tests := []struct {
name string
splitterConfig tokenize.SplitterConfig
args args
encoding encoding.Encoding
maxLogSize int
wantErr bool
}{
{
name: "default configuration",
splitterConfig: tokenize.NewSplitterConfig(),
args: args{
maxLogSize: 1024,
},
wantErr: false,
},
{
name: "eoncoding error",
splitterConfig: tokenize.SplitterConfig{
Encoding: "error",
Flusher: tokenize.NewFlusherConfig(),
Multiline: tokenize.NewMultilineConfig(),
},
args: args{
maxLogSize: 1024,
},
wantErr: true,
encoding: unicode.UTF8,
maxLogSize: 1024,
wantErr: false,
},
{
name: "Multiline error",
splitterConfig: tokenize.SplitterConfig{
Encoding: "utf-8",
Flusher: tokenize.NewFlusherConfig(),
Flusher: tokenize.NewFlusherConfig(),
Multiline: tokenize.MultilineConfig{
LineStartPattern: "START",
LineEndPattern: "END",
},
},
args: args{
maxLogSize: 1024,
},
wantErr: true,
encoding: unicode.UTF8,
maxLogSize: 1024,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewMultilineFactory(tt.splitterConfig, tt.args.maxLogSize)
factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize)
got, err := factory.Build()
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
8 changes: 4 additions & 4 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand Down Expand Up @@ -177,10 +178,9 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) {
mlc := tokenize.NewMultilineConfig()
mlc.LineStartPattern = `\d+-\d+-\d+`
f.splitterFactory = splitter.NewMultilineFactory(tokenize.SplitterConfig{
Encoding: "utf-8",
Flusher: tokenize.NewFlusherConfig(),
Multiline: mlc,
}, 15)
}, unicode.UTF8, 15)
f.readerConfig.maxLogSize = 15

temp := openTemp(t, t.TempDir())
Expand Down Expand Up @@ -239,7 +239,7 @@ func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) {

func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.SplitterConfig) (*readerFactory, chan *emitParams) {
emitChan := make(chan *emitParams, 100)
enc, err := decode.LookupEncoding(splitterConfig.Encoding)
enc, err := decode.LookupEncoding(defaultEncoding)
require.NoError(t, err)
return &readerFactory{
SugaredLogger: testutil.Logger(t),
Expand All @@ -249,7 +249,7 @@ func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.Splitte
emit: testEmitFunc(emitChan),
},
fromBeginning: true,
splitterFactory: splitter.NewMultilineFactory(splitterConfig, defaultMaxLogSize),
splitterFactory: splitter.NewMultilineFactory(splitterConfig, enc, defaultMaxLogSize),
encoding: enc,
}, emitChan
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) {
var toBody toBodyFunc = func(token []byte) interface{} {
return string(token)
}
if decode.IsNop(c.Config.Splitter.Encoding) {
if decode.IsNop(c.Config.Encoding) {
toBody = func(token []byte) interface{} {
copied := make([]byte, len(token))
copy(copied, token)
Expand Down
6 changes: 3 additions & 3 deletions pkg/stanza/operator/input/file/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func TestUnmarshal(t *testing.T) {
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.Splitter.Encoding = "utf-16le"
cfg.Encoding = "utf-16le"
return cfg
}(),
},
Expand All @@ -422,7 +422,7 @@ func TestUnmarshal(t *testing.T) {
ExpectErr: false,
Expect: func() *Config {
cfg := NewConfig()
cfg.Splitter.Encoding = "UTF-16lE"
cfg.Encoding = "UTF-16lE"
return cfg
}(),
},
Expand Down Expand Up @@ -510,7 +510,7 @@ func TestBuild(t *testing.T) {
{
"InvalidEncoding",
func(f *Config) {
f.Splitter.Encoding = "UTF-3233"
f.Encoding = "UTF-3233"
},
require.Error,
nil,
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/operator/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func TestReadUsingNopEncoding(t *testing.T) {
t.Run(tc.testName, func(t *testing.T) {
operator, logReceived, tempDir := newTestFileOperator(t, func(cfg *Config) {
cfg.MaxLogSize = 8
cfg.Splitter.Encoding = "nop"
cfg.Encoding = "nop"
})
// Create a file, then start
temp := openTemp(t, tempDir)
Expand Down
11 changes: 3 additions & 8 deletions pkg/stanza/tokenize/splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package tokenize // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"bufio"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

// SplitterConfig consolidates MultilineConfig and FlusherConfig
type SplitterConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
Flusher FlusherConfig `mapstructure:",squash,omitempty"`
Multiline MultilineConfig `mapstructure:"multiline,omitempty"`
PreserveLeading bool `mapstructure:"preserve_leading_whitespaces,omitempty"`
Expand All @@ -22,18 +22,13 @@ type SplitterConfig struct {
// NewSplitterConfig returns default SplitterConfig
func NewSplitterConfig() SplitterConfig {
return SplitterConfig{
Encoding: "utf-8",
Multiline: NewMultilineConfig(),
Flusher: FlusherConfig{Period: DefaultFlushPeriod},
}
}

// Build builds bufio.SplitFunc based on the config
func (c *SplitterConfig) Build(flushAtEOF bool, maxLogSize int) (bufio.SplitFunc, error) {
enc, err := decode.LookupEncoding(c.Encoding)
if err != nil {
return nil, err
}
func (c *SplitterConfig) Build(enc encoding.Encoding, flushAtEOF bool, maxLogSize int) (bufio.SplitFunc, error) {
trimFunc := trim.Whitespace(c.PreserveLeading, c.PreserveTrailing)
splitFunc, err := c.Multiline.Build(enc, flushAtEOF, maxLogSize, trimFunc)
if err != nil {
Expand Down
Loading

0 comments on commit c1d70f2

Please sign in to comment.