diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index be597be1f563..0823b93ef3e6 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -17,6 +17,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" @@ -163,21 +164,21 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact return &Manager{ SugaredLogger: logger.With("component", "fileconsumer"), cancel: func() {}, - readerFactory: readerFactory{ + readerFactory: reader.Factory{ SugaredLogger: logger.With("component", "fileconsumer"), - readerConfig: &readerConfig{ - fingerprintSize: int(c.FingerprintSize), - maxLogSize: int(c.MaxLogSize), - emit: emit, - includeFileName: c.IncludeFileName, - includeFilePath: c.IncludeFilePath, - includeFileNameResolved: c.IncludeFileNameResolved, - includeFilePathResolved: c.IncludeFilePathResolved, + Config: &reader.Config{ + FingerprintSize: int(c.FingerprintSize), + MaxLogSize: int(c.MaxLogSize), + Emit: emit, + IncludeFileName: c.IncludeFileName, + IncludeFilePath: c.IncludeFilePath, + IncludeFileNameResolved: c.IncludeFileNameResolved, + IncludeFilePathResolved: c.IncludeFilePathResolved, }, - fromBeginning: startAtBeginning, - splitterFactory: factory, - encoding: enc, - headerConfig: hCfg, + FromBeginning: startAtBeginning, + SplitterFactory: factory, + Encoding: enc, + HeaderConfig: hCfg, }, fileMatcher: fileMatcher, roller: newRoller(), @@ -185,7 +186,7 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, fact maxBatchFiles: c.MaxConcurrentFiles / 2, maxBatches: c.MaxBatches, deleteAfterRead: c.DeleteAfterRead, - knownFiles: make([]*reader, 0, 10), + knownFiles: make([]*reader.Reader, 0, 10*c.MaxConcurrentFiles), seenPaths: make(map[string]struct{}, 100), }, nil } diff --git a/pkg/stanza/fileconsumer/config_test.go b/pkg/stanza/fileconsumer/config_test.go index b613955e9a46..9d83118aa4bd 100644 --- a/pkg/stanza/fileconsumer/config_test.go +++ b/pkg/stanza/fileconsumer/config_test.go @@ -767,7 +767,7 @@ func TestBuildWithHeader(t *testing.T) { }, require.NoError, func(t *testing.T, m *Manager) { - require.NotNil(t, m.readerFactory.headerConfig.SplitFunc) + require.NotNil(t, m.readerFactory.HeaderConfig.SplitFunc) }, }, } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 692fbde4212d..8c218f34139d 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -15,6 +15,7 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" ) @@ -24,7 +25,7 @@ type Manager struct { wg sync.WaitGroup cancel context.CancelFunc - readerFactory readerFactory + readerFactory reader.Factory fileMatcher *matcher.Matcher roller roller persister operator.Persister @@ -34,7 +35,7 @@ type Manager struct { maxBatchFiles int deleteAfterRead bool - knownFiles []*reader + knownFiles []*reader.Reader seenPaths map[string]struct{} currentFps []*fingerprint.Fingerprint @@ -68,7 +69,6 @@ func (m *Manager) Stop() error { for _, reader := range m.knownFiles { reader.Close() } - m.knownFiles = nil m.cancel = nil return nil } @@ -96,12 +96,6 @@ func (m *Manager) startPoller(ctx context.Context) { // poll checks all the watched paths for new entries func (m *Manager) poll(ctx context.Context) { - // Increment the generation on all known readers - // This is done here because the next generation is about to start - for i := 0; i < len(m.knownFiles); i++ { - m.knownFiles[i].generation++ - } - // Used to keep track of the number of batches processed in this poll cycle batchesProcessed := 0 @@ -129,7 +123,7 @@ func (m *Manager) poll(ctx context.Context) { func (m *Manager) consume(ctx context.Context, paths []string) { m.Debug("Consuming files") - readers := make([]*reader, 0, len(paths)) + readers := make([]*reader.Reader, 0, len(paths)) for _, path := range paths { r := m.makeReader(path) if r != nil { @@ -145,11 +139,11 @@ func (m *Manager) consume(ctx context.Context, paths []string) { var wg sync.WaitGroup for _, r := range readers { wg.Add(1) - go func(r *reader) { + go func(r *reader.Reader) { defer wg.Done() r.ReadToEnd(ctx) // Delete a file if deleteAfterRead is enabled and we reached the end of the file - if m.deleteAfterRead && r.eof { + if m.deleteAfterRead && r.EOF { r.Delete() } }(r) @@ -158,9 +152,9 @@ func (m *Manager) consume(ctx context.Context, paths []string) { // Save off any files that were not fully read if m.deleteAfterRead { - unfinished := make([]*reader, 0, len(readers)) + unfinished := make([]*reader.Reader, 0, len(readers)) for _, r := range readers { - if !r.eof { + if !r.EOF { unfinished = append(unfinished, r) } } @@ -173,7 +167,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { } // Any new files that appear should be consumed entirely - m.readerFactory.fromBeginning = true + m.readerFactory.FromBeginning = true m.roller.roll(ctx, readers) m.saveCurrent(readers) @@ -183,7 +177,7 @@ func (m *Manager) consume(ctx context.Context, paths []string) { func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.File) { if _, ok := m.seenPaths[path]; !ok { - if m.readerFactory.fromBeginning { + if m.readerFactory.FromBeginning { m.Infow("Started watching file", "path", path) } else { m.Infow("Started watching file from end. To read preexisting logs, configure the argument 'start_at' to 'beginning'", "path", path) @@ -196,7 +190,7 @@ func (m *Manager) makeFingerprint(path string) (*fingerprint.Fingerprint, *os.Fi return nil, nil } - fp, err := m.readerFactory.newFingerprint(file) + fp, err := m.readerFactory.NewFingerprint(file) if err != nil { if err = file.Close(); err != nil { m.Debugw("problem closing file", zap.Error(err)) @@ -226,7 +220,7 @@ func (m *Manager) checkDuplicates(fp *fingerprint.Fingerprint) bool { // makeReader take a file path, then creates reader, // discarding any that have a duplicate fingerprint to other files that have already // been read this polling interval -func (m *Manager) makeReader(path string) *reader { +func (m *Manager) makeReader(path string) *reader.Reader { // Open the files first to minimize the time between listing and opening fp, file := m.makeFingerprint(path) if fp == nil { @@ -258,33 +252,26 @@ func (m *Manager) clearCurrentFingerprints() { // saveCurrent adds the readers from this polling interval to this list of // known files, then increments the generation of all tracked old readers // before clearing out readers that have existed for 3 generations. -func (m *Manager) saveCurrent(readers []*reader) { - // Add readers from the current, completed poll interval to the list of known files - m.knownFiles = append(m.knownFiles, readers...) - - // Clear out old readers. They are sorted such that they are oldest first, - // so we can just find the first reader whose generation is less than our - // max, and keep every reader after that - for i := 0; i < len(m.knownFiles); i++ { - reader := m.knownFiles[i] - if reader.generation <= 3 { - m.knownFiles = m.knownFiles[i:] - break - } +func (m *Manager) saveCurrent(readers []*reader.Reader) { + forgetNum := len(m.knownFiles) + len(readers) - cap(m.knownFiles) + if forgetNum > 0 { + m.knownFiles = append(m.knownFiles[forgetNum:], readers...) + return } + m.knownFiles = append(m.knownFiles, readers...) } -func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) { +func (m *Manager) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader.Reader, error) { // Check if the new path has the same fingerprint as an old path if oldReader, ok := m.findFingerprintMatch(fp); ok { - return m.readerFactory.copy(oldReader, file) + return m.readerFactory.Copy(oldReader, file) } // If we don't match any previously known files, create a new reader from scratch - return m.readerFactory.newReader(file, fp) + return m.readerFactory.NewReader(file, fp) } -func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader, bool) { +func (m *Manager) findFingerprintMatch(fp *fingerprint.Fingerprint) (*reader.Reader, bool) { // Iterate backwards to match newest first for i := len(m.knownFiles) - 1; i >= 0; i-- { oldReader := m.knownFiles[i] @@ -313,7 +300,7 @@ func (m *Manager) syncLastPollFiles(ctx context.Context) { // Encode each known file for _, fileReader := range m.knownFiles { - if err := enc.Encode(fileReader.readerMetadata); err != nil { + if err := enc.Encode(fileReader.Metadata); err != nil { m.Errorw("Failed to encode known files", zap.Error(err)) } } @@ -331,7 +318,6 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error { } if encoded == nil { - m.knownFiles = make([]*reader, 0, 10) return nil } @@ -345,13 +331,12 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error { if knownFileCount > 0 { m.Infow("Resuming from previously known offset(s). 'start_at' setting is not applicable.") - m.readerFactory.fromBeginning = true + m.readerFactory.FromBeginning = true } // Decode each of the known files - m.knownFiles = make([]*reader, 0, knownFileCount) for i := 0; i < knownFileCount; i++ { - rmd := &readerMetadata{} + rmd := new(reader.Metadata) if err = dec.Decode(rmd); err != nil { return err } @@ -371,7 +356,7 @@ func (m *Manager) loadLastPollFiles(ctx context.Context) error { } // This reader won't be used for anything other than metadata reference, so just wrap the metadata - m.knownFiles = append(m.knownFiles, &reader{readerMetadata: rmd}) + m.knownFiles = append(m.knownFiles, &reader.Reader{Metadata: rmd}) } return nil diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index ff78d6bc2103..c663923b0e76 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -1109,10 +1109,10 @@ func TestFileReader_FingerprintUpdated(t *testing.T) { temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) - fp, err := operator.readerFactory.newFingerprint(temp) + fp, err := operator.readerFactory.NewFingerprint(temp) require.NoError(t, err) - reader, err := operator.readerFactory.newReader(tempCopy, fp) + reader, err := operator.readerFactory.NewReader(tempCopy, fp) require.NoError(t, err) defer reader.Close() @@ -1151,11 +1151,11 @@ func TestFingerprintGrowsAndStops(t *testing.T) { temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) - fp, err := operator.readerFactory.newFingerprint(temp) + fp, err := operator.readerFactory.NewFingerprint(temp) require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) - reader, err := operator.readerFactory.newReader(tempCopy, fp) + reader, err := operator.readerFactory.NewReader(tempCopy, fp) require.NoError(t, err) defer reader.Close() @@ -1214,11 +1214,11 @@ func TestFingerprintChangeSize(t *testing.T) { temp := openTemp(t, tempDir) tempCopy := openFile(t, temp.Name()) - fp, err := operator.readerFactory.newFingerprint(temp) + fp, err := operator.readerFactory.NewFingerprint(temp) require.NoError(t, err) require.Equal(t, []byte(""), fp.FirstBytes) - reader, err := operator.readerFactory.newReader(tempCopy, fp) + reader, err := operator.readerFactory.NewReader(tempCopy, fp) require.NoError(t, err) defer reader.Close() @@ -1247,7 +1247,7 @@ func TestFingerprintChangeSize(t *testing.T) { // Change fingerprint and try to read file again // We do not expect fingerprint change // We test both increasing and decreasing fingerprint size - reader.readerConfig.fingerprintSize = maxFP * (lineLen / 3) + reader.Config.FingerprintSize = maxFP * (lineLen / 3) line := string(tokenWithLength(lineLen-1)) + "\n" fileContent = append(fileContent, []byte(line)...) @@ -1255,7 +1255,7 @@ func TestFingerprintChangeSize(t *testing.T) { reader.ReadToEnd(context.Background()) require.Equal(t, fileContent[:expectedFP], reader.Fingerprint.FirstBytes) - reader.readerConfig.fingerprintSize = maxFP / 2 + reader.Config.FingerprintSize = maxFP / 2 line = string(tokenWithLength(lineLen-1)) + "\n" fileContent = append(fileContent, []byte(line)...) diff --git a/pkg/stanza/fileconsumer/reader_factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go similarity index 57% rename from pkg/stanza/fileconsumer/reader_factory.go rename to pkg/stanza/fileconsumer/internal/reader/factory.go index 36f55377fdbe..5a95fd568ecd 100644 --- a/pkg/stanza/fileconsumer/reader_factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" +package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" import ( "bufio" @@ -20,29 +20,29 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/util" ) -type readerFactory struct { +type Factory struct { *zap.SugaredLogger - readerConfig *readerConfig - fromBeginning bool - splitterFactory splitter.Factory - encoding encoding.Encoding - headerConfig *header.Config + Config *Config + FromBeginning bool + SplitterFactory splitter.Factory + Encoding encoding.Encoding + HeaderConfig *header.Config } -func (f *readerFactory) newReader(file *os.File, fp *fingerprint.Fingerprint) (*reader, error) { - return f.build(file, &readerMetadata{ +func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) { + return f.build(file, &Metadata{ Fingerprint: fp, FileAttributes: map[string]any{}, - }, f.splitterFactory.SplitFunc()) + }, f.SplitterFactory.SplitFunc()) } // copy creates a deep copy of a reader -func (f *readerFactory) copy(old *reader, newFile *os.File) (*reader, error) { +func (f *Factory) Copy(old *Reader, newFile *os.File) (*Reader, error) { lineSplitFunc := old.lineSplitFunc if lineSplitFunc == nil { - lineSplitFunc = f.splitterFactory.SplitFunc() + lineSplitFunc = f.SplitterFactory.SplitFunc() } - return f.build(newFile, &readerMetadata{ + return f.build(newFile, &Metadata{ Fingerprint: old.Fingerprint.Copy(), Offset: old.Offset, FileAttributes: util.MapCopy(old.FileAttributes), @@ -50,33 +50,33 @@ func (f *readerFactory) copy(old *reader, newFile *os.File) (*reader, error) { }, lineSplitFunc) } -func (f *readerFactory) newFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { - return fingerprint.New(file, f.readerConfig.fingerprintSize) +func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { + return fingerprint.New(file, f.Config.FingerprintSize) } -func (f *readerFactory) build(file *os.File, m *readerMetadata, lineSplitFunc bufio.SplitFunc) (r *reader, err error) { - r = &reader{ - readerConfig: f.readerConfig, - readerMetadata: m, - file: file, - fileName: file.Name(), - SugaredLogger: f.SugaredLogger.With("path", file.Name()), - decoder: decode.New(f.encoding), - lineSplitFunc: lineSplitFunc, +func (f *Factory) build(file *os.File, m *Metadata, lineSplitFunc bufio.SplitFunc) (r *Reader, err error) { + r = &Reader{ + Config: f.Config, + Metadata: m, + file: file, + FileName: file.Name(), + SugaredLogger: f.SugaredLogger.With("path", file.Name()), + decoder: decode.New(f.Encoding), + lineSplitFunc: lineSplitFunc, } - if !f.fromBeginning { + if !f.FromBeginning { if err = r.offsetToEnd(); err != nil { return nil, err } } - if f.headerConfig == nil || m.HeaderFinalized { + if f.HeaderConfig == nil || m.HeaderFinalized { r.splitFunc = r.lineSplitFunc - r.processFunc = f.readerConfig.emit + r.processFunc = f.Config.Emit } else { - r.splitFunc = f.headerConfig.SplitFunc - r.headerReader, err = header.NewReader(f.SugaredLogger, *f.headerConfig) + r.splitFunc = f.HeaderConfig.SplitFunc + r.headerReader, err = header.NewReader(f.SugaredLogger, *f.HeaderConfig) if err != nil { return nil, err } @@ -84,12 +84,12 @@ func (f *readerFactory) build(file *os.File, m *readerMetadata, lineSplitFunc bu } // Resolve file name and path attributes - resolved := r.fileName + resolved := r.FileName // Dirty solution, waiting for this permanent fix https://github.com/golang/go/issues/39786 // EvalSymlinks on windows is partially working depending on the way you use Symlinks and Junctions if runtime.GOOS != "windows" { - resolved, err = filepath.EvalSymlinks(r.fileName) + resolved, err = filepath.EvalSymlinks(r.FileName) if err != nil { f.Errorf("resolve symlinks: %w", err) } @@ -99,22 +99,22 @@ func (f *readerFactory) build(file *os.File, m *readerMetadata, lineSplitFunc bu f.Errorf("resolve abs: %w", err) } - if f.readerConfig.includeFileName { - r.FileAttributes[attrs.LogFileName] = filepath.Base(r.fileName) + if f.Config.IncludeFileName { + r.FileAttributes[attrs.LogFileName] = filepath.Base(r.FileName) } else if r.FileAttributes[attrs.LogFileName] != nil { delete(r.FileAttributes, attrs.LogFileName) } - if f.readerConfig.includeFilePath { - r.FileAttributes[attrs.LogFilePath] = r.fileName + if f.Config.IncludeFilePath { + r.FileAttributes[attrs.LogFilePath] = r.FileName } else if r.FileAttributes[attrs.LogFilePath] != nil { delete(r.FileAttributes, attrs.LogFilePath) } - if f.readerConfig.includeFileNameResolved { + if f.Config.IncludeFileNameResolved { r.FileAttributes[attrs.LogFileNameResolved] = filepath.Base(abs) } else if r.FileAttributes[attrs.LogFileNameResolved] != nil { delete(r.FileAttributes, attrs.LogFileNameResolved) } - if f.readerConfig.includeFilePathResolved { + if f.Config.IncludeFilePathResolved { r.FileAttributes[attrs.LogFilePathResolved] = abs } else if r.FileAttributes[attrs.LogFilePathResolved] != nil { delete(r.FileAttributes, attrs.LogFilePathResolved) diff --git a/pkg/stanza/fileconsumer/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go similarity index 74% rename from pkg/stanza/fileconsumer/reader.go rename to pkg/stanza/fileconsumer/internal/reader/reader.go index 5a1f622181a1..c597c615eb8f 100644 --- a/pkg/stanza/fileconsumer/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" +package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" import ( "bufio" @@ -19,41 +19,40 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/scanner" ) -type readerConfig struct { - fingerprintSize int - maxLogSize int - emit emit.Callback - includeFileName bool - includeFilePath bool - includeFileNameResolved bool - includeFilePathResolved bool +type Config struct { + FingerprintSize int + MaxLogSize int + Emit emit.Callback + IncludeFileName bool + IncludeFilePath bool + IncludeFileNameResolved bool + IncludeFilePathResolved bool } -type readerMetadata struct { +type Metadata struct { Fingerprint *fingerprint.Fingerprint Offset int64 FileAttributes map[string]any HeaderFinalized bool } -// reader manages a single file -type reader struct { +// Reader manages a single file +type Reader struct { *zap.SugaredLogger - *readerConfig - *readerMetadata + *Config + *Metadata + FileName string + EOF bool file *os.File - fileName string lineSplitFunc bufio.SplitFunc splitFunc bufio.SplitFunc decoder *decode.Decoder headerReader *header.Reader processFunc emit.Callback - generation int - eof bool } // offsetToEnd sets the starting offset -func (r *reader) offsetToEnd() error { +func (r *Reader) offsetToEnd() error { info, err := r.file.Stat() if err != nil { return fmt.Errorf("stat: %w", err) @@ -62,14 +61,21 @@ func (r *reader) offsetToEnd() error { return nil } +func (r *Reader) NewFingerprintFromFile() (*fingerprint.Fingerprint, error) { + if r.file == nil { + return nil, errors.New("file is nil") + } + return fingerprint.New(r.file, r.FingerprintSize) +} + // ReadToEnd will read until the end of the file -func (r *reader) ReadToEnd(ctx context.Context) { +func (r *Reader) ReadToEnd(ctx context.Context) { if _, err := r.file.Seek(r.Offset, 0); err != nil { r.Errorw("Failed to seek", zap.Error(err)) return } - s := scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + s := scanner.New(r, r.MaxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) // Iterate over the tokenized file, emitting entries as we go for { @@ -81,10 +87,10 @@ func (r *reader) ReadToEnd(ctx context.Context) { ok := s.Scan() if !ok { - r.eof = true + r.EOF = true if err := s.Error(); err != nil { // If Scan returned an error then we are not guaranteed to be at the end of the file - r.eof = false + r.EOF = false r.Errorw("Failed during scan", zap.Error(err)) } break @@ -102,12 +108,12 @@ func (r *reader) ReadToEnd(ctx context.Context) { // Do not use the updated offset from the old scanner, as the most recent token // could be split differently with the new splitter. r.splitFunc = r.lineSplitFunc - r.processFunc = r.emit + r.processFunc = r.Emit if _, err = r.file.Seek(r.Offset, 0); err != nil { r.Errorw("Failed to seek post-header", zap.Error(err)) return } - s = scanner.New(r, r.maxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) + s = scanner.New(r, r.MaxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc) } else { r.Errorw("process: %w", zap.Error(err)) } @@ -117,7 +123,7 @@ func (r *reader) ReadToEnd(ctx context.Context) { } } -func (r *reader) finalizeHeader() { +func (r *Reader) finalizeHeader() { if err := r.headerReader.Stop(); err != nil { r.Errorw("Failed to stop header pipeline during finalization", zap.Error(err)) } @@ -126,18 +132,18 @@ func (r *reader) finalizeHeader() { } // Delete will close and delete the file -func (r *reader) Delete() { +func (r *Reader) Delete() { if r.file == nil { return } r.Close() - if err := os.Remove(r.fileName); err != nil { - r.Errorf("could not delete %s", r.fileName) + if err := os.Remove(r.FileName); err != nil { + r.Errorf("could not delete %s", r.FileName) } } // Close will close the file -func (r *reader) Close() { +func (r *Reader) Close() { if r.file != nil { if err := r.file.Close(); err != nil { r.Debugw("Problem closing reader", zap.Error(err)) @@ -153,15 +159,15 @@ func (r *reader) Close() { } // Read from the file and update the fingerprint if necessary -func (r *reader) Read(dst []byte) (int, error) { +func (r *Reader) Read(dst []byte) (int, error) { // Skip if fingerprint is already built // or if fingerprint is behind Offset - if len(r.Fingerprint.FirstBytes) == r.fingerprintSize || int(r.Offset) > len(r.Fingerprint.FirstBytes) { + if len(r.Fingerprint.FirstBytes) == r.FingerprintSize || int(r.Offset) > len(r.Fingerprint.FirstBytes) { return r.file.Read(dst) } n, err := r.file.Read(dst) - appendCount := min0(n, r.fingerprintSize-int(r.Offset)) - // return for n == 0 or r.Offset >= r.fileInput.fingerprintSize + appendCount := min0(n, r.FingerprintSize-int(r.Offset)) + // return for n == 0 or r.Offset >= r.FingerprintSize if appendCount == 0 { return n, err } @@ -189,11 +195,11 @@ func min0(a, b int) int { // reader will automatically close the file and drop the handle. // // The function returns true if the file handle is still valid, false otherwise. -func (r *reader) validateFingerprint() bool { +func (r *Reader) ValidateFingerprint() bool { if r.file == nil { return false } - refreshedFingerprint, err := fingerprint.New(r.file, r.fingerprintSize) + refreshedFingerprint, err := fingerprint.New(r.file, r.FingerprintSize) if err != nil { r.Debugw("Failed to create fingerprint", zap.Error(err)) return false diff --git a/pkg/stanza/fileconsumer/reader_test.go b/pkg/stanza/fileconsumer/reader_test.go index 79f719ed2bcb..dde98a91dc4e 100644 --- a/pkg/stanza/fileconsumer/reader_test.go +++ b/pkg/stanza/fileconsumer/reader_test.go @@ -14,6 +14,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/splitter" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex" @@ -27,10 +28,10 @@ func TestPersistFlusher(t *testing.T) { f, emitChan := testReaderFactory(t, split.Config{}, defaultMaxLogSize, flushPeriod) temp := openTemp(t, t.TempDir()) - fp, err := f.newFingerprint(temp) + fp, err := f.NewFingerprint(temp) require.NoError(t, err) - r, err := f.newReader(temp, fp) + r, err := f.NewReader(temp, fp) require.NoError(t, err) _, err = temp.WriteString("log with newline\nlog without newline") @@ -45,7 +46,7 @@ func TestPersistFlusher(t *testing.T) { expectNoTokensUntil(t, emitChan, 2*flushPeriod) // A copy of the reader should remember that we last emitted about 200ms ago. - copyReader, err := f.copy(r, temp) + copyReader, err := f.Copy(r, temp) assert.NoError(t, err) // This time, the flusher will kick in and we should emit the unfinished log. @@ -116,10 +117,10 @@ func TestTokenization(t *testing.T) { _, err := temp.Write(tc.fileContent) require.NoError(t, err) - fp, err := f.newFingerprint(temp) + fp, err := f.NewFingerprint(temp) require.NoError(t, err) - r, err := f.newReader(temp, fp) + r, err := f.NewReader(temp, fp) require.NoError(t, err) r.ReadToEnd(context.Background()) @@ -146,10 +147,10 @@ func TestTokenizationTooLong(t *testing.T) { _, err := temp.Write(fileContent) require.NoError(t, err) - fp, err := f.newFingerprint(temp) + fp, err := f.NewFingerprint(temp) require.NoError(t, err) - r, err := f.newReader(temp, fp) + r, err := f.NewReader(temp, fp) require.NoError(t, err) r.ReadToEnd(context.Background()) @@ -178,14 +179,14 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) { _, err := temp.Write(fileContent) require.NoError(t, err) - fp, err := f.newFingerprint(temp) + fp, err := f.NewFingerprint(temp) require.NoError(t, err) - r, err := f.newReader(temp, fp) + r, err := f.NewReader(temp, fp) require.NoError(t, err) r.ReadToEnd(context.Background()) - require.True(t, r.eof) + require.True(t, r.EOF) for _, expected := range expected { require.Equal(t, expected, readToken(t, emitChan)) @@ -205,14 +206,14 @@ func TestHeaderFingerprintIncluded(t *testing.T) { h, err := header.NewConfig("^#", []operator.Config{{Builder: regexConf}}, enc) require.NoError(t, err) - f.headerConfig = h + f.HeaderConfig = h temp := openTemp(t, t.TempDir()) - fp, err := f.newFingerprint(temp) + fp, err := f.NewFingerprint(temp) require.NoError(t, err) - r, err := f.newReader(temp, fp) + r, err := f.NewReader(temp, fp) require.NoError(t, err) _, err = temp.Write(fileContent) @@ -223,7 +224,7 @@ func TestHeaderFingerprintIncluded(t *testing.T) { require.Equal(t, []byte("#header-line\naaa\n"), r.Fingerprint.FirstBytes) } -func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPeriod time.Duration) (*readerFactory, chan *emitParams) { +func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPeriod time.Duration) (*reader.Factory, chan *emitParams) { emitChan := make(chan *emitParams, 100) enc, err := decode.LookupEncoding(defaultEncoding) require.NoError(t, err) @@ -231,16 +232,16 @@ func testReaderFactory(t *testing.T, sCfg split.Config, maxLogSize int, flushPer splitFunc, err := sCfg.Func(enc, false, maxLogSize) require.NoError(t, err) - return &readerFactory{ + return &reader.Factory{ SugaredLogger: testutil.Logger(t), - readerConfig: &readerConfig{ - fingerprintSize: fingerprint.DefaultSize, - maxLogSize: maxLogSize, - emit: testEmitFunc(emitChan), + Config: &reader.Config{ + FingerprintSize: fingerprint.DefaultSize, + MaxLogSize: maxLogSize, + Emit: testEmitFunc(emitChan), }, - fromBeginning: true, - splitterFactory: splitter.NewFactory(splitFunc, trim.Whitespace, flushPeriod, maxLogSize), - encoding: enc, + FromBeginning: true, + SplitterFactory: splitter.NewFactory(splitFunc, trim.Whitespace, flushPeriod, maxLogSize), + Encoding: enc, }, emitChan } diff --git a/pkg/stanza/fileconsumer/roller.go b/pkg/stanza/fileconsumer/roller.go index 1bdd6dd8a6c7..044f192aeb54 100644 --- a/pkg/stanza/fileconsumer/roller.go +++ b/pkg/stanza/fileconsumer/roller.go @@ -3,10 +3,14 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" -import "context" +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" +) type roller interface { - readLostFiles(context.Context, []*reader) - roll(context.Context, []*reader) + readLostFiles(context.Context, []*reader.Reader) + roll(context.Context, []*reader.Reader) cleanup() } diff --git a/pkg/stanza/fileconsumer/roller_other.go b/pkg/stanza/fileconsumer/roller_other.go index 04d6edaf886f..efda6b3a2731 100644 --- a/pkg/stanza/fileconsumer/roller_other.go +++ b/pkg/stanza/fileconsumer/roller_other.go @@ -9,31 +9,33 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collecto import ( "context" "sync" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" ) type detectLostFiles struct { - oldReaders []*reader + oldReaders []*reader.Reader } func newRoller() roller { - return &detectLostFiles{oldReaders: []*reader{}} + return &detectLostFiles{oldReaders: []*reader.Reader{}} } -func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader) { +func (r *detectLostFiles) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { // Detect files that have been rotated out of matching pattern - lostReaders := make([]*reader, 0, len(r.oldReaders)) + lostReaders := make([]*reader.Reader, 0, len(r.oldReaders)) OUTER: for _, oldReader := range r.oldReaders { for _, newReader := range newReaders { if newReader.Fingerprint.StartsWith(oldReader.Fingerprint) { continue OUTER } - if oldReader.fileName == newReader.fileName { + if oldReader.FileName == newReader.FileName { // At this point, we know that the file has been rotated. However, we do not know // if it was moved or truncated. If truncated, then both handles point to the same // file, in which case we should only read from it using the new reader. We can use // the validateFingerprint method to establish that the file has not been truncated. - if !oldReader.validateFingerprint() { + if !oldReader.ValidateFingerprint() { continue OUTER } } @@ -44,7 +46,7 @@ OUTER: var lostWG sync.WaitGroup for _, lostReader := range lostReaders { lostWG.Add(1) - go func(r *reader) { + go func(r *reader.Reader) { defer lostWG.Done() r.ReadToEnd(ctx) }(lostReader) @@ -52,7 +54,7 @@ OUTER: lostWG.Wait() } -func (r *detectLostFiles) roll(_ context.Context, newReaders []*reader) { +func (r *detectLostFiles) roll(_ context.Context, newReaders []*reader.Reader) { for _, oldReader := range r.oldReaders { oldReader.Close() } diff --git a/pkg/stanza/fileconsumer/roller_windows.go b/pkg/stanza/fileconsumer/roller_windows.go index e9d1594ea881..7fbc47a35d1f 100644 --- a/pkg/stanza/fileconsumer/roller_windows.go +++ b/pkg/stanza/fileconsumer/roller_windows.go @@ -6,19 +6,23 @@ package fileconsumer // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" -import "context" +import ( + "context" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" +) type closeImmediately struct{} -func newRoller(_ int) roller { +func newRoller() roller { return &closeImmediately{} } -func (r *closeImmediately) readLostFiles(ctx context.Context, newReaders []*reader) { +func (r *closeImmediately) readLostFiles(ctx context.Context, newReaders []*reader.Reader) { return } -func (r *closeImmediately) roll(_ context.Context, newReaders []*reader) { +func (r *closeImmediately) roll(_ context.Context, newReaders []*reader.Reader) { for _, newReader := range newReaders { newReader.Close() }