Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add close_timeout option #1926

Merged
merged 1 commit into from
Aug 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d
*Topbeat*

*Filebeat*
- Introduce close_timeout harvester options {issue}1600[1600]


- Add harvester_limit option {pull}2417[2417]

Expand Down
11 changes: 11 additions & 0 deletions filebeat/docs/reference/configuration/filebeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ WARNING: Only use this options if you understand that data loss is a potential s

Close eof closes a file as soon as the end of a file is reached. This is useful in case your files are only written once and not updated from time to time. This case can happen in case you are writing every single log event to a new file.

===== close_timeout

WARNING: Only use this options if you understand the potential side affects with potential data loss. In addition it can happen that multiline events are not sent completely on timeout.

Close timeout gives every harvester a predefined lifetime. Independent of the location of the reader, it will stop the reader after `close_timeout`. This option can be useful, if only a predefine time should be spent on older log files. Using this option in combination with `ignore_older` == `close_timeout` means the file is not picked up again in case it wasn't modified in between. This normally leads to data loss and not the complete file is sent.

In case close_timeout is used in combination with multiline events, it can happen that the harvester will be stopped in the middle of a multiline event, means only parts of the event will be sent. In case the harvester is continued at a later stage again and the file still exists, only the second part of the event will be sent.

Close timeout will not apply, in case your output is stuck and no further events can be sent. At least one event must be sent after close_timeout kicks in so the harvester can be closed afterwards.


[[clean-options]]
===== clean_*

Expand Down
3 changes: 2 additions & 1 deletion filebeat/docs/troubleshooting.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ There are 4 more configuration options which can be used to close file handlers,
* close_renamed
* close_removed
* close_eof
* close_timeout

`close_renamed` and `close_removed` can be useful on Windows and issues related to file rotation, see <<windows-file-rotation>>. `close_eof` can be useful in environments with a large number of files with only very few entries. More details can be found in config options, see <<configuration-filebeat-options>>.
`close_renamed` and `close_removed` can be useful on Windows and issues related to file rotation, see <<windows-file-rotation>>. `close_eof` can be useful in environments with a large number of files with only very few entries. `close_timeout` in environments where it is more important to close file handlers then to send all log lines. More details can be found in config options, see <<configuration-filebeat-options>>.

Before using any of these variables, make sure to study the documentation on each.

Expand Down
5 changes: 5 additions & 0 deletions filebeat/etc/beat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ filebeat.prospectors:
# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false

# Close timeout closes the harvester after the predefined time.
# This is independent if the harvester did finish reading the file or not.
# By default this option is disabled.
# Note: Potential data loss. Make sure to read and understand the docs for this option.
#close_timeout: 0

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ filebeat.prospectors:
# Removes the state for file which cannot be found on disk anymore immediately
#clean_removed: false

# Close timeout closes the harvester after the predefined time.
# This is independent if the harvester did finish reading the file or not.
# By default this option is disabled.
# Note: Potential data loss. Make sure to read and understand the docs for this option.
#close_timeout: 0

#----------------------------- Stdin prospector -------------------------------
# Configuration to use stdin input
Expand Down
2 changes: 2 additions & 0 deletions filebeat/harvester/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
CloseRemoved: false,
CloseRenamed: false,
CloseEOF: false,
CloseTimeout: 0,
ForceCloseFiles: false,
}
)
Expand All @@ -46,6 +47,7 @@ type harvesterConfig struct {
CloseRemoved bool `config:"close_removed"`
CloseRenamed bool `config:"close_renamed"`
CloseEOF bool `config:"close_eof"`
CloseTimeout time.Duration `config:"close_timeout" validate:"min=0"`
ForceCloseFiles bool `config:"force_close_files"`
ExcludeLines []*regexp.Regexp `config:"exclude_lines"`
IncludeLines []*regexp.Regexp `config:"include_lines"`
Expand Down
3 changes: 2 additions & 1 deletion filebeat/harvester/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type Harvester struct {
state file.State
prospectorChan chan *input.Event
file source.FileSource /* the file being watched */
done chan struct{}
fileReader *LogFile
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
done chan struct{}
}

func NewHarvester(
Expand Down
21 changes: 19 additions & 2 deletions filebeat/harvester/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"expvar"
"io"
"os"
"time"

"golang.org/x/text/transform"

Expand Down Expand Up @@ -260,6 +261,7 @@ func (h *Harvester) close() {
if h.file != nil {

h.file.Close()

logp.Debug("harvester", "Closing file: %s", h.state.Source)
harvesterOpenFiles.Add(-1)

Expand Down Expand Up @@ -294,12 +296,27 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
fileReader, err := NewLogFile(h.file, h.config, h.done)
h.fileReader, err = NewLogFile(h.file, h.config)
if err != nil {
return nil, err
}

r, err = reader.NewEncode(fileReader, h.encoding, h.config.BufferSize)
// Closes reader after timeout or when done channel is closed
go func() {
var closeTimeout <-chan time.Time
if h.config.CloseTimeout > 0 {
closeTimeout = time.After(h.config.CloseTimeout)
}

select {
case <-h.done:
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", h.state.Source)
}
h.fileReader.Close()
}()

r, err = reader.NewEncode(h.fileReader, h.encoding, h.config.BufferSize)
if err != nil {
return nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions filebeat/harvester/log_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package harvester
import (
"io"
"os"
"sync"
"time"

"github.com/elastic/beats/filebeat/harvester/source"
Expand All @@ -17,12 +18,12 @@ type LogFile struct {
lastTimeRead time.Time
backoff time.Duration
done chan struct{}
singleClose sync.Once
}

func NewLogFile(
fs source.FileSource,
config harvesterConfig,
done chan struct{},
) (*LogFile, error) {
var offset int64
if seeker, ok := fs.(io.Seeker); ok {
Expand All @@ -39,7 +40,7 @@ func NewLogFile(
config: config,
lastTimeRead: time.Now(),
backoff: config.Backoff,
done: done,
done: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -164,3 +165,11 @@ func (r *LogFile) wait() {
}
}
}

func (r *LogFile) Close() {
// Make sure reader is only closed once
r.singleClose.Do(func() {
close(r.done)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as the underlying reader might block on syscall, the file must be closed right after closing the channel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// Note: File reader is not closed here because that leads to race conditions
})
}
1 change: 1 addition & 0 deletions filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (p *Prospector) Run() {
if p.config.CleanInactive > 0 {
event.State.TTL = p.config.CleanInactive
}

select {
case <-p.done:
logp.Info("Prospector channel stopped")
Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ filebeat.prospectors:
close_removed: {{close_removed}}
close_renamed: {{close_renamed}}
close_eof: {{close_eof}}
close_timeout: {{close_timeout}}
force_close_files: {{force_close_files}}
clean_inactive: {{clean_inactive}}
clean_removed: {{clean_removed}}
Expand Down
41 changes: 41 additions & 0 deletions filebeat/tests/system/test_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,3 +366,44 @@ def test_truncated_file_closed(self):
max_timeout=15)

filebeat.check_kill_and_wait()

def test_close_timeout(self):
"""
Checks that a file is closed after close_timeout
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/test.log",
close_timeout="1s",
scan_frequency="1s"
)
os.mkdir(self.working_dir + "/log/")

filebeat = self.start_beat()

testfile1 = self.working_dir + "/log/test.log"
file = open(testfile1, 'w')

# Write 1000 lines with a sleep between each line to make sure it takes more then 1s to complete
iterations1 = 1000
for n in range(0, iterations1):
file.write("example data")
file.write("\n")
time.sleep(0.001)

file.close()

# Wait until harvester is closed because of ttl
self.wait_until(
lambda: self.log_contains(
"Closing harvester because close_timeout was reached"),
max_timeout=15)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == 1

# Check that not all but some lines were read
assert self.output_lines() < 1000
assert self.output_lines() > 0

59 changes: 56 additions & 3 deletions filebeat/tests/system/test_multiline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from filebeat import BaseTest
import os
import time

"""
Tests for the multiline log messages
Expand Down Expand Up @@ -115,9 +116,6 @@ def test_rabbitmq_multiline_log(self):
# Check that output file has the same number of lines as the log file
assert 3 == len(output)




def test_max_lines(self):
"""
Test the maximum number of lines that is sent by multiline
Expand Down Expand Up @@ -238,3 +236,58 @@ def test_max_bytes(self):

# Check that output file has the same number of lines as the log file
assert 20 == len(output)

def test_close_timeout_with_multiline(self):
"""
Test if multiline events are split up with close_timeout
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
multiline=True,
pattern="^\[",
negate="true",
match="after",
close_timeout="1s",
)

os.mkdir(self.working_dir + "/log/")

testfile = self.working_dir + "/log/test.log"

with open(testfile, 'w', 0) as file:
file.write("[2015] hello world")
file.write("\n")
file.write(" First Line\n")
file.write(" Second Line\n")

proc = self.start_beat()

# Wait until harvester is closed because of timeout
# This leads to the partial event above to be sent
self.wait_until(
lambda: self.log_contains(
"Closing harvester because close_timeout was reached"),
max_timeout=15)

# Because of the timeout the following two lines should be put together
with open(testfile, 'a', 0) as file:
file.write(" This should not be third\n")
file.write(" This should not be fourth\n")
# This starts a new pattern
file.write("[2016] Hello world\n")
# This line should be appended
file.write(" First line again\n")

self.wait_until(
lambda: self.output_has(lines=3),
max_timeout=10)
proc.check_kill_and_wait()

# close_timeout must have closed the reader exactly twice
self.wait_until(
lambda: self.log_contains_count(
"Closing harvester because close_timeout was reached") == 2,
max_timeout=15)

output = self.read_output()
assert 3 == len(output)