Skip to content

Commit

Permalink
Auditbeat recursive file watches for Windows (#6893)
Browse files Browse the repository at this point in the history
* Add support for case-insensitive text search in system-tests

Allow system tests to search for text in the beat logfile using
case-insensitive search. This is necessary to match paths in
case-insensitive file systems, where the path logged may have different
capitalisation than the one used in the system test.

* Use custom version of fsnotify with recursion support

* Added system-test for the file_integrity module

* Auditbeat: Fix deadlock in fsnotify under Windows

Under Windows, directories to be watched need to be installed after the
event consumer loop is started. Otherwise there's a chance of a
potential deadlock, as fsnotify under Windows uses a single goroutine to
deliver events and install watches. If the channel it uses to deliver
events is full, it will block and won't be able to install further
watches.

* Auditbeat: Use OS-support for recursive watches

This patch enables OS-supported recursive watching in fsnotify if it is
available. Currently only supported under Windows via our custom
fsnotify fork.

Fixes #6864
  • Loading branch information
adriansr authored and andrewkroh committed Apr 19, 2018
1 parent 2a0ad4d commit 8088e53
Show file tree
Hide file tree
Showing 16 changed files with 452 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
*Auditbeat*

- Add hex decoding for the name field in audit path records. {pull}6687[6687]
- Fixed a deadlock in the file_integrity module under Windows. {issue}6864[6864]

*Filebeat*

Expand Down
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: github.com/fsnotify/fsnotify
Revision: 4da3e2cfbabc9f751898f250b49f2439785783a1
Revision: c9bbe1f46f1da9904baf3916a4ba4aec7f1e9000
License type (autodetected): BSD-3-Clause
./vendor/github.com/fsnotify/fsnotify/LICENSE:
--------------------------------------------------------------------
Expand Down
12 changes: 8 additions & 4 deletions auditbeat/module/file_integrity/eventreader_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ func NewEventReader(c Config) (EventProducer, error) {
}

func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents(done)

// Windows implementation of fsnotify needs to have the watched paths
// installed after the event consumer is started, to avoid a potential
// deadlock. Do it on all platforms for simplicity.
for _, p := range r.config.Paths {
if err := r.watcher.Add(p); err != nil {
if err == syscall.EMFILE {
Expand All @@ -48,10 +56,6 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
}
}

if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents(done)
r.log.Infow("Started fsnotify watcher",
"file_path", r.config.Paths,
"recursive", r.config.Recursive)
Expand Down
4 changes: 3 additions & 1 deletion auditbeat/module/file_integrity/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ func New(recursive bool) (Watcher, error) {
if err != nil {
return nil, err
}
if recursive {
// Use our simulated recursive watches unless the fsnotify implementation
// supports OS-provided recursive watches
if recursive && fsnotify.SetRecursive() != nil {
return newRecursiveWatcher(fsnotify), nil
}
return (*nonRecursiveWatcher)(fsnotify), nil
Expand Down
39 changes: 39 additions & 0 deletions auditbeat/tests/system/auditbeat.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import os
import shutil
import sys
import tempfile

sys.path.append(os.path.join(os.path.dirname(__file__), '../../../metricbeat/tests/system'))

if os.name == "nt":
import win32file

from metricbeat import BaseTest as MetricbeatTest


Expand All @@ -13,3 +18,37 @@ def setUpClass(self):
self.beat_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../"))
super(MetricbeatTest, self).setUpClass()

def create_file(self, path, contents):
f = open(path, 'wb')
f.write(contents)
f.close()

def check_event(self, event, expected):
for key in expected:
assert key in event, "key '{0}' not found in event".format(key)
assert event[key] == expected[key], \
"key '{0}' has value '{1}', expected '{2}'".format(key,
event[key],
expected[key])

def temp_dir(self, prefix):
# os.path.realpath resolves any symlinks in path. Necessary for macOS
# where /var is a symlink to /private/var
p = os.path.realpath(tempfile.mkdtemp(prefix))
if os.name == "nt":
# Under windows, get rid of any ~1 in path (short path)
p = str(win32file.GetLongPathName(p))
return p


class PathCleanup:
def __init__(self, paths):
self.paths = paths

def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
for path in self.paths:
shutil.rmtree(path)
6 changes: 3 additions & 3 deletions auditbeat/tests/system/config/auditbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ auditbeat.modules:
{%- endfor %}

queue.mem:
events: 4096
flush.min_events: 8
flush.timeout: 0.1s
events: 4
flush.min_events: 0
flush.timeout: 0.01s

{% include './tests/system/config/libbeat.yml.j2' %}
189 changes: 189 additions & 0 deletions auditbeat/tests/system/test_file_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import sys
import os
import shutil
import time
import unittest
from auditbeat import *
from beat.beat import INTEGRATION_TESTS


# Escapes a path to match what's printed in the logs
def escape_path(path):
return path.replace('\\', '\\\\')


def has_file(objs, path, sha1hash):
found = False
for obj in objs:
if 'file.path' in obj and 'hash.sha1' in obj \
and obj['file.path'].lower() == path.lower() and obj['hash.sha1'] == sha1hash:
found = True
break
assert found, "File '{0}' with sha1sum '{1}' not found".format(path, sha1hash)


def has_dir(objs, path):
found = False
for obj in objs:
if 'file.path' in obj and obj['file.path'].lower() == path.lower() and obj['file.type'] == "dir":
found = True
break
assert found, "Dir '{0}' not found".format(path)


def file_events(objs, path, expected):
evts = set()
for obj in objs:
if 'file.path' in obj and 'event.action' in obj and obj['file.path'].lower() == path.lower():
if type(obj['event.action']) == list:
evts = evts.union(set(obj['event.action']))
else:
evts.add(obj['event.action'])
for wanted in set(expected):
assert wanted in evts, "Event {0} for path '{1}' not found (got {2})".format(
wanted, path, evts)


def wrap_except(expr):
try:
return expr()
except IOError:
return False


class Test(BaseTest):

def wait_output(self, min_events):
self.wait_until(lambda: wrap_except(lambda: len(self.read_output()) >= min_events))
# wait for the number of lines in the file to stay constant for a second
prev_lines = -1
while True:
num_lines = self.output_lines()
if prev_lines < num_lines:
prev_lines = num_lines
time.sleep(1)
else:
break

def test_non_recursive(self):
"""
file_integrity monitors watched directories (non recursive).
"""

dirs = [self.temp_dir("auditbeat_test"),
self.temp_dir("auditbeat_test")]

with PathCleanup(dirs):
self.render_config_template(
modules=[{
"name": "file_integrity",
"extras": {
"paths": dirs,
"scan_at_start": False
}
}],
)
proc = self.start_beat()

# wait until the directories to watch are printed in the logs
# this happens when the file_integrity module starts.
# Case must be ignored under windows as capitalisation of paths
# may differ
self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True)

file1 = os.path.join(dirs[0], 'file.txt')
self.create_file(file1, "hello world!")

file2 = os.path.join(dirs[1], 'file2.txt')
self.create_file(file2, "Foo bar")

# wait until file1 is reported before deleting. Otherwise the hash
# might not be calculated
self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(file1)), ignore_case=True)

os.unlink(file1)

subdir = os.path.join(dirs[0], "subdir")
os.mkdir(subdir)
file3 = os.path.join(subdir, "other_file.txt")
self.create_file(file3, "not reported.")

self.wait_log_contains("\"deleted\"")
self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(subdir)), ignore_case=True)
self.wait_output(3)

proc.check_kill_and_wait()
self.assert_no_logged_warnings()

# Ensure all Beater stages are used.
assert self.log_contains("Setup Beat: auditbeat")
assert self.log_contains("auditbeat start running")
assert self.log_contains("auditbeat stopped")

objs = self.read_output()

has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169")
has_file(objs, file2, "d23be250530a24be33069572db67995f21244c51")
has_dir(objs, subdir)

file_events(objs, file1, ['created', 'deleted'])
file_events(objs, file2, ['created'])

# assert file inside subdir is not reported
assert self.log_contains(file3) is False

def test_recursive(self):
"""
file_integrity monitors watched directories (recursive).
"""

dirs = [self.temp_dir("auditbeat_test")]

with PathCleanup(dirs):
self.render_config_template(
modules=[{
"name": "file_integrity",
"extras": {
"paths": dirs,
"scan_at_start": False,
"recursive": True
}
}],
)
proc = self.start_beat()

# wait until the directories to watch are printed in the logs
# this happens when the file_integrity module starts
self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True)
self.wait_log_contains("\"recursive\": true")

subdir = os.path.join(dirs[0], "subdir")
os.mkdir(subdir)
file1 = os.path.join(subdir, "file.txt")
self.create_file(file1, "hello world!")

subdir2 = os.path.join(subdir, "other")
os.mkdir(subdir2)
file2 = os.path.join(subdir2, "more.txt")
self.create_file(file2, "")

self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(file2)), ignore_case=True)
self.wait_output(4)

proc.check_kill_and_wait()
self.assert_no_logged_warnings()

# Ensure all Beater stages are used.
assert self.log_contains("Setup Beat: auditbeat")
assert self.log_contains("auditbeat start running")
assert self.log_contains("auditbeat stopped")

objs = self.read_output()

has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169")
has_file(objs, file2, "da39a3ee5e6b4b0d3255bfef95601890afd80709")
has_dir(objs, subdir)
has_dir(objs, subdir2)

file_events(objs, file1, ['created'])
file_events(objs, file2, ['created'])
15 changes: 10 additions & 5 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,27 +339,30 @@ def get_log(self, logfile=None):

def wait_log_contains(self, msg, logfile=None,
max_timeout=10, poll_interval=0.1,
name="log_contains"):
name="log_contains",
ignore_case=False):
self.wait_until(
cond=lambda: self.log_contains(msg, logfile),
cond=lambda: self.log_contains(msg, logfile, ignore_case=ignore_case),
max_timeout=max_timeout,
poll_interval=poll_interval,
name=name)

def log_contains(self, msg, logfile=None):
def log_contains(self, msg, logfile=None, ignore_case=False):
"""
Returns true if the give logfile contains the given message.
Note that the msg must be present in a single line.
"""

return self.log_contains_count(msg, logfile) > 0
return self.log_contains_count(msg, logfile, ignore_case=ignore_case) > 0

def log_contains_count(self, msg, logfile=None):
def log_contains_count(self, msg, logfile=None, ignore_case=False):
"""
Returns the number of appearances of the given string in the log file
"""

counter = 0
if ignore_case:
msg = msg.lower()

# Init defaults
if logfile is None:
Expand All @@ -368,6 +371,8 @@ def log_contains_count(self, msg, logfile=None):
try:
with open(os.path.join(self.working_dir, logfile), "r") as f:
for line in f:
if ignore_case:
line = line.lower()
if line.find(msg) >= 0:
counter = counter + 1
except IOError:
Expand Down
Loading

0 comments on commit 8088e53

Please sign in to comment.