From 676e8365dd280e5a11dee5f0322d62b9d95b508c Mon Sep 17 00:00:00 2001 From: ruflin Date: Tue, 9 May 2017 16:00:19 +0200 Subject: [PATCH] Move prospector log to its own package with log harvester This is the last step in reorganising the packages related to prospector and harvester. Follow up PR's will mainly focusing on abstracting out common functionality, standardise naming and have proper interfaces. * Merge log harvester and log prospector config into one config * Rename Log to prospector as part of the new package structure * Move log harvester logic to log prospector package * Keep common harvester logic in its own package * stdin harvester still heavily depends on log harvester, needs to be split up and simplified at a later stage * Further cleanup of Prospector interface. `Wait()` only exists as a temporary solution. --- filebeat/harvester/util_test.go | 21 ++ filebeat/prospector/config.go | 44 +---- .../{harvester => prospector/log}/config.go | 73 +++++-- filebeat/prospector/{ => log}/config_test.go | 12 +- .../log}/harvester.go | 16 +- .../log}/harvester_test.go | 2 +- .../log}/json_test.go | 2 +- filebeat/{harvester => prospector/log}/log.go | 11 +- .../{harvester => prospector/log}/log_file.go | 6 +- .../{harvester => prospector/log}/log_test.go | 27 +-- .../{prospector_log.go => log/prospector.go} | 187 ++++++++++-------- .../prospector_other_test.go} | 16 +- .../prospector_test.go} | 10 +- .../prospector_windows_test.go} | 8 +- filebeat/prospector/{ => log}/registry.go | 15 +- .../{harvester => prospector/log}/stdin.go | 2 +- filebeat/prospector/prospector.go | 23 +-- filebeat/prospector/stdin/prospector.go | 40 ++-- 18 files changed, 268 insertions(+), 247 deletions(-) rename filebeat/{harvester => prospector/log}/config.go (57%) rename filebeat/prospector/{ => log}/config_test.go (84%) rename filebeat/{harvester => prospector/log}/harvester.go (92%) rename filebeat/{harvester => prospector/log}/harvester_test.go (57%) rename filebeat/{harvester => prospector/log}/json_test.go (99%) rename filebeat/{harvester => prospector/log}/log.go (97%) rename filebeat/{harvester => prospector/log}/log_file.go (98%) rename filebeat/{harvester => prospector/log}/log_test.go (80%) rename filebeat/prospector/{prospector_log.go => log/prospector.go} (73%) rename filebeat/prospector/{prospector_log_other_test.go => log/prospector_other_test.go} (92%) rename filebeat/prospector/{prospector_log_test.go => log/prospector_test.go} (93%) rename filebeat/prospector/{prospector_log_windows_test.go => log/prospector_windows_test.go} (90%) rename filebeat/prospector/{ => log}/registry.go (73%) rename filebeat/{harvester => prospector/log}/stdin.go (94%) diff --git a/filebeat/harvester/util_test.go b/filebeat/harvester/util_test.go index 7900c4541a5a..9c095908ff3f 100644 --- a/filebeat/harvester/util_test.go +++ b/filebeat/harvester/util_test.go @@ -32,3 +32,24 @@ func TestMatchAnyRegexps(t *testing.T) { assert.Equal(t, MatchAny(matchers, "/var/log/log.gz"), true) } + +func TestExcludeLine(t *testing.T) { + regexp, err := InitMatchers("^DBG") + assert.Nil(t, err) + assert.True(t, MatchAny(regexp, "DBG: a debug message")) + assert.False(t, MatchAny(regexp, "ERR: an error message")) +} + +func TestIncludeLine(t *testing.T) { + regexp, err := InitMatchers("^ERR", "^WARN") + + assert.Nil(t, err) + assert.False(t, MatchAny(regexp, "DBG: a debug message")) + assert.True(t, MatchAny(regexp, "ERR: an error message")) + assert.True(t, MatchAny(regexp, "WARNING: a simple warning message")) +} + +func TestInitRegexp(t *testing.T) { + _, err := InitMatchers("(((((") + assert.NotNil(t, err) +} diff --git a/filebeat/prospector/config.go b/filebeat/prospector/config.go index 190a5a569517..3658c77fa8e4 100644 --- a/filebeat/prospector/config.go +++ b/filebeat/prospector/config.go @@ -1,55 +1,19 @@ package prospector import ( - "fmt" "time" cfg "github.com/elastic/beats/filebeat/config" - "github.com/elastic/beats/libbeat/common/match" ) var ( defaultConfig = prospectorConfig{ - Enabled: true, - IgnoreOlder: 0, - ScanFrequency: 10 * time.Second, - InputType: cfg.DefaultInputType, - CleanInactive: 0, - CleanRemoved: true, - HarvesterLimit: 0, - Symlinks: false, - TailFiles: false, + ScanFrequency: 10 * time.Second, + InputType: cfg.DefaultInputType, } ) type prospectorConfig struct { - Enabled bool `config:"enabled"` - ExcludeFiles []match.Matcher `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - InputType string `config:"input_type"` - CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` - CleanRemoved bool `config:"clean_removed"` - HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` - Symlinks bool `config:"symlinks"` - TailFiles bool `config:"tail_files"` - recursiveGlob bool `config:"recursive_glob.enabled"` -} - -func (config *prospectorConfig) Validate() error { - - if config.InputType == cfg.LogInputType && len(config.Paths) == 0 { - return fmt.Errorf("No paths were defined for prospector") - } - - if config.CleanInactive != 0 && config.IgnoreOlder == 0 { - return fmt.Errorf("ignore_older must be enabled when clean_inactive is used") - } - - if config.CleanInactive != 0 && config.CleanInactive <= config.IgnoreOlder+config.ScanFrequency { - return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed") - } - - return nil + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + InputType string `config:"input_type"` } diff --git a/filebeat/harvester/config.go b/filebeat/prospector/log/config.go similarity index 57% rename from filebeat/harvester/config.go rename to filebeat/prospector/log/config.go index f96533802639..f040f82ef466 100644 --- a/filebeat/harvester/config.go +++ b/filebeat/prospector/log/config.go @@ -1,22 +1,34 @@ -package harvester +package log import ( "fmt" "time" + "github.com/dustin/go-humanize" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/reader" - - "github.com/dustin/go-humanize" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/match" "github.com/elastic/beats/libbeat/processors" ) var ( - defaultConfig = harvesterConfig{ - BufferSize: 16 * humanize.KiByte, + defaultConfig = config{ + // Common InputType: cfg.DefaultInputType, + CleanInactive: 0, + + // Prospector + Enabled: true, + IgnoreOlder: 0, + ScanFrequency: 10 * time.Second, + CleanRemoved: true, + HarvesterLimit: 0, + Symlinks: false, + TailFiles: false, + + // Harvester + BufferSize: 16 * humanize.KiByte, Backoff: 1 * time.Second, BackoffFactor: 2, MaxBackoff: 10 * time.Second, @@ -26,15 +38,31 @@ var ( CloseRenamed: false, CloseEOF: false, CloseTimeout: 0, - CleanInactive: 0, } ) -type harvesterConfig struct { +type config struct { + + // Common + InputType string `config:"input_type"` + CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` + + // Prospector + Enabled bool `config:"enabled"` + ExcludeFiles []match.Matcher `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` + recursiveGlob bool `config:"recursive_glob.enabled"` + + // Harvester common.EventMetadata `config:",inline"` // Fields and tags to add to events. BufferSize int `config:"harvester_buffer_size"` Encoding string `config:"encoding"` - InputType string `config:"input_type"` Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` BackoffFactor int `config:"backoff_factor" validate:"min=1"` MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` @@ -48,27 +76,40 @@ type harvesterConfig struct { MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` Multiline *reader.MultilineConfig `config:"multiline"` JSON *reader.JSONConfig `config:"json"` - CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` Pipeline string `config:"pipeline"` Module string `config:"_module_name"` // hidden option to set the module name Fileset string `config:"_fileset_name"` // hidden option to set the fileset name Processors processors.PluginConfig `config:"processors"` } -func (config *harvesterConfig) Validate() error { +func (c *config) Validate() error { + + // Prospector + if c.InputType == cfg.LogInputType && len(c.Paths) == 0 { + return fmt.Errorf("No paths were defined for prospector") + } + + if c.CleanInactive != 0 && c.IgnoreOlder == 0 { + return fmt.Errorf("ignore_older must be enabled when clean_inactive is used") + } + + if c.CleanInactive != 0 && c.CleanInactive <= c.IgnoreOlder+c.ScanFrequency { + return fmt.Errorf("clean_inactive must be > ignore_older + scan_frequency to make sure only files which are not monitored anymore are removed") + } + // Harvester // Check input type - if _, ok := cfg.ValidInputType[config.InputType]; !ok { - return fmt.Errorf("Invalid input type: %v", config.InputType) + if _, ok := cfg.ValidInputType[c.InputType]; !ok { + return fmt.Errorf("Invalid input type: %v", c.InputType) } - if config.JSON != nil && len(config.JSON.MessageKey) == 0 && - config.Multiline != nil { + if c.JSON != nil && len(c.JSON.MessageKey) == 0 && + c.Multiline != nil { return fmt.Errorf("When using the JSON decoder and multiline together, you need to specify a message_key value") } - if config.JSON != nil && len(config.JSON.MessageKey) == 0 && - (len(config.IncludeLines) > 0 || len(config.ExcludeLines) > 0) { + if c.JSON != nil && len(c.JSON.MessageKey) == 0 && + (len(c.IncludeLines) > 0 || len(c.ExcludeLines) > 0) { return fmt.Errorf("When using the JSON decoder and line filtering together, you need to specify a message_key value") } diff --git a/filebeat/prospector/config_test.go b/filebeat/prospector/log/config_test.go similarity index 84% rename from filebeat/prospector/config_test.go rename to filebeat/prospector/log/config_test.go index b07e850c43d5..c79a0d7361bc 100644 --- a/filebeat/prospector/config_test.go +++ b/filebeat/prospector/log/config_test.go @@ -1,6 +1,6 @@ // +build !integration -package prospector +package log import ( "testing" @@ -12,7 +12,7 @@ import ( func TestCleanOlderError(t *testing.T) { - config := prospectorConfig{ + config := config{ CleanInactive: 10 * time.Hour, } @@ -22,7 +22,7 @@ func TestCleanOlderError(t *testing.T) { func TestCleanOlderIgnoreOlderError(t *testing.T) { - config := prospectorConfig{ + config := config{ CleanInactive: 10 * time.Hour, IgnoreOlder: 15 * time.Hour, } @@ -33,7 +33,7 @@ func TestCleanOlderIgnoreOlderError(t *testing.T) { func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) { - config := prospectorConfig{ + config := config{ CleanInactive: 10 * time.Hour, IgnoreOlder: 10 * time.Hour, } @@ -44,9 +44,11 @@ func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) { func TestCleanOlderIgnoreOlder(t *testing.T) { - config := prospectorConfig{ + config := config{ CleanInactive: 10*time.Hour + defaultConfig.ScanFrequency + 1*time.Second, IgnoreOlder: 10 * time.Hour, + InputType: "log", + Paths: []string{"hello"}, } err := config.Validate() diff --git a/filebeat/harvester/harvester.go b/filebeat/prospector/log/harvester.go similarity index 92% rename from filebeat/harvester/harvester.go rename to filebeat/prospector/log/harvester.go index 34bc6d3fd7fc..89369f2b7002 100644 --- a/filebeat/harvester/harvester.go +++ b/filebeat/prospector/log/harvester.go @@ -9,7 +9,7 @@ // line. As soon as the line is completed, it is read and returned. // // The stdin harvesters reads data from stdin. -package harvester +package log import ( "errors" @@ -18,7 +18,7 @@ import ( "github.com/satori/go.uuid" - "github.com/elastic/beats/filebeat/config" + cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/harvester/encoding" "github.com/elastic/beats/filebeat/harvester/source" "github.com/elastic/beats/filebeat/input/file" @@ -43,7 +43,7 @@ type Outlet interface { } type Harvester struct { - config harvesterConfig + config config state file.State states *file.States file source.FileSource /* the file being watched */ @@ -59,7 +59,7 @@ type Harvester struct { } func NewHarvester( - cfg *common.Config, + config *common.Config, state file.State, states *file.States, outlet Outlet, @@ -75,7 +75,7 @@ func NewHarvester( ID: uuid.NewV4(), } - if err := cfg.Unpack(&h.config); err != nil { + if err := config.Unpack(&h.config); err != nil { return nil, err } @@ -107,12 +107,12 @@ func NewHarvester( func (h *Harvester) open() error { switch h.config.InputType { - case config.StdinInputType: + case cfg.StdinInputType: return h.openStdin() - case config.LogInputType: + case cfg.LogInputType: return h.openFile() default: - return fmt.Errorf("Invalid input type") + return fmt.Errorf("Invalid harvester type: %+v", h.config) } } diff --git a/filebeat/harvester/harvester_test.go b/filebeat/prospector/log/harvester_test.go similarity index 57% rename from filebeat/harvester/harvester_test.go rename to filebeat/prospector/log/harvester_test.go index c7c01637140f..67689942d451 100644 --- a/filebeat/harvester/harvester_test.go +++ b/filebeat/prospector/log/harvester_test.go @@ -1,3 +1,3 @@ // +build !integration -package harvester +package log diff --git a/filebeat/harvester/json_test.go b/filebeat/prospector/log/json_test.go similarity index 99% rename from filebeat/harvester/json_test.go rename to filebeat/prospector/log/json_test.go index 19026b0b0fdf..e57c050f6c63 100644 --- a/filebeat/harvester/json_test.go +++ b/filebeat/prospector/log/json_test.go @@ -1,4 +1,4 @@ -package harvester +package log import ( "testing" diff --git a/filebeat/harvester/log.go b/filebeat/prospector/log/log.go similarity index 97% rename from filebeat/harvester/log.go rename to filebeat/prospector/log/log.go index 6e115effcaac..ee487c689d14 100644 --- a/filebeat/harvester/log.go +++ b/filebeat/prospector/log/log.go @@ -1,4 +1,4 @@ -package harvester +package log import ( "bytes" @@ -8,7 +8,7 @@ import ( "os" "time" - "github.com/elastic/beats/filebeat/config" + "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/harvester/reader" "github.com/elastic/beats/filebeat/harvester/source" "github.com/elastic/beats/filebeat/input/file" @@ -28,7 +28,6 @@ var ( harvesterClosed = monitoring.NewInt(harvesterMetrics, "closed") harvesterRunning = monitoring.NewInt(harvesterMetrics, "running") harvesterOpenFiles = monitoring.NewInt(harvesterMetrics, "open_files") - filesTruncated = monitoring.NewInt(harvesterMetrics, "files.truncated") ) // Setup opens the file handler and creates the reader for the harvester @@ -226,14 +225,14 @@ func (h *Harvester) SendStateUpdate() { // the include_lines and exclude_lines options. func (h *Harvester) shouldExportLine(line string) bool { if len(h.config.IncludeLines) > 0 { - if !MatchAny(h.config.IncludeLines, line) { + if !harvester.MatchAny(h.config.IncludeLines, line) { // drop line logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line) return false } } if len(h.config.ExcludeLines) > 0 { - if MatchAny(h.config.ExcludeLines, line) { + if harvester.MatchAny(h.config.ExcludeLines, line) { // drop line logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line) return false @@ -324,7 +323,7 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) { // getState returns an updated copy of the harvester state func (h *Harvester) getState() file.State { - if h.config.InputType == config.StdinInputType { + if !h.file.HasState() { return file.State{} } diff --git a/filebeat/harvester/log_file.go b/filebeat/prospector/log/log_file.go similarity index 98% rename from filebeat/harvester/log_file.go rename to filebeat/prospector/log/log_file.go index d7629a5bfeee..dd37dcbdddb4 100644 --- a/filebeat/harvester/log_file.go +++ b/filebeat/prospector/log/log_file.go @@ -1,4 +1,4 @@ -package harvester +package log import ( "io" @@ -13,7 +13,7 @@ import ( type LogFile struct { fs source.FileSource offset int64 - config harvesterConfig + config config lastTimeRead time.Time backoff time.Duration done chan struct{} @@ -21,7 +21,7 @@ type LogFile struct { func NewLogFile( fs source.FileSource, - config harvesterConfig, + config config, ) (*LogFile, error) { var offset int64 if seeker, ok := fs.(io.Seeker); ok { diff --git a/filebeat/harvester/log_test.go b/filebeat/prospector/log/log_test.go similarity index 80% rename from filebeat/harvester/log_test.go rename to filebeat/prospector/log/log_test.go index 5604f4a591d0..927663985c7f 100644 --- a/filebeat/harvester/log_test.go +++ b/filebeat/prospector/log/log_test.go @@ -1,6 +1,6 @@ // +build !integration -package harvester +package log import ( "fmt" @@ -20,7 +20,7 @@ import ( func TestReadLine(t *testing.T) { - absPath, err := filepath.Abs("../tests/files/logs/") + absPath, err := filepath.Abs("../../tests/files/logs/") // All files starting with tmp are ignored logFile := absPath + "/tmp" + strconv.Itoa(rand.Int()) + ".log" @@ -59,7 +59,7 @@ func TestReadLine(t *testing.T) { f := source.File{File: readFile} h := Harvester{ - config: harvesterConfig{ + config: config{ CloseInactive: 500 * time.Millisecond, Backoff: 100 * time.Millisecond, MaxBackoff: 1 * time.Second, @@ -102,27 +102,6 @@ func TestReadLine(t *testing.T) { assert.Equal(t, err, ErrInactive) } -func TestExcludeLine(t *testing.T) { - regexp, err := InitMatchers("^DBG") - assert.Nil(t, err) - assert.True(t, MatchAny(regexp, "DBG: a debug message")) - assert.False(t, MatchAny(regexp, "ERR: an error message")) -} - -func TestIncludeLine(t *testing.T) { - regexp, err := InitMatchers("^ERR", "^WARN") - - assert.Nil(t, err) - assert.False(t, MatchAny(regexp, "DBG: a debug message")) - assert.True(t, MatchAny(regexp, "ERR: an error message")) - assert.True(t, MatchAny(regexp, "WARNING: a simple warning message")) -} - -func TestInitRegexp(t *testing.T) { - _, err := InitMatchers("(((((") - assert.NotNil(t, err) -} - // readLine reads a full line into buffer and returns it. // In case of partial lines, readLine does return an error and an empty string // This could potentially be improved / replaced by https://github.com/elastic/beats/libbeat/tree/master/common/streambuf diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/log/prospector.go similarity index 73% rename from filebeat/prospector/prospector_log.go rename to filebeat/prospector/log/prospector.go index d65c43d2c925..526b4928460c 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/log/prospector.go @@ -1,4 +1,4 @@ -package prospector +package log import ( "errors" @@ -21,14 +21,15 @@ const ( ) var ( - filesRenamed = monitoring.NewInt(nil, "filebeat.prospector.log.files.renamed") - filesTruncated = monitoring.NewInt(nil, "filebeat.prospector.log.files.truncated") + filesRenamed = monitoring.NewInt(nil, "filebeat.prospector.log.files.renamed") + filesTruncated = monitoring.NewInt(nil, "filebeat.prospector.log.files.truncated") + harvesterSkipped = monitoring.NewInt(nil, "filebeat.harvester.skipped") ) -// Log contains the prospector and its config -type Log struct { +// Prospector contains the prospector and its config +type Prospector struct { cfg *common.Config - config prospectorConfig + config config states *file.States registry *harvesterRegistry outlet channel.Outleter @@ -36,50 +37,51 @@ type Log struct { } // NewLog instantiates a new Log -func NewLog(cfg *common.Config, states []file.State, registry *harvesterRegistry, outlet channel.Outleter, done chan struct{}) (*Log, error) { +func NewProspector(cfg *common.Config, states []file.State, outlet channel.Outleter, done chan struct{}) (*Prospector, error) { - prospectorer := &Log{ + p := &Prospector{ + config: defaultConfig, cfg: cfg, - registry: registry, + registry: newHarvesterRegistry(), outlet: outlet, states: &file.States{}, done: done, } - if err := cfg.Unpack(&prospectorer.config); err != nil { + if err := cfg.Unpack(&p.config); err != nil { return nil, err } // Create empty harvester to check if configs are fine // TODO: Do config validation instead - _, err := prospectorer.createHarvester(file.State{}) + _, err := p.createHarvester(file.State{}) if err != nil { return nil, err } - if len(prospectorer.config.Paths) == 0 { + if len(p.config.Paths) == 0 { return nil, fmt.Errorf("each prospector must have at least one path defined") } - err = prospectorer.loadStates(states) + err = p.loadStates(states) if err != nil { return nil, err } - logp.Debug("prospector", "File Configs: %v", prospectorer.config.Paths) + logp.Debug("prospector", "File Configs: %v", p.config.Paths) - return prospectorer, nil + return p, nil } // LoadStates loads states into prospector // It goes through all states coming from the registry. Only the states which match the glob patterns of // the prospector will be loaded and updated. All other states will not be touched. -func (l *Log) loadStates(states []file.State) error { - logp.Debug("prospector", "exclude_files: %s", l.config.ExcludeFiles) +func (p *Prospector) loadStates(states []file.State) error { + logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles) for _, state := range states { // Check if state source belongs to this prospector. If yes, update the state. - if l.matchesFile(state.Source) { + if p.matchesFile(state.Source) { state.TTL = -1 // In case a prospector is tried to be started with an unfinished state matching the glob pattern @@ -88,7 +90,7 @@ func (l *Log) loadStates(states []file.State) error { } // Update prospector states and send new states to registry - err := l.updateState(state) + err := p.updateState(state) if err != nil { logp.Err("Problem putting initial state: %+v", err) return err @@ -96,53 +98,53 @@ func (l *Log) loadStates(states []file.State) error { } } - logp.Info("Prospector with previous states loaded: %v", l.states.Count()) + logp.Info("Prospector with previous states loaded: %v", p.states.Count()) return nil } // Run runs the prospector -func (l *Log) Run() { +func (p *Prospector) Run() { logp.Debug("prospector", "Start next scan") // TailFiles is like ignore_older = 1ns and only on startup - if l.config.TailFiles { - ignoreOlder := l.config.IgnoreOlder + if p.config.TailFiles { + ignoreOlder := p.config.IgnoreOlder // Overwrite ignore_older for the first scan - l.config.IgnoreOlder = 1 + p.config.IgnoreOlder = 1 defer func() { // Reset ignore_older after first run - l.config.IgnoreOlder = ignoreOlder + p.config.IgnoreOlder = ignoreOlder // Disable tail_files after the first run - l.config.TailFiles = false + p.config.TailFiles = false }() } - l.scan() + p.scan() // It is important that a first scan is run before cleanup to make sure all new states are read first - if l.config.CleanInactive > 0 || l.config.CleanRemoved { - beforeCount := l.states.Count() - cleanedStates := l.states.Cleanup() + if p.config.CleanInactive > 0 || p.config.CleanRemoved { + beforeCount := p.states.Count() + cleanedStates := p.states.Cleanup() logp.Debug("prospector", "Prospector states cleaned up. Before: %d, After: %d", beforeCount, beforeCount-cleanedStates) } // Marking removed files to be cleaned up. Cleanup happens after next scan to make sure all states are updated first - if l.config.CleanRemoved { - for _, state := range l.states.GetStates() { + if p.config.CleanRemoved { + for _, state := range p.states.GetStates() { // os.Stat will return an error in case the file does not exist stat, err := os.Stat(state.Source) if err != nil { if os.IsNotExist(err) { - l.removeState(state) + p.removeState(state) logp.Debug("prospector", "Remove state for file as file removed: %s", state.Source) } else { logp.Err("Prospector state for %s was not removed: %s", state.Source, err) } } else { // Check if existing source on disk and state are the same. Remove if not the case. - newState := file.NewState(stat, state.Source, l.config.InputType) + newState := file.NewState(stat, state.Source, p.config.InputType) if !newState.FileStateOS.IsSame(state.FileStateOS) { - l.removeState(state) + p.removeState(state) logp.Debug("prospector", "Remove state for file as file removed or renamed: %s", state.Source) } } @@ -150,7 +152,7 @@ func (l *Log) Run() { } } -func (l *Log) removeState(state file.State) { +func (p *Prospector) removeState(state file.State) { // Only clean up files where state is Finished if !state.Finished { @@ -159,7 +161,7 @@ func (l *Log) removeState(state file.State) { } state.TTL = 0 - err := l.updateState(state) + err := p.updateState(state) if err != nil { logp.Err("File cleanup state update error: %s", err) } @@ -168,13 +170,13 @@ func (l *Log) removeState(state file.State) { // getFiles returns all files which have to be harvested // All globs are expanded and then directory and excluded files are removed -func (l *Log) getFiles() map[string]os.FileInfo { +func (p *Prospector) getFiles() map[string]os.FileInfo { paths := map[string]os.FileInfo{} - for _, path := range l.config.Paths { + for _, path := range p.config.Paths { depth := uint8(0) - if l.config.recursiveGlob { + if p.config.recursiveGlob { depth = recursiveGlobDepth } matches, err := file.Glob(path, depth) @@ -188,7 +190,7 @@ func (l *Log) getFiles() map[string]os.FileInfo { for _, file := range matches { // check if the file is in the exclude_files list - if l.isFileExcluded(file) { + if p.isFileExcluded(file) { logp.Debug("prospector", "Exclude file: %s", file) continue } @@ -206,7 +208,7 @@ func (l *Log) getFiles() map[string]os.FileInfo { } isSymlink := fileInfo.Mode()&os.ModeSymlink > 0 - if isSymlink && !l.config.Symlinks { + if isSymlink && !p.config.Symlinks { logp.Debug("prospector", "File %s skipped as it is a symlink.", file) continue } @@ -220,10 +222,10 @@ func (l *Log) getFiles() map[string]os.FileInfo { // If symlink is enabled, it is checked that original is not part of same prospector // It original is harvested by other prospector, states will potentially overwrite each other - if l.config.Symlinks { + if p.config.Symlinks { for _, finfo := range paths { if os.SameFile(finfo, fileInfo) { - logp.Info("Same file found as symlink and original. Skipping file: %s", file) + logp.Info("Same file found as symlink and originap. Skipping file: %s", file) continue OUTER } } @@ -237,12 +239,12 @@ func (l *Log) getFiles() map[string]os.FileInfo { } // matchesFile returns true in case the given filePath is part of this prospector, means matches its glob patterns -func (l *Log) matchesFile(filePath string) bool { +func (p *Prospector) matchesFile(filePath string) bool { // Path is cleaned to ensure we always compare clean paths filePath = filepath.Clean(filePath) - for _, glob := range l.config.Paths { + for _, glob := range p.config.Paths { // Glob is cleaned to ensure we always compare clean paths glob = filepath.Clean(glob) @@ -255,7 +257,7 @@ func (l *Log) matchesFile(filePath string) bool { } // Check if file is not excluded - if match && !l.isFileExcluded(filePath) { + if match && !p.isFileExcluded(filePath) { return true } } @@ -263,12 +265,12 @@ func (l *Log) matchesFile(filePath string) bool { } // Scan starts a scanGlob for each provided path/glob -func (l *Log) scan() { +func (p *Prospector) scan() { - for path, info := range l.getFiles() { + for path, info := range p.getFiles() { select { - case <-l.done: + case <-p.done: logp.Info("Scan aborted because prospector stopped.") return default: @@ -282,14 +284,14 @@ func (l *Log) scan() { logp.Debug("prospector", "Check file for harvesting: %s", path) // Create new state for comparison - newState := file.NewState(info, path, l.config.InputType) + newState := file.NewState(info, path, p.config.InputType) // Load last state - lastState := l.states.FindPrevious(newState) + lastState := p.states.FindPrevious(newState) // Ignores all files which fall under ignore_older - if l.isIgnoreOlder(newState) { - err := l.handleIgnoreOlder(lastState, newState) + if p.isIgnoreOlder(newState) { + err := p.handleIgnoreOlder(lastState, newState) if err != nil { logp.Err("Updating ignore_older state error: %s", err) } @@ -299,18 +301,18 @@ func (l *Log) scan() { // Decides if previous state exists if lastState.IsEmpty() { logp.Debug("prospector", "Start harvester for new file: %s", newState.Source) - err := l.startHarvester(newState, 0) + err := p.startHarvester(newState, 0) if err != nil { logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err) } } else { - l.harvestExistingFile(newState, lastState) + p.harvestExistingFile(newState, lastState) } } } // harvestExistingFile continues harvesting a file with a known state if needed -func (l *Log) harvestExistingFile(newState file.State, oldState file.State) { +func (p *Prospector) harvestExistingFile(newState file.State, oldState file.State) { logp.Debug("prospector", "Update existing file for harvesting: %s, offset: %v", newState.Source, oldState.Offset) @@ -322,7 +324,7 @@ func (l *Log) harvestExistingFile(newState file.State, oldState file.State) { // This could also be an issue with force_close_older that a new harvester is started after each scan but not needed? // One problem with comparing modTime is that it is in seconds, and scans can happen more then once a second logp.Debug("prospector", "Resuming harvesting of file: %s, offset: %v", newState.Source, oldState.Offset) - err := l.startHarvester(newState, oldState.Offset) + err := p.startHarvester(newState, oldState.Offset) if err != nil { logp.Err("Harvester could not be started on existing file: %s, Err: %s", newState.Source, err) } @@ -332,7 +334,7 @@ func (l *Log) harvestExistingFile(newState file.State, oldState file.State) { // File size was reduced -> truncated file if oldState.Finished && newState.Fileinfo.Size() < oldState.Offset { logp.Debug("prospector", "Old file was truncated. Starting from the beginning: %s", newState.Source) - err := l.startHarvester(newState, 0) + err := p.startHarvester(newState, 0) if err != nil { logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err) } @@ -351,7 +353,7 @@ func (l *Log) harvestExistingFile(newState file.State, oldState file.State) { logp.Debug("prospector", "Updating state for renamed file: %s -> %s, Current offset: %v", oldState.Source, newState.Source, oldState.Offset) // Update state because of file rotation oldState.Source = newState.Source - err := l.updateState(oldState) + err := p.updateState(oldState) if err != nil { logp.Err("File rotation state update error: %s", err) } @@ -372,7 +374,7 @@ func (l *Log) harvestExistingFile(newState file.State, oldState file.State) { // handleIgnoreOlder handles states which fall under ignore older // Based on the state information it is decided if the state information has to be updated or not -func (l *Log) handleIgnoreOlder(lastState, newState file.State) error { +func (p *Prospector) handleIgnoreOlder(lastState, newState file.State) error { logp.Debug("prospector", "Ignore file because ignore_older reached: %s", newState.Source) if !lastState.IsEmpty() { @@ -384,7 +386,7 @@ func (l *Log) handleIgnoreOlder(lastState, newState file.State) error { } // Make sure file is not falling under clean_inactive yet - if l.isCleanInactive(newState) { + if p.isCleanInactive(newState) { logp.Debug("prospector", "Do not write state for ignore_older because clean_inactive reached") return nil } @@ -395,7 +397,7 @@ func (l *Log) handleIgnoreOlder(lastState, newState file.State) error { // Write state for ignore_older file as none exists yet newState.Finished = true - err := l.updateState(newState) + err := p.updateState(newState) if err != nil { return err } @@ -404,21 +406,21 @@ func (l *Log) handleIgnoreOlder(lastState, newState file.State) error { } // isFileExcluded checks if the given path should be excluded -func (l *Log) isFileExcluded(file string) bool { - patterns := l.config.ExcludeFiles +func (p *Prospector) isFileExcluded(file string) bool { + patterns := p.config.ExcludeFiles return len(patterns) > 0 && harvester.MatchAny(patterns, file) } // isIgnoreOlder checks if the given state reached ignore_older -func (l *Log) isIgnoreOlder(state file.State) bool { +func (p *Prospector) isIgnoreOlder(state file.State) bool { // ignore_older is disable - if l.config.IgnoreOlder == 0 { + if p.config.IgnoreOlder == 0 { return false } modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > l.config.IgnoreOlder { + if time.Since(modTime) > p.config.IgnoreOlder { return true } @@ -426,15 +428,15 @@ func (l *Log) isIgnoreOlder(state file.State) bool { } // isCleanInactive checks if the given state false under clean_inactive -func (l *Log) isCleanInactive(state file.State) bool { +func (p *Prospector) isCleanInactive(state file.State) bool { // clean_inactive is disable - if l.config.CleanInactive <= 0 { + if p.config.CleanInactive <= 0 { return false } modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > l.config.CleanInactive { + if time.Since(modTime) > p.config.CleanInactive { return true } @@ -442,14 +444,14 @@ func (l *Log) isCleanInactive(state file.State) bool { } // createHarvester creates a new harvester instance from the given state -func (l *Log) createHarvester(state file.State) (*harvester.Harvester, error) { +func (p *Prospector) createHarvester(state file.State) (*Harvester, error) { // Each harvester gets its own copy of the outlet - outlet := l.outlet.Copy() - h, err := harvester.NewHarvester( - l.cfg, + outlet := p.outlet.Copy() + h, err := NewHarvester( + p.cfg, state, - l.states, + p.states, outlet, ) @@ -458,9 +460,9 @@ func (l *Log) createHarvester(state file.State) (*harvester.Harvester, error) { // startHarvester starts a new harvester with the given offset // In case the HarvesterLimit is reached, an error is returned -func (l *Log) startHarvester(state file.State, offset int64) error { +func (p *Prospector) startHarvester(state file.State, offset int64) error { - if l.config.HarvesterLimit > 0 && l.registry.len() >= l.config.HarvesterLimit { + if p.config.HarvesterLimit > 0 && p.registry.len() >= p.config.HarvesterLimit { harvesterSkipped.Add(1) return fmt.Errorf("Harvester limit reached") } @@ -470,7 +472,7 @@ func (l *Log) startHarvester(state file.State, offset int64) error { state.Offset = offset // Create harvester with state - h, err := l.createHarvester(state) + h, err := p.createHarvester(state) if err != nil { return err } @@ -480,27 +482,27 @@ func (l *Log) startHarvester(state file.State, offset int64) error { return fmt.Errorf("Error setting up harvester: %s", err) } - l.registry.start(h, reader) + p.registry.start(h, reader) return nil } // updateState updates the prospector state and forwards the event to the spooler // All state updates done by the prospector itself are synchronous to make sure not states are overwritten -func (l *Log) updateState(state file.State) error { +func (p *Prospector) updateState(state file.State) error { // Add ttl if cleanOlder is enabled and TTL is not already 0 - if l.config.CleanInactive > 0 && state.TTL != 0 { - state.TTL = l.config.CleanInactive + if p.config.CleanInactive > 0 && state.TTL != 0 { + state.TTL = p.config.CleanInactive } // Update first internal state - l.states.Update(state) + p.states.Update(state) data := util.NewData() data.SetState(state) - ok := l.outlet.OnEvent(data) + ok := p.outlet.OnEvent(data) if !ok { logp.Info("Prospector outlet closed") @@ -509,3 +511,18 @@ func (l *Log) updateState(state file.State) error { return nil } + +// Wait waits for the all harvesters to complete and only then call stop +func (p *Prospector) Wait() { + p.registry.waitForCompletion() + p.Stop() +} + +// Stop stops all harvesters and then stops the prospector +func (p *Prospector) Stop() { + + // Stop all harvesters + // In case the beatDone channel is closed, this will not wait for completion + // Otherwise Stop will wait until output is complete + p.registry.Stop() +} diff --git a/filebeat/prospector/prospector_log_other_test.go b/filebeat/prospector/log/prospector_other_test.go similarity index 92% rename from filebeat/prospector/prospector_log_other_test.go rename to filebeat/prospector/log/prospector_other_test.go index 5036bf160a5c..3642fd750b4c 100644 --- a/filebeat/prospector/prospector_log_other_test.go +++ b/filebeat/prospector/log/prospector_other_test.go @@ -1,6 +1,6 @@ // +build !windows -package prospector +package log import ( "testing" @@ -61,14 +61,14 @@ func TestMatchFile(t *testing.T) { for _, test := range matchTests { - l := Log{ - config: prospectorConfig{ + p := Prospector{ + config: config{ Paths: test.paths, ExcludeFiles: test.excludeFiles, }, } - assert.Equal(t, test.result, l.matchesFile(test.file)) + assert.Equal(t, test.result, p.matchesFile(test.file)) } } @@ -130,8 +130,8 @@ var initStateTests = []struct { func TestInit(t *testing.T) { for _, test := range initStateTests { - l := Log{ - config: prospectorConfig{ + p := Prospector{ + config: config{ Paths: test.paths, }, states: &file.States{}, @@ -144,9 +144,9 @@ func TestInit(t *testing.T) { test.states[i] = state } - err := l.loadStates(test.states) + err := p.loadStates(test.states) assert.NoError(t, err) - assert.Equal(t, test.count, l.states.Count()) + assert.Equal(t, test.count, p.states.Count()) } } diff --git a/filebeat/prospector/prospector_log_test.go b/filebeat/prospector/log/prospector_test.go similarity index 93% rename from filebeat/prospector/prospector_log_test.go rename to filebeat/prospector/log/prospector_test.go index 498d1ca864f4..5793c9bcec3b 100644 --- a/filebeat/prospector/prospector_log_test.go +++ b/filebeat/prospector/log/prospector_test.go @@ -1,6 +1,6 @@ // +build !integration -package prospector +package log import ( "os" @@ -14,8 +14,8 @@ import ( func TestProspectorFileExclude(t *testing.T) { - p := Log{ - config: prospectorConfig{ + p := Prospector{ + config: config{ ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)}, }, } @@ -50,8 +50,8 @@ func TestIsCleanInactive(t *testing.T) { for _, test := range cleanInactiveTests { - l := Log{ - config: prospectorConfig{ + l := Prospector{ + config: config{ CleanInactive: test.cleanInactive, }, } diff --git a/filebeat/prospector/prospector_log_windows_test.go b/filebeat/prospector/log/prospector_windows_test.go similarity index 90% rename from filebeat/prospector/prospector_log_windows_test.go rename to filebeat/prospector/log/prospector_windows_test.go index 37b4351437bf..0a0ab607a524 100644 --- a/filebeat/prospector/prospector_log_windows_test.go +++ b/filebeat/prospector/log/prospector_windows_test.go @@ -1,6 +1,6 @@ // +build !integration -package prospector +package log import ( "testing" @@ -59,13 +59,13 @@ func TestMatchFileWindows(t *testing.T) { for _, test := range matchTestsWindows { - l := Log{ - config: prospectorConfig{ + p := Prospector{ + config: config{ Paths: test.paths, ExcludeFiles: test.excludeFiles, }, } - assert.Equal(t, test.result, l.matchesFile(test.file)) + assert.Equal(t, test.result, p.matchesFile(test.file)) } } diff --git a/filebeat/prospector/registry.go b/filebeat/prospector/log/registry.go similarity index 73% rename from filebeat/prospector/registry.go rename to filebeat/prospector/log/registry.go index 0f600f5d9f86..383377d126f0 100644 --- a/filebeat/prospector/registry.go +++ b/filebeat/prospector/log/registry.go @@ -1,32 +1,31 @@ -package prospector +package log import ( "sync" - "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/harvester/reader" uuid "github.com/satori/go.uuid" ) type harvesterRegistry struct { sync.Mutex - harvesters map[uuid.UUID]*harvester.Harvester + harvesters map[uuid.UUID]*Harvester wg sync.WaitGroup } func newHarvesterRegistry() *harvesterRegistry { return &harvesterRegistry{ - harvesters: map[uuid.UUID]*harvester.Harvester{}, + harvesters: map[uuid.UUID]*Harvester{}, } } -func (hr *harvesterRegistry) add(h *harvester.Harvester) { +func (hr *harvesterRegistry) add(h *Harvester) { hr.Lock() defer hr.Unlock() hr.harvesters[h.ID] = h } -func (hr *harvesterRegistry) remove(h *harvester.Harvester) { +func (hr *harvesterRegistry) remove(h *Harvester) { hr.Lock() defer hr.Unlock() delete(hr.harvesters, h.ID) @@ -36,7 +35,7 @@ func (hr *harvesterRegistry) Stop() { hr.Lock() for _, hv := range hr.harvesters { hr.wg.Add(1) - go func(h *harvester.Harvester) { + go func(h *Harvester) { hr.wg.Done() h.Stop() }(hv) @@ -49,7 +48,7 @@ func (hr *harvesterRegistry) waitForCompletion() { hr.wg.Wait() } -func (hr *harvesterRegistry) start(h *harvester.Harvester, r reader.Reader) { +func (hr *harvesterRegistry) start(h *Harvester, r reader.Reader) { hr.wg.Add(1) hr.add(h) diff --git a/filebeat/harvester/stdin.go b/filebeat/prospector/log/stdin.go similarity index 94% rename from filebeat/harvester/stdin.go rename to filebeat/prospector/log/stdin.go index 70df5b214f13..730ca0ae93c7 100644 --- a/filebeat/harvester/stdin.go +++ b/filebeat/prospector/log/stdin.go @@ -1,4 +1,4 @@ -package harvester +package log import ( "os" diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index e808896d350d..c405ceeb7595 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -10,14 +10,10 @@ import ( "github.com/elastic/beats/filebeat/channel" cfg "github.com/elastic/beats/filebeat/config" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/filebeat/prospector/log" "github.com/elastic/beats/filebeat/prospector/stdin" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" -) - -var ( - harvesterSkipped = monitoring.NewInt(nil, "filebeat.harvester.skipped") ) // Prospector contains the prospector @@ -28,13 +24,14 @@ type Prospector struct { wg *sync.WaitGroup id uint64 Once bool - registry *harvesterRegistry beatDone chan struct{} } // Prospectorer is the interface common to all prospectors type Prospectorer interface { Run() + Stop() + Wait() } // NewProspector instantiates a new prospector @@ -44,7 +41,6 @@ func NewProspector(conf *common.Config, outlet channel.Outleter, beatDone chan s wg: &sync.WaitGroup{}, done: make(chan struct{}), Once: false, - registry: newHarvesterRegistry(), beatDone: beatDone, } @@ -77,9 +73,9 @@ func (p *Prospector) initProspectorer(outlet channel.Outleter, states []file.Sta case cfg.StdinInputType: prospectorer, err = stdin.NewProspector(config, outlet) case cfg.LogInputType: - prospectorer, err = NewLog(config, states, p.registry, outlet, p.done) + prospectorer, err = log.NewProspector(config, states, outlet, p.done) default: - return fmt.Errorf("Invalid input type: %v", p.config.InputType) + return fmt.Errorf("invalid prospector type: %v. Change input_type", p.config.InputType) } if err != nil { @@ -155,11 +151,8 @@ func (p *Prospector) stop() { // In case of once, it will be waited until harvesters close itself if p.Once { - p.registry.waitForCompletion() + p.prospectorer.Wait() + } else { + p.prospectorer.Stop() } - - // Stop all harvesters - // In case the beatDone channel is closed, this will not wait for completion - // Otherwise Stop will wait until output is complete - p.registry.Stop() } diff --git a/filebeat/prospector/stdin/prospector.go b/filebeat/prospector/stdin/prospector.go index 77dc0e908415..6abf149e64d7 100644 --- a/filebeat/prospector/stdin/prospector.go +++ b/filebeat/prospector/stdin/prospector.go @@ -4,15 +4,15 @@ import ( "fmt" "github.com/elastic/beats/filebeat/channel" - "github.com/elastic/beats/filebeat/harvester" "github.com/elastic/beats/filebeat/input/file" + "github.com/elastic/beats/filebeat/prospector/log" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" ) -// Stdin is a prospector for stdin -type Stdin struct { - harvester *harvester.Harvester +// Prospector is a prospector for stdin +type Prospector struct { + harvester *log.Harvester started bool cfg *common.Config outlet channel.Outleter @@ -20,9 +20,9 @@ type Stdin struct { // NewStdin creates a new stdin prospector // This prospector contains one harvester which is reading from stdin -func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Stdin, error) { +func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Prospector, error) { - prospectorer := &Stdin{ + p := &Prospector{ started: false, cfg: cfg, outlet: outlet, @@ -30,36 +30,36 @@ func NewProspector(cfg *common.Config, outlet channel.Outleter) (*Stdin, error) var err error - prospectorer.harvester, err = prospectorer.createHarvester(file.State{Source: "-"}) + p.harvester, err = p.createHarvester(file.State{Source: "-"}) if err != nil { return nil, fmt.Errorf("Error initializing stdin harvester: %v", err) } - return prospectorer, nil + return p, nil } // Run runs the prospector -func (s *Stdin) Run() { +func (p *Prospector) Run() { // Make sure stdin harvester is only started once - if !s.started { - reader, err := s.harvester.Setup() + if !p.started { + reader, err := p.harvester.Setup() if err != nil { logp.Err("Error starting stdin harvester: %s", err) return } - go s.harvester.Harvest(reader) - s.started = true + go p.harvester.Harvest(reader) + p.started = true } } // createHarvester creates a new harvester instance from the given state -func (s *Stdin) createHarvester(state file.State) (*harvester.Harvester, error) { +func (p *Prospector) createHarvester(state file.State) (*log.Harvester, error) { // Each harvester gets its own copy of the outlet - outlet := s.outlet.Copy() - h, err := harvester.NewHarvester( - s.cfg, + outlet := p.outlet.Copy() + h, err := log.NewHarvester( + p.cfg, state, nil, outlet, @@ -67,3 +67,9 @@ func (s *Stdin) createHarvester(state file.State) (*harvester.Harvester, error) return h, err } + +// Wait waits for completion of the prospector. +func (p *Prospector) Wait() {} + +// Stop stops the prospector. +func (p *Prospector) Stop() {}