diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c8b400a05b5..803700f303d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -107,6 +107,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Possible values for Netflow's locality fields (source.locality, destination.locality and flow.locality) are now `internal` and `external`, instead of `private` and `public`. {issue}24272[24272] {pull}24295[24295] - Add User Agent Parser for Azure Sign In Logs Ingest Pipeline {pull}23201[23201] - Changes filebeat httpjson input's append transform to create a list even with only a single value{pull}25074[25074] +- Change logging in logs input to structure logging. Some log message formats have changed. {pull}25299[25299] - All url.* fields apart from url.original in the Apache, Nginx, IIS, Traefik, S3Access, Cisco, F5, Fortinet, Google Workspace, Imperva, Microsoft, Netscout, O365, Sophos, Squid, Suricata, Zeek, Zia, Zoom, and ZScaler modules are now url unescaped due to using the Elasticsearch uri_parts processor. {pull}24699[24699] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 0d4e6d6b539..34bf1e221d6 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -77,6 +77,8 @@ type OutletFactory func() channel.Outleter // Harvester contains all harvester related data type Harvester struct { + logger *logp.Logger + id uuid.UUID config config source harvester.Source // the source being watched @@ -120,6 +122,7 @@ type harvesterProgressMetrics struct { // NewHarvester creates a new harvester func NewHarvester( + logger *logp.Logger, config *common.Config, state file.State, states *file.States, @@ -132,7 +135,9 @@ func NewHarvester( return nil, err } + logger = logger.Named("harvester").With("harvester_id", id) h := &Harvester{ + logger: logger, config: defaultConfig(), state: state, states: states, @@ -204,7 +209,7 @@ func (h *Harvester) Setup() error { return err } - logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator) + h.logger.Debugf("Harvester setup successful. Line terminator: %d", h.config.LineTerminator) return nil } @@ -234,6 +239,8 @@ func (h *Harvester) updateCurrentSize() error { // Run start the harvester and reads files line by line and sends events to the defined output func (h *Harvester) Run() error { + logger := h.logger + // Allow for some cleanup on termination if h.onTerminate != nil { defer h.onTerminate() @@ -287,7 +294,7 @@ func (h *Harvester) Run() error { select { // Applies when timeout is reached case <-closeTimeout: - logp.Info("Closing harvester because close_timeout was reached: %s", source) + logger.Infof("Closing harvester because close_timeout was reached: %s", source) // Required when reader loop returns and reader finished case <-h.done: } @@ -295,11 +302,11 @@ func (h *Harvester) Run() error { h.stop() err := h.reader.Close() if err != nil { - logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err) + logger.Errorf("Failed to stop harvester for file: %v", err) } }(h.state.Source) - logp.Info("Harvester started for file: %s", h.state.Source) + logger.Info("Harvester started for file.") h.doneWg.Add(1) go func() { @@ -318,21 +325,21 @@ func (h *Harvester) Run() error { if err != nil { switch err { case ErrFileTruncate: - logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source) + logger.Info("File was truncated. Begin reading file from offset 0.") h.state.Offset = 0 filesTruncated.Add(1) case ErrRemoved: - logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source) + logger.Info("File was removed. Closing because close_removed is enabled.") case ErrRenamed: - logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source) + logger.Info("File was renamed. Closing because close_renamed is enabled.") case ErrClosed: - logp.Info("Reader was closed: %s. Closing.", h.state.Source) + logger.Info("Reader was closed. Closing.") case io.EOF: - logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source) + logger.Info("End of file reached. Closing because close_eof is enabled.") case ErrInactive: - logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive) + logger.Infof("File is inactive. Closing because close_inactive of %v reached.", h.config.CloseInactive) default: - logp.Err("Read line error: %v; File: %v", err, h.state.Source) + logger.Errorf("Read line error: %v", err) } return nil } @@ -370,7 +377,7 @@ func (h *Harvester) monitorFileSize() { case <-ticker.C: err := h.updateCurrentSize() if err != nil { - logp.Err("Error updating file size: %v; File: %v", err, h.state.Source) + h.logger.Errorf("Error updating file size: %v", err) } } } @@ -481,7 +488,7 @@ func (h *Harvester) SendStateUpdate() { h.publishState(h.state) - logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) + h.logger.Debugf("Update state (offset: %v).", h.state.Offset) h.states.Update(h.state) } @@ -491,14 +498,14 @@ func (h *Harvester) shouldExportLine(line string) bool { if len(h.config.IncludeLines) > 0 { if !harvester.MatchAny(h.config.IncludeLines, line) { // drop line - logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line) + h.logger.Debugf("Drop line as it does not match any of the include patterns %s", line) return false } } if len(h.config.ExcludeLines) > 0 { if harvester.MatchAny(h.config.ExcludeLines, line) { // drop line - logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line) + h.logger.Debugf("Drop line as it does match one of the exclude patterns%s", line) return false } } @@ -539,6 +546,8 @@ func (h *Harvester) openFile() error { } func (h *Harvester) validateFile(f *os.File) error { + logger := h.logger + info, err := f.Stat() if err != nil { return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err) @@ -557,9 +566,9 @@ func (h *Harvester) validateFile(f *os.File) error { if err != nil { if err == transform.ErrShortSrc { - logp.Info("Initialising encoding for '%v' failed due to file being too short", f) + logger.Infof("Initialising encoding for '%v' failed due to file being too short", f) } else { - logp.Err("Initialising encoding for '%v' failed: %v", f, err) + logger.Errorf("Initialising encoding for '%v' failed: %v", f, err) } return err } @@ -570,7 +579,7 @@ func (h *Harvester) validateFile(f *os.File) error { return err } - logp.Debug("harvester", "Setting offset for file: %s. Offset: %d ", h.state.Source, offset) + logger.Debugf("Setting offset: %d ", offset) h.state.Offset = offset return nil @@ -579,12 +588,12 @@ func (h *Harvester) validateFile(f *os.File) error { func (h *Harvester) initFileOffset(file *os.File) (int64, error) { // continue from last known offset if h.state.Offset > 0 { - logp.Debug("harvester", "Set previous offset for file: %s. Offset: %d ", h.state.Source, h.state.Offset) + h.logger.Debugf("Set previous offset: %d ", h.state.Offset) return file.Seek(h.state.Offset, os.SEEK_SET) } // get offset from file in case of encoding factory was required to read some data. - logp.Debug("harvester", "Setting offset for file based on seek: %s", h.state.Source) + h.logger.Debug("Setting offset to: 0") return file.Seek(0, os.SEEK_CUR) } @@ -605,8 +614,8 @@ func (h *Harvester) cleanup() { // Mark harvester as finished h.state.Finished = true - logp.Debug("harvester", "Stopping harvester for file: %s", h.state.Source) - defer logp.Debug("harvester", "harvester cleanup finished for file: %s", h.state.Source) + h.logger.Debugf("Stopping harvester.") + defer h.logger.Debugf("harvester cleanup finished.") // Make sure file is closed as soon as harvester exits // If file was never opened, it can't be closed @@ -615,14 +624,14 @@ func (h *Harvester) cleanup() { // close file handler h.source.Close() - logp.Debug("harvester", "Closing file: %s", h.state.Source) + h.logger.Debugf("Closing file") harvesterOpenFiles.Add(-1) // On completion, push offset so we can continue where we left off if we relaunch on the same file // Only send offset if file object was created successfully h.SendStateUpdate() } else { - logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.state.Source) + h.logger.Warn("Stopping harvester, NOT closing file as file info not available.") } harvesterClosed.Add(1) @@ -645,12 +654,12 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { var r reader.Reader var err error - logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes) + h.logger.Debugf("newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes) // TODO: NewLineReader uses additional buffering to deal with encoding and testing // for new lines in input stream. Simple 8-bit based encodings, or plain // don't require 'complicated' logic. - h.log, err = NewLog(h.source, h.config.LogConfig) + h.log, err = NewLog(h.logger, h.source, h.config.LogConfig) if err != nil { return nil, err } diff --git a/filebeat/input/log/harvester_test.go b/filebeat/input/log/harvester_test.go index 537851979ca..dba61b5d0ea 100644 --- a/filebeat/input/log/harvester_test.go +++ b/filebeat/input/log/harvester_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/readfile" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" @@ -75,6 +76,7 @@ func TestReadLine(t *testing.T) { source := File{File: readFile} h := Harvester{ + logger: logp.NewLogger("harvester"), config: config{ LogConfig: LogConfig{ CloseInactive: 500 * time.Millisecond, diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 1b203adcf5e..b5a5cc7543f 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -27,6 +27,8 @@ import ( "sync" "time" + "github.com/gofrs/uuid" + "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input" @@ -61,6 +63,7 @@ func init() { // Input contains the input and its config type Input struct { cfg *common.Config + logger *logp.Logger config config states *file.States harvesters *harvester.Registry @@ -130,7 +133,11 @@ func NewInput( meta = nil } + uuid, _ := uuid.NewV4() + logger := logp.NewLogger("input").With("input_id", uuid) + p := &Input{ + logger: logger, config: inputConfig, cfg: cfg, harvesters: harvester.NewRegistry(), @@ -144,7 +151,7 @@ func NewInput( // Create empty harvester to check if configs are fine // TODO: Do config validation instead - _, err = p.createHarvester(file.State{}, nil) + _, err = p.createHarvester(logger, file.State{}, nil) if err != nil { return nil, err } @@ -154,7 +161,7 @@ func NewInput( return nil, err } - logp.Info("Configured paths: %v", p.config.Paths) + logger.Infof("Configured paths: %v", p.config.Paths) cleanupNeeded = false go p.stopWhenDone() @@ -166,7 +173,9 @@ func NewInput( // It goes through all states coming from the registry. Only the states which match the glob patterns of // the input will be loaded and updated. All other states will not be touched. func (p *Input) loadStates(states []file.State) error { - logp.Debug("input", "exclude_files: %s. Number of states: %d", p.config.ExcludeFiles, len(states)) + logger := p.logger + + logger.Debugf("exclude_files: %s. Number of states: %d", p.config.ExcludeFiles, len(states)) for _, state := range states { // Check if state source belongs to this input. If yes, update the state. @@ -190,19 +199,20 @@ func (p *Input) loadStates(states []file.State) error { // Update input states and send new states to registry err := p.updateState(state) if err != nil { - logp.Err("Problem putting initial state: %+v", err) + logger.Errorf("Problem putting initial state: %+v", err) return err } } } - logp.Debug("input", "input with previous states loaded: %v", p.states.Count()) + logger.Debugf("input with previous states loaded: %v", p.states.Count()) return nil } // Run runs the input func (p *Input) Run() { - logp.Debug("input", "Start next scan") + logger := p.logger + logger.Debug("Start next scan") // TailFiles is like ignore_older = 1ns and only on startup if p.config.TailFiles { @@ -223,61 +233,64 @@ func (p *Input) Run() { if p.config.CleanInactive > 0 || p.config.CleanRemoved { beforeCount := p.states.Count() cleanedStates, pendingClean := p.states.Cleanup() - logp.Debug("input", "input states cleaned up. Before: %d, After: %d, Pending: %d", + logger.Debugf("input states cleaned up. Before: %d, After: %d, Pending: %d", beforeCount, beforeCount-cleanedStates, pendingClean) } // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first if p.config.CleanRemoved { for _, state := range p.states.GetStates() { + stateLogger := loggerWithState(logger, state) + // os.Stat will return an error in case the file does not exist stat, err := os.Stat(state.Source) if err != nil { if os.IsNotExist(err) { - p.removeState(state) - logp.Debug("input", "Remove state for file as file removed: %s", state.Source) + p.removeState(stateLogger, state) + stateLogger.Debugf("Remove state for file as file removed: %s", state.Source) } else { - logp.Err("input state for %s was not removed: %s", state.Source, err) + stateLogger.Errorf("input state for %s was not removed: %s", state.Source, err) } } else { // Check if existing source on disk and state are the same. Remove if not the case. newState := file.NewState(stat, state.Source, p.config.Type, p.meta, p.fileStateIdentifier) if state.IdentifierName != newState.IdentifierName { - logp.Debug("input", "file_identity configuration for file has changed from %s to %s, generating new id", state.IdentifierName, newState.IdentifierName) + stateLogger.Debugf("file_identity configuration for file has changed from %s to %s, generating new id", state.IdentifierName, newState.IdentifierName) state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state) } if !state.IsEqual(&newState) { - p.removeState(state) - logp.Debug("input", "Remove state of file as its identity has changed: %s", state.Source) + p.removeState(stateLogger, state) + stateLogger.Debugf("Remove state of file as its identity has changed: %s", state.Source) } } } } } -func (p *Input) removeState(state file.State) { +func (p *Input) removeState(logger *logp.Logger, state file.State) { // Only clean up files where state is Finished if !state.Finished { - logp.Debug("input", "State for file not removed because harvester not finished: %s", state.Source) + logger.Debugf("State for file not removed because harvester not finished: %s", state.Source) return } state.TTL = 0 err := p.updateState(state) if err != nil { - logp.Err("File cleanup state update error: %s", err) + logger.Errorf("File cleanup state update error: %s", err) } } // getFiles returns all files which have to be harvested // All globs are expanded and then directory and excluded files are removed func (p *Input) getFiles() map[string]os.FileInfo { + logger := p.logger paths := map[string]os.FileInfo{} for _, path := range p.config.Paths { matches, err := filepath.Glob(path) if err != nil { - logp.Err("glob(%s) failed: %v", path, err) + logger.Errorf("glob(%s) failed: %v", path, err) continue } @@ -287,32 +300,32 @@ func (p *Input) getFiles() map[string]os.FileInfo { // check if the file is in the exclude_files list if p.isFileExcluded(file) { - logp.Debug("input", "Exclude file: %s", file) + logger.Debugf("Exclude file: %s", file) continue } // Fetch Lstat File info to detected also symlinks fileInfo, err := os.Lstat(file) if err != nil { - logp.Debug("input", "lstat(%s) failed: %s", file, err) + logger.Debugf("lstat(%s) failed: %s", file, err) continue } if fileInfo.IsDir() { - logp.Debug("input", "Skipping directory: %s", file) + logger.Debugf("Skipping directory: %s", file) continue } isSymlink := fileInfo.Mode()&os.ModeSymlink > 0 if isSymlink && !p.config.Symlinks { - logp.Debug("input", "File %s skipped as it is a symlink.", file) + logger.Debugf("File %s skipped as it is a symlink.", file) continue } // Fetch Stat file info which fetches the inode. In case of a symlink, the original inode is fetched fileInfo, err = os.Stat(file) if err != nil { - logp.Debug("input", "stat(%s) failed: %s", file, err) + logger.Debugf("stat(%s) failed: %s", file, err) continue } @@ -321,7 +334,7 @@ func (p *Input) getFiles() map[string]os.FileInfo { if p.config.Symlinks { for _, finfo := range paths { if os.SameFile(finfo, fileInfo) { - logp.Info("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name()) + logger.Infof("Same file found as symlink and original. Skipping file: %s (as it same as %s)", file, finfo.Name()) continue OUTER } } @@ -347,7 +360,7 @@ func (p *Input) matchesFile(filePath string) bool { // Evaluate if glob matches filePath match, err := filepath.Match(glob, filePath) if err != nil { - logp.Debug("input", "Error matching glob: %s", err) + p.logger.Debugf("Error matching glob: %s", err) continue } @@ -436,7 +449,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) { if err != nil { return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err) } - logp.Debug("input", "Check file for harvesting: %s", absolutePath) + p.logger.Debugf("Check file for harvesting: %s", absolutePath) // Create new state for comparison newState := file.NewState(info, absolutePath, p.config.Type, p.meta, p.fileStateIdentifier) return newState, nil @@ -452,6 +465,8 @@ func getKeys(paths map[string]os.FileInfo) []string { // Scan starts a scanGlob for each provided path/glob func (p *Input) scan() { + logger := p.logger + var sortInfos []FileSortInfo var files []string @@ -462,7 +477,7 @@ func (p *Input) scan() { if p.config.ScanSort != "" { sortInfos, err = getSortedFiles(p.config.ScanOrder, p.config.ScanSort, getSortInfos(paths)) if err != nil { - logp.Err("Failed to sort files during scan due to error %s", err) + logger.Errorf("Failed to sort files during scan due to error %s", err) } } @@ -471,6 +486,7 @@ func (p *Input) scan() { } for i := 0; i < len(paths); i++ { + logger = p.logger // reset logger on each loop var path string var info os.FileInfo @@ -485,49 +501,53 @@ func (p *Input) scan() { select { case <-p.done: - logp.Info("Scan aborted because input stopped.") + logger.Info("Scan aborted because input stopped.") return default: } newState, err := getFileState(path, info, p) if err != nil { - logp.Err("Skipping file %s due to error %s", path, err) + logger.Errorf("Skipping file %s due to error %s", path, err) } + logger = loggerWithState(logger, newState) + // Load last state isNewState := p.states.IsNew(newState) // Ignores all files which fall under ignore_older if p.isIgnoreOlder(newState) { - err := p.handleIgnoreOlder(isNewState, newState) + err := p.handleIgnoreOlder(logger, isNewState, newState) if err != nil { - logp.Err("Updating ignore_older state error: %s", err) + logger.Errorf("Updating ignore_older state error: %s", err) } continue } // Decides if previous state exists if isNewState { - logp.Debug("input", "Start harvester for new file: %s", newState.Source) - err := p.startHarvester(newState, 0) + logger.Debugf("Start harvester for new file: %s", newState.Source) + err := p.startHarvester(logger, newState, 0) if err == errHarvesterLimit { - logp.Debug("input", harvesterErrMsg, newState.Source, err) + logger.Debugf(harvesterErrMsg, newState.Source, err) continue } if err != nil { - logp.Err(harvesterErrMsg, newState.Source, err) + logger.Errorf(harvesterErrMsg, newState.Source, err) } } else { lastState := p.states.FindPrevious(newState) - p.harvestExistingFile(newState, lastState) + p.harvestExistingFile(logger, newState, lastState) } } } // harvestExistingFile continues harvesting a file with a known state if needed -func (p *Input) harvestExistingFile(newState file.State, oldState file.State) { - logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) +func (p *Input) harvestExistingFile(logger *logp.Logger, newState file.State, oldState file.State) { + logger = loggerWithOldState(logger, oldState) + + logger.Debugf("Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) // No harvester is running for the file, start a new harvester // It is important here that only the size is checked and not modification time, as modification time could be incorrect on windows @@ -536,20 +556,20 @@ func (p *Input) harvestExistingFile(newState file.State, oldState file.State) { // Resume harvesting of an old file we've stopped harvesting from // This could also be an issue with force_close_older that a new harvester is started after each scan but not needed? // One problem with comparing modTime is that it is in seconds, and scans can happen more then once a second - logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size()) - err := p.startHarvester(newState, oldState.Offset) + logger.Debugf("Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size()) + err := p.startHarvester(logger, newState, oldState.Offset) if err != nil { - logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) + logger.Errorf("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) } return } // File size was reduced -> truncated file if oldState.Finished && newState.Fileinfo.Size() < oldState.Offset { - logp.Debug("input", "Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size()) - err := p.startHarvester(newState, 0) + logger.Debugf("Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Offset, newState.Fileinfo.Size()) + err := p.startHarvester(logger, newState, 0) if err != nil { - logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) + logger.Errorf("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) } filesTruncated.Add(1) @@ -560,41 +580,41 @@ func (p *Input) harvestExistingFile(newState file.State, oldState file.State) { if oldState.Source != "" && oldState.Source != newState.Source { // This does not start a new harvester as it is assume that the older harvester is still running // or no new lines were detected. It sends only an event status update to make sure the new name is persisted. - logp.Debug("input", "File rename was detected: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) + logger.Debugf("File rename was detected: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) if oldState.Finished { - logp.Debug("input", "Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) + logger.Debugf("Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) // Update state because of file rotation oldState.Source = newState.Source oldState.TTL = newState.TTL err := p.updateState(oldState) if err != nil { - logp.Err("File rotation state update error: %s", err) + logger.Errorf("File rotation state update error: %s", err) } filesRenamed.Add(1) } else { - logp.Debug("input", "File rename detected but harvester not finished yet.") + logger.Debugf("File rename detected but harvester not finished yet.") } } if !oldState.Finished { // Nothing to do. Harvester is still running and file was not renamed - logp.Debug("input", "Harvester for file is still running: %s", newState.Source) + logger.Debugf("Harvester for file is still running: %s", newState.Source) } else { - logp.Debug("input", "File didn't change: %s", newState.Source) + logger.Debugf("File didn't change: %s", newState.Source) } } // handleIgnoreOlder handles states which fall under ignore older // Based on the state information it is decided if the state information has to be updated or not -func (p *Input) handleIgnoreOlder(isNewState bool, newState file.State) error { - logp.Debug("input", "Ignore file because ignore_older reached: %s", newState.Source) +func (p *Input) handleIgnoreOlder(logger *logp.Logger, isNewState bool, newState file.State) error { + logger.Debugf("Ignore file because ignore_older reached: %s", newState.Source) if !isNewState { lastState := p.states.FindPrevious(newState) if !lastState.Finished { - logp.Info("File is falling under ignore_older before harvesting is finished. Adjust your close_* settings: %s", newState.Source) + logger.Infof("File is falling under ignore_older before harvesting is finished. Adjust your close_* settings: %s", newState.Source) } // Old state exist, no need to update it return nil @@ -602,7 +622,7 @@ func (p *Input) handleIgnoreOlder(isNewState bool, newState file.State) error { // Make sure file is not falling under clean_inactive yet if p.isCleanInactive(newState) { - logp.Debug("input", "Do not write state for ignore_older because clean_inactive reached") + logger.Debugf("Do not write state for ignore_older because clean_inactive reached") return nil } @@ -669,9 +689,10 @@ func subOutletWrap(outlet channel.Outleter) func() channel.Outleter { } // createHarvester creates a new harvester instance from the given state -func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harvester, error) { +func (p *Input) createHarvester(logger *logp.Logger, state file.State, onTerminate func()) (*Harvester, error) { // Each wraps the outlet, for closing the outlet individually h, err := NewHarvester( + logger, p.cfg, state, p.states, @@ -688,7 +709,7 @@ func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harveste // startHarvester starts a new harvester with the given offset // In case the HarvesterLimit is reached, an error is returned -func (p *Input) startHarvester(state file.State, offset int64) error { +func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int64) error { if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 { p.numHarvesters.Dec() harvesterSkipped.Add(1) @@ -699,7 +720,7 @@ func (p *Input) startHarvester(state file.State, offset int64) error { state.Offset = offset // Create harvester with state - h, err := p.createHarvester(state, func() { p.numHarvesters.Dec() }) + h, err := p.createHarvester(logger, state, func() { p.numHarvesters.Dec() }) if err != nil { p.numHarvesters.Dec() return err @@ -758,7 +779,7 @@ func (p *Input) doUpdate(state file.State) error { Private: state, }) if !ok { - logp.Info("input outlet closed") + p.logger.Info("input outlet closed") return errors.New("input outlet closed") } return nil diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index 9324e7f2d04..552979af1b9 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/filebeat/input/inputtest" "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/logp" ) var matchTests = []struct { @@ -145,6 +146,7 @@ var initStateTests = []struct { func TestInit(t *testing.T) { for _, test := range initStateTests { p := Input{ + logger: logp.NewLogger("harvester"), config: config{ Paths: test.paths, }, diff --git a/filebeat/input/log/log.go b/filebeat/input/log/log.go index 60728143764..19f50d7f256 100644 --- a/filebeat/input/log/log.go +++ b/filebeat/input/log/log.go @@ -29,6 +29,8 @@ import ( // Log contains all log related data type Log struct { + logger *logp.Logger + fs harvester.Source offset int64 config LogConfig @@ -39,6 +41,7 @@ type Log struct { // NewLog creates a new log instance to read log sources func NewLog( + logger *logp.Logger, fs harvester.Source, config LogConfig, ) (*Log, error) { @@ -52,6 +55,7 @@ func NewLog( } return &Log{ + logger: logger, fs: fs, offset: offset, config: config, @@ -104,7 +108,7 @@ func (f *Log) Read(buf []byte) (int, error) { return totalN, err } - logp.Debug("harvester", "End of file reached: %s; Backoff now.", f.fs.Name()) + f.logger.Debugf("End of file reached: %s; Backoff now.", f.fs.Name()) f.wait() } } @@ -113,7 +117,7 @@ func (f *Log) Read(buf []byte) (int, error) { // based on the config options. func (f *Log) errorChecks(err error) error { if err != io.EOF { - logp.Err("Unexpected state reading from %s; error: %s", f.fs.Name(), err) + f.logger.Errorf("Unexpected state reading from %s; error: %s", f.fs.Name(), err) return err } @@ -121,7 +125,7 @@ func (f *Log) errorChecks(err error) error { // Stdin is not continuable if !f.fs.Continuable() { - logp.Debug("harvester", "Source is not continuable: %s", f.fs.Name()) + f.logger.Debugf("Source is not continuable: %s", f.fs.Name()) return err } @@ -134,14 +138,13 @@ func (f *Log) errorChecks(err error) error { // calling the stat function info, statErr := f.fs.Stat() if statErr != nil { - logp.Err("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr) + f.logger.Errorf("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr) return statErr } // check if file was truncated if info.Size() < f.offset { - logp.Debug("harvester", - "File was truncated as offset (%d) > size (%d): %s", f.offset, info.Size(), f.fs.Name()) + f.logger.Debugf("File was truncated as offset (%d) > size (%d): %s", f.offset, info.Size(), f.fs.Name()) return ErrFileTruncate } @@ -167,14 +170,14 @@ func (f *Log) checkFileDisappearedErrors() error { // calling the stat function info, statErr := f.fs.Stat() if statErr != nil { - logp.Err("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr) + f.logger.Errorf("Unexpected error reading from %s; error: %s", f.fs.Name(), statErr) return statErr } if f.config.CloseRenamed { // Check if the file can still be found under the same path if !file.IsSameFile(f.fs.Name(), info) { - logp.Debug("harvester", "close_renamed is enabled and file %s has been renamed", f.fs.Name()) + f.logger.Debugf("close_renamed is enabled and file %s has been renamed", f.fs.Name()) return ErrRenamed } } @@ -182,7 +185,7 @@ func (f *Log) checkFileDisappearedErrors() error { if f.config.CloseRemoved { // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 if f.fs.Removed() { - logp.Debug("harvester", "close_removed is enabled and file %s has been removed", f.fs.Name()) + f.logger.Debugf("close_removed is enabled and file %s has been removed", f.fs.Name()) return ErrRemoved } } diff --git a/filebeat/input/log/logger.go b/filebeat/input/log/logger.go new file mode 100644 index 00000000000..f48cae94ce6 --- /dev/null +++ b/filebeat/input/log/logger.go @@ -0,0 +1,40 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package log + +import ( + "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/logp" +) + +func loggerWithState(logger *logp.Logger, state file.State) *logp.Logger { + return logger.With( + "source", state.Source, + "state_id", state.Id, + "finished", state.Finished, + "os_id", state.FileStateOS, + ) +} + +func loggerWithOldState(logger *logp.Logger, oldState file.State) *logp.Logger { + return logger.With( + "old_source", oldState.Source, + "old_finished", oldState.Finished, + "old_os_id", oldState.FileStateOS, + ) +} diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 0e8fbd0fc10..04dfe12ba39 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -88,6 +88,7 @@ func (p *Input) Run() { func (p *Input) createHarvester(state file.State) (*log.Harvester, error) { // Each harvester gets its own copy of the outlet h, err := log.NewHarvester( + logp.NewLogger("stdin"), p.cfg, state, nil, nil, func() channel.Outleter { diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index 659253c4ec6..f56a51d0fe0 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -370,7 +370,7 @@ def test_truncated_file_closed(self): # Wait until harvester is closed self.wait_until( lambda: self.log_contains( - "Stopping harvester for file"), + "Stopping harvester."), max_timeout=15) # Write 1 line -> truncation diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index 7192fd956d5..f9dbd138a2e 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -178,8 +178,7 @@ def test_rotating_close_inactive_low_write_rate(self): # wait for file to be closed due to close_inactive self.wait_until( - lambda: self.log_contains( - "Closing file: {}\n".format(os.path.abspath(testfile))), + lambda: self.log_contains("Closing file"), max_timeout=10) # wait a bit longer (on 1.0.1 this would cause the harvester @@ -305,8 +304,7 @@ def test_close_inactive(self): # wait for file to be closed due to close_inactive self.wait_until( - lambda: self.log_contains( - "Closing file: {}\n".format(os.path.abspath(testfile))), + lambda: self.log_contains("Closing file"), max_timeout=10) # write second line @@ -359,8 +357,7 @@ def test_close_inactive_file_removal(self): # wait for file to be closed due to close_inactive self.wait_until( - lambda: self.log_contains( - "Closing file: {}\n".format(os.path.abspath(testfile))), + lambda: self.log_contains("Closing file"), max_timeout=10) filebeat.check_kill_and_wait() @@ -407,7 +404,7 @@ def test_close_inactive_file_rotation_and_removal(self): self.wait_until( lambda: self.log_contains( # Still checking for old file name as filename does not change in harvester - "Closing file: "), + "Closing file"), max_timeout=10) filebeat.check_kill_and_wait() @@ -467,7 +464,7 @@ def test_close_inactive_file_rotation_and_removal2(self): self.wait_until( lambda: self.log_contains_count( # Checking if two files were closed - "Closing file: ") == 2, + "Closing file") == 2, max_timeout=10) filebeat.check_kill_and_wait() diff --git a/filebeat/tests/system/test_stdin.py b/filebeat/tests/system/test_stdin.py index 6d701fdfbf5..a8283925ab8 100644 --- a/filebeat/tests/system/test_stdin.py +++ b/filebeat/tests/system/test_stdin.py @@ -24,7 +24,7 @@ def test_stdin(self): self.wait_until( lambda: self.log_contains( - "Harvester started for file: -"), + "Harvester started for file."), max_timeout=10) iterations1 = 5