Skip to content

Commit

Permalink
Fix Registrar not removing files when clean_removed is configured (#1…
Browse files Browse the repository at this point in the history
…0747)

This PR includes some changes that also did help to clean/stabilize the `text_clean_removed*` tests. Often times the tests have been stopped to early, due to races between the input file states, registrar states, and test code.

After stabilizing the tests, I reliably (still needs a few runs) can trigger a data-synchronisation issue between the prospectors state and the registry. This is clearly a bug and results in state not being removed from the registry file at all.

After some more testing (yay, heisenbug :/) I found a race condition between harvester shutdown, prospector state cleaning (e.g. if clean_removed is set) and registry updates. The sequence of events goes like this:
```
1. harvester closes file
2. harvester updates prospectors local file states (marks file as finished)
3. prospector starts GC, cleaning old states
4. prospector sends 'remove state event' to the registrar (TTL=0, Finished=True). Note: state is not removed by prospector until next scan completes.
5. harvestar sends 'finished state event' to the registrar (TTL=-1, Finished=True)
6. registrar reads applies state updates:
  1. change state to TTL=0, Finished=True
  2. change state to TTL=-1, Finished=True
  3. clean local state -> file is not removed from registry, due to TTL=-1
  4. write registry
```

The change proposed changes the order the harvester cleanup propagates updates by sending the 'finished event' first before updating the prospectors state. This way we postpone the file being cleaned by the prospector and guarantee the 'finished event' and 'remove event' order (The prospector can not remove the state yet, because Finished is not set until after the 'finished event' has been published).


Additions:
- Add Registry class (accessed via `self.registry()`). This class is used to check/read the registry
- Add InputLogs class for creating/apppending/removing log files used to drive the tests. This reduces path manipulations and reduces repetitive `with open(...) as f:` blocks.
- Add LogState class. This class keeps an offset to start reading from. The offset can be advanced via `checkpoint` or is automatically advanced if `next` succeeds. Using `LogState` we can wait for a particular message to appear in the log, by ignoring old messages. This removes the need to have a count on actual messages.
- Thanks to `LogState` and some minor adjustments to timings I did shave of like 10s from test_registrar.py on my machine.

FB success count:
- test_clean_removed: 5
- test_clean_removed_with_clean_inactive: 5
  • Loading branch information
Steffen Siering authored Feb 18, 2019
1 parent e1a9f54 commit 1438323
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 166 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixed data types for roles and indices fields in `elasticsearch/audit` fileset {pull}10307[10307]
- Ensure `source.address` is always populated by the nginx module (ECS). {pull}10418[10418]
- Cover empty request data, url and version in Apache2 module{pull}10730[10730]
- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747]
- Improve detection of file deletion on Windows. {pull}10747[10747]

*Heartbeat*

Expand Down
1 change: 1 addition & 0 deletions filebeat/harvester/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Source interface {
io.ReadCloser
Name() string
Removed() bool // check if source has been removed
Stat() (os.FileInfo, error)
Continuable() bool // can we continue processing after EOF?
HasState() bool // does this source have a state?
Expand Down
7 changes: 6 additions & 1 deletion filebeat/input/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package log

import "os"
import (
"os"

"github.com/elastic/beats/libbeat/common/file"
)

type File struct {
*os.File
}

func (File) Continuable() bool { return true }
func (File) HasState() bool { return true }
func (f File) Removed() bool { return file.IsRemoved(f.File) }
6 changes: 3 additions & 3 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,12 @@ func (h *Harvester) SendStateUpdate() {
return
}

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
h.states.Update(h.state)

d := util.NewData()
d.SetState(h.state)
h.publishState(d)

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
h.states.Update(h.state)
}

// shouldExportLine decides if the line is exported or not based on
Expand Down
5 changes: 1 addition & 4 deletions filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ func (f *Log) errorChecks(err error) error {

if f.config.CloseRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
_, statErr := os.Stat(f.fs.Name())

// Error means file does not exist.
if statErr != nil {
if f.fs.Removed() {
return ErrRemoved
}
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/log/stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ func (p Pipe) Name() string { return p.File.Name() }
func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() }
func (p Pipe) Continuable() bool { return false }
func (p Pipe) HasState() bool { return false }
func (p Pipe) Removed() bool { return false }
2 changes: 1 addition & 1 deletion filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (r *Registrar) writeRegistry() error {
}

func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
logp.Debug("registrar", "Write registry file: %s", baseName)
logp.Debug("registrar", "Write registry file: %s (%v)", baseName, len(states))

tempfile := baseName + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
Expand Down
155 changes: 126 additions & 29 deletions filebeat/tests/system/filebeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
curdir = os.path.dirname(__file__)
sys.path.append(os.path.join(curdir, '../../../libbeat/tests/system'))

from beat.beat import TestCase
from beat.beat import TestCase, TimeoutError, REGEXP_TYPE

default_registry_file = 'registry/filebeat/data.json'

Expand All @@ -22,27 +22,33 @@ def setUpClass(self):

super(BaseTest, self).setUpClass()

def has_registry(self, name=None, data_path=None):
if not name:
name = default_registry_file
if not data_path:
data_path = self.working_dir
@property
def registry(self):
return self.access_registry()

dotFilebeat = os.path.join(data_path, name)
return os.path.isfile(dotFilebeat)
@property
def input_logs(self):
return InputLogs(os.path.join(self.working_dir, "log"))

def get_registry(self, name=None, data_path=None):
if not name:
name = default_registry_file
if not data_path:
data_path = self.working_dir
@property
def logs(self):
return self.log_access()

# Returns content of the registry file
dotFilebeat = os.path.join(data_path, name)
self.wait_until(cond=lambda: os.path.isfile(dotFilebeat))
def access_registry(self, name=None, data_path=None):
data_path = data_path if data_path else self.working_dir
return Registry(data_path, name)

with open(dotFilebeat) as file:
return json.load(file)
def log_access(self, file=None):
file = file if file else self.beat_name + ".log"
return LogState(os.path.join(self.working_dir, file))

def has_registry(self, name=None, data_path=None):
return self.access_registry(name, data_path).exists()

def get_registry(self, name=None, data_path=None, filter=None):
reg = self.access_registry(name, data_path)
self.wait_until(reg.exists)
return reg.load(filter=filter)

def get_registry_entry_by_path(self, path):
"""
Expand All @@ -51,21 +57,112 @@ def get_registry_entry_by_path(self, path):
If a path exists multiple times (which is possible because of file rotation)
the most recent version is returned
"""
registry = self.get_registry()

tmp_entry = None
def hasPath(entry):
return entry["source"] == path

# Checks all entries and returns the most recent one
for entry in registry:
if entry["source"] == path:
if tmp_entry == None:
tmp_entry = entry
else:
if tmp_entry["timestamp"] < entry["timestamp"]:
tmp_entry = entry
entries = self.get_registry(filter=hasPath)
entries.sort(key=lambda x: x["timestamp"])

return tmp_entry
# return entry with largest timestamp
return None if len(entries) == 0 else entries[-1]

def file_permissions(self, path):
full_path = os.path.join(self.working_dir, path)
return oct(stat.S_IMODE(os.lstat(full_path).st_mode))


class InputLogs:
""" InputLogs is used to write and append to files which are read by filebeat. """

def __init__(self, home):
self.home = home
if not os.path.isdir(self.home):
os.mkdir(self.home)

def write(self, name, contents):
self._write_to(name, 'w', contents)

def append(self, name, contents):
self._write_to(name, 'a', contents)

def size(self, name):
return os.path.getsize(self.path_of(name))

def _write_to(self, name, mode, contents):
with open(self.path_of(name), mode) as f:
f.write(contents)

def remove(self, name):
os.remove(self.path_of(name))

def path_of(self, name):
return os.path.join(self.home, name)


class Registry:
""" Registry provides access to the registry used by filebeat to store its progress """

def __init__(self, home, name=None):
if not name:
name = default_registry_file
self.path = os.path.join(home, name)

def exists(self):
return os.path.isfile(self.path)

def load(self, filter=None):
with open(self.path) as f:
entries = json.load(f)

if filter:
entries = [x for x in entries if filter(x)]
return entries

def count(self, filter=None):
if not self.exists():
return 0
return len(self.load(filter=filter))


class LogState:
def __init__(self, path):
self.path = path
self.off = 0

def checkpoint(self):
self.off = os.path.getsize(self.path)

def lines(self, filter=None):
if not filter:
def filter(x): return True
with open(self.path, "r") as f:
f.seek(self.off)
return [l for l in f if filter(l)]

def contains(self, msg, ignore_case=False, count=1):
if ignore_case:
msg = msg.lower()

if type(msg) == REGEXP_TYPE:
def match(x): return msg.search(x) is not None
else:
def match(x): return x.find(msg) >= 0

pred = match
if ignore_case:
def pred(x): return match(x.lower())

return len(self.lines(filter=pred)) >= count

def next(self, msg, ignore_case=False, count=1):
ok = self.contains(msg, ignore_case, count)
if ok:
self.checkpoint()
return ok

def nextCheck(self, msg, ignore_case=False, count=1):
return lambda: self.next(msg, ignore_case, count)

def check(self, msg, ignore_case=False, count=1):
return lambda: self.contains(msg, ignore_case, count)
Loading

0 comments on commit 1438323

Please sign in to comment.