Skip to content

Commit

Permalink
[pkg/stanza] Extract flush package from tokenize package (#26517)
Browse files Browse the repository at this point in the history
  • Loading branch information
djaglowski authored Sep 8, 2023
1 parent cfba665 commit 75e29ce
Show file tree
Hide file tree
Showing 12 changed files with 191 additions and 256 deletions.
30 changes: 30 additions & 0 deletions .chloggen/pkg-stanza-flush.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/stanza

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Remove Flusher from tokenize.SplitterConfig

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26517]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Removes the following in favor of flush.WithPeriod
- tokenize.DefaultFlushPeriod
- tokenize.FlusherConfig
- tokenize.NewFlusherConfig

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
6 changes: 4 additions & 2 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
defaultMaxLogSize = 1024 * 1024
defaultMaxConcurrentFiles = 1024
defaultEncoding = "utf-8"
defaultFlushPeriod = 500 * time.Millisecond
)

var allowFileDeletion = featuregate.GlobalRegistry().MustRegister(
Expand Down Expand Up @@ -79,6 +80,7 @@ type Config struct {
Splitter tokenize.SplitterConfig `mapstructure:",squash,omitempty"`
TrimConfig trim.Config `mapstructure:",squash,omitempty"`
Encoding string `mapstructure:"encoding,omitempty"`
FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"`
Header *HeaderConfig `mapstructure:"header,omitempty"`
}

Expand All @@ -99,7 +101,7 @@ func (c Config) Build(logger *zap.SugaredLogger, emit emit.Callback) (*Manager,
}

// Ensure that splitter is buildable
factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize), c.TrimConfig.Func())
factory := splitter.NewMultilineFactory(c.Splitter, enc, int(c.MaxLogSize), c.TrimConfig.Func(), c.FlushPeriod)
if _, err := factory.Build(); err != nil {
return nil, err
}
Expand All @@ -118,7 +120,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback
}

// Ensure that splitter is buildable
factory := splitter.NewCustomFactory(c.Splitter.Flusher, splitFunc)
factory := splitter.NewCustomFactory(splitFunc, c.FlushPeriod)
if _, err := factory.Build(); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func TestNoNewline(t *testing.T) {
cfg := NewConfig().includeDir(tempDir)
cfg.StartAt = "beginning"
cfg.Splitter = tokenize.NewSplitterConfig()
cfg.Splitter.Flusher.Period = time.Nanosecond
cfg.FlushPeriod = time.Nanosecond
operator, emitCalls := buildTestManager(t, cfg)

temp := openTemp(t, tempDir)
Expand Down
15 changes: 8 additions & 7 deletions pkg/stanza/fileconsumer/internal/splitter/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bufio"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

type customFactory struct {
flusherCfg tokenize.FlusherConfig
splitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
flushPeriod time.Duration
}

var _ Factory = (*customFactory)(nil)

func NewCustomFactory(flusherCfg tokenize.FlusherConfig, splitFunc bufio.SplitFunc) Factory {
func NewCustomFactory(splitFunc bufio.SplitFunc, flushPeriod time.Duration) Factory {
return &customFactory{
flusherCfg: flusherCfg,
splitFunc: splitFunc,
splitFunc: splitFunc,
flushPeriod: flushPeriod,
}
}

// Build builds Multiline Splitter struct
func (f *customFactory) Build() (bufio.SplitFunc, error) {
return f.flusherCfg.Wrap(f.splitFunc, trim.Nop), nil
return flush.WithPeriod(f.splitFunc, trim.Nop, f.flushPeriod), nil
}
26 changes: 10 additions & 16 deletions pkg/stanza/fileconsumer/internal/splitter/custom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,30 @@ package splitter
import (
"bufio"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
)

func TestCustomFactory(t *testing.T) {
type fields struct {
Flusher tokenize.FlusherConfig
Splitter bufio.SplitFunc
}
tests := []struct {
name string
fields fields
wantErr bool
name string
splitter bufio.SplitFunc
flushPeriod time.Duration
wantErr bool
}{
{
name: "default configuration",
fields: fields{
Flusher: tokenize.NewFlusherConfig(),
Splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) {
return len(data), data, nil
},
splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) {
return len(data), data, nil
},
wantErr: false,
flushPeriod: 100 * time.Millisecond,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewCustomFactory(tt.fields.Flusher, tt.fields.Splitter)
factory := NewCustomFactory(tt.splitter, tt.flushPeriod)
got, err := factory.Build()
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
11 changes: 10 additions & 1 deletion pkg/stanza/fileconsumer/internal/splitter/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package splitter // import "github.com/open-telemetry/opentelemetry-collector-co

import (
"bufio"
"time"

"golang.org/x/text/encoding"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)
Expand All @@ -17,6 +19,7 @@ type multilineFactory struct {
encoding encoding.Encoding
maxLogSize int
trimFunc trim.Func
flushPeriod time.Duration
}

var _ Factory = (*multilineFactory)(nil)
Expand All @@ -26,16 +29,22 @@ func NewMultilineFactory(
encoding encoding.Encoding,
maxLogSize int,
trimFunc trim.Func,
flushPeriod time.Duration,
) Factory {
return &multilineFactory{
splitterCfg: splitterCfg,
encoding: encoding,
maxLogSize: maxLogSize,
trimFunc: trimFunc,
flushPeriod: flushPeriod,
}
}

// Build builds Multiline Splitter struct
func (f *multilineFactory) Build() (bufio.SplitFunc, error) {
return f.splitterCfg.Build(f.encoding, false, f.maxLogSize, f.trimFunc)
splitFunc, err := f.splitterCfg.Build(f.encoding, false, f.maxLogSize, f.trimFunc)
if err != nil {
return nil, err
}
return flush.WithPeriod(splitFunc, f.trimFunc, f.flushPeriod), nil
}
13 changes: 8 additions & 5 deletions pkg/stanza/fileconsumer/internal/splitter/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package splitter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/text/encoding"
Expand All @@ -20,32 +21,34 @@ func TestMultilineBuild(t *testing.T) {
splitterConfig tokenize.SplitterConfig
encoding encoding.Encoding
maxLogSize int
flushPeriod time.Duration
wantErr bool
}{
{
name: "default configuration",
splitterConfig: tokenize.NewSplitterConfig(),
encoding: unicode.UTF8,
maxLogSize: 1024,
flushPeriod: 100 * time.Millisecond,
wantErr: false,
},
{
name: "Multiline error",
splitterConfig: tokenize.SplitterConfig{
Flusher: tokenize.NewFlusherConfig(),
Multiline: tokenize.MultilineConfig{
LineStartPattern: "START",
LineEndPattern: "END",
},
},
encoding: unicode.UTF8,
maxLogSize: 1024,
wantErr: true,
flushPeriod: 100 * time.Millisecond,
encoding: unicode.UTF8,
maxLogSize: 1024,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize, trim.Nop)
factory := NewMultilineFactory(tt.splitterConfig, tt.encoding, tt.maxLogSize, trim.Nop, tt.flushPeriod)
got, err := factory.Build()
if (err != nil) != tt.wantErr {
t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
35 changes: 10 additions & 25 deletions pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/text/encoding/unicode"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
Expand All @@ -25,9 +24,7 @@ import (

func TestPersistFlusher(t *testing.T) {
flushPeriod := 100 * time.Millisecond
sCfg := tokenize.NewSplitterConfig()
sCfg.Flusher.Period = flushPeriod
f, emitChan := testReaderFactoryWithSplitter(t, sCfg)
f, emitChan := testReaderFactory(t, tokenize.NewSplitterConfig(), defaultMaxLogSize, flushPeriod)

temp := openTemp(t, t.TempDir())
fp, err := f.newFingerprint(temp)
Expand Down Expand Up @@ -113,7 +110,7 @@ func TestTokenization(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
f, emitChan := testReaderFactory(t)
f, emitChan := testReaderFactory(t, tokenize.NewSplitterConfig(), defaultMaxLogSize, defaultFlushPeriod)

temp := openTemp(t, t.TempDir())
_, err := temp.Write(tc.fileContent)
Expand Down Expand Up @@ -143,8 +140,7 @@ func TestTokenizationTooLong(t *testing.T) {
[]byte("aaa"),
}

f, emitChan := testReaderFactory(t)
f.readerConfig.maxLogSize = 10
f, emitChan := testReaderFactory(t, tokenize.NewSplitterConfig(), 10, defaultFlushPeriod)

temp := openTemp(t, t.TempDir())
_, err := temp.Write(fileContent)
Expand Down Expand Up @@ -174,15 +170,9 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) {
[]byte("2023-01-01 2"),
}

f, emitChan := testReaderFactory(t)

mlc := tokenize.NewMultilineConfig()
mlc.LineStartPattern = `\d+-\d+-\d+`
f.splitterFactory = splitter.NewMultilineFactory(tokenize.SplitterConfig{
Flusher: tokenize.NewFlusherConfig(),
Multiline: mlc,
}, unicode.UTF8, 15, trim.Whitespace)
f.readerConfig.maxLogSize = 15
sCfg := tokenize.NewSplitterConfig()
sCfg.Multiline.LineStartPattern = `\d+-\d+-\d+`
f, emitChan := testReaderFactory(t, sCfg, 15, defaultFlushPeriod)

temp := openTemp(t, t.TempDir())
_, err := temp.Write(fileContent)
Expand All @@ -205,8 +195,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) {
func TestHeaderFingerprintIncluded(t *testing.T) {
fileContent := []byte("#header-line\naaa\n")

f, _ := testReaderFactory(t)
f.readerConfig.maxLogSize = 10
f, _ := testReaderFactory(t, tokenize.NewSplitterConfig(), 10, defaultFlushPeriod)

regexConf := regex.NewConfig()
regexConf.Regex = "^#(?P<header>.*)"
Expand Down Expand Up @@ -234,11 +223,7 @@ func TestHeaderFingerprintIncluded(t *testing.T) {
require.Equal(t, []byte("#header-line\naaa\n"), r.Fingerprint.FirstBytes)
}

func testReaderFactory(t *testing.T) (*readerFactory, chan *emitParams) {
return testReaderFactoryWithSplitter(t, tokenize.NewSplitterConfig())
}

func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.SplitterConfig) (*readerFactory, chan *emitParams) {
func testReaderFactory(t *testing.T, sCfg tokenize.SplitterConfig, maxLogSize int, flushPeriod time.Duration) (*readerFactory, chan *emitParams) {
emitChan := make(chan *emitParams, 100)
enc, err := decode.LookupEncoding(defaultEncoding)
trimFunc := trim.Whitespace
Expand All @@ -247,11 +232,11 @@ func testReaderFactoryWithSplitter(t *testing.T, splitterConfig tokenize.Splitte
SugaredLogger: testutil.Logger(t),
readerConfig: &readerConfig{
fingerprintSize: fingerprint.DefaultSize,
maxLogSize: defaultMaxLogSize,
maxLogSize: maxLogSize,
emit: testEmitFunc(emitChan),
},
fromBeginning: true,
splitterFactory: splitter.NewMultilineFactory(splitterConfig, enc, defaultMaxLogSize, trimFunc),
splitterFactory: splitter.NewMultilineFactory(sCfg, enc, maxLogSize, trimFunc, flushPeriod),
encoding: enc,
}, emitChan
}
Expand Down
24 changes: 6 additions & 18 deletions pkg/stanza/tokenize/flusher.go → pkg/stanza/flush/flush.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package tokenize // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/tokenize"
package flush // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/flush"

import (
"bufio"
Expand All @@ -10,26 +10,14 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim"
)

const DefaultFlushPeriod = 500 * time.Millisecond

// FlusherConfig is a configuration of Flusher helper
type FlusherConfig struct {
Period time.Duration `mapstructure:"force_flush_period"`
}

// NewFlusherConfig creates a default Flusher config
func NewFlusherConfig() FlusherConfig {
return FlusherConfig{
// Empty or `0s` means that we will never force flush
Period: DefaultFlushPeriod,
}
}

// Wrap a bufio.SplitFunc with a flusher
func (c *FlusherConfig) Wrap(splitFunc bufio.SplitFunc, trimFunc trim.Func) bufio.SplitFunc {
func WithPeriod(splitFunc bufio.SplitFunc, trimFunc trim.Func, period time.Duration) bufio.SplitFunc {
if period <= 0 {
return splitFunc
}
f := &flusher{
lastDataChange: time.Now(),
forcePeriod: c.Period,
forcePeriod: period,
previousDataLength: 0,
}
return f.splitFunc(splitFunc, trimFunc)
Expand Down
Loading

0 comments on commit 75e29ce

Please sign in to comment.