diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d297377ce0c..5cb24e52e0d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -186,6 +186,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fixed data types for roles and indices fields in `elasticsearch/audit` fileset {pull}10307[10307] - Ensure `source.address` is always populated by the nginx module (ECS). {pull}10418[10418] - Cover empty request data, url and version in Apache2 module{pull}10730[10730] +- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747] +- Improve detection of file deletion on Windows. {pull}10747[10747] *Heartbeat* diff --git a/filebeat/harvester/source.go b/filebeat/harvester/source.go index 6c739ccacce..f7e3c0c6405 100644 --- a/filebeat/harvester/source.go +++ b/filebeat/harvester/source.go @@ -25,6 +25,7 @@ import ( type Source interface { io.ReadCloser Name() string + Removed() bool // check if source has been removed Stat() (os.FileInfo, error) Continuable() bool // can we continue processing after EOF? HasState() bool // does this source have a state? diff --git a/filebeat/input/log/file.go b/filebeat/input/log/file.go index 8c513e0e9d8..59d408c87ad 100644 --- a/filebeat/input/log/file.go +++ b/filebeat/input/log/file.go @@ -17,7 +17,11 @@ package log -import "os" +import ( + "os" + + "github.com/elastic/beats/libbeat/common/file" +) type File struct { *os.File @@ -25,3 +29,4 @@ type File struct { func (File) Continuable() bool { return true } func (File) HasState() bool { return true } +func (f File) Removed() bool { return file.IsRemoved(f.File) } diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 07408ffe316..9197793c718 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -391,12 +391,12 @@ func (h *Harvester) SendStateUpdate() { return } - logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) - h.states.Update(h.state) - d := util.NewData() d.SetState(h.state) h.publishState(d) + + logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset) + h.states.Update(h.state) } // shouldExportLine decides if the line is exported or not based on diff --git a/filebeat/input/log/log.go b/filebeat/input/log/log.go index ba13d91bbda..d0c14c45b81 100644 --- a/filebeat/input/log/log.go +++ b/filebeat/input/log/log.go @@ -152,10 +152,7 @@ func (f *Log) errorChecks(err error) error { if f.config.CloseRemoved { // Check if the file name exists. See https://github.com/elastic/filebeat/issues/93 - _, statErr := os.Stat(f.fs.Name()) - - // Error means file does not exist. - if statErr != nil { + if f.fs.Removed() { return ErrRemoved } } diff --git a/filebeat/input/log/stdin.go b/filebeat/input/log/stdin.go index aff8c2b43d7..07aeb008adb 100644 --- a/filebeat/input/log/stdin.go +++ b/filebeat/input/log/stdin.go @@ -44,3 +44,4 @@ func (p Pipe) Name() string { return p.File.Name() } func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() } func (p Pipe) Continuable() bool { return false } func (p Pipe) HasState() bool { return false } +func (p Pipe) Removed() bool { return false } diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index f128107b5fc..22fdd1b6066 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -408,7 +408,7 @@ func (r *Registrar) writeRegistry() error { } func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) { - logp.Debug("registrar", "Write registry file: %s", baseName) + logp.Debug("registrar", "Write registry file: %s (%v)", baseName, len(states)) tempfile := baseName + ".new" f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm) diff --git a/filebeat/tests/system/filebeat.py b/filebeat/tests/system/filebeat.py index df2b8483855..39c14518476 100644 --- a/filebeat/tests/system/filebeat.py +++ b/filebeat/tests/system/filebeat.py @@ -6,7 +6,7 @@ curdir = os.path.dirname(__file__) sys.path.append(os.path.join(curdir, '../../../libbeat/tests/system')) -from beat.beat import TestCase +from beat.beat import TestCase, TimeoutError, REGEXP_TYPE default_registry_file = 'registry/filebeat/data.json' @@ -22,27 +22,33 @@ def setUpClass(self): super(BaseTest, self).setUpClass() - def has_registry(self, name=None, data_path=None): - if not name: - name = default_registry_file - if not data_path: - data_path = self.working_dir + @property + def registry(self): + return self.access_registry() - dotFilebeat = os.path.join(data_path, name) - return os.path.isfile(dotFilebeat) + @property + def input_logs(self): + return InputLogs(os.path.join(self.working_dir, "log")) - def get_registry(self, name=None, data_path=None): - if not name: - name = default_registry_file - if not data_path: - data_path = self.working_dir + @property + def logs(self): + return self.log_access() - # Returns content of the registry file - dotFilebeat = os.path.join(data_path, name) - self.wait_until(cond=lambda: os.path.isfile(dotFilebeat)) + def access_registry(self, name=None, data_path=None): + data_path = data_path if data_path else self.working_dir + return Registry(data_path, name) - with open(dotFilebeat) as file: - return json.load(file) + def log_access(self, file=None): + file = file if file else self.beat_name + ".log" + return LogState(os.path.join(self.working_dir, file)) + + def has_registry(self, name=None, data_path=None): + return self.access_registry(name, data_path).exists() + + def get_registry(self, name=None, data_path=None, filter=None): + reg = self.access_registry(name, data_path) + self.wait_until(reg.exists) + return reg.load(filter=filter) def get_registry_entry_by_path(self, path): """ @@ -51,21 +57,112 @@ def get_registry_entry_by_path(self, path): If a path exists multiple times (which is possible because of file rotation) the most recent version is returned """ - registry = self.get_registry() - tmp_entry = None + def hasPath(entry): + return entry["source"] == path - # Checks all entries and returns the most recent one - for entry in registry: - if entry["source"] == path: - if tmp_entry == None: - tmp_entry = entry - else: - if tmp_entry["timestamp"] < entry["timestamp"]: - tmp_entry = entry + entries = self.get_registry(filter=hasPath) + entries.sort(key=lambda x: x["timestamp"]) - return tmp_entry + # return entry with largest timestamp + return None if len(entries) == 0 else entries[-1] def file_permissions(self, path): full_path = os.path.join(self.working_dir, path) return oct(stat.S_IMODE(os.lstat(full_path).st_mode)) + + +class InputLogs: + """ InputLogs is used to write and append to files which are read by filebeat. """ + + def __init__(self, home): + self.home = home + if not os.path.isdir(self.home): + os.mkdir(self.home) + + def write(self, name, contents): + self._write_to(name, 'w', contents) + + def append(self, name, contents): + self._write_to(name, 'a', contents) + + def size(self, name): + return os.path.getsize(self.path_of(name)) + + def _write_to(self, name, mode, contents): + with open(self.path_of(name), mode) as f: + f.write(contents) + + def remove(self, name): + os.remove(self.path_of(name)) + + def path_of(self, name): + return os.path.join(self.home, name) + + +class Registry: + """ Registry provides access to the registry used by filebeat to store its progress """ + + def __init__(self, home, name=None): + if not name: + name = default_registry_file + self.path = os.path.join(home, name) + + def exists(self): + return os.path.isfile(self.path) + + def load(self, filter=None): + with open(self.path) as f: + entries = json.load(f) + + if filter: + entries = [x for x in entries if filter(x)] + return entries + + def count(self, filter=None): + if not self.exists(): + return 0 + return len(self.load(filter=filter)) + + +class LogState: + def __init__(self, path): + self.path = path + self.off = 0 + + def checkpoint(self): + self.off = os.path.getsize(self.path) + + def lines(self, filter=None): + if not filter: + def filter(x): return True + with open(self.path, "r") as f: + f.seek(self.off) + return [l for l in f if filter(l)] + + def contains(self, msg, ignore_case=False, count=1): + if ignore_case: + msg = msg.lower() + + if type(msg) == REGEXP_TYPE: + def match(x): return msg.search(x) is not None + else: + def match(x): return x.find(msg) >= 0 + + pred = match + if ignore_case: + def pred(x): return match(x.lower()) + + return len(self.lines(filter=pred)) >= count + + def next(self, msg, ignore_case=False, count=1): + ok = self.contains(msg, ignore_case, count) + if ok: + self.checkpoint() + return ok + + def nextCheck(self, msg, ignore_case=False, count=1): + return lambda: self.next(msg, ignore_case, count) + + def check(self, msg, ignore_case=False, count=1): + return lambda: self.contains(msg, ignore_case, count) diff --git a/filebeat/tests/system/test_registrar.py b/filebeat/tests/system/test_registrar.py index fc1579745fc..f988867c3d4 100644 --- a/filebeat/tests/system/test_registrar.py +++ b/filebeat/tests/system/test_registrar.py @@ -3,9 +3,10 @@ import os import platform -import time +import re import shutil import stat +import time import unittest from filebeat import BaseTest @@ -778,73 +779,46 @@ def test_clean_inactive(self): """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/input*", - clean_inactive="4s", + clean_inactive="3s", ignore_older="2s", close_inactive="0.2s", scan_frequency="0.1s" ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/input1" - testfile_path2 = self.working_dir + "/log/input2" - testfile_path3 = self.working_dir + "/log/input3" - - with open(testfile_path1, 'w') as testfile1: - testfile1.write("first file\n") + file1 = "input1" + file2 = "input2" + file3 = "input3" - with open(testfile_path2, 'w') as testfile2: - testfile2.write("second file\n") + self.input_logs.write(file1, "first file\n") + self.input_logs.write(file2, "second file\n") filebeat = self.start_beat() - self.wait_until( - lambda: self.output_has(lines=2), - max_timeout=10) + self.wait_until(lambda: self.output_has(lines=2), max_timeout=10) # Wait until registry file is created - self.wait_until( - lambda: self.log_contains_count("Registry file updated") > 1, - max_timeout=15) - - # Syncing file on disk is always susceptible to timing issues. - time.sleep(1) - - data = self.get_registry() - assert len(data) == 2 + self.wait_until(lambda: self.registry.exists(), max_timeout=15) + assert self.registry.count() == 2 # Wait until states are removed from inputs - self.wait_until( - lambda: self.log_contains_count( - "State removed for") == 2, - max_timeout=15) - - with open(testfile_path3, 'w') as testfile3: - testfile3.write("2\n") + self.wait_until(self.logs.nextCheck("State removed for", count=2), max_timeout=15) # Write new file to make sure registrar is flushed again - self.wait_until( - lambda: self.output_has(lines=3), - max_timeout=30) + self.input_logs.write(file3, "third file\n") + self.wait_until(lambda: self.output_has(lines=3), max_timeout=30) - # Wait until states are removed from inputs - self.wait_until( - lambda: self.log_contains_count( - "State removed for") >= 3, - max_timeout=15) + # Wait until state of new file is removed + self.wait_until(self.logs.nextCheck("State removed for"), max_timeout=15) filebeat.check_kill_and_wait() # Check that the first two files were removed from the registry - data = self.get_registry() + data = self.registry.load() assert len(data) == 1, "Expected a single file but got: %s" % data # Make sure the last file in the registry is the correct one and has the correct offset - if os.name == "nt": - assert data[0]["offset"] == 3 - else: - assert data[0]["offset"] == 2 + assert data[0]["offset"] == self.input_logs.size(file3) - @unittest.skip("Skipped as flaky: https://github.com/elastic/beats/issues/7690") def test_clean_removed(self): """ Checks that files which were removed, the state is removed @@ -857,58 +831,43 @@ def test_clean_removed(self): close_removed=True ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/input1" - testfile_path2 = self.working_dir + "/log/input2" + file1 = "input1" + file2 = "input2" - with open(testfile_path1, 'w') as testfile1: - testfile1.write("file to be removed\n") - - with open(testfile_path2, 'w') as testfile2: - testfile2.write("2\n") + self.input_logs.write(file1, "file to be removed\n") + self.input_logs.write(file2, "2\n") filebeat = self.start_beat() - self.wait_until( - lambda: self.output_has(lines=2), - max_timeout=10) + self.wait_until(lambda: self.output_has(lines=2), max_timeout=10) # Wait until registry file is created - self.wait_until(lambda: self.log_contains_count("Registry file updated") > 1) + self.wait_until(self.registry.exists) # Wait until registry is updated - self.wait_until(lambda: len(self.get_registry()) == 2) - assert len(self.get_registry()) == 2 + self.wait_until(lambda: self.registry.count() == 2) - os.remove(testfile_path1) + self.input_logs.remove(file1) # Wait until states are removed from inputs - self.wait_until(lambda: self.log_contains("Remove state for file as file removed")) + self.wait_until(self.logs.check("Remove state for file as file removed")) # Add one more line to make sure registry is written - with open(testfile_path2, 'a') as testfile2: - testfile2.write("make sure registry is written\n") - - self.wait_until( - lambda: self.output_has(lines=3), - max_timeout=10) + self.input_logs.append(file2, "make sure registry is written\n") + self.wait_until(lambda: self.output_has(lines=3), max_timeout=10) # Make sure all states are cleaned up - self.wait_until(lambda: self.log_contains("Before: 1, After: 1, Pending: 0")) + self.wait_until(self.logs.nextCheck(re.compile("Registrar.*After: 1"))) filebeat.check_kill_and_wait() # Check that the first to files were removed from the registry - data = self.get_registry() + data = self.registry.load() assert len(data) == 1 # Make sure the last file in the registry is the correct one and has the correct offset - if os.name == "nt": - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + 2 - else: - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + assert data[0]["offset"] == self.input_logs.size(file2) - @unittest.skip('flaky test https://github.com/elastic/beats/issues/9215') def test_clean_removed_with_clean_inactive(self): """ Checks that files which were removed, the state is removed @@ -918,59 +877,58 @@ def test_clean_removed_with_clean_inactive(self): path=os.path.abspath(self.working_dir) + "/log/input*", scan_frequency="0.1s", clean_removed=True, - clean_inactive="20s", + clean_inactive="60s", ignore_older="15s", close_removed=True ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/input1" - testfile_path2 = self.working_dir + "/log/input2" - - with open(testfile_path1, 'w') as testfile1: - testfile1.write("file to be removed\n") - - with open(testfile_path2, 'w') as testfile2: - testfile2.write("2\n") + file1 = "input1" + file2 = "input2" + contents2 = [ + "2\n", + "make sure registry is written\n", + ] + self.input_logs.write(file1, "file to be removed\n") + self.input_logs.write(file2, contents2[0]) filebeat = self.start_beat() - self.wait_until( - lambda: self.output_has(lines=2), - max_timeout=10) + self.wait_until(lambda: self.output_has(lines=2), max_timeout=10) # Wait until registry file is created self.wait_until( - lambda: self.log_contains_count("Registry file updated. 2 states written.") > 0, + self.logs.nextCheck("Registry file updated. 2 states written."), max_timeout=15) - data = self.get_registry() - assert len(data) == 2 + count = self.registry.count() + print("registry size: {}".format(count)) + assert count == 2 - os.remove(testfile_path1) + self.input_logs.remove(file1) # Wait until states are removed from inputs - self.wait_until(lambda: self.log_contains("Remove state for file as file removed")) + self.wait_until(self.logs.nextCheck("Remove state for file as file removed")) # Add one more line to make sure registry is written - with open(testfile_path2, 'a') as testfile2: - testfile2.write("make sure registry is written\n") + self.input_logs.append(file2, contents2[1]) self.wait_until(lambda: self.output_has(lines=3)) - # Check is > as the same log line might happen before but afterwards it is repeated - self.wait_until(lambda: self.log_contains_count("Before: 1, After: 1, Pending: 1") > 5) + + # wait until next gc and until registry file has been updated + self.wait_until(self.logs.check("Before: 1, After: 1, Pending: 1")) + self.wait_until(self.logs.nextCheck("Registry file updated. 1 states written.")) + count = self.registry.count() + print("registry size after remove: {}".format(count)) + assert count == 1 filebeat.check_kill_and_wait() - # Check that the first to files were removed from the registry - data = self.get_registry() + # Check that the first two files were removed from the registry + data = self.registry.load() assert len(data) == 1 # Make sure the last file in the registry is the correct one and has the correct offset - if os.name == "nt": - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + 2 - else: - assert data[0]["offset"] == len("make sure registry is written\n" + "2\n") + assert data[0]["offset"] == self.input_logs.size(file2) def test_symlink_failure(self): """ @@ -1044,56 +1002,44 @@ def test_restart_state(self): self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - close_inactive="1s", - ignore_older="3s", + close_inactive="200ms", + ignore_older="2000ms", ) - os.mkdir(self.working_dir + "/log/") - testfile_path1 = self.working_dir + "/log/test1.log" - testfile_path2 = self.working_dir + "/log/test2.log" - testfile_path3 = self.working_dir + "/log/test3.log" - testfile_path4 = self.working_dir + "/log/test4.log" + init_files = ["test"+str(i)+".log" for i in range(3)] + restart_files = ["test"+str(i+3)+".log" for i in range(1)] - with open(testfile_path1, 'w') as testfile1: - testfile1.write("Hello World\n") - with open(testfile_path2, 'w') as testfile2: - testfile2.write("Hello World\n") - with open(testfile_path3, 'w') as testfile3: - testfile3.write("Hello World\n") + for name in init_files: + self.input_logs.write(name, "Hello World\n") filebeat = self.start_beat() # Make sure states written appears one more time self.wait_until( - lambda: self.log_contains("Ignore file because ignore_older"), + self.logs.check("Ignore file because ignore_older"), max_timeout=10) filebeat.check_kill_and_wait() self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/*", - close_inactive="1s", - ignore_older="3s", - clean_inactive="5s", + close_inactive="200ms", + ignore_older="2000ms", + clean_inactive="3s", ) filebeat = self.start_beat(output="filebeat2.log") + logs = self.log_access("filebeat2.log") # Write additional file - with open(testfile_path4, 'w') as testfile4: - testfile4.write("Hello World\n") + for name in restart_files: + self.input_logs.write(name, "Hello World\n") # Make sure all 4 states are persisted - self.wait_until( - lambda: self.log_contains( - "input states cleaned up. Before: 4, After: 4", logfile="filebeat2.log"), - max_timeout=10) + self.wait_until(logs.nextCheck("input states cleaned up. Before: 4, After: 4")) # Wait until registry file is cleaned - self.wait_until( - lambda: self.log_contains( - "input states cleaned up. Before: 0, After: 0", logfile="filebeat2.log"), - max_timeout=10) + self.wait_until(logs.nextCheck("input states cleaned up. Before: 0, After: 0")) filebeat.check_kill_and_wait() diff --git a/libbeat/common/file/file_other.go b/libbeat/common/file/file_other.go index 1ac4ecb2cd4..894183cee32 100644 --- a/libbeat/common/file/file_other.go +++ b/libbeat/common/file/file_other.go @@ -62,3 +62,9 @@ func ReadOpen(path string) (*os.File, error) { perm := os.FileMode(0) return os.OpenFile(path, flag, perm) } + +// IsRemoved checks wheter the file held by f is removed. +func IsRemoved(f *os.File) bool { + _, err := os.Stat(f.Name()) + return err != nil +} diff --git a/libbeat/common/file/file_windows.go b/libbeat/common/file/file_windows.go index 6572fa78862..55f6ef4e1b2 100644 --- a/libbeat/common/file/file_windows.go +++ b/libbeat/common/file/file_windows.go @@ -23,6 +23,9 @@ import ( "reflect" "strconv" "syscall" + "unsafe" + + "golang.org/x/sys/windows" ) type StateOS struct { @@ -31,6 +34,12 @@ type StateOS struct { Vol uint64 `json:"vol,"` } +var ( + modkernel32 = windows.NewLazyDLL("kernel32.dll") + + procGetFileInformationByHandleEx = modkernel32.NewProc("GetFileInformationByHandleEx") +) + // GetOSState returns the platform specific StateOS func GetOSState(info os.FileInfo) StateOS { // os.SameFile must be called to populate the id fields. Otherwise in case for example @@ -107,3 +116,33 @@ func ReadOpen(path string) (*os.File, error) { return os.NewFile(uintptr(handle), path), nil } + +// IsRemoved checks wheter the file held by f is removed. +// On Windows IsRemoved reads the DeletePending flags using the GetFileInformationByHandleEx. +// A file is not removed/unlinked as long as at least one process still own a +// file handle. A delete file is only marked as deleted, and file attributes +// can still be read. Only opening a file marked with 'DeletePending' will +// fail. +func IsRemoved(f *os.File) bool { + hdl := f.Fd() + if hdl == uintptr(syscall.InvalidHandle) { + return false + } + + info := struct { + AllocationSize int64 + EndOfFile int64 + NumberOfLinks int32 + DeletePending bool + Directory bool + }{} + infoSz := unsafe.Sizeof(info) + + const class = 1 // FileStandardInfo + r1, _, _ := syscall.Syscall6( + procGetFileInformationByHandleEx.Addr(), 4, uintptr(hdl), class, uintptr(unsafe.Pointer(&info)), infoSz, 0, 0) + if r1 == 0 { + return true // assume file is removed if syscall errors + } + return info.DeletePending +}