Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auditbeat recursive file watches for Windows #6893

Merged
merged 5 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
*Auditbeat*

- Add hex decoding for the name field in audit path records. {pull}6687[6687]
- Fixed a deadlock in the file_integrity module under Windows. {issue}6864[6864]

*Filebeat*

Expand Down
2 changes: 1 addition & 1 deletion NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: github.com/fsnotify/fsnotify
Revision: 4da3e2cfbabc9f751898f250b49f2439785783a1
Revision: c9bbe1f46f1da9904baf3916a4ba4aec7f1e9000
License type (autodetected): BSD-3-Clause
./vendor/github.com/fsnotify/fsnotify/LICENSE:
--------------------------------------------------------------------
Expand Down
12 changes: 8 additions & 4 deletions auditbeat/module/file_integrity/eventreader_fsnotify.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ func NewEventReader(c Config) (EventProducer, error) {
}

func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents(done)

// Windows implementation of fsnotify needs to have the watched paths
// installed after the event consumer is started, to avoid a potential
// deadlock. Do it on all platforms for simplicity.
for _, p := range r.config.Paths {
if err := r.watcher.Add(p); err != nil {
if err == syscall.EMFILE {
Expand All @@ -48,10 +56,6 @@ func (r *reader) Start(done <-chan struct{}) (<-chan Event, error) {
}
}

if err := r.watcher.Start(); err != nil {
return nil, errors.Wrap(err, "unable to start watcher")
}
go r.consumeEvents(done)
r.log.Infow("Started fsnotify watcher",
"file_path", r.config.Paths,
"recursive", r.config.Recursive)
Expand Down
4 changes: 3 additions & 1 deletion auditbeat/module/file_integrity/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ func New(recursive bool) (Watcher, error) {
if err != nil {
return nil, err
}
if recursive {
// Use our simulated recursive watches unless the fsnotify implementation
// supports OS-provided recursive watches
if recursive && fsnotify.SetRecursive() != nil {
return newRecursiveWatcher(fsnotify), nil
}
return (*nonRecursiveWatcher)(fsnotify), nil
Expand Down
39 changes: 39 additions & 0 deletions auditbeat/tests/system/auditbeat.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import os
import shutil
import sys
import tempfile

sys.path.append(os.path.join(os.path.dirname(__file__), '../../../metricbeat/tests/system'))

if os.name == "nt":
import win32file

from metricbeat import BaseTest as MetricbeatTest


Expand All @@ -13,3 +18,37 @@ def setUpClass(self):
self.beat_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../"))
super(MetricbeatTest, self).setUpClass()

def create_file(self, path, contents):
f = open(path, 'wb')
f.write(contents)
f.close()

def check_event(self, event, expected):
for key in expected:
assert key in event, "key '{0}' not found in event".format(key)
assert event[key] == expected[key], \
"key '{0}' has value '{1}', expected '{2}'".format(key,
event[key],
expected[key])

def temp_dir(self, prefix):
# os.path.realpath resolves any symlinks in path. Necessary for macOS
# where /var is a symlink to /private/var
p = os.path.realpath(tempfile.mkdtemp(prefix))
if os.name == "nt":
# Under windows, get rid of any ~1 in path (short path)
p = str(win32file.GetLongPathName(p))
return p


class PathCleanup:
def __init__(self, paths):
self.paths = paths

def __enter__(self):
pass

def __exit__(self, exc_type, exc_val, exc_tb):
for path in self.paths:
shutil.rmtree(path)
6 changes: 3 additions & 3 deletions auditbeat/tests/system/config/auditbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ auditbeat.modules:
{%- endfor %}

queue.mem:
events: 4096
flush.min_events: 8
flush.timeout: 0.1s
events: 4
flush.min_events: 0
flush.timeout: 0.01s

{% include './tests/system/config/libbeat.yml.j2' %}
189 changes: 189 additions & 0 deletions auditbeat/tests/system/test_file_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import sys
import os
import shutil
import time
import unittest
from auditbeat import *
from beat.beat import INTEGRATION_TESTS


# Escapes a path to match what's printed in the logs
def escape_path(path):
return path.replace('\\', '\\\\')


def has_file(objs, path, sha1hash):
found = False
for obj in objs:
if 'file.path' in obj and 'hash.sha1' in obj \
and obj['file.path'].lower() == path.lower() and obj['hash.sha1'] == sha1hash:
found = True
break
assert found, "File '{0}' with sha1sum '{1}' not found".format(path, sha1hash)


def has_dir(objs, path):
found = False
for obj in objs:
if 'file.path' in obj and obj['file.path'].lower() == path.lower() and obj['file.type'] == "dir":
found = True
break
assert found, "Dir '{0}' not found".format(path)


def file_events(objs, path, expected):
evts = set()
for obj in objs:
if 'file.path' in obj and 'event.action' in obj and obj['file.path'].lower() == path.lower():
if type(obj['event.action']) == list:
evts = evts.union(set(obj['event.action']))
else:
evts.add(obj['event.action'])
for wanted in set(expected):
assert wanted in evts, "Event {0} for path '{1}' not found (got {2})".format(
wanted, path, evts)


def wrap_except(expr):
try:
return expr()
except IOError:
return False


class Test(BaseTest):

def wait_output(self, min_events):
self.wait_until(lambda: wrap_except(lambda: len(self.read_output()) >= min_events))
# wait for the number of lines in the file to stay constant for a second
prev_lines = -1
while True:
num_lines = self.output_lines()
if prev_lines < num_lines:
prev_lines = num_lines
time.sleep(1)
else:
break

def test_non_recursive(self):
"""
file_integrity monitors watched directories (non recursive).
"""

dirs = [self.temp_dir("auditbeat_test"),
self.temp_dir("auditbeat_test")]

with PathCleanup(dirs):
self.render_config_template(
modules=[{
"name": "file_integrity",
"extras": {
"paths": dirs,
"scan_at_start": False
}
}],
)
proc = self.start_beat()

# wait until the directories to watch are printed in the logs
# this happens when the file_integrity module starts.
# Case must be ignored under windows as capitalisation of paths
# may differ
self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True)

file1 = os.path.join(dirs[0], 'file.txt')
self.create_file(file1, "hello world!")

file2 = os.path.join(dirs[1], 'file2.txt')
self.create_file(file2, "Foo bar")

# wait until file1 is reported before deleting. Otherwise the hash
# might not be calculated
self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(file1)), ignore_case=True)

os.unlink(file1)

subdir = os.path.join(dirs[0], "subdir")
os.mkdir(subdir)
file3 = os.path.join(subdir, "other_file.txt")
self.create_file(file3, "not reported.")

self.wait_log_contains("\"deleted\"")
self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(subdir)), ignore_case=True)
self.wait_output(3)

proc.check_kill_and_wait()
self.assert_no_logged_warnings()

# Ensure all Beater stages are used.
assert self.log_contains("Setup Beat: auditbeat")
assert self.log_contains("auditbeat start running")
assert self.log_contains("auditbeat stopped")

objs = self.read_output()

has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169")
has_file(objs, file2, "d23be250530a24be33069572db67995f21244c51")
has_dir(objs, subdir)

file_events(objs, file1, ['created', 'deleted'])
file_events(objs, file2, ['created'])

# assert file inside subdir is not reported
assert self.log_contains(file3) is False

def test_recursive(self):
"""
file_integrity monitors watched directories (recursive).
"""

dirs = [self.temp_dir("auditbeat_test")]

with PathCleanup(dirs):
self.render_config_template(
modules=[{
"name": "file_integrity",
"extras": {
"paths": dirs,
"scan_at_start": False,
"recursive": True
}
}],
)
proc = self.start_beat()

# wait until the directories to watch are printed in the logs
# this happens when the file_integrity module starts
self.wait_log_contains(escape_path(dirs[0]), max_timeout=30, ignore_case=True)
self.wait_log_contains("\"recursive\": true")

subdir = os.path.join(dirs[0], "subdir")
os.mkdir(subdir)
file1 = os.path.join(subdir, "file.txt")
self.create_file(file1, "hello world!")

subdir2 = os.path.join(subdir, "other")
os.mkdir(subdir2)
file2 = os.path.join(subdir2, "more.txt")
self.create_file(file2, "")

self.wait_log_contains("\"path\": \"{0}\"".format(escape_path(file2)), ignore_case=True)
self.wait_output(4)

proc.check_kill_and_wait()
self.assert_no_logged_warnings()

# Ensure all Beater stages are used.
assert self.log_contains("Setup Beat: auditbeat")
assert self.log_contains("auditbeat start running")
assert self.log_contains("auditbeat stopped")

objs = self.read_output()

has_file(objs, file1, "430ce34d020724ed75a196dfc2ad67c77772d169")
has_file(objs, file2, "da39a3ee5e6b4b0d3255bfef95601890afd80709")
has_dir(objs, subdir)
has_dir(objs, subdir2)

file_events(objs, file1, ['created'])
file_events(objs, file2, ['created'])
15 changes: 10 additions & 5 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,27 +339,30 @@ def get_log(self, logfile=None):

def wait_log_contains(self, msg, logfile=None,
max_timeout=10, poll_interval=0.1,
name="log_contains"):
name="log_contains",
ignore_case=False):
self.wait_until(
cond=lambda: self.log_contains(msg, logfile),
cond=lambda: self.log_contains(msg, logfile, ignore_case=ignore_case),
max_timeout=max_timeout,
poll_interval=poll_interval,
name=name)

def log_contains(self, msg, logfile=None):
def log_contains(self, msg, logfile=None, ignore_case=False):
"""
Returns true if the give logfile contains the given message.
Note that the msg must be present in a single line.
"""

return self.log_contains_count(msg, logfile) > 0
return self.log_contains_count(msg, logfile, ignore_case=ignore_case) > 0

def log_contains_count(self, msg, logfile=None):
def log_contains_count(self, msg, logfile=None, ignore_case=False):
"""
Returns the number of appearances of the given string in the log file
"""

counter = 0
if ignore_case:
msg = msg.lower()

# Init defaults
if logfile is None:
Expand All @@ -368,6 +371,8 @@ def log_contains_count(self, msg, logfile=None):
try:
with open(os.path.join(self.working_dir, logfile), "r") as f:
for line in f:
if ignore_case:
line = line.lower()
if line.find(msg) >= 0:
counter = counter + 1
except IOError:
Expand Down
Loading