Skip to content

Commit

Permalink
Update tests to use result struct + json-based validation
Browse files Browse the repository at this point in the history
Update tests that validated rules files (typically looking for
errors/warnings) to use the new result struct + json based validation:

- When validating rules files, always use json output.

- In test cases, instead of parsing stderr/stdout, use new test
  properties "validate_ok", "validate_errors",
  "validate_warnings". These parse the json output and look for
  specific tuples of (error code, error message, item type, item name)
  in the output.

- There were a few tests that were actually validation tests but using
  the -r argument to load rules. Convert them to validation tests. In
  one case, split the test into two separate tests--one for
  validation, one ensuring that the rule doesn't match anything.

- There were a couple of tests that were duplicates of existing
  validation tests, just checking for the error in a different
  way. Remove them.

Signed-off-by: Mark Stemm <mark.stemm@gmail.com>
  • Loading branch information
mstemm authored and poiana committed Aug 4, 2022
1 parent 550cdbd commit a37e225
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 456 deletions.
103 changes: 77 additions & 26 deletions test/falco_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def setUp(self):
else:
self.stderr_not_contains = [self.stderr_not_contains]

self.validate_ok = self.params.get('validate_ok', '*', default='')
self.validate_warnings = self.params.get('validate_warnings', '*', default='')
self.validate_errors = self.params.get('validate_errors', '*', default='')

self.exit_status = self.params.get('exit_status', '*', default=0)
self.should_detect = self.params.get('detect', '*', default=False)
self.check_detection_counts = self.params.get('check_detection_counts', '*', default=True)
Expand Down Expand Up @@ -105,6 +109,9 @@ def setUp(self):
if self.validate_rules_file == False:
self.validate_rules_file = []
else:
# Always enable json output when validating rules
# files. Makes parsing errors/warnings easier
self.json_output = True
if not isinstance(self.validate_rules_file, list):
self.validate_rules_file = [self.validate_rules_file]

Expand Down Expand Up @@ -153,13 +160,6 @@ def setUp(self):
detect_counts[key] = value
self.detect_counts = detect_counts

self.rules_warning = self.params.get(
'rules_warning', '*', default=False)
if self.rules_warning == False:
self.rules_warning = set()
else:
self.rules_warning = set(self.rules_warning)

# Maps from rule name to set of evttypes
self.rules_events = self.params.get('rules_events', '*', default=False)
if self.rules_events == False:
Expand Down Expand Up @@ -265,22 +265,6 @@ def tearDown(self):
if self.package != 'None':
self.uninstall_package()

def check_rules_warnings(self, res):

found_warning = set()

for match in re.finditer('Rule ([^:]+): warning \(([^)]+)\):', res.stderr.decode("utf-8")):
rule = match.group(1)
warning = match.group(2)
found_warning.add(rule)

self.log.debug("Expected warning rules: {}".format(self.rules_warning))
self.log.debug("Actual warning rules: {}".format(found_warning))

if found_warning != self.rules_warning:
self.fail("Expected rules with warnings {} does not match actual rules with warnings {}".format(
self.rules_warning, found_warning))

def check_rules_events(self, res):

found_events = {}
Expand Down Expand Up @@ -376,7 +360,68 @@ def check_outputs(self):

return True

def check_json_output(self, res):
def get_validate_json(self, res):
if self.validate_json is None:
# The first line of stdout should be the validation result as json
self.validate_json = json.loads(res.stdout.decode("utf-8").partition('\n')[0])
return self.validate_json

def check_validate_ok(self, res):
if self.validate_ok != '':
vobj = self.get_validate_json(res)
for expected in self.validate_ok:
found = False
for vres in vobj["falco_load_results"]:
if vres["successful"] and os.path.basename(vres["name"]) == expected:
found = True
break
if not found:
self.fail("Validation json did not contain a successful result for file '{}'".format(expected))

def check_validate_warnings(self, res):
if self.validate_warnings != '':
vobj = self.get_validate_json(res)
for warnobj in self.validate_warnings:
found = False
for vres in vobj["falco_load_results"]:
for warning in vres["warnings"]:
if warning["code"] == warnobj["code"]:
if ("message" in warnobj and warning["message"] == warnobj["message"]) or ("message_contains" in warnobj and warnobj["message_contains"] in warning["message"]):
for loc in warning["context"]["locations"]:
if loc["item_type"] == warnobj["item_type"] and loc["item_name"] == warnobj["item_name"]:
found = True
break
if not found:
if "message" in warnobj:
self.fail("Validation json did not contain a warning '{}' for '{}' '{}' with message '{}'".format(
warnobj["code"], warnobj["item_type"], warnobj["item_name"], warnobj["message"]))
else:
self.fail("Validation json did not contain a warning '{}' for '{}' '{}' with message containing '{}'".format(
warnobj["code"], warnobj["item_type"], warnobj["item_name"], warnobj["message_contains"]))

def check_validate_errors(self, res):
if self.validate_errors != '':
vobj = self.get_validate_json(res)
for errobj in self.validate_errors:
found = False
for vres in vobj["falco_load_results"]:
for error in vres["errors"]:
if error["code"] == errobj["code"]:
if ("message" in errobj and error["message"] == errobj["message"]) or ("message_contains" in errobj and errobj["message_contains"] in error["message"]):
for loc in error["context"]["locations"]:
if loc["item_type"] == errobj["item_type"] and loc["item_name"] == errobj["item_name"]:
found = True
break
if not found:
if "message" in errobj:
self.fail("Validation json did not contain a error '{}' for '{}' '{}' with message '{}'".format(
errobj["code"], errobj["item_type"], errobj["item_name"], errobj["message"]))
else:
self.fail("Validation json did not contain a error '{}' for '{}' '{}' with message containing '{}'".format(
errobj["code"], errobj["item_type"], errobj["item_name"], errobj["message_contains"]))


def check_json_event_output(self, res):
if self.json_output:
# Just verify that any lines starting with '{' are valid json objects.
# Doesn't do any deep inspection of the contents.
Expand Down Expand Up @@ -578,6 +623,8 @@ def test(self):
# This sets falco_binary_path as a side-effect.
self.install_package()

self.validate_json = None

trace_arg = self.trace_file

if self.trace_file:
Expand Down Expand Up @@ -644,18 +691,22 @@ def test(self):
self.error("Falco command \"{}\" exited with unexpected return value {} (!= {})".format(
cmd, res.exit_status, self.exit_status))

self.check_validate_ok(res)
self.check_validate_errors(res)
self.check_validate_warnings(res)

# No need to check any outputs if the falco process exited abnormally.
if res.exit_status != 0:
return

self.check_rules_warnings(res)
if len(self.rules_events) > 0:
self.check_rules_events(res)
if len(self.validate_rules_file) == 0 and self.check_detection_counts:
self.check_detections(res)
if len(self.detect_counts) > 0:
self.check_detections_by_rule(res)
self.check_json_output(res)
if not self.validate_rules_file:
self.check_json_event_output(res)
self.check_outputs()
self.check_output_strictly_contains(res)
self.check_grpc()
Expand Down
Loading

0 comments on commit a37e225

Please sign in to comment.