Skip to content

Commit

Permalink
Add close_removed and close_renamed, deprecate force_close_files
Browse files Browse the repository at this point in the history
force_close_files is replaced by the two option close_removed and close_renamed. Force_close_files is deprecated. In case it is enabled, it sets close_removed and close_renamed to true.

This is part of elastic#1600
  • Loading branch information
ruflin committed Jun 28, 2016
1 parent f9d028e commit 4576f47
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 84 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d

*Filebeat*

- Stop following symlink. Symlinks are now ignored: {pull}1686[1686]
- Deprecate force_close_files option and replace it with close_removed and close_renamed {issue}1600[1600]

*Winlogbeat*


Expand Down Expand Up @@ -50,6 +53,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha3...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Introdce close_removed and close_renamed harvester options {issue}1600[1600]


*Winlogbeat*

Expand Down
20 changes: 10 additions & 10 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ filebeat.prospectors:
# The backoff value will be multiplied each time with the backoff_factor until max_backoff is reached
#backoff_factor: 2

# This option closes a file, as soon as the file name changes.
# This config option is recommended on windows only. Filebeat keeps the files it's reading open. This can cause
# issues when the file is removed, as the file will not be fully removed until also Filebeat closes
# the reading. Filebeat closes the file handler after ignore_older. During this time no new file with the
# same name can be created. Turning this feature on the other hand can lead to loss of data
# on rotate files. It can happen that after file rotation the beginning of the new
# file is skipped, as the reading starts at the end. We recommend to leave this option on false
# but lower the ignore_older value to release files faster.
#force_close_files: false
# Close renamed means a file handler is close when a file is renamed / rotated. In case the harvester was
# not finished reading the roated file, the file will be picked up again after scan_frequency in case it
# also matches the prospector patterns.
#close_renamed: false

# When enabling this option, a file handler is closed immidiately in case a file can't be found
# any more. In case the file shows up again later, harvesting will continue at the last position
# after scan_frequency
#close_removed: false

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
#- input_type: stdin

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
#filebeat.spool_size: 2048

Expand Down
14 changes: 13 additions & 1 deletion filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/beats/libbeat/common"

"github.com/dustin/go-humanize"
"github.com/elastic/beats/libbeat/logp"
)

var (
Expand All @@ -22,8 +23,10 @@ var (
BackoffFactor: 2,
MaxBackoff: 10 * time.Second,
CloseOlder: 1 * time.Hour,
ForceCloseFiles: false,
MaxBytes: 10 * (1 << 20), // 10MB
CloseRemoved: false,
CloseRenamed: false,
ForceCloseFiles: false,
}
)

Expand All @@ -38,6 +41,8 @@ type harvesterConfig struct {
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
CloseOlder time.Duration `config:"close_older"`
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
Expand All @@ -48,6 +53,13 @@ type harvesterConfig struct {

func (config *harvesterConfig) Validate() error {

// TODO: remove in 7.0
if config.ForceCloseFiles {
config.CloseRemoved = true
config.CloseRenamed = true
logp.Warn("DEPRECATED: force_close_files was set to true. Use close_removed + close_rename")
}

// Check input type
if _, ok := cfg.ValidInputType[config.InputType]; !ok {
return fmt.Errorf("Invalid input type: %v", config.InputType)
Expand Down
22 changes: 22 additions & 0 deletions filebeat/harvester/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package harvester

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestForceCloseFiles(t *testing.T) {

config := defaultConfig
assert.False(t, config.ForceCloseFiles)
assert.False(t, config.CloseRemoved)
assert.False(t, config.CloseRenamed)

config.ForceCloseFiles = true
config.Validate()

assert.True(t, config.ForceCloseFiles)
assert.True(t, config.CloseRemoved)
assert.True(t, config.CloseRenamed)
}
14 changes: 13 additions & 1 deletion filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func (h *Harvester) Harvest() {
// don't require 'complicated' logic.
cfg := h.Config
readerConfig := reader.LogFileReaderConfig{
ForceClose: cfg.ForceCloseFiles,
CloseRemoved: cfg.CloseRemoved,
CloseRenamed: cfg.CloseRenamed,
CloseOlder: cfg.CloseOlder,
BackoffDuration: cfg.Backoff,
MaxBackoffDuration: cfg.MaxBackoff,
Expand Down Expand Up @@ -68,12 +69,23 @@ func (h *Harvester) Harvest() {
// Partial lines return error and are only read on completion
ts, text, bytesRead, jsonFields, err := readLine(processor)
if err != nil {

if err == reader.ErrFileTruncate {
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.Path)
h.SetOffset(0)
return
}

if err == reader.ErrRemoved {
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.Path)
return
}

if err == reader.ErrRenamed {
logp.Info("File was renamed: %s Closing because close_renamed is enabled.", h.Path)
return
}

logp.Info("Read line error: %s", err)
return
}
Expand Down
28 changes: 17 additions & 11 deletions filebeat/harvester/reader/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (

var (
ErrFileTruncate = errors.New("detected file being truncated")
ErrForceClose = errors.New("file must be closed")
ErrRenamed = errors.New("file was renamed")
ErrRemoved = errors.New("file was removed")
ErrInactive = errors.New("file inactive")
)

Expand All @@ -27,11 +28,12 @@ type logFileReader struct {
}

type LogFileReaderConfig struct {
ForceClose bool
CloseOlder time.Duration
BackoffDuration time.Duration
MaxBackoffDuration time.Duration
BackoffFactor int
CloseRenamed bool
CloseRemoved bool
}

func NewLogFileReader(
Expand Down Expand Up @@ -72,8 +74,9 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
r.offset += int64(n)
r.lastTimeRead = time.Now()
}

// reset backoff
if err == nil {
// reset backoff
r.backoff = r.config.BackoffDuration
return n, nil
}
Expand Down Expand Up @@ -113,16 +116,19 @@ func (r *logFileReader) Read(buf []byte) (int, error) {
return n, ErrInactive
}

if r.config.ForceClose {
// Check if the file name exists (see #93)
if r.config.CloseRenamed {
if !file.IsSameFile(r.fs.Name(), info) {
return n, ErrRenamed
}
}

if r.config.CloseRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
_, statErr := os.Stat(r.fs.Name())

// Error means file does not exist. If no error, check if same file. If
// not close as rotated.
if statErr != nil || !file.IsSameFile(r.fs.Name(), info) {
logp.Info("Force close file: %s; error: %s", r.fs.Name(), statErr)
// Return directly on windows -> file is closing
return n, ErrForceClose
// Error means file does not exist.
if statErr != nil {
return n, ErrRemoved
}
}

Expand Down
3 changes: 2 additions & 1 deletion filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ filebeat.prospectors:
backoff: 0.1s
backoff_factor: 1
max_backoff: 0.1s
force_close_files: {{force_close_files}}
close_removed: {{close_removed}}
close_renamed: {{close_renamed}}

{% if fields %}
fields:
Expand Down
62 changes: 2 additions & 60 deletions filebeat/tests/system/test_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def test_file_disappear_appear(self):

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*.log",
force_close_files="true",
close_removed="true",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")
Expand Down Expand Up @@ -301,7 +301,7 @@ def test_file_disappear_appear(self):
# Wait until error shows up on windows
self.wait_until(
lambda: self.log_contains(
"Force close file"),
"Closing because close_removed is enabled"),
max_timeout=15)

# Move file to old file name
Expand Down Expand Up @@ -332,64 +332,6 @@ def test_file_disappear_appear(self):
output = self.read_output()
assert len(output) == 5 + 6

def test_force_close(self):
"""
Checks that a file is closed in case it is rotated
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
force_close_files="true",
scan_frequency="0.1s"
)
os.mkdir(self.working_dir + "/log/")

testfile1 = self.working_dir + "/log/test.log"
testfile2 = self.working_dir + "/log/test.log.rotated"
file = open(testfile1, 'w')

iterations1 = 5
for n in range(0, iterations1):
file.write("rotation file")
file.write("\n")

file.close()

filebeat = self.start_beat()

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations1), max_timeout=10)

os.rename(testfile1, testfile2)

file = open(testfile1, 'w', 0)
file.write("Hello World\n")
file.close()

# Wait until error shows up on windows
self.wait_until(
lambda: self.log_contains(
"Force close file"),
max_timeout=15)

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations1 + 1), max_timeout=10)

filebeat.check_kill_and_wait()

data = self.get_registry()

# Make sure new file was picked up. As it has the same file name,
# one entry for the new and one for the old should exist
assert len(data) == 2

# Make sure output has 11 entries, the new file was started
# from scratch
output = self.read_output()
#assert len(output) == 5 + 6

def test_new_line_on_existing_file(self):
"""
Checks that filebeat follows future writes to the same
Expand Down
Loading

0 comments on commit 4576f47

Please sign in to comment.