Skip to content

Commit

Permalink
Add nerd observable analyzer
Browse files Browse the repository at this point in the history
Co-authored-by: Michalius <xmatejm00@vutbr.cz>
Co-authored-by: standa4 <stanislavb@seznam.cz>
  • Loading branch information
2 people authored and vaclavbartos committed Oct 14, 2024
1 parent fd2ac9f commit 5c2cc80
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {'python_module': {'health_check_schedule': None, 'update_schedule': None, 'module': 'nerd.NERD', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'NERD_analyzer', 'description': 'scan an IP address against NERD database.\r\nBefore using you must set your api_key and nerd_analysis.\r\nYou can get your api_key on nerd.cesnet.cz.\r\nSet nerd_analysis to:\r\n- basic - returns basic information\r\n- full - returns all information in DB\r\n- fmp - returns only FMP score\r\n- rep - returns only the reputation score', 'disabled': False, 'soft_time_limit': 60, 'routing_key': 'default', 'health_check_status': True, 'type': 'observable', 'docker_based': False, 'maximum_tlp': 'AMBER', 'observable_supported': ['ip'], 'supported_filetypes': [], 'run_hash': False, 'run_hash_type': '', 'not_supported_filetypes': [], 'model': 'analyzers_manager.AnalyzerConfig'}

params = [{'python_module': {'module': 'nerd.NERD', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'api_key_name', 'type': 'str', 'description': 'Set your api_key before running. You can get one on nerd.cesnet.cz', 'is_secret': True, 'required': True}, {'python_module': {'module': 'nerd.NERD', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'nerd_analysis', 'type': 'str', 'description': 'Set analysis type to basic, full, rep or fmp', 'is_secret': False, 'required': True}]

values = [{'parameter': {'python_module': {'module': 'nerd.NERD', 'base_path': 'api_app.analyzers_manager.observable_analyzers'}, 'name': 'nerd_analysis', 'type': 'str', 'description': 'Set analysis type to basic, full, rep or fmp', 'is_secret': False, 'required': True}, 'analyzer_config': 'NERD_analyzer', 'connector_config': None, 'visualizer_config': None, 'ingestor_config': None, 'pivot_config': None, 'for_organization': False, 'value': 'basic', 'updated_at': '2024-10-11T14:00:47.545904Z', 'owner': None}]


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value

def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True

def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)



def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()



class Migration(migrations.Migration):
atomic = False
dependencies = [
('api_app', '0062_alter_parameter_python_module'),
('analyzers_manager', '0122_alter_soft_time_limit'),
]

operations = [
migrations.RunPython(
migrate, reverse_migrate
)
]


68 changes: 68 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/nerd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.

import requests
from requests.exceptions import HTTPError

from api_app.analyzers_manager.classes import ObservableAnalyzer
from api_app.analyzers_manager.exceptions import (
AnalyzerConfigurationException,
AnalyzerRunException,
)
from tests.mock_utils import MockUpResponse, if_mock_connections, patch

class NERD(ObservableAnalyzer):
url: str = "https://nerd.cesnet.cz/nerd/api/v1"

_api_key_name: str
nerd_analysis: str

def run(self):
if self.nerd_analysis == "basic":
headers = {"Authorization": self._api_key_name, "Accept": "application/json"}
uri = f"/ip/{self.observable_name}"
elif self.nerd_analysis == "full":
headers = {"Authorization": self._api_key_name, "Accept": "application/json"}
uri = f"/ip/{self.observable_name}/full"
elif self.nerd_analysis == "rep":
headers = {"Authorization": self._api_key_name, "Accept": "application/json"}
uri = f"/ip/{self.observable_name}/rep"
elif self.nerd_analysis == "fmp":
headers = {"Authorization": self._api_key_name, "Accept": "application/json"}
uri = f"/ip/{self.observable_name}/fmp"
else:
raise AnalyzerConfigurationException(
f"analysis type: '{self.nerd_analysis}' not supported."
"Supported are: 'basic', 'full', 'rep', 'fmp'."
)

try:
response = requests.get(self.url + uri, headers=headers)
response.raise_for_status()
except requests.RequestException as e:
try:
result = response.json()
except ValueError:
raise AnalyzerRunException(e)
if isinstance(e, HTTPError) and e.response.status_code == 404 and "NOT FOUND" in str(e):
return {"status": "NO DATA"}
else:
raise AnalyzerRunException(e)

result = response.json()

return result


@classmethod
def _monkeypatch(cls):

patches = [
if_mock_connections(
patch(
"requests.get",
return_value=MockUpResponse({}, 200),
),
)
]
return super()._monkeypatch(patches=patches)

0 comments on commit 5c2cc80

Please sign in to comment.