Skip to content

Commit

Permalink
[chore][pkg/stanza] Move reader and reader factory into internal pack…
Browse files Browse the repository at this point in the history
…age (#27416)

Follows #27396 

This PR creates an internal `reader` package and moves directly related
structs into it.

I intend to clean up this codebase substantially from here. This is just
a first step that creates a crude boundary between concerns. There are
many exported fields which can later be abstracted, but currently the
codebase has many direct interactions. Tests remain in the
`fileconsumer` package for now but will be migrated once there are
cleaner interfaces to test.
  • Loading branch information
djaglowski authored Oct 4, 2023
1 parent e94a0f7 commit 5dba9aa
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 173 deletions.
29 changes: 15 additions & 14 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
Expand Down Expand Up @@ -163,29 +164,29 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact
return &Manager{
SugaredLogger: logger.With("component", "fileconsumer"),
cancel: func() {},
readerFactory: readerFactory{
readerFactory: reader.Factory{
SugaredLogger: logger.With("component", "fileconsumer"),
readerConfig: &readerConfig{
fingerprintSize: int(c.FingerprintSize),
maxLogSize: int(c.MaxLogSize),
emit: emit,
includeFileName: c.IncludeFileName,
includeFilePath: c.IncludeFilePath,
includeFileNameResolved: c.IncludeFileNameResolved,
includeFilePathResolved: c.IncludeFilePathResolved,
Config: &reader.Config{
FingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
Emit: emit,
IncludeFileName: c.IncludeFileName,
IncludeFilePath: c.IncludeFilePath,
IncludeFileNameResolved: c.IncludeFileNameResolved,
IncludeFilePathResolved: c.IncludeFilePathResolved,
},
fromBeginning: startAtBeginning,
splitterFactory: factory,
encoding: enc,
headerConfig: hCfg,
FromBeginning: startAtBeginning,
SplitterFactory: factory,
Encoding: enc,
HeaderConfig: hCfg,
},
fileMatcher: fileMatcher,
roller: newRoller(),
pollInterval: c.PollInterval,
maxBatchFiles: c.MaxConcurrentFiles / 2,
maxBatches: c.MaxBatches,
deleteAfterRead: c.DeleteAfterRead,
knownFiles: make([]*reader, 0, 10),
knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles),
seenPaths: make(map[string]struct{}, 100),
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ func TestBuildWithHeader(t *testing.T) {
},
require.NoError,
func(t *testing.T, m *Manager) {
require.NotNil(t, m.readerFactory.headerConfig.SplitFunc)
require.NotNil(t, m.readerFactory.HeaderConfig.SplitFunc)
},
},
}
Expand Down
67 changes: 26 additions & 41 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
)
Expand All @@ -24,7 +25,7 @@ type Manager struct {
wg sync.WaitGroup
cancel context.CancelFunc

readerFactory readerFactory
readerFactory reader.Factory
fileMatcher *matcher.Matcher
roller roller
persister operator.Persister
Expand All @@ -34,7 +35,7 @@ type Manager struct {
maxBatchFiles int
deleteAfterRead bool

knownFiles []*reader
knownFiles []*reader.Reader
seenPaths map[string]struct{}

currentFps []*fingerprint.Fingerprint
Expand Down Expand Up @@ -68,7 +69,6 @@ func (m *Manager) Stop() error {
for _, reader := range m.knownFiles {
reader.Close()
}
m.knownFiles = nil
m.cancel = nil
return nil
}
Expand Down Expand Up @@ -96,12 +96,6 @@ func (m *Manager) startPoller(ctx context.Context) {

// poll checks all the watched paths for new entries
func (m *Manager) poll(ctx context.Context) {
// Increment the generation on all known readers
// This is done here because the next generation is about to start
for i := 0; i < len(m.knownFiles); i++ {
m.knownFiles[i].generation++
}

// Used to keep track of the number of batches processed in this poll cycle
batchesProcessed := 0

Expand Down Expand Up @@ -129,7 +123,7 @@ func (m *Manager) poll(ctx context.Context) {

func (m *Manager) consume(ctx context.Context, paths []string) {
m.Debug("Consuming files")
readers := make([]*reader, 0, len(paths))
readers := make([]*reader.Reader, 0, len(paths))
for _, path := range paths {
r := m.makeReader(path)
if r != nil {
Expand All @@ -145,11 +139,11 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
var wg sync.WaitGroup
for _, r := range readers {
wg.Add(1)
go func(r *reader) {
go func(r *reader.Reader) {
defer wg.Done()
r.ReadToEnd(ctx)
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
if m.deleteAfterRead && r.eof {
if m.deleteAfterRead && r.EOF {
r.Delete()
}
}(r)
Expand All @@ -158,9 +152,9 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

// Save off any files that were not fully read
if m.deleteAfterRead {
unfinished := make([]*reader, 0, len(readers))
unfinished := make([]*reader.Reader, 0, len(readers))
for _, r := range readers {
if !r.eof {
if !r.EOF {
unfinished = append(unfinished, r)
}
}
Expand All @@ -173,7 +167,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
}

// Any new files that appear should be consumed entirely
m.readerFactory.fromBeginning = true
m.readerFactory.FromBeginning = true

m.roller.roll(ctx, readers)
m.saveCurrent(readers)
Expand All @@ -183,7 +177,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) {

func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) {
if _, ok := m.seenPaths[path]; !ok {
if m.readerFactory.fromBeginning {
if m.readerFactory.FromBeginning {
m.Infow("Started watching file", "path", path)
} else {
m.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path)
Expand All @@ -196,7 +190,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi
return nil, nil
}

fp, err := m.readerFactory.newFingerprint(file)
fp, err := m.readerFactory.NewFingerprint(file)
if err != nil {
if err = file.Close(); err != nil {
m.Debugw("problem closing file", zap.Error(err))
Expand Down Expand Up @@ -226,7 +220,7 @@ func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool {
// makeReader take a file path, then creates reader,
// discarding any that have a duplicate fingerprint to other files that have already
// been read this polling interval
func (m *Manager) makeReader(path string) *reader {
func (m *Manager) makeReader(path string) *reader.Reader {
// Open the files first to minimize the time between listing and opening
fp, file := m.makeFingerprint(path)
if fp == nil {
Expand Down Expand Up @@ -258,33 +252,26 @@ func (m *Manager) clearCurrentFingerprints() {
// saveCurrent adds the readers from this polling interval to this list of
// known files, then increments the generation of all tracked old readers
// before clearing out readers that have existed for 3 generations.
func (m *Manager) saveCurrent(readers []*reader) {
// Add readers from the current, completed poll interval to the list of known files
m.knownFiles = append(m.knownFiles, readers...)

// Clear out old readers. They are sorted such that they are oldest first,
// so we can just find the first reader whose generation is less than our
// max, and keep every reader after that
for i := 0; i < len(m.knownFiles); i++ {
reader := m.knownFiles[i]
if reader.generation <= 3 {
m.knownFiles = m.knownFiles[i:]
break
}
func (m *Manager) saveCurrent(readers []*reader.Reader) {
forgetNum := len(m.knownFiles) + len(readers) - cap(m.knownFiles)
if forgetNum > 0 {
m.knownFiles = append(m.knownFiles[forgetNum:], readers...)
return
}
m.knownFiles = append(m.knownFiles, readers...)
}

func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) {
func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) {
// Check if the new path has the same fingerprint as an old path
if oldReader, ok := m.findFingerprintMatch(fp); ok {
return m.readerFactory.copy(oldReader, file)
return m.readerFactory.Copy(oldReader, file)
}

// If we don't match any previously known files, create a new reader from scratch
return m.readerFactory.newReader(file, fp)
return m.readerFactory.NewReader(file, fp)
}

func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader, bool) {
func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Reader, bool) {
// Iterate backwards to match newest first
for i := len(m.knownFiles) - 1; i >= 0; i-- {
oldReader := m.knownFiles[i]
Expand Down Expand Up @@ -313,7 +300,7 @@ func (m *Manager) syncLastPollFiles(ctx context.Context) {

// Encode each known file
for _, fileReader := range m.knownFiles {
if err := enc.Encode(fileReader.readerMetadata); err != nil {
if err := enc.Encode(fileReader.Metadata); err != nil {
m.Errorw("Failed to encode known files", zap.Error(err))
}
}
Expand All @@ -331,7 +318,6 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
}

if encoded == nil {
m.knownFiles = make([]*reader, 0, 10)
return nil
}

Expand All @@ -345,13 +331,12 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {

if knownFileCount > 0 {
m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.")
m.readerFactory.fromBeginning = true
m.readerFactory.FromBeginning = true
}

// Decode each of the known files
m.knownFiles = make([]*reader, 0, knownFileCount)
for i := 0; i < knownFileCount; i++ {
rmd := &readerMetadata{}
rmd := new(reader.Metadata)
if err = dec.Decode(rmd); err != nil {
return err
}
Expand All @@ -371,7 +356,7 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error {
}

// This reader won't be used for anything other than metadata reference, so just wrap the metadata
m.knownFiles = append(m.knownFiles, &reader{readerMetadata: rmd})
m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd})
}

return nil
Expand Down
16 changes: 8 additions & 8 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,10 +1109,10 @@ func TestFileReader_FingerprintUpdated(t *testing.T) {

temp := openTemp(t, tempDir)
tempCopy := openFile(t, temp.Name())
fp, err := operator.readerFactory.newFingerprint(temp)
fp, err := operator.readerFactory.NewFingerprint(temp)
require.NoError(t, err)

reader, err := operator.readerFactory.newReader(tempCopy, fp)
reader, err := operator.readerFactory.NewReader(tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -1151,11 +1151,11 @@ func TestFingerprintGrowsAndStops(t *testing.T) {

temp := openTemp(t, tempDir)
tempCopy := openFile(t, temp.Name())
fp, err := operator.readerFactory.newFingerprint(temp)
fp, err := operator.readerFactory.NewFingerprint(temp)
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.readerFactory.newReader(tempCopy, fp)
reader, err := operator.readerFactory.NewReader(tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -1214,11 +1214,11 @@ func TestFingerprintChangeSize(t *testing.T) {

temp := openTemp(t, tempDir)
tempCopy := openFile(t, temp.Name())
fp, err := operator.readerFactory.newFingerprint(temp)
fp, err := operator.readerFactory.NewFingerprint(temp)
require.NoError(t, err)
require.Equal(t, []byte(""), fp.FirstBytes)

reader, err := operator.readerFactory.newReader(tempCopy, fp)
reader, err := operator.readerFactory.NewReader(tempCopy, fp)
require.NoError(t, err)
defer reader.Close()

Expand Down Expand Up @@ -1247,15 +1247,15 @@ func TestFingerprintChangeSize(t *testing.T) {
// Change fingerprint and try to read file again
// We do not expect fingerprint change
// We test both increasing and decreasing fingerprint size
reader.readerConfig.fingerprintSize = maxFP * (lineLen / 3)
reader.Config.FingerprintSize = maxFP * (lineLen / 3)
line := string(tokenWithLength(lineLen-1)) + "\n"
fileContent = append(fileContent, []byte(line)...)

writeString(t, temp, line)
reader.ReadToEnd(context.Background())
require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes)

reader.readerConfig.fingerprintSize = maxFP / 2
reader.Config.FingerprintSize = maxFP / 2
line = string(tokenWithLength(lineLen-1)) + "\n"
fileContent = append(fileContent, []byte(line)...)

Expand Down
Loading

0 comments on commit 5dba9aa

Please sign in to comment.