diff --git a/filebeat/input/docker/prospector.go b/filebeat/input/docker/prospector.go index 76a3b4d0c7d..1b26b7f4b20 100644 --- a/filebeat/input/docker/prospector.go +++ b/filebeat/input/docker/prospector.go @@ -45,7 +45,7 @@ func NewProspector(cfg *common.Config, outletFactory channel.Factory, context pr if err := cfg.SetString("docker-json", -1, config.Containers.Stream); err != nil { return nil, errors.Wrap(err, "update prospector config") } - return log.NewProspector(cfg, outletFactory, context) + return log.NewInput(cfg, outletFactory, context) } func checkStream(val string) error { diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index e0a7284e40e..b7a5f46ce40 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -24,7 +24,7 @@ var ( }, CleanInactive: 0, - // Prospector + // Input Enabled: true, IgnoreOlder: 0, ScanFrequency: 10 * time.Second, @@ -60,7 +60,7 @@ type config struct { InputType string `config:"input_type"` CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` - // Prospector + // Input Enabled bool `config:"enabled"` ExcludeFiles []match.Matcher `config:"exclude_files"` IgnoreOlder time.Duration `config:"ignore_older"` @@ -84,7 +84,7 @@ type config struct { Multiline *reader.MultilineConfig `config:"multiline"` JSON *reader.JSONConfig `config:"json"` - // Hidden on purpose, used by the docker prospector: + // Hidden on purpose, used by the docker input: DockerJSON string `config:"docker-json"` } @@ -127,9 +127,9 @@ func (c *config) Validate() error { c.Type = c.InputType } - // Prospector + // Input if c.Type == harvester.LogType && len(c.Paths) == 0 { - return fmt.Errorf("No paths were defined for prospector") + return fmt.Errorf("No paths were defined for input") } if c.CleanInactive != 0 && c.IgnoreOlder == 0 { @@ -171,11 +171,11 @@ func (c *config) Validate() error { // resolveRecursiveGlobs expands `**` from the globs in multiple patterns func (c *config) resolveRecursiveGlobs() error { if !c.RecursiveGlob { - logp.Debug("prospector", "recursive glob disabled") + logp.Debug("input", "recursive glob disabled") return nil } - logp.Debug("prospector", "recursive glob enabled") + logp.Debug("input", "recursive glob enabled") var paths []string for _, path := range c.Paths { patterns, err := file.GlobPatterns(path, recursiveGlobDepth) @@ -183,7 +183,7 @@ func (c *config) resolveRecursiveGlobs() error { return err } if len(patterns) > 1 { - logp.Debug("prospector", "%q expanded to %#v", path, patterns) + logp.Debug("input", "%q expanded to %#v", path, patterns) } paths = append(paths, patterns...) } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 6634f5f9c5e..bf247346660 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -408,7 +408,7 @@ func (h *Harvester) validateFile(f *os.File) error { return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name()) } - // Compares the stat of the opened file to the state given by the prospector. Abort if not match. + // Compares the stat of the opened file to the state given by the input. Abort if not match. if !os.SameFile(h.state.Fileinfo, info) { return errors.New("file info is not identical with opened file. Aborting harvesting and retrying file later again") } diff --git a/filebeat/input/log/prospector.go b/filebeat/input/log/input.go similarity index 74% rename from filebeat/input/log/prospector.go rename to filebeat/input/log/input.go index 0ae11eedc00..35bbc1b1c0b 100644 --- a/filebeat/input/log/prospector.go +++ b/filebeat/input/log/input.go @@ -11,8 +11,8 @@ import ( "github.com/elastic/beats/filebeat/channel" "github.com/elastic/beats/filebeat/harvester" + "github.com/elastic/beats/filebeat/input" "github.com/elastic/beats/filebeat/input/file" - "github.com/elastic/beats/filebeat/prospector" "github.com/elastic/beats/filebeat/util" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/atomic" @@ -25,20 +25,20 @@ const ( ) var ( - filesRenamed = monitoring.NewInt(nil, "filebeat.prospector.log.files.renamed") - filesTruncated = monitoring.NewInt(nil, "filebeat.prospector.log.files.truncated") + filesRenamed = monitoring.NewInt(nil, "filebeat.input.log.files.renamed") + filesTruncated = monitoring.NewInt(nil, "filebeat.input.log.files.truncated") harvesterSkipped = monitoring.NewInt(nil, "filebeat.harvester.skipped") ) func init() { - err := prospector.Register("log", NewProspector) + err := input.Register("log", NewInput) if err != nil { panic(err) } } -// Prospector contains the prospector and its config -type Prospector struct { +// Input contains the input and its config +type Input struct { cfg *common.Config config config states *file.States @@ -49,15 +49,15 @@ type Prospector struct { numHarvesters atomic.Uint32 } -// NewProspector instantiates a new Log -func NewProspector( +// NewInput instantiates a new Log +func NewInput( cfg *common.Config, outlet channel.Factory, - context prospector.Context, -) (prospector.Prospectorer, error) { + context input.Context, +) (input.Input, error) { // Note: underlying output. - // The prospector and harvester do have different requirements + // The input and harvester do have different requirements // on the timings the outlets must be closed/unblocked. // The outlet generated here is the underlying outlet, only closed // once all workers have been shut down. @@ -72,7 +72,7 @@ func NewProspector( // can be forwarded correctly to the registrar. stateOut := channel.CloseOnSignal(channel.SubOutlet(out), context.BeatDone) - p := &Prospector{ + p := &Input{ config: defaultConfig, cfg: cfg, harvesters: harvester.NewRegistry(), @@ -100,7 +100,7 @@ func NewProspector( } if len(p.config.Paths) == 0 { - return nil, fmt.Errorf("each prospector must have at least one path defined") + return nil, fmt.Errorf("each input must have at least one path defined") } err = p.loadStates(context.States) @@ -113,23 +113,23 @@ func NewProspector( return p, nil } -// LoadStates loads states into prospector +// LoadStates loads states into input // It goes through all states coming from the registry. Only the states which match the glob patterns of -// the prospector will be loaded and updated. All other states will not be touched. -func (p *Prospector) loadStates(states []file.State) error { - logp.Debug("prospector", "exclude_files: %s. Number of stats: %d", p.config.ExcludeFiles, len(states)) +// the input will be loaded and updated. All other states will not be touched. +func (p *Input) loadStates(states []file.State) error { + logp.Debug("input", "exclude_files: %s. Number of stats: %d", p.config.ExcludeFiles, len(states)) for _, state := range states { - // Check if state source belongs to this prospector. If yes, update the state. + // Check if state source belongs to this input. If yes, update the state. if p.matchesFile(state.Source) { state.TTL = -1 - // In case a prospector is tried to be started with an unfinished state matching the glob pattern + // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return fmt.Errorf("Can only start a prospector when all related states are finished: %+v", state) + return fmt.Errorf("Can only start an input when all related states are finished: %+v", state) } - // Update prospector states and send new states to registry + // Update input states and send new states to registry err := p.updateState(state) if err != nil { logp.Err("Problem putting initial state: %+v", err) @@ -138,13 +138,13 @@ func (p *Prospector) loadStates(states []file.State) error { } } - logp.Debug("prospector", "Prospector with previous states loaded: %v", p.states.Count()) + logp.Debug("input", "input with previous states loaded: %v", p.states.Count()) return nil } -// Run runs the prospector -func (p *Prospector) Run() { - logp.Debug("prospector", "Start next scan") +// Run runs the input +func (p *Input) Run() { + logp.Debug("input", "Start next scan") // TailFiles is like ignore_older = 1ns and only on startup if p.config.TailFiles { @@ -165,7 +165,7 @@ func (p *Prospector) Run() { if p.config.CleanInactive > 0 || p.config.CleanRemoved { beforeCount := p.states.Count() cleanedStates := p.states.Cleanup() - logp.Debug("prospector", "Prospector states cleaned up. Before: %d, After: %d", beforeCount, beforeCount-cleanedStates) + logp.Debug("input", "input states cleaned up. Before: %d, After: %d", beforeCount, beforeCount-cleanedStates) } // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first @@ -176,26 +176,26 @@ func (p *Prospector) Run() { if err != nil { if os.IsNotExist(err) { p.removeState(state) - logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source) + logp.Debug("input", "Remove state for file as file removed: %s", state.Source) } else { - logp.Err("Prospector state for %s was not removed: %s", state.Source, err) + logp.Err("input state for %s was not removed: %s", state.Source, err) } } else { // Check if existing source on disk and state are the same. Remove if not the case. newState := file.NewState(stat, state.Source, p.config.Type) if !newState.FileStateOS.IsSame(state.FileStateOS) { p.removeState(state) - logp.Debug("prospector", "Remove state for file as file removed or renamed: %s", state.Source) + logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source) } } } } } -func (p *Prospector) removeState(state file.State) { +func (p *Input) removeState(state file.State) { // Only clean up files where state is Finished if !state.Finished { - logp.Debug("prospector", "State for file not removed because harvester not finished: %s", state.Source) + logp.Debug("input", "State for file not removed because harvester not finished: %s", state.Source) return } @@ -208,7 +208,7 @@ func (p *Prospector) removeState(state file.State) { // getFiles returns all files which have to be harvested // All globs are expanded and then directory and excluded files are removed -func (p *Prospector) getFiles() map[string]os.FileInfo { +func (p *Input) getFiles() map[string]os.FileInfo { paths := map[string]os.FileInfo{} for _, path := range p.config.Paths { @@ -224,37 +224,37 @@ func (p *Prospector) getFiles() map[string]os.FileInfo { // check if the file is in the exclude_files list if p.isFileExcluded(file) { - logp.Debug("prospector", "Exclude file: %s", file) + logp.Debug("input", "Exclude file: %s", file) continue } // Fetch Lstat File info to detected also symlinks fileInfo, err := os.Lstat(file) if err != nil { - logp.Debug("prospector", "lstat(%s) failed: %s", file, err) + logp.Debug("input", "lstat(%s) failed: %s", file, err) continue } if fileInfo.IsDir() { - logp.Debug("prospector", "Skipping directory: %s", file) + logp.Debug("input", "Skipping directory: %s", file) continue } isSymlink := fileInfo.Mode()&os.ModeSymlink > 0 if isSymlink && !p.config.Symlinks { - logp.Debug("prospector", "File %s skipped as it is a symlink.", file) + logp.Debug("input", "File %s skipped as it is a symlink.", file) continue } // Fetch Stat file info which fetches the inode. In case of a symlink, the original inode is fetched fileInfo, err = os.Stat(file) if err != nil { - logp.Debug("prospector", "stat(%s) failed: %s", file, err) + logp.Debug("input", "stat(%s) failed: %s", file, err) continue } - // If symlink is enabled, it is checked that original is not part of same prospector - // It original is harvested by other prospector, states will potentially overwrite each other + // If symlink is enabled, it is checked that original is not part of same input + // It original is harvested by other input, states will potentially overwrite each other if p.config.Symlinks { for _, finfo := range paths { if os.SameFile(finfo, fileInfo) { @@ -271,8 +271,8 @@ func (p *Prospector) getFiles() map[string]os.FileInfo { return paths } -// matchesFile returns true in case the given filePath is part of this prospector, means matches its glob patterns -func (p *Prospector) matchesFile(filePath string) bool { +// matchesFile returns true in case the given filePath is part of this input, means matches its glob patterns +func (p *Input) matchesFile(filePath string) bool { // Path is cleaned to ensure we always compare clean paths filePath = filepath.Clean(filePath) @@ -284,7 +284,7 @@ func (p *Prospector) matchesFile(filePath string) bool { // Evaluate if glob matches filePath match, err := filepath.Match(glob, filePath) if err != nil { - logp.Debug("prospector", "Error matching glob: %s", err) + logp.Debug("input", "Error matching glob: %s", err) continue } @@ -351,14 +351,14 @@ func getSortedFiles(scanOrder string, scanSort string, sortInfos []FileSortInfo) return sortInfos, nil } -func getFileState(path string, info os.FileInfo, p *Prospector) (file.State, error) { +func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) { var err error var absolutePath string absolutePath, err = filepath.Abs(path) if err != nil { return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err) } - logp.Debug("prospector", "Check file for harvesting: %s", absolutePath) + logp.Debug("input", "Check file for harvesting: %s", absolutePath) // Create new state for comparison newState := file.NewState(info, absolutePath, p.config.Type) return newState, nil @@ -373,7 +373,7 @@ func getKeys(paths map[string]os.FileInfo) []string { } // Scan starts a scanGlob for each provided path/glob -func (p *Prospector) scan() { +func (p *Input) scan() { var sortInfos []FileSortInfo var files []string @@ -407,7 +407,7 @@ func (p *Prospector) scan() { select { case <-p.done: - logp.Info("Scan aborted because prospector stopped.") + logp.Info("Scan aborted because input stopped.") return default: } @@ -431,7 +431,7 @@ func (p *Prospector) scan() { // Decides if previous state exists if lastState.IsEmpty() { - logp.Debug("prospector", "Start harvester for new file: %s", newState.Source) + logp.Debug("input", "Start harvester for new file: %s", newState.Source) err := p.startHarvester(newState, 0) if err != nil { logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err) @@ -443,8 +443,8 @@ func (p *Prospector) scan() { } // harvestExistingFile continues harvesting a file with a known state if needed -func (p *Prospector) harvestExistingFile(newState file.State, oldState file.State) { - logp.Debug("prospector", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) +func (p *Input) harvestExistingFile(newState file.State, oldState file.State) { + logp.Debug("input", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) // No harvester is running for the file, start a new harvester // It is important here that only the size is checked and not modification time, as modification time could be incorrect on windows @@ -453,7 +453,7 @@ func (p *Prospector) harvestExistingFile(newState file.State, oldState file.Stat // Resume harvesting of an old file we've stopped harvesting from // This could also be an issue with force_close_older that a new harvester is started after each scan but not needed? // One problem with comparing modTime is that it is in seconds, and scans can happen more then once a second - logp.Debug("prospector", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size()) + logp.Debug("input", "Resuming harvesting of file: %s, offset: %d, new size: %d", newState.Source, oldState.Offset, newState.Fileinfo.Size()) err := p.startHarvester(newState, oldState.Offset) if err != nil { logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) @@ -463,7 +463,7 @@ func (p *Prospector) harvestExistingFile(newState file.State, oldState file.Stat // File size was reduced -> truncated file if oldState.Finished && newState.Fileinfo.Size() < oldState.Offset { - logp.Debug("prospector", "Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Fileinfo.Size()) + logp.Debug("input", "Old file was truncated. Starting from the beginning: %s, offset: %d, new size: %d ", newState.Source, newState.Fileinfo.Size()) err := p.startHarvester(newState, 0) if err != nil { logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) @@ -477,10 +477,10 @@ func (p *Prospector) harvestExistingFile(newState file.State, oldState file.Stat if oldState.Source != "" && oldState.Source != newState.Source { // This does not start a new harvester as it is assume that the older harvester is still running // or no new lines were detected. It sends only an event status update to make sure the new name is persisted. - logp.Debug("prospector", "File rename was detected: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) + logp.Debug("input", "File rename was detected: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) if oldState.Finished { - logp.Debug("prospector", "Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) + logp.Debug("input", "Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) // Update state because of file rotation oldState.Source = newState.Source err := p.updateState(oldState) @@ -490,22 +490,22 @@ func (p *Prospector) harvestExistingFile(newState file.State, oldState file.Stat filesRenamed.Add(1) } else { - logp.Debug("prospector", "File rename detected but harvester not finished yet.") + logp.Debug("input", "File rename detected but harvester not finished yet.") } } if !oldState.Finished { // Nothing to do. Harvester is still running and file was not renamed - logp.Debug("prospector", "Harvester for file is still running: %s", newState.Source) + logp.Debug("input", "Harvester for file is still running: %s", newState.Source) } else { - logp.Debug("prospector", "File didn't change: %s", newState.Source) + logp.Debug("input", "File didn't change: %s", newState.Source) } } // handleIgnoreOlder handles states which fall under ignore older // Based on the state information it is decided if the state information has to be updated or not -func (p *Prospector) handleIgnoreOlder(lastState, newState file.State) error { - logp.Debug("prospector", "Ignore file because ignore_older reached: %s", newState.Source) +func (p *Input) handleIgnoreOlder(lastState, newState file.State) error { + logp.Debug("input", "Ignore file because ignore_older reached: %s", newState.Source) if !lastState.IsEmpty() { if !lastState.Finished { @@ -517,7 +517,7 @@ func (p *Prospector) handleIgnoreOlder(lastState, newState file.State) error { // Make sure file is not falling under clean_inactive yet if p.isCleanInactive(newState) { - logp.Debug("prospector", "Do not write state for ignore_older because clean_inactive reached") + logp.Debug("input", "Do not write state for ignore_older because clean_inactive reached") return nil } @@ -536,13 +536,13 @@ func (p *Prospector) handleIgnoreOlder(lastState, newState file.State) error { } // isFileExcluded checks if the given path should be excluded -func (p *Prospector) isFileExcluded(file string) bool { +func (p *Input) isFileExcluded(file string) bool { patterns := p.config.ExcludeFiles return len(patterns) > 0 && harvester.MatchAny(patterns, file) } // isIgnoreOlder checks if the given state reached ignore_older -func (p *Prospector) isIgnoreOlder(state file.State) bool { +func (p *Input) isIgnoreOlder(state file.State) bool { // ignore_older is disable if p.config.IgnoreOlder == 0 { return false @@ -557,7 +557,7 @@ func (p *Prospector) isIgnoreOlder(state file.State) bool { } // isCleanInactive checks if the given state false under clean_inactive -func (p *Prospector) isCleanInactive(state file.State) bool { +func (p *Input) isCleanInactive(state file.State) bool { // clean_inactive is disable if p.config.CleanInactive <= 0 { return false @@ -572,7 +572,7 @@ func (p *Prospector) isCleanInactive(state file.State) bool { } // createHarvester creates a new harvester instance from the given state -func (p *Prospector) createHarvester(state file.State, onTerminate func()) (*Harvester, error) { +func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harvester, error) { // Each wraps the outlet, for closing the outlet individually outlet := channel.SubOutlet(p.outlet) h, err := NewHarvester( @@ -590,7 +590,7 @@ func (p *Prospector) createHarvester(state file.State, onTerminate func()) (*Har // startHarvester starts a new harvester with the given offset // In case the HarvesterLimit is reached, an error is returned -func (p *Prospector) startHarvester(state file.State, offset int64) error { +func (p *Input) startHarvester(state file.State, offset int64) error { if p.numHarvesters.Inc() > p.config.HarvesterLimit && p.config.HarvesterLimit > 0 { p.numHarvesters.Dec() harvesterSkipped.Add(1) @@ -624,9 +624,9 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { return err } -// updateState updates the prospector state and forwards the event to the spooler -// All state updates done by the prospector itself are synchronous to make sure not states are overwritten -func (p *Prospector) updateState(state file.State) error { +// updateState updates the input state and forwards the event to the spooler +// All state updates done by the input itself are synchronous to make sure not states are overwritten +func (p *Input) updateState(state file.State) error { // Add ttl if cleanOlder is enabled and TTL is not already 0 if p.config.CleanInactive > 0 && state.TTL != 0 { state.TTL = p.config.CleanInactive @@ -639,21 +639,21 @@ func (p *Prospector) updateState(state file.State) error { data.SetState(state) ok := p.outlet.OnEvent(data) if !ok { - logp.Info("Prospector outlet closed") - return errors.New("prospector outlet closed") + logp.Info("input outlet closed") + return errors.New("input outlet closed") } return nil } // Wait waits for the all harvesters to complete and only then call stop -func (p *Prospector) Wait() { +func (p *Input) Wait() { p.harvesters.WaitForCompletion() p.Stop() } -// Stop stops all harvesters and then stops the prospector -func (p *Prospector) Stop() { +// Stop stops all harvesters and then stops the input +func (p *Input) Stop() { // Stop all harvesters // In case the beatDone channel is closed, this will not wait for completion // Otherwise Stop will wait until output is complete diff --git a/filebeat/input/log/prospector_other_test.go b/filebeat/input/log/input_other_test.go similarity index 92% rename from filebeat/input/log/prospector_other_test.go rename to filebeat/input/log/input_other_test.go index 813b1b70f00..fb976565df4 100644 --- a/filebeat/input/log/prospector_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -59,7 +59,7 @@ var matchTests = []struct { func TestMatchFile(t *testing.T) { for _, test := range matchTests { - p := Prospector{ + p := Input{ config: config{ Paths: test.paths, ExcludeFiles: test.excludeFiles, @@ -72,8 +72,8 @@ func TestMatchFile(t *testing.T) { var initStateTests = []struct { states []file.State // list of states - paths []string // prospector glob - count int // expected states in prospector + paths []string // input glob + count int // expected states in input }{ { []file.State{ @@ -123,11 +123,11 @@ var initStateTests = []struct { }, } -// TestInit checks that the correct states are in a prospector after the init phase +// TestInit checks that the correct states are in an input after the init phase // This means only the ones that match the glob and not exclude files func TestInit(t *testing.T) { for _, test := range initStateTests { - p := Prospector{ + p := Input{ config: config{ Paths: test.paths, }, diff --git a/filebeat/input/log/prospector_test.go b/filebeat/input/log/input_test.go similarity index 94% rename from filebeat/input/log/prospector_test.go rename to filebeat/input/log/input_test.go index 6a4c81c655b..383c9e84dd0 100644 --- a/filebeat/input/log/prospector_test.go +++ b/filebeat/input/log/input_test.go @@ -13,8 +13,8 @@ import ( "github.com/elastic/beats/libbeat/common/match" ) -func TestProspectorFileExclude(t *testing.T) { - p := Prospector{ +func TestInputFileExclude(t *testing.T) { + p := Input{ config: config{ ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)}, }, @@ -49,7 +49,7 @@ var cleanInactiveTests = []struct { func TestIsCleanInactive(t *testing.T) { for _, test := range cleanInactiveTests { - l := Prospector{ + l := Input{ config: config{ CleanInactive: test.cleanInactive, }, diff --git a/filebeat/input/log/prospector_windows_test.go b/filebeat/input/log/prospector_windows_test.go index b990125aa12..8971c8f96fa 100644 --- a/filebeat/input/log/prospector_windows_test.go +++ b/filebeat/input/log/prospector_windows_test.go @@ -59,7 +59,7 @@ var matchTestsWindows = []struct { func TestMatchFileWindows(t *testing.T) { for _, test := range matchTestsWindows { - p := Prospector{ + p := Input{ config: config{ Paths: test.paths, ExcludeFiles: test.excludeFiles, diff --git a/filebeat/tests/system/test_prospector.py b/filebeat/tests/system/test_prospector.py index a22151adcb0..fead8cb7690 100644 --- a/filebeat/tests/system/test_prospector.py +++ b/filebeat/tests/system/test_prospector.py @@ -299,7 +299,7 @@ def test_no_paths_defined(self): # wait for first "Start next scan" log message self.wait_until( lambda: self.log_contains( - "No paths were defined for prospector"), + "No paths were defined for input"), max_timeout=10) self.wait_until( diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index 531767aae3e..9d26d69c456 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -980,13 +980,13 @@ def test_restart_state(self): # Make sure all 4 states are persisted self.wait_until( lambda: self.log_contains( - "Prospector states cleaned up. Before: 4, After: 4", logfile="filebeat2.log"), + "input states cleaned up. Before: 4, After: 4", logfile="filebeat2.log"), max_timeout=10) # Wait until registry file is cleaned self.wait_until( lambda: self.log_contains( - "Prospector states cleaned up. Before: 0, After: 0", logfile="filebeat2.log"), + "input states cleaned up. Before: 0, After: 0", logfile="filebeat2.log"), max_timeout=10) filebeat.check_kill_and_wait() diff --git a/filebeat/tests/system/test_reload_modules.py b/filebeat/tests/system/test_reload_modules.py index d0341503720..a757acf6326 100644 --- a/filebeat/tests/system/test_reload_modules.py +++ b/filebeat/tests/system/test_reload_modules.py @@ -239,7 +239,7 @@ def test_wrong_module_no_reload(self): # Wait until offset for new line is updated self.wait_until( - lambda: self.log_contains("No paths were defined for prospector accessing"), + lambda: self.log_contains("No paths were defined for input accessing"), max_timeout=10) assert exit_code == 1 diff --git a/filebeat/tests/system/test_reload_prospectors.py b/filebeat/tests/system/test_reload_prospectors.py index d9db94c8c50..b46215d7718 100644 --- a/filebeat/tests/system/test_reload_prospectors.py +++ b/filebeat/tests/system/test_reload_prospectors.py @@ -283,7 +283,7 @@ def test_reload_same_config(self): # Make sure error shows up in log file self.wait_until( - lambda: self.log_contains("Can only start a prospector when all related states are finished"), + lambda: self.log_contains("Can only start an input when all related states are finished"), max_timeout=15) # Wait until old runner is stopped diff --git a/filebeat/tests/system/test_shutdown.py b/filebeat/tests/system/test_shutdown.py index 5d6ea3c694b..4bdcfadcbe7 100644 --- a/filebeat/tests/system/test_shutdown.py +++ b/filebeat/tests/system/test_shutdown.py @@ -172,7 +172,7 @@ def test_stopping_empty_path(self): time.sleep(2) # Wait until first flush - msg = "No paths were defined for prospector" + msg = "No paths were defined for input" self.wait_until( lambda: self.log_contains_count(msg) >= 1, max_timeout=5)