Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Registrar not removing files when clean_removed is configured #10747

Merged
merged 10 commits into from
Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixed data types for roles and indices fields in `elasticsearch/audit` fileset {pull}10307[10307]
- Ensure `source.address` is always populated by the nginx module (ECS). {pull}10418[10418]
- Cover empty request data, url and version in Apache2 module{pull}10730[10730]
- Fix registry entries not being cleaned due to race conditions. {pull}10747[10747]
- Improve detection of file deletion on Windows. {pull}10747[10747]

*Heartbeat*

Expand Down
1 change: 1 addition & 0 deletions filebeat/harvester/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Source interface {
io.ReadCloser
Name() string
Removed() bool // check if source has been removed
Stat() (os.FileInfo, error)
Continuable() bool // can we continue processing after EOF?
HasState() bool // does this source have a state?
Expand Down
7 changes: 6 additions & 1 deletion filebeat/input/log/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

package log

import "os"
import (
"os"

"github.com/elastic/beats/libbeat/common/file"
)

type File struct {
*os.File
}

func (File) Continuable() bool { return true }
func (File) HasState() bool { return true }
func (f File) Removed() bool { return file.IsRemoved(f.File) }
6 changes: 3 additions & 3 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,12 +391,12 @@ func (h *Harvester) SendStateUpdate() {
return
}

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
h.states.Update(h.state)

d := util.NewData()
d.SetState(h.state)
h.publishState(d)

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
h.states.Update(h.state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this move? I would assume the harvester itself should be the first one to know about state updates?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add my findings to the PR description.

The Update here is no to the harvester, but to the prospector. Thanks to shutdown racing with scan phase we sometimes send finish and remove events out of order if publishState is not called before states.Update.

}

// shouldExportLine decides if the line is exported or not based on
Expand Down
5 changes: 1 addition & 4 deletions filebeat/input/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ func (f *Log) errorChecks(err error) error {

if f.config.CloseRemoved {
// Check if the file name exists. See https://github.com/elastic/filebeat/issues/93
_, statErr := os.Stat(f.fs.Name())

// Error means file does not exist.
if statErr != nil {
if f.fs.Removed() {
return ErrRemoved
}
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/log/stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ func (p Pipe) Name() string { return p.File.Name() }
func (p Pipe) Stat() (os.FileInfo, error) { return p.File.Stat() }
func (p Pipe) Continuable() bool { return false }
func (p Pipe) HasState() bool { return false }
func (p Pipe) Removed() bool { return false }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exported method Pipe.Removed should have comment or be unexported

2 changes: 1 addition & 1 deletion filebeat/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ func (r *Registrar) writeRegistry() error {
}

func writeTmpFile(baseName string, perm os.FileMode, states []file.State) (string, error) {
logp.Debug("registrar", "Write registry file: %s", baseName)
logp.Debug("registrar", "Write registry file: %s (%v)", baseName, len(states))

tempfile := baseName + ".new"
f, err := os.OpenFile(tempfile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_SYNC, perm)
Expand Down
155 changes: 126 additions & 29 deletions filebeat/tests/system/filebeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
curdir = os.path.dirname(__file__)
sys.path.append(os.path.join(curdir, '../../../libbeat/tests/system'))

from beat.beat import TestCase
from beat.beat import TestCase, TimeoutError, REGEXP_TYPE

default_registry_file = 'registry/filebeat/data.json'

Expand All @@ -22,27 +22,33 @@ def setUpClass(self):

super(BaseTest, self).setUpClass()

def has_registry(self, name=None, data_path=None):
if not name:
name = default_registry_file
if not data_path:
data_path = self.working_dir
@property
def registry(self):
return self.access_registry()

dotFilebeat = os.path.join(data_path, name)
return os.path.isfile(dotFilebeat)
@property
def input_logs(self):
return InputLogs(os.path.join(self.working_dir, "log"))

def get_registry(self, name=None, data_path=None):
if not name:
name = default_registry_file
if not data_path:
data_path = self.working_dir
@property
def logs(self):
return self.log_access()

# Returns content of the registry file
dotFilebeat = os.path.join(data_path, name)
self.wait_until(cond=lambda: os.path.isfile(dotFilebeat))
def access_registry(self, name=None, data_path=None):
data_path = data_path if data_path else self.working_dir
return Registry(data_path, name)

with open(dotFilebeat) as file:
return json.load(file)
def log_access(self, file=None):
file = file if file else self.beat_name + ".log"
return LogState(os.path.join(self.working_dir, file))

def has_registry(self, name=None, data_path=None):
return self.access_registry(name, data_path).exists()

def get_registry(self, name=None, data_path=None, filter=None):
reg = self.access_registry(name, data_path)
self.wait_until(reg.exists)
return reg.load(filter=filter)

def get_registry_entry_by_path(self, path):
"""
Expand All @@ -51,21 +57,112 @@ def get_registry_entry_by_path(self, path):
If a path exists multiple times (which is possible because of file rotation)
the most recent version is returned
"""
registry = self.get_registry()

tmp_entry = None
def hasPath(entry):
return entry["source"] == path

# Checks all entries and returns the most recent one
for entry in registry:
if entry["source"] == path:
if tmp_entry == None:
tmp_entry = entry
else:
if tmp_entry["timestamp"] < entry["timestamp"]:
tmp_entry = entry
entries = self.get_registry(filter=hasPath)
entries.sort(key=lambda x: x["timestamp"])

return tmp_entry
# return entry with largest timestamp
return None if len(entries) == 0 else entries[-1]

def file_permissions(self, path):
full_path = os.path.join(self.working_dir, path)
return oct(stat.S_IMODE(os.lstat(full_path).st_mode))


class InputLogs:
""" InputLogs is used to write and append to files which are read by filebeat. """

def __init__(self, home):
self.home = home
if not os.path.isdir(self.home):
os.mkdir(self.home)

def write(self, name, contents):
self._write_to(name, 'w', contents)

def append(self, name, contents):
self._write_to(name, 'a', contents)

def size(self, name):
return os.path.getsize(self.path_of(name))

def _write_to(self, name, mode, contents):
with open(self.path_of(name), mode) as f:
f.write(contents)

def remove(self, name):
os.remove(self.path_of(name))

def path_of(self, name):
return os.path.join(self.home, name)


class Registry:
""" Registry provides access to the registry used by filebeat to store its progress """

def __init__(self, home, name=None):
if not name:
name = default_registry_file
self.path = os.path.join(home, name)

def exists(self):
return os.path.isfile(self.path)

def load(self, filter=None):
with open(self.path) as f:
entries = json.load(f)

if filter:
entries = [x for x in entries if filter(x)]
return entries

def count(self, filter=None):
if not self.exists():
return 0
return len(self.load(filter=filter))


class LogState:
def __init__(self, path):
self.path = path
self.off = 0

def checkpoint(self):
self.off = os.path.getsize(self.path)

def lines(self, filter=None):
if not filter:
def filter(x): return True
with open(self.path, "r") as f:
f.seek(self.off)
return [l for l in f if filter(l)]

def contains(self, msg, ignore_case=False, count=1):
if ignore_case:
msg = msg.lower()

if type(msg) == REGEXP_TYPE:
def match(x): return msg.search(x) is not None
else:
def match(x): return x.find(msg) >= 0

pred = match
if ignore_case:
def pred(x): return match(x.lower())

return len(self.lines(filter=pred)) >= count

def next(self, msg, ignore_case=False, count=1):
ok = self.contains(msg, ignore_case, count)
if ok:
self.checkpoint()
return ok

def nextCheck(self, msg, ignore_case=False, count=1):
return lambda: self.next(msg, ignore_case, count)

def check(self, msg, ignore_case=False, count=1):
return lambda: self.contains(msg, ignore_case, count)
Loading