Skip to content
This repository has been archived by the owner on Nov 2, 2024. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
crt_sh

Passive_DNS playbook and visualizer (intelowlproject#2374)

* created 'passive_dns' playbook and visualizer

* dnsdb

* validin

* changes

* refactor

* changes

* refactor + tests

* changes

* changes

Add create user docs (intelowlproject#2381)

* docs for test user creation

docs for test user creation

* typo :"(

---------

Co-authored-by: g4ze <bhaiyajionline@gmail.com>

fixed capesandbox short analysis time limit (intelowlproject#2364)

* fixed capesandbox short analysis time limit

* added url to soft time limit error

* fixed code doctor

* added update method

added info installation process

Orkl_search analyzer, closes intelowlproject#1274 (intelowlproject#2380)

* orkl search

* docs

* migrations

* free to use

* typo

---------

Co-authored-by: g4ze <bhaiyajionline@gmail.com>

Frontend - no more required analyzer in scan form (intelowlproject#2397)

* no more requried analyzer in scan form

* fix test

docs, migrations and corrections

ci

Co-authored-by: g4ze <bhaiyajionline@gmail.com>
Co-authored-by: Matteo Lodi <30625432+mlodic@users.noreply.github.com>
  • Loading branch information
3 people authored and Michalsus committed Oct 11, 2024
1 parent 1f82f7d commit 3ecd85f
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 0 deletions.
117 changes: 117 additions & 0 deletions api_app/analyzers_manager/migrations/0098_analyzer_config_crt_sh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "crt_sh.Crt_sh",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "Crt_sh",
"description": "[Crt_sh](https://crt.sh/) lets you get certificates info about a domain.",
"disabled": False,
"soft_time_limit": 500,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["domain"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = []

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0097_analyzer_config_orklsearch"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
51 changes: 51 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/crt_sh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import logging

import requests

from api_app.analyzers_manager import classes
from tests.mock_utils import MockUpResponse, if_mock_connections, patch

logger = logging.getLogger(__name__)


class Crt_sh(classes.ObservableAnalyzer):
"""
Wrapper of crt.sh
"""

url = "https://crt.sh"

def update(self):
pass

def run(self):
headers = {"accept": "application/json"}
response = requests.get(
f"{self.url}/?q={self.observable_name}", headers=headers
)
response.raise_for_status()
response = response.json()
return response

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch(
"requests.get",
return_value=MockUpResponse(
{
"issuer_ca_id": 16418,
"issuer_name": """C=US, O=Let's Encrypt,
CN=Let's Encrypt Authority X3""",
"name_value": "hatch.uber.com",
"min_cert_id": 325717795,
"min_entry_timestamp": "2018-02-08T16:47:39.089",
"not_before": "2018-02-08T15:47:39",
},
200,
),
),
)
]
return super()._monkeypatch(patches=patches)
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.


from django.db import migrations


def migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")

pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.add(AnalyzerConfig.objects.get(name="OrklSearch").id)
pc.full_clean()
pc.save()


def reverse_migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")

pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="OrklSearch").id)
pc.full_clean()
pc.save()


class Migration(migrations.Migration):
dependencies = [
(
"playbooks_manager",
"0045_playbook_config_passive_dns",
),
("analyzers_manager", "0097_analyzer_config_orklsearch"),
]

operations = [
migrations.RunPython(migrate, reverse_migrate),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.


from django.db import migrations


def migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.add(AnalyzerConfig.objects.get(name="Crt_sh").id)
pc.full_clean()
pc.save()


def reverse_migrate(apps, schema_editor):
playbook_config = apps.get_model("playbooks_manager", "PlaybookConfig")
AnalyzerConfig = apps.get_model("analyzers_manager", "AnalyzerConfig")
pc = playbook_config.objects.get(name="FREE_TO_USE_ANALYZERS")
pc.analyzers.remove(AnalyzerConfig.objects.get(name="Crt_sh").id)
pc.full_clean()
pc.save()


class Migration(migrations.Migration):
dependencies = [
("playbooks_manager", "0046_add_orkl_to_free_to_use"),
("analyzers_manager", "0098_analyzer_config_crt_sh"),
]

operations = [
migrations.RunPython(migrate, reverse_migrate),
]
1 change: 1 addition & 0 deletions docs/source/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ The following is the list of the available analyzers you can run out-of-the-box.
* `AILTypoSquatting`:[AILTypoSquatting](https://github.com/typosquatter/ail-typo-squatting) is a Python library to generate list of potential typo squatting domains with domain name permutation engine to feed AIL and other systems.
* `MalprobSearch`:[Malprob](https://malprob.io/) is a leading malware detection and identification service, powered by cutting-edge AI technology.
* `OrklSearch`:[Orkl](https://orkl.eu/) is the Community Driven Cyber Threat Intelligence Library.
* `Crt_sh`:[Crt_Sh](https://crt.sh/) lets you get certificates info about a domain.

##### Generic analyzers (email, phone number, etc.; anything really)

Expand Down

0 comments on commit 3ecd85f

Please sign in to comment.