Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load schemas dynamically #1135

Merged
merged 39 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f6df03c
Load observables dynamically
udgover Sep 14, 2024
76e4a38
Ruff format
udgover Sep 14, 2024
12a0b97
Remove end of file observables imports
udgover Sep 16, 2024
9ed714b
Add dependencies to poetry / pyproject
udgover Sep 16, 2024
b15222e
Add load_observables function and handle guess type
udgover Sep 16, 2024
888ede4
Rename GenericObservable with Generic in tests
udgover Sep 16, 2024
2eb59c4
Remove validators and replace with static method is_valid
udgover Sep 16, 2024
1c5e3a9
Implement validators for previously handled observables
udgover Sep 16, 2024
df542ff
Ruff format
udgover Sep 16, 2024
bb0faeb
Add entity loader
udgover Sep 17, 2024
b03b7a2
Move entities under entities folder
udgover Sep 17, 2024
16e6d0d
Use enum.Enum for ObservableType definition
udgover Sep 17, 2024
ce32276
Add guess ObservableType if not already part of enum
udgover Sep 17, 2024
c6f3f8d
Remove unused declarations
udgover Sep 17, 2024
2a9fc2b
Remove Field(discriminator=type) which is only relevant when subclass…
udgover Sep 17, 2024
71ecd7e
Update tests to reflect entities changes
udgover Sep 17, 2024
9a09504
Ruff format
udgover Sep 17, 2024
93a501d
Add __init__.py in entities/observables private folder
udgover Sep 17, 2024
002c3b1
Add indicators and update type literal for all objects
udgover Sep 17, 2024
a2ea66c
Ruff format
udgover Sep 17, 2024
e209466
Update analytics to import relevant entities
udgover Sep 17, 2024
bf83486
Update analytics to correctly import schemas
udgover Sep 17, 2024
120d299
Update feeds to correctly import schemas
udgover Sep 17, 2024
59260a4
global naming change
tomchop Sep 18, 2024
d99a42e
Formatting
tomchop Sep 18, 2024
bc8c401
Ruff formatting
tomchop Sep 18, 2024
580acfd
Register entities, observables and indicators object accordingly to s…
udgover Sep 18, 2024
8afe499
Revert tests to main version
udgover Sep 18, 2024
0a6c3b1
Revert analytics to main version
udgover Sep 18, 2024
0c074dc
Replace ssdeep.SsdeepHash with ssdeep.Ssdeep
udgover Sep 18, 2024
913f8e3
Revert feeds to main version
udgover Sep 18, 2024
fff01fd
Replace ssdeep.SsdeepHash with ssdeep.Ssdeep
udgover Sep 18, 2024
1c221ba
Restore apiv2 to main version
udgover Sep 18, 2024
1169561
Revert dfiq to main version
udgover Sep 18, 2024
f4ccb2c
create objects from indicator. Replace indicator variable with regex
udgover Sep 18, 2024
decbf6f
Ruff format
udgover Sep 18, 2024
0dcd89b
Remove path_validator function and return regex matches in is_valid
udgover Sep 18, 2024
d920b00
Update comments to detail where types are populated
udgover Sep 19, 2024
d809ec2
Give more details about conversion from filename to entity name
udgover Sep 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions core/schemas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import importlib
import inspect
import logging
from pathlib import Path

import aenum
udgover marked this conversation as resolved.
Show resolved Hide resolved

from core.schemas import entity, indicator, observable

logger = logging.getLogger(__name__)


def load_entities():
udgover marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Registering entities")
modules = dict()
for entity_file in Path(__file__).parent.glob("entities/**/*.py"):
if entity_file.stem == "__init__":
continue
logger.info(f"Registering entity type {entity_file.stem}")
if entity_file.parent.stem == "entities":
module_name = f"core.schemas.entities.{entity_file.stem}"
elif entity_file.parent.stem == "private":
module_name = f"core.schemas.entities.private.{entity_file.stem}"
enum_value = entity_file.stem.replace("_", "-")
udgover marked this conversation as resolved.
Show resolved Hide resolved
if entity_file.stem not in entity.EntityType.__members__:
aenum.extend_enum(entity.EntityType, entity_file.stem, enum_value)
modules[module_name] = enum_value
entity.TYPE_MAPPING = {"entity": entity.Entity, "entities": entity.Entity}
for module_name, enum_value in modules.items():
module = importlib.import_module(module_name)
for _, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, entity.Entity):
entity.TYPE_MAPPING[enum_value] = obj
setattr(entity, obj.__name__, obj)
for key in entity.TYPE_MAPPING:
if key in ["entity", "entities"]:
continue
cls = entity.TYPE_MAPPING[key]
if not entity.EntityTypes:
entity.EntityTypes = cls
else:
entity.EntityTypes |= cls


def load_indicators():
logger.info("Registering indicators")
modules = dict()
for indicator_file in Path(__file__).parent.glob("indicators/**/*.py"):
if indicator_file.stem == "__init__":
continue
logger.info(f"Registering indicator type {indicator_file.stem}")
if indicator_file.parent.stem == "indicators":
module_name = f"core.schemas.indicators.{indicator_file.stem}"
elif indicator_file.parent.stem == "private":
module_name = f"core.schemas.indicators.private.{indicator_file.stem}"
enum_value = indicator_file.stem
if indicator_file.stem not in indicator.IndicatorType.__members__:
aenum.extend_enum(indicator.IndicatorType, indicator_file.stem, enum_value)
modules[module_name] = enum_value
indicator.TYPE_MAPPING = {
"indicator": indicator.Indicator,
"indicators": indicator.Indicator,
}
for module_name, enum_value in modules.items():
module = importlib.import_module(module_name)
for _, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, indicator.Indicator):
indicator.TYPE_MAPPING[enum_value] = obj
setattr(indicator, obj.__name__, obj)
for key in indicator.TYPE_MAPPING:
if key in ["indicator", "indicators"]:
continue
cls = indicator.TYPE_MAPPING[key]
if not indicator.IndicatorTypes:
indicator.IndicatorTypes = cls
else:
indicator.IndicatorTypes |= cls


def load_observables():
logger.info("Registering observables")
modules = dict()
for observable_file in Path(__file__).parent.glob("observables/**/*.py"):
if observable_file.stem == "__init__":
continue
logger.info(f"Registering observable type {observable_file.stem}")
if observable_file.parent.stem == "observables":
module_name = f"core.schemas.observables.{observable_file.stem}"
elif observable_file.parent.stem == "private":
module_name = f"core.schemas.observables.private.{observable_file.stem}"
if observable_file.stem not in observable.ObservableType.__members__:
aenum.extend_enum(
observable.ObservableType, observable_file.stem, observable_file.stem
)
modules[module_name] = observable_file.stem
if "guess" not in observable.ObservableType.__members__:
aenum.extend_enum(observable.ObservableType, "guess", "guess")
observable.TYPE_MAPPING = {
"observable": observable.Observable,
"observables": observable.Observable,
}
for module_name, enum_value in modules.items():
module = importlib.import_module(module_name)
for _, obj in inspect.getmembers(module, inspect.isclass):
if issubclass(obj, observable.Observable):
observable.TYPE_MAPPING[enum_value] = obj
setattr(observable, obj.__name__, obj)
for key in observable.TYPE_MAPPING:
if key in ["observable", "observables"]:
continue
cls = observable.TYPE_MAPPING[key]
if not observable.ObservableTypes:
observable.ObservableTypes = cls
else:
observable.ObservableTypes |= cls


load_observables()
load_entities()
load_indicators()
Empty file.
10 changes: 10 additions & 0 deletions core/schemas/entities/attack_pattern.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import ClassVar, Literal

from core.schemas import entity


class AttackPattern(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.attack_pattern
type: Literal[entity.EntityType.attack_pattern] = entity.EntityType.attack_pattern
aliases: list[str] = []
kill_chain_phases: list[str] = []
16 changes: 16 additions & 0 deletions core/schemas/entities/campaign.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import datetime
from typing import ClassVar, Literal

from pydantic import Field

from core.helpers import now
from core.schemas import entity


class Campaign(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.campaign
type: Literal[entity.EntityType.campaign] = entity.EntityType.campaign

aliases: list[str] = []
first_seen: datetime.datetime = Field(default_factory=now)
last_seen: datetime.datetime = Field(default_factory=now)
8 changes: 8 additions & 0 deletions core/schemas/entities/company.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import ClassVar, Literal

from core.schemas import entity


class Company(entity.Entity):
type: Literal[entity.EntityType.company] = entity.EntityType.company
_type_filter: ClassVar[str] = entity.EntityType.company
10 changes: 10 additions & 0 deletions core/schemas/entities/course_of_action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import ClassVar, Literal

from core.schemas import entity


class CourseOfAction(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.course_of_action
type: Literal[entity.EntityType.course_of_action] = (
entity.EntityType.course_of_action
)
12 changes: 12 additions & 0 deletions core/schemas/entities/identity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import ClassVar, Literal

from core.schemas import entity


class Identity(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.identity
type: Literal[entity.EntityType.identity] = entity.EntityType.identity

identity_class: str = ""
sectors: list[str] = []
contact_information: str = ""
16 changes: 16 additions & 0 deletions core/schemas/entities/intrusion_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import datetime
from typing import ClassVar, Literal

from pydantic import Field

from core.helpers import now
from core.schemas import entity


class IntrusionSet(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.intrusion_set
type: Literal[entity.EntityType.intrusion_set] = entity.EntityType.intrusion_set

aliases: list[str] = []
first_seen: datetime.datetime = Field(default_factory=now)
last_seen: datetime.datetime = Field(default_factory=now)
10 changes: 10 additions & 0 deletions core/schemas/entities/investigation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import ClassVar, Literal

from core.schemas import entity


class Investigation(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.investigation
type: Literal[entity.EntityType.investigation] = entity.EntityType.investigation

reference: str = ""
12 changes: 12 additions & 0 deletions core/schemas/entities/malware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import ClassVar, Literal

from core.schemas import entity


class Malware(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.malware
type: Literal[entity.EntityType.malware] = entity.EntityType.malware

kill_chain_phases: list[str] = []
aliases: list[str] = []
family: str = ""
8 changes: 8 additions & 0 deletions core/schemas/entities/note.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import ClassVar, Literal

from core.schemas import entity


class Note(entity.Entity):
type: Literal[entity.EntityType.note] = entity.EntityType.note
_type_filter: ClassVar[str] = entity.EntityType.note
8 changes: 8 additions & 0 deletions core/schemas/entities/phone.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import ClassVar, Literal

from core.schemas import entity


class Phone(entity.Entity):
type: Literal[entity.EntityType.phone] = entity.EntityType.phone
_type_filter: ClassVar[str] = entity.EntityType.phone
4 changes: 4 additions & 0 deletions core/schemas/entities/private/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*
!.gitignore
!README.md
!__init__.py
7 changes: 7 additions & 0 deletions core/schemas/entities/private/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### Private indicators

This directory is where you should place your private indicators. It could be
named anything else, but this one has a `.gitignore` so you don't mess things
up. ;-)

Each entity defined with a filename containing `_` will be then represented in the API and the UI with `-`. For example, if you add a file `super_new_entity.py`, this entity will be defined as `super-new-entity`.
Empty file.
17 changes: 17 additions & 0 deletions core/schemas/entities/threat_actor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import datetime
from typing import ClassVar, Literal

from pydantic import Field

from core.helpers import now
from core.schemas import entity


class ThreatActor(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.threat_actor
type: Literal[entity.EntityType.threat_actor] = entity.EntityType.threat_actor

threat_actor_types: list[str] = []
aliases: list[str] = []
first_seen: datetime.datetime = Field(default_factory=now)
last_seen: datetime.datetime = Field(default_factory=now)
12 changes: 12 additions & 0 deletions core/schemas/entities/tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import ClassVar, Literal

from core.schemas import entity


class Tool(entity.Entity):
_type_filter: ClassVar[str] = entity.EntityType.tool
type: Literal[entity.EntityType.tool] = entity.EntityType.tool

aliases: list[str] = []
kill_chain_phases: list[str] = []
tool_version: str = ""
46 changes: 46 additions & 0 deletions core/schemas/entities/vulnerability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import re
from enum import Enum
from typing import ClassVar, Literal

from pydantic import Field

from core.schemas import entity

vulnerability_matcher = re.compile(
r"(?P<pre>\W?)(?P<search>CVE-\d{4}-\d{4,7})(?P<post>\W?)"
)


class SeverityType(str, Enum):
none = "none"
low = "low"
medium = "medium"
high = "high"
critical = "critical"


class Vulnerability(entity.Entity):
"""
This class represents a vulnerability in the schema.

Attributes:
title: title of the vulnerability.
base_score : base score of the vulnerability obtained from CVSS metric
ranging from 0.0 to 10.0.
severity: represents the severity of a vulnerability. One of none, low,
medium, high, critical.
"""

_type_filter: ClassVar[str] = entity.EntityType.vulnerability
type: Literal[entity.EntityType.vulnerability] = entity.EntityType.vulnerability

title: str = ""
base_score: float = Field(ge=0.0, le=10.0, default=0.0)
severity: SeverityType = "none"
reference: str = ""

@classmethod
def is_valid(cls, ent: entity.Entity) -> bool:
if vulnerability_matcher.match(ent.name):
return True
return False
Loading
Loading