Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): load external tasks from template dir #373

Merged
merged 5 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion secator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,5 +1052,5 @@ def integration(tasks, workflows, scans, test, debug):
@test.command()
def coverage():
"""Run coverage report."""
cmd = f'{sys.executable} -m coverage report -m --omit=*/site-packages/*,*/tests/*'
cmd = f'{sys.executable} -m coverage report -m --omit=*/site-packages/*,*/tests/*,*/templates/*'
run_test(cmd, 'coverage')
6 changes: 0 additions & 6 deletions secator/output_types/vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,5 @@ def __repr__(self):
s = f'[dim]{s}[/]'
return rich_to_ansi(s)

# def __gt__(self, other):
# # favor httpx over other url info tools
# if self._source == 'httpx' and other._source != 'httpx':
# return True
# return super().__gt__(other)

def __str__(self):
return self.matched_at + ' -> ' + self.name
10 changes: 4 additions & 6 deletions secator/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
from secator.utils import discover_internal_tasks, discover_external_tasks
INTERNAL_TASKS = discover_internal_tasks()
EXTERNAL_TASKS = discover_external_tasks()
ALL_TASKS = INTERNAL_TASKS + EXTERNAL_TASKS
from secator.utils import discover_tasks
TASKS = discover_tasks()
__all__ = [
cls.__name__
for cls in ALL_TASKS
for cls in TASKS
]
for cls in INTERNAL_TASKS:
for cls in TASKS:
exec(f'from .{cls.__name__} import {cls.__name__}')
42 changes: 29 additions & 13 deletions secator/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import inspect
import importlib
import itertools
import logging
import operator
Expand All @@ -8,7 +9,7 @@
import sys
import warnings
from datetime import datetime
from importlib import import_module

from inspect import isclass
from pathlib import Path
from pkgutil import iter_modules
Expand Down Expand Up @@ -138,7 +139,7 @@ def discover_internal_tasks():
if module_name.startswith('_'):
continue
try:
module = import_module(f'secator.tasks.{module_name}')
module = importlib.import_module(f'secator.tasks.{module_name}')
except ImportError as e:
console.print(f'[bold red]Could not import secator.tasks.{module_name}:[/]')
console.print(f'\t[bold red]{type(e).__name__}[/]: {str(e)}')
Expand All @@ -160,17 +161,32 @@ def discover_internal_tasks():

def discover_external_tasks():
"""Find external secator tasks."""
if not os.path.exists('config.secator'):
return []
with open('config.secator', 'r') as f:
classes = f.read().splitlines()
output = []
for cls_path in classes:
cls = import_dynamic(cls_path, cls_root='Command')
if not cls:
continue
# logger.warning(f'Added external tool {cls_path}')
output.append(cls)
sys.dont_write_bytecode = True
for path in CONFIG.dirs.templates.glob('**/*.py'):
try:
task_name = path.stem
module_name = f'secator.tasks.{task_name}'

# console.print(f'Importing module {module_name} from {path}')
spec = importlib.util.spec_from_file_location(module_name, path)
module = importlib.util.module_from_spec(spec)
# console.print(f'Adding module "{module_name}" to sys path')
sys.modules[module_name] = module

# console.print(f'Executing module "{module}"')
spec.loader.exec_module(module)

# console.print(f'Checking that {module} contains task {task_name}')
if not hasattr(module, task_name):
console.print(f'[bold orange1]Could not load external task "{task_name}" from module {path.name}[/] ({path})')
continue
cls = getattr(module, task_name)
console.print(f'[bold green]Successfully loaded external task "{task_name}"[/] ({path})')
output.append(cls)
except Exception as e:
console.print(f'[bold red]Could not load external module {path.name}. Reason: {str(e)}.[/] ({path})')
sys.dont_write_bytecode = False
return output


Expand All @@ -194,7 +210,7 @@ def import_dynamic(cls_path, cls_root='Command'):
"""
try:
package, name = cls_path.rsplit(".", maxsplit=1)
cls = getattr(import_module(package), name)
cls = getattr(importlib.import_module(package), name)
root_cls = inspect.getmro(cls)[-2]
if root_cls.__name__ == cls_root:
return cls
Expand Down
13 changes: 13 additions & 0 deletions secator/utils_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import json
import os
import sys
import unittest.mock

from fp.fp import FreeProxy
Expand Down Expand Up @@ -182,3 +183,15 @@ def _test_task_output(
raise

console.print('[bold green] ok[/]')


def clear_modules():
"""Clear all secator modules imports.
See https://stackoverflow.com/questions/7460363/re-import-module-under-test-to-lose-context for context.
"""
keys_to_delete = []
for k, _ in sys.modules.items():
if k.startswith('secator'):
keys_to_delete.append(k)
for k in keys_to_delete:
del sys.modules[k]
36 changes: 36 additions & 0 deletions tests/fixtures/ls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from secator.runners import Command
from secator.decorators import task
from secator.output_types import Vulnerability


@task()
class ls(Command):
cmd = 'ls -al'
output_types = [Vulnerability]
output_map = {
Vulnerability: {}
}

@staticmethod
def item_loader(self, line):
fields = ['permissions', 'link_count', 'owner', 'group', 'size', 'month', 'day', 'hour', 'path']
result = [c for c in line.split(' ') if c]
if len(result) != len(fields):
return None
data = {}
for ix, value in enumerate(result):
data[fields[ix]] = value

# Output vulnerabilities
permissions = data['permissions']
path = data['path']
full_path = f'{self.input}/{path}'
if permissions[-2] == 'w': # found a vulnerability !
yield Vulnerability(
name='World-writeable path',
severity='high',
confidence='high',
provider='ls',
matched_at=full_path,
extra_data={k: v for k, v in data.items() if k != 'path'}
)
4 changes: 4 additions & 0 deletions tests/fixtures/ls.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: workflow
name: ls
tasks:
ls:
11 changes: 3 additions & 8 deletions tests/unit/test_offline.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import os
import sys
import unittest

from secator.utils_test import clear_modules


class TestOffline(unittest.TestCase):
def setUp(self):
try:
# This allows to drop the secator module loaded from other tests in order to reload the config with modified
# environment variables.
# See https://stackoverflow.com/questions/7460363/re-import-module-under-test-to-lose-context for context.
del sys.modules['secator']
except KeyError:
pass
clear_modules()
os.environ['SECATOR_OFFLINE_MODE'] = '1'

def test_offline_cve_lookup(self):
Expand Down
53 changes: 53 additions & 0 deletions tests/unit/test_template.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import unittest
from secator.config import CONFIG
from secator.output_types import Vulnerability
from secator.utils_test import FIXTURES_DIR, clear_modules
import os

import shutil


class TestTemplate(unittest.TestCase):
def setUp(self):
self.template_dir = CONFIG.dirs.templates
self.custom_task_path = self.template_dir / 'ls.py'
self.writeable_file = self.template_dir / 'test.txt'
self.custom_workflow_path = self.template_dir / 'ls.yml'
shutil.copy(f'{FIXTURES_DIR}/ls.py', self.custom_task_path)
shutil.copy(f'{FIXTURES_DIR}/ls.yml', self.custom_workflow_path)
self.writeable_file.touch()
os.chmod(self.writeable_file, 0o007)
self.expected_vuln = Vulnerability(
name='World-writeable path',
severity='high',
confidence='high',
provider='ls',
matched_at=f'{str(self.writeable_file)}',
_source='ls',
)
clear_modules()
self.maxDiff = None

def tearDown(self):
self.custom_task_path.unlink()
self.custom_workflow_path.unlink()
self.writeable_file.unlink()

def test_external_task(self):
from secator.tasks import ls
results = ls(str(self.template_dir)).run()
self.assertEqual(len(results), 1)
self.assertTrue(self.expected_vuln == Vulnerability.load(results[0].toDict()))

def test_external_workflow(self):
from secator.cli import ALL_WORKFLOWS
from secator.runners import Workflow
ls_workflow = None
for w in ALL_WORKFLOWS:
if w.name == 'ls':
ls_workflow = w
self.assertIsNotNone(ls_workflow)
results = Workflow(ls_workflow, targets=[str(self.template_dir)]).run()
self.assertEqual(len(results), 2)
self.assertTrue(self.expected_vuln == Vulnerability.load(results[1].toDict()))

Loading