Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move tail_files to prospector level #2932

Merged
merged 1 commit into from
Nov 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ https://github.com/elastic/beats/compare/v5.0.0...master[Check the HEAD diff]
- If a file is falling under ignore_older during startup, offset is now set to end of file instead of 0.
With the previous logic the whole file was sent in case a line was added and it was inconsitent with
files which were harvested previously. {pull}2907[2907]
- tail_files is now only applied on the first scan and not for all new files. {pull}2932[2932]

*Winlogbeat*

Expand Down
2 changes: 0 additions & 2 deletions filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ var (
BufferSize: 16 * humanize.KiByte,
DocumentType: "log",
InputType: cfg.DefaultInputType,
TailFiles: false,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
Expand All @@ -38,7 +37,6 @@ type harvesterConfig struct {
DocumentType string `config:"document_type"`
Encoding string `config:"encoding"`
InputType string `config:"input_type"`
TailFiles bool `config:"tail_files"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Expand Down
6 changes: 0 additions & 6 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,6 @@ func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
return file.Seek(h.state.Offset, os.SEEK_SET)
}

// tail file if file is new and tail_files config is set
if h.config.TailFiles {
logp.Debug("harvester", "Setting offset for tailing file: %s.", h.state.Source)
return file.Seek(0, os.SEEK_END)
}

// get offset from file in case of encoding factory was required to read some data.
logp.Debug("harvester", "Setting offset for file based on seek: %s", h.state.Source)
return file.Seek(0, os.SEEK_CUR)
Expand Down
2 changes: 2 additions & 0 deletions filebeat/prospector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
CleanRemoved: true,
HarvesterLimit: 0,
Symlinks: false,
TailFiles: false,
}
)

Expand All @@ -30,6 +31,7 @@ type prospectorConfig struct {
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint64 `config:"harvester_limit" validate:"min=0"`
Symlinks bool `config:"symlinks"`
TailFiles bool `config:"tail_files"`
}

func (config *prospectorConfig) Validate() error {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Prospector struct {
}

type Prospectorer interface {
Init(states []file.State) error
Init(states file.States) error
Run()
}

Expand All @@ -61,7 +61,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
return nil, err
}

err := prospector.Init(states.GetStates())
err := prospector.Init(states)
if err != nil {
return nil, err
}
Expand All @@ -72,7 +72,7 @@ func NewProspector(cfg *common.Config, states file.States, outlet Outlet) (*Pros
}

// Init sets up default config for prospector
func (p *Prospector) Init(states []file.State) error {
func (p *Prospector) Init(states file.States) error {

var prospectorer Prospectorer
var err error
Expand Down
24 changes: 19 additions & 5 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
)

var (
filesRenamed = expvar.NewInt("filebeat.prospector.log.files.renamed")
filesTrucated = expvar.NewInt("filebeat.prospector.log.files.truncated")
filesRenamed = expvar.NewInt("filebeat.prospector.log.files.renamed")
filesTruncated = expvar.NewInt("filebeat.prospector.log.files.truncated")
)

type ProspectorLog struct {
Expand All @@ -37,13 +37,14 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) {
// Init sets up the prospector
// It goes through all states coming from the registry. Only the states which match the glob patterns of
// the prospector will be loaded and updated. All other states will not be touched.
func (p *ProspectorLog) Init(states []file.State) error {
func (p *ProspectorLog) Init(states file.States) error {
logp.Debug("prospector", "exclude_files: %s", p.config.ExcludeFiles)

for _, state := range states {
for _, state := range states.GetStates() {
// Check if state source belongs to this prospector. If yes, update the state.
if p.matchesFile(state.Source) {
state.TTL = -1

// Update prospector states and send new states to registry
err := p.Prospector.updateState(input.NewEvent(state))
if err != nil {
Expand All @@ -60,6 +61,19 @@ func (p *ProspectorLog) Init(states []file.State) error {
func (p *ProspectorLog) Run() {
logp.Debug("prospector", "Start next scan")

// TailFiles is like ignore_older = 1ns and only on startup
if p.config.TailFiles {
ignoreOlder := p.config.IgnoreOlder

// Overwrite ignore_older for the first scan
p.config.IgnoreOlder = 1
defer func() {
// Reset ignore_older after first run
p.config.IgnoreOlder = ignoreOlder
// Disable tail_files after the first run
p.config.TailFiles = false
}()
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about:

if p.config.TailFiles {
  defer func(old time.Duration) {
    p.config.IgnoreOlder = old
  }(p.config.IgnoreOlder)

  p.config.TailFiles = false // disable tail_files in future calls
  p.config.IgnoreOlder = 1 * time.Nanosecond
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also work. The advantage of the first version is, that in case TailFiles is also used somewhere inside the first scan (which is not the case), it would still work.

p.scan()

// It is important that a first scan is run before cleanup to make sure all new states are read first
Expand Down Expand Up @@ -246,7 +260,7 @@ func (p *ProspectorLog) harvestExistingFile(newState file.State, oldState file.S
logp.Err("Harvester could not be started on truncated file: %s, Err: %s", newState.Source, err)
}

filesTrucated.Add(1)
filesTruncated.Add(1)
return
}

Expand Down
4 changes: 3 additions & 1 deletion filebeat/prospector/prospector_log_other_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ func TestInit(t *testing.T) {
Paths: test.paths,
},
}
err := p.Init(test.states)
states := file.NewStates()
states.SetStates(test.states)
err := p.Init(*states)
assert.NoError(t, err)
assert.Equal(t, test.count, p.Prospector.states.Count())
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector_stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func NewProspectorStdin(p *Prospector) (*ProspectorStdin, error) {
return prospectorer, nil
}

func (p *ProspectorStdin) Init(states []file.State) error {
func (p *ProspectorStdin) Init(states file.States) error {
p.started = false
return nil
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/prospector/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ func TestProspectorInitInputTypeLogError(t *testing.T) {
config: prospectorConfig{},
}

err := prospector.Init([]file.State{})
states := file.NewStates()
states.SetStates([]file.State{})
err := prospector.Init(*states)
// Error should be returned because no path is set
assert.Error(t, err)
}
Expand Down