diff --git a/secator/cli.py b/secator/cli.py index 2939c9a8..db412da0 100644 --- a/secator/cli.py +++ b/secator/cli.py @@ -1052,5 +1052,5 @@ def integration(tasks, workflows, scans, test, debug): @test.command() def coverage(): """Run coverage report.""" - cmd = f'{sys.executable} -m coverage report -m --omit=*/site-packages/*,*/tests/*' + cmd = f'{sys.executable} -m coverage report -m --omit=*/site-packages/*,*/tests/*,*/templates/*' run_test(cmd, 'coverage') diff --git a/secator/output_types/vulnerability.py b/secator/output_types/vulnerability.py index 05c06da1..74c89344 100644 --- a/secator/output_types/vulnerability.py +++ b/secator/output_types/vulnerability.py @@ -89,11 +89,5 @@ def __repr__(self): s = f'[dim]{s}[/]' return rich_to_ansi(s) - # def __gt__(self, other): - # # favor httpx over other url info tools - # if self._source == 'httpx' and other._source != 'httpx': - # return True - # return super().__gt__(other) - def __str__(self): return self.matched_at + ' -> ' + self.name diff --git a/secator/tasks/__init__.py b/secator/tasks/__init__.py index 2606bc2e..867bbdf3 100644 --- a/secator/tasks/__init__.py +++ b/secator/tasks/__init__.py @@ -1,10 +1,8 @@ -from secator.utils import discover_internal_tasks, discover_external_tasks -INTERNAL_TASKS = discover_internal_tasks() -EXTERNAL_TASKS = discover_external_tasks() -ALL_TASKS = INTERNAL_TASKS + EXTERNAL_TASKS +from secator.utils import discover_tasks +TASKS = discover_tasks() __all__ = [ cls.__name__ - for cls in ALL_TASKS + for cls in TASKS ] -for cls in INTERNAL_TASKS: +for cls in TASKS: exec(f'from .{cls.__name__} import {cls.__name__}') diff --git a/secator/utils.py b/secator/utils.py index b12a1c9c..abb9df12 100644 --- a/secator/utils.py +++ b/secator/utils.py @@ -1,4 +1,5 @@ import inspect +import importlib import itertools import logging import operator @@ -8,7 +9,7 @@ import sys import warnings from datetime import datetime -from importlib import import_module + from inspect import isclass from pathlib import Path from pkgutil import iter_modules @@ -138,7 +139,7 @@ def discover_internal_tasks(): if module_name.startswith('_'): continue try: - module = import_module(f'secator.tasks.{module_name}') + module = importlib.import_module(f'secator.tasks.{module_name}') except ImportError as e: console.print(f'[bold red]Could not import secator.tasks.{module_name}:[/]') console.print(f'\t[bold red]{type(e).__name__}[/]: {str(e)}') @@ -160,17 +161,32 @@ def discover_internal_tasks(): def discover_external_tasks(): """Find external secator tasks.""" - if not os.path.exists('config.secator'): - return [] - with open('config.secator', 'r') as f: - classes = f.read().splitlines() output = [] - for cls_path in classes: - cls = import_dynamic(cls_path, cls_root='Command') - if not cls: - continue - # logger.warning(f'Added external tool {cls_path}') - output.append(cls) + sys.dont_write_bytecode = True + for path in CONFIG.dirs.templates.glob('**/*.py'): + try: + task_name = path.stem + module_name = f'secator.tasks.{task_name}' + + # console.print(f'Importing module {module_name} from {path}') + spec = importlib.util.spec_from_file_location(module_name, path) + module = importlib.util.module_from_spec(spec) + # console.print(f'Adding module "{module_name}" to sys path') + sys.modules[module_name] = module + + # console.print(f'Executing module "{module}"') + spec.loader.exec_module(module) + + # console.print(f'Checking that {module} contains task {task_name}') + if not hasattr(module, task_name): + console.print(f'[bold orange1]Could not load external task "{task_name}" from module {path.name}[/] ({path})') + continue + cls = getattr(module, task_name) + console.print(f'[bold green]Successfully loaded external task "{task_name}"[/] ({path})') + output.append(cls) + except Exception as e: + console.print(f'[bold red]Could not load external module {path.name}. Reason: {str(e)}.[/] ({path})') + sys.dont_write_bytecode = False return output @@ -194,7 +210,7 @@ def import_dynamic(cls_path, cls_root='Command'): """ try: package, name = cls_path.rsplit(".", maxsplit=1) - cls = getattr(import_module(package), name) + cls = getattr(importlib.import_module(package), name) root_cls = inspect.getmro(cls)[-2] if root_cls.__name__ == cls_root: return cls diff --git a/secator/utils_test.py b/secator/utils_test.py index 753e15d2..27c0ef2c 100644 --- a/secator/utils_test.py +++ b/secator/utils_test.py @@ -1,6 +1,7 @@ import contextlib import json import os +import sys import unittest.mock from fp.fp import FreeProxy @@ -182,3 +183,15 @@ def _test_task_output( raise console.print('[bold green] ok[/]') + + +def clear_modules(): + """Clear all secator modules imports. + See https://stackoverflow.com/questions/7460363/re-import-module-under-test-to-lose-context for context. + """ + keys_to_delete = [] + for k, _ in sys.modules.items(): + if k.startswith('secator'): + keys_to_delete.append(k) + for k in keys_to_delete: + del sys.modules[k] diff --git a/tests/fixtures/ls.py b/tests/fixtures/ls.py new file mode 100644 index 00000000..0a5fbcce --- /dev/null +++ b/tests/fixtures/ls.py @@ -0,0 +1,36 @@ +from secator.runners import Command +from secator.decorators import task +from secator.output_types import Vulnerability + + +@task() +class ls(Command): + cmd = 'ls -al' + output_types = [Vulnerability] + output_map = { + Vulnerability: {} + } + + @staticmethod + def item_loader(self, line): + fields = ['permissions', 'link_count', 'owner', 'group', 'size', 'month', 'day', 'hour', 'path'] + result = [c for c in line.split(' ') if c] + if len(result) != len(fields): + return None + data = {} + for ix, value in enumerate(result): + data[fields[ix]] = value + + # Output vulnerabilities + permissions = data['permissions'] + path = data['path'] + full_path = f'{self.input}/{path}' + if permissions[-2] == 'w': # found a vulnerability ! + yield Vulnerability( + name='World-writeable path', + severity='high', + confidence='high', + provider='ls', + matched_at=full_path, + extra_data={k: v for k, v in data.items() if k != 'path'} + ) diff --git a/tests/fixtures/ls.yml b/tests/fixtures/ls.yml new file mode 100644 index 00000000..b515f50d --- /dev/null +++ b/tests/fixtures/ls.yml @@ -0,0 +1,4 @@ +type: workflow +name: ls +tasks: + ls: \ No newline at end of file diff --git a/tests/unit/test_offline.py b/tests/unit/test_offline.py index e77103cf..9377bb02 100644 --- a/tests/unit/test_offline.py +++ b/tests/unit/test_offline.py @@ -1,17 +1,12 @@ import os -import sys import unittest +from secator.utils_test import clear_modules + class TestOffline(unittest.TestCase): def setUp(self): - try: - # This allows to drop the secator module loaded from other tests in order to reload the config with modified - # environment variables. - # See https://stackoverflow.com/questions/7460363/re-import-module-under-test-to-lose-context for context. - del sys.modules['secator'] - except KeyError: - pass + clear_modules() os.environ['SECATOR_OFFLINE_MODE'] = '1' def test_offline_cve_lookup(self): diff --git a/tests/unit/test_template.py b/tests/unit/test_template.py new file mode 100644 index 00000000..db1deb98 --- /dev/null +++ b/tests/unit/test_template.py @@ -0,0 +1,53 @@ +import unittest +from secator.config import CONFIG +from secator.output_types import Vulnerability +from secator.utils_test import FIXTURES_DIR, clear_modules +import os + +import shutil + + +class TestTemplate(unittest.TestCase): + def setUp(self): + self.template_dir = CONFIG.dirs.templates + self.custom_task_path = self.template_dir / 'ls.py' + self.writeable_file = self.template_dir / 'test.txt' + self.custom_workflow_path = self.template_dir / 'ls.yml' + shutil.copy(f'{FIXTURES_DIR}/ls.py', self.custom_task_path) + shutil.copy(f'{FIXTURES_DIR}/ls.yml', self.custom_workflow_path) + self.writeable_file.touch() + os.chmod(self.writeable_file, 0o007) + self.expected_vuln = Vulnerability( + name='World-writeable path', + severity='high', + confidence='high', + provider='ls', + matched_at=f'{str(self.writeable_file)}', + _source='ls', + ) + clear_modules() + self.maxDiff = None + + def tearDown(self): + self.custom_task_path.unlink() + self.custom_workflow_path.unlink() + self.writeable_file.unlink() + + def test_external_task(self): + from secator.tasks import ls + results = ls(str(self.template_dir)).run() + self.assertEqual(len(results), 1) + self.assertTrue(self.expected_vuln == Vulnerability.load(results[0].toDict())) + + def test_external_workflow(self): + from secator.cli import ALL_WORKFLOWS + from secator.runners import Workflow + ls_workflow = None + for w in ALL_WORKFLOWS: + if w.name == 'ls': + ls_workflow = w + self.assertIsNotNone(ls_workflow) + results = Workflow(ls_workflow, targets=[str(self.template_dir)]).run() + self.assertEqual(len(results), 2) + self.assertTrue(self.expected_vuln == Vulnerability.load(results[1].toDict())) +