From 9a62d36cb1b2734180e415c2154b2ba96f382e5d Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Tue, 24 Sep 2024 10:25:36 +0200 Subject: [PATCH 01/10] Add Yeti Package to create several objects defined as json with relationships --- core/schemas/package.py | 218 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 core/schemas/package.py diff --git a/core/schemas/package.py b/core/schemas/package.py new file mode 100644 index 000000000..3bb9c7fc7 --- /dev/null +++ b/core/schemas/package.py @@ -0,0 +1,218 @@ +import json +from datetime import datetime, timezone +from typing import Any, ClassVar, Dict, List, Optional + +from pydantic import BaseModel, ConfigDict, model_validator +from typing_extensions import Self + +from core.schemas import entity, indicator, observable + + +class YetiPackageElement(BaseModel): + model_config = ConfigDict(extra='allow') + type: str + context: Optional[Dict[str, Any]] = {} + link_to: Optional[List[str]] = [] + link_type: Optional[str] = "observes" + + +class YetiPackage(BaseModel): + timestamp: str | int # add validator + source: str + tags: Optional[List[str]] = [] + observables: Optional[Dict[str, YetiPackageElement]] = {} + entities: Optional[Dict[str, YetiPackageElement]] = {} + indicators: Optional[Dict[str, YetiPackageElement]] = {} + + _exclude_from_model_dump: ClassVar[List[str]] = ["type", "context", "link_to", "link_type"] + _yeti_objects: ClassVar[Dict[str, observable.Observable | entity.Entity | indicator.Indicator]] = {} + _relationship_types = ClassVar[Dict[str, str]] + + def __init__(self, **data: Any): + super().__init__(**data) + self._timestamp_dt: datetime = self._convert_timestamp(self.timestamp) + + + @classmethod + def from_json(cls: Self, json_input: str) -> Self: + return cls(**json.loads(json_input)) + + # We only need to validate relationships. + @model_validator(mode="after") + def validate_elements(self) -> Self: + self._relationship_types = {} + # Should we thinkg about key collision between entities, observables and indicators? + element_keys = set(self.observables) | set(self.entities) | set(self.indicators) + for element_type in ["observables", "entities", "indicators"]: + for element_key, element in getattr(self, element_type).items(): + model = element.model_dump(exclude=self._exclude_from_model_dump) + if element_type == "entities": + model["name"] = element_key + cls = entity.TYPE_MAPPING[element.type] + elif element_type == "indicators": + model["name"] = element_key + cls = indicator.TYPE_MAPPING[element.type] + else: + model["value"] = element_key + cls = observable.TYPE_MAPPING[element.type] + cls(**model) + # validate relationships + for targeted_element in element.link_to: + if targeted_element not in element_keys: + error = f"Relationship with <{targeted_element}> defined for {element_type} {element_key} does not exist" + raise ValueError(error) + self._relationship_types[element_key] = element.link_type + return self + + def save(self) -> None: + if self.observables: + for observable_key, observable_element in self.observables.items(): + print("Saving observable ", observable_key) + self._save_observable(observable_key, observable_element) + if self.entities: + for entity_key, entity_element in self.entities.items(): + #print("Saving entity ", entity_key) + self._save_entity(entity_key, entity_element) + if self.indicators: + for indicator_key, indicator_element in self.indicators.items(): + self._save_indicator(indicator_key, indicator_element) + self._save_relationships() + + def _convert_timestamp(self, timestamp: str | int) -> datetime: + if isinstance(timestamp, int): + if timestamp > 10000000000: + return datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc) + else: + return datetime.fromtimestamp(timestamp, tz=timezone.utc) + elif isinstance(timestamp, str): + if "." in timestamp: + fmt = "%Y-%m-%dT%H:%M:%S.%f%z" + else: + fmt = "%Y-%m-%dT%H:%M:%S%z" + return datetime.strptime(timestamp, fmt) + else: + raise ValueError("Invalid timestamp format") + + def _save_entity(self, name: str, element: YetiPackageElement) -> None: + # Create or get honeypot + yeti_entity = entity.Entity.find(name=name, type=element.type) + if not yeti_entity: + model = element.model_dump(exclude=self._exclude_from_model_dump) + model["name"] = name + cls = entity.TYPE_MAPPING[element.type] + if "first_seen" not in model: + model["first_seen"] = self.timestamp + if "last_seen" not in model: + model["last_seen"] = self.timestamp + yeti_entity = cls(**model).save() + else: + yeti_entity.first_seen = ( + self._timestamp_dt + if yeti_entity.first_seen > self._timestamp_dt + else yeti_entity.first_seen + ) + yeti_entity.last_seen = ( + self._timestamp_dt + if yeti_entity.last_seen < self._timestamp_dt + else yeti_entity.last_seen + ) + yeti_entity = yeti_entity.save() + if self.tags: + yeti_entity.tag(self.tags) + yeti_entity = yeti_entity.save() + self._yeti_objects[name] = self._update_entity_context(yeti_entity) + + + def _save_indicator(self, name: str, element: YetiPackageElement) -> None: + yeti_indicator = indicator.Indicator.find(name=name, type=element.type) + if not yeti_indicator: + model = element.model_dump(exclude=self._exclude_from_model_dump) + model["name"] = name + cls = indicator.TYPE_MAPPING[element.type] + yeti_indicator = cls(**model).save() + if self.tags: + yeti_indicator.tag(self.tags) + self._yeti_objects[name] = yeti_indicator.save() + + + def _save_observable(self, value: str, element: YetiPackageElement) -> None: + yeti_observable = observable.Observable.find(value=value, type=element.type) + tags = self.tags + if not yeti_observable: + # support unknown observable type with generic and adds type as tag: type: + if element.type not in observable.TYPE_MAPPING: + cls = observable.Generic + tags.append(f"type:{element.type}") + else: + cls = observable.TYPE_MAPPING[element.type] + model = element.model_dump(exclude=self._exclude_from_model_dump) + model["value"] = value + yeti_observable = cls(**model).save() + if tags: + yeti_observable.tag(tags) + yeti_observable = yeti_observable.save() + self._yeti_objects[value] = self._update_observable_context(yeti_observable) + + def _save_relationships(self) -> None: + for element_type in ["observables", "entities", "indicators"]: + for element_key, element in getattr(self, element_type).items(): + if not element.link_to: + continue + for targeted_element in element.link_to: + source = self._yeti_objects[element_key] + target = self._yeti_objects[targeted_element] + link_type = self._relationship_types[targeted_element] + source.link_to(target, link_type, "") + + def _update_entity_context(self, yeti_entity: entity.Entity) -> entity.Entity: + found_idx = -1 + updated_context = { + "source": self.source, + "total_seen": 1, + } + for idx, context in enumerate(list(yeti_entity.context)): + if context["source"] == self.source: + found_idx = idx + break + if found_idx != -1: + # Handle previous context which were not structured as above + current_context = yeti_entity.context[found_idx] + updated_context["total_seen"] = current_context.get("total_seen", 0) + 1 + yeti_entity.context[found_idx] = updated_context + return yeti_entity.save() + else: + return yeti_entity.add_context(self.source, updated_context) + + def _update_observable_context(self, yeti_observable: observable.Observable) -> observable.Observable: + found_idx = -1 + updated_context = { + "source": self.source, + "total_seen": 1, + "first_seen": self._timestamp_dt, + "last_seen": self._timestamp_dt, + } + for idx, context in enumerate(list(yeti_observable.context)): + if context["source"] == self.source: + found_idx = idx + break + if found_idx != -1: + # Handle previous context which were not structured as above + current_context = yeti_observable.context[found_idx] + if not current_context.get("first_seen"): + first_seen = self.timestamp + else: + first_seen = self._convert_timestamp(current_context["first_seen"]) + # keep previous first_seen + if first_seen < self._timestamp_dt: + updated_context["first_seen"] = first_seen + if not current_context.get("last_seen"): + last_seen = self.timestamp + else: + last_seen = self._convert_timestamp(current_context["last_seen"]) + if last_seen > self._timestamp_dt: + updated_context["last_seen"] = last_seen + updated_context["total_seen"] = current_context.get("total_seen", 0) + 1 + yeti_observable.context[found_idx] = updated_context + return yeti_observable.save() + else: + return yeti_observable.add_context(self.source, updated_context) From 838d780039d2fdc7fe4026e2cf5b32d011cef911 Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Fri, 4 Oct 2024 09:13:05 +0200 Subject: [PATCH 02/10] Refactoring of Yeti Package to handle methods to add elements --- core/schemas/package.py | 260 ++++++++++++++++++++++++---------------- 1 file changed, 159 insertions(+), 101 deletions(-) diff --git a/core/schemas/package.py b/core/schemas/package.py index 3bb9c7fc7..739eb7faf 100644 --- a/core/schemas/package.py +++ b/core/schemas/package.py @@ -1,81 +1,157 @@ import json from datetime import datetime, timezone -from typing import Any, ClassVar, Dict, List, Optional +from typing import Any, ClassVar, Dict, List, Optional, Tuple -from pydantic import BaseModel, ConfigDict, model_validator +from pydantic import BaseModel, Field, model_validator from typing_extensions import Self from core.schemas import entity, indicator, observable +from core.schemas.observable import ObservableTypes -class YetiPackageElement(BaseModel): - model_config = ConfigDict(extra='allow') - type: str - context: Optional[Dict[str, Any]] = {} - link_to: Optional[List[str]] = [] - link_type: Optional[str] = "observes" +class YetiPackageRelationship(BaseModel): + target: str + link_type: str = "observes" class YetiPackage(BaseModel): timestamp: str | int # add validator - source: str + source: str = Field(min_length=3) tags: Optional[List[str]] = [] - observables: Optional[Dict[str, YetiPackageElement]] = {} - entities: Optional[Dict[str, YetiPackageElement]] = {} - indicators: Optional[Dict[str, YetiPackageElement]] = {} - - _exclude_from_model_dump: ClassVar[List[str]] = ["type", "context", "link_to", "link_type"] - _yeti_objects: ClassVar[Dict[str, observable.Observable | entity.Entity | indicator.Indicator]] = {} - _relationship_types = ClassVar[Dict[str, str]] + observables: Optional[List[observable.ObservableTypes]] = [] + entities: Optional[List[entity.EntityTypes]] = [] + indicators: Optional[List[indicator.Indicator]] = [] + relationships: Optional[Dict[str, List[YetiPackageRelationship]]] = {} + + _observables_generic_tags: ClassVar[Dict[str, str]] = {} + _objects: ClassVar[Dict[str, Any]] = {} def __init__(self, **data: Any): super().__init__(**data) + for observable_element in self.observables: + self._objects[observable_element.value] = observable_element + for entity_element in self.entities: + self._objects[entity_element.name] = entity_element + for indicator_element in self.indicators: + self._objects[indicator_element.name] = indicator_element self._timestamp_dt: datetime = self._convert_timestamp(self.timestamp) + # Use model validator to convert unknown observable types to generic and add type as tag + @model_validator(mode="before") + @classmethod + def handle_generic_observable_types(cls, data: Any) -> Any: + YetiPackage._observables_generic_tags = {} + YetiPackage._objects = {} + if ( + isinstance(data, dict) + and "observables" in data + and isinstance(data["observables"], list) + ): + for observable_element in data["observables"]: + if "type" in observable_element: + observable_type = observable_element["type"] + if observable_type in observable.TYPE_MAPPING: + continue + observable_element["type"] = "generic" + cls._observables_generic_tags[observable_element["value"]] = ( + f"type:{observable_type}" + ) + return data @classmethod - def from_json(cls: Self, json_input: str) -> Self: - return cls(**json.loads(json_input)) - - # We only need to validate relationships. - @model_validator(mode="after") - def validate_elements(self) -> Self: - self._relationship_types = {} - # Should we thinkg about key collision between entities, observables and indicators? - element_keys = set(self.observables) | set(self.entities) | set(self.indicators) - for element_type in ["observables", "entities", "indicators"]: - for element_key, element in getattr(self, element_type).items(): - model = element.model_dump(exclude=self._exclude_from_model_dump) - if element_type == "entities": - model["name"] = element_key - cls = entity.TYPE_MAPPING[element.type] - elif element_type == "indicators": - model["name"] = element_key - cls = indicator.TYPE_MAPPING[element.type] - else: - model["value"] = element_key - cls = observable.TYPE_MAPPING[element.type] - cls(**model) - # validate relationships - for targeted_element in element.link_to: - if targeted_element not in element_keys: - error = f"Relationship with <{targeted_element}> defined for {element_type} {element_key} does not exist" - raise ValueError(error) - self._relationship_types[element_key] = element.link_type + def from_json(cls: Self, json_package: str) -> Self: + package = json.loads(json_package) + instance = cls( + timestamp=package["timestamp"], + source=package["source"], + tags=package.get("tags", []), + ) + if "observables" in package: + for observable_element in package["observables"]: + instance.add_observable(**observable_element) + if "entities" in package: + for entity_element in package["entities"]: + instance.add_entity(**entity_element) + if "indicators" in package: + for indicator_element in package["indicators"]: + instance.add_indicator(**indicator_element) + if "relationships" in package: + for source, relationships in package["relationships"].items(): + for relationship in relationships: + instance.add_relationship(source, **relationship) + return instance + + def add_observable(self, value, type, **kwargs) -> Self: + if value in self._objects: + raise ValueError(f'"{value}" already exists') + if type in observable.TYPE_MAPPING: + cls = observable.TYPE_MAPPING[type] + else: + cls = observable.TYPE_MAPPING["generic"] + self._observables_generic_tags[value] = f"type:{type}" + kwargs["value"] = value + instance = cls(**kwargs, exclude="type") + self.observables.append(instance) + self._objects[value] = instance + return self + + def add_entity(self, name, type, **kwargs) -> Self: + if name in self._objects: + raise ValueError(f'Entity "{name}" already exists') + if type not in entity.TYPE_MAPPING: + raise ValueError(f"Invalid entity type {type}") + cls = entity.TYPE_MAPPING[type] + kwargs["name"] = name + instance = cls(**kwargs, exclude="type") + self.entities.append(instance) + self._objects[name] = instance + return self + + def add_indicator(self, name, type, **kwargs) -> Self: + if name in self._objects: + raise ValueError(f'Indicator "{name}" already exists') + if type not in indicator.TYPE_MAPPING: + raise ValueError(f"Invalid indicator type: {type}") + cls = indicator.TYPE_MAPPING[type] + kwargs["name"] = name + instance = cls(**kwargs, exclude="type") + self.indicators.append(instance) + self._objects[name] = instance + return self + + # relationships validation is done at save time + def add_relationship( + self, source: str, target: str, link_type: str = "observes" + ) -> Self: + if source not in self.relationships: + self.relationships[source] = [] + for relationship in self.relationships[source]: + if relationship.target == target: + raise ValueError( + f"Relationship between {source} and {target} already exists" + ) + relationship = YetiPackageRelationship(target=target, link_type=link_type) + self.relationships[source].append(relationship) return self def save(self) -> None: - if self.observables: - for observable_key, observable_element in self.observables.items(): - print("Saving observable ", observable_key) - self._save_observable(observable_key, observable_element) - if self.entities: - for entity_key, entity_element in self.entities.items(): - #print("Saving entity ", entity_key) - self._save_entity(entity_key, entity_element) - if self.indicators: - for indicator_key, indicator_element in self.indicators.items(): - self._save_indicator(indicator_key, indicator_element) + if not self.observables and not self.entities and not self.indicators: + raise ValueError("No elements to save") + # before saving, let's check that relationships are valid + for source, relationships in self.relationships.items(): + if source not in self._objects: + raise ValueError(f'Relationship source "{source}" does not exist') + for relationship in relationships: + if relationship.target not in self._objects: + raise ValueError( + f'Relationship target "{relationship.target}" does not exist' + ) + for observable_element in self.observables: + self._save_observable(observable_element) + for entity_element in self.entities: + self._save_entity(entity_element) + for indicator_element in self.indicators: + self._save_indicator(indicator_element) self._save_relationships() def _convert_timestamp(self, timestamp: str | int) -> datetime: @@ -93,19 +169,11 @@ def _convert_timestamp(self, timestamp: str | int) -> datetime: else: raise ValueError("Invalid timestamp format") - def _save_entity(self, name: str, element: YetiPackageElement) -> None: - # Create or get honeypot - yeti_entity = entity.Entity.find(name=name, type=element.type) + def _save_entity(self, element: entity.EntityTypes) -> None: + yeti_entity = entity.Entity.find(name=element.name, type=element.type) if not yeti_entity: - model = element.model_dump(exclude=self._exclude_from_model_dump) - model["name"] = name - cls = entity.TYPE_MAPPING[element.type] - if "first_seen" not in model: - model["first_seen"] = self.timestamp - if "last_seen" not in model: - model["last_seen"] = self.timestamp - yeti_entity = cls(**model).save() - else: + yeti_entity = element.save() + if hasattr(yeti_entity, "first_seen") and hasattr(yeti_entity, "last_seen"): yeti_entity.first_seen = ( self._timestamp_dt if yeti_entity.first_seen > self._timestamp_dt @@ -119,50 +187,38 @@ def _save_entity(self, name: str, element: YetiPackageElement) -> None: yeti_entity = yeti_entity.save() if self.tags: yeti_entity.tag(self.tags) - yeti_entity = yeti_entity.save() - self._yeti_objects[name] = self._update_entity_context(yeti_entity) + yeti_entity = self._update_entity_context(yeti_entity) + self._objects[element.name] = yeti_entity.save() - - def _save_indicator(self, name: str, element: YetiPackageElement) -> None: - yeti_indicator = indicator.Indicator.find(name=name, type=element.type) + def _save_indicator(self, element: indicator.IndicatorTypes) -> None: + yeti_indicator = indicator.Indicator.find(name=element.name, type=element.type) if not yeti_indicator: - model = element.model_dump(exclude=self._exclude_from_model_dump) - model["name"] = name - cls = indicator.TYPE_MAPPING[element.type] - yeti_indicator = cls(**model).save() + yeti_indicator = element.save() if self.tags: yeti_indicator.tag(self.tags) - self._yeti_objects[name] = yeti_indicator.save() - + self._objects[element.name] = yeti_indicator.save() - def _save_observable(self, value: str, element: YetiPackageElement) -> None: - yeti_observable = observable.Observable.find(value=value, type=element.type) - tags = self.tags + def _save_observable(self, element: observable.ObservableTypes) -> None: + yeti_observable = observable.Observable.find( + value=element.value, type=element.type + ) + tags = set(self.tags) if not yeti_observable: # support unknown observable type with generic and adds type as tag: type: - if element.type not in observable.TYPE_MAPPING: - cls = observable.Generic - tags.append(f"type:{element.type}") - else: - cls = observable.TYPE_MAPPING[element.type] - model = element.model_dump(exclude=self._exclude_from_model_dump) - model["value"] = value - yeti_observable = cls(**model).save() + yeti_observable = element.save() + if element.value in self._observables_generic_tags: + tags.add(self._observables_generic_tags[element.value]) if tags: yeti_observable.tag(tags) - yeti_observable = yeti_observable.save() - self._yeti_objects[value] = self._update_observable_context(yeti_observable) + yeti_observable = self._update_observable_context(yeti_observable) + self._objects[element.value] = yeti_observable.save() def _save_relationships(self) -> None: - for element_type in ["observables", "entities", "indicators"]: - for element_key, element in getattr(self, element_type).items(): - if not element.link_to: - continue - for targeted_element in element.link_to: - source = self._yeti_objects[element_key] - target = self._yeti_objects[targeted_element] - link_type = self._relationship_types[targeted_element] - source.link_to(target, link_type, "") + for source, relationships in self.relationships.items(): + source_object = self._objects[source] + for relationship in relationships: + target_object = self._objects[relationship.target] + source_object.link_to(target_object, relationship.link_type, "") def _update_entity_context(self, yeti_entity: entity.Entity) -> entity.Entity: found_idx = -1 @@ -183,7 +239,9 @@ def _update_entity_context(self, yeti_entity: entity.Entity) -> entity.Entity: else: return yeti_entity.add_context(self.source, updated_context) - def _update_observable_context(self, yeti_observable: observable.Observable) -> observable.Observable: + def _update_observable_context( + self, yeti_observable: observable.Observable + ) -> observable.Observable: found_idx = -1 updated_context = { "source": self.source, From 125b3ae2e3fecbc8fbb8aa90be1d04bb541c6b78 Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Fri, 4 Oct 2024 09:13:29 +0200 Subject: [PATCH 03/10] Add tests coverage for yeti_package --- tests/schemas/package.py | 315 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) create mode 100644 tests/schemas/package.py diff --git a/tests/schemas/package.py b/tests/schemas/package.py new file mode 100644 index 000000000..7ec00743f --- /dev/null +++ b/tests/schemas/package.py @@ -0,0 +1,315 @@ +import datetime +import unittest + +from pydantic import ValidationError + +from core import database_arango +from core.schemas import entity, indicator, observable, package + +# from core.schemas.indicator import Indicator +# from core.schemas.observable import Observable +from core.schemas.observables import hostname, ipv4 + + +class YetiPackageTest(unittest.TestCase): + def setUp(self) -> None: + database_arango.db.connect(database="yeti_test") + database_arango.db.clear() + + def tearDown(self) -> None: + database_arango.db.clear() + + def test_package_creation_from_methods(self) -> None: + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", source="SecretSource" + ) + yeti_package.add_observable("toto.com", "hostname") + yeti_package.add_observable("192.168.1.1", "ipv4") + yeti_package.add_entity("Fresh campaign", "campaign") + yeti_package.add_indicator( + "awesome_regexp", "regex", pattern=".*", diamond="adversary" + ) + yeti_package.add_relationship("Fresh campaign", "toto.com", "contacts") + yeti_package.add_relationship( + "Fresh campaign", "192.168.1.1", "communicates_with" + ) + yeti_package.save() + + obs1 = observable.Observable.find(value="toto.com", type="hostname") + obs2 = observable.Observable.find(value="192.168.1.1", type="ipv4") + campaign = entity.Entity.find(name="Fresh campaign", type="campaign") + regex = indicator.Indicator.find(name="awesome_regexp", type="regex") + + self.assertIsNotNone(obs1) + self.assertIsNotNone(obs2) + self.assertIsNotNone(campaign) + self.assertIsNotNone(regex) + + vertices, paths, count = campaign.neighbors() + + self.assertEqual(len(paths), 2) + self.assertEqual(count, 2) + self.assertEqual(len(vertices), 2) + + for path in paths: + if path[0].target == obs1.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "contacts") + elif path[0].target == obs2.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "communicates_with") + + self.assertIn(obs1.extended_id, vertices) + neighbor = vertices[obs1.extended_id] + self.assertEqual(neighbor.id, obs1.id) + self.assertIn(obs2.extended_id, vertices) + neighbor = vertices[obs2.extended_id] + self.assertEqual(neighbor.id, obs2.id) + + def test_package_creation_from_dict(self) -> None: + observables = [ + {"value": "toto.com", "type": "hostname"}, + {"value": "192.168.1.1", "type": "ipv4"}, + ] + entities = [{"name": "Fresh campaign", "type": "campaign"}] + indicators = [ + { + "name": "awesome_regexp", + "type": "regex", + "pattern": ".*", + "diamond": "adversary", + } + ] + relationships = { + "Fresh campaign": [ + {"target": "toto.com", "link_type": "contacts"}, + {"target": "192.168.1.1", "link_type": "communicates_with"}, + ] + } + + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", + source="SecretSource", + observables=observables, + entities=entities, + indicators=indicators, + relationships=relationships, + ) + yeti_package.save() + + obs1 = observable.Observable.find(value="toto.com", type="hostname") + obs2 = observable.Observable.find(value="192.168.1.1", type="ipv4") + campaign = entity.Entity.find(name="Fresh campaign", type="campaign") + regex = indicator.Indicator.find(name="awesome_regexp", type="regex") + + self.assertIsNotNone(obs1) + self.assertIsNotNone(obs2) + self.assertIsNotNone(campaign) + self.assertIsNotNone(regex) + + vertices, paths, count = campaign.neighbors() + + self.assertEqual(len(paths), 2) + self.assertEqual(count, 2) + self.assertEqual(len(vertices), 2) + + for path in paths: + if path[0].target == obs1.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "contacts") + elif path[0].target == obs2.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "communicates_with") + + self.assertIn(obs1.extended_id, vertices) + neighbor = vertices[obs1.extended_id] + self.assertEqual(neighbor.id, obs1.id) + self.assertIn(obs2.extended_id, vertices) + neighbor = vertices[obs2.extended_id] + self.assertEqual(neighbor.id, obs2.id) + + def test_package_creation_from_objects(self) -> None: + obs1 = hostname.Hostname(value="toto.com") + obs2 = ipv4.IPv4(value="192.168.1.1") + campaign = entity.Campaign(name="Fresh campaign") + regex = indicator.Regex( + name="awesome_regexp", pattern=".*", diamond="adversary" + ) + relationships = { + "Fresh campaign": [ + package.YetiPackageRelationship( + target=obs1.value, link_type="contacts" + ), + package.YetiPackageRelationship( + target=obs2.value, link_type="communicates_with" + ), + ] + } + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", + source="SecretSource", + observables=[obs1, obs2], + entities=[campaign], + indicators=[regex], + relationships=relationships, + ) + yeti_package.save() + obs1 = observable.Observable.find(value="toto.com", type="hostname") + obs2 = observable.Observable.find(value="192.168.1.1", type="ipv4") + campaign = entity.Entity.find(name="Fresh campaign", type="campaign") + regex = indicator.Indicator.find(name="awesome_regexp", type="regex") + + self.assertIsNotNone(obs1) + self.assertIsNotNone(obs2) + self.assertIsNotNone(campaign) + self.assertIsNotNone(regex) + + vertices, paths, count = campaign.neighbors() + + self.assertEqual(len(paths), 2) + self.assertEqual(count, 2) + self.assertEqual(len(vertices), 2) + + for path in paths: + if path[0].target == obs1.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "contacts") + elif path[0].target == obs2.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "communicates_with") + + self.assertIn(obs1.extended_id, vertices) + neighbor = vertices[obs1.extended_id] + self.assertEqual(neighbor.id, obs1.id) + self.assertIn(obs2.extended_id, vertices) + neighbor = vertices[obs2.extended_id] + self.assertEqual(neighbor.id, obs2.id) + + yeti_package.save() + + def test_package_creation_from_json_string(self) -> None: + json_string = """ + {"timestamp": "2024-04-10T10:00:00Z", + "source": "SuperSecretSource", + "tags": [], + "observables": [ + {"value": "192.168.1.1", "type": "ipv4"}, + {"value": "toto.com", "type": "hostname"} + ], + "entities": [{"type": "campaign", "name": "Fresh campaign"}], + "indicators": [{"type": "regex", "name": "awesome_regexp", "pattern": ".*", "diamond": "adversary"}], + "relationships": { + "Fresh campaign": [ + {"target": "192.168.1.1", "link_type": "communicates_with"}, + {"target": "toto.com", "link_type": "contacts"} + ] + } + } + """ + yeti_package = package.YetiPackage.from_json(json_string) + yeti_package.save() + + obs1 = observable.Observable.find(value="toto.com", type="hostname") + obs2 = observable.Observable.find(value="192.168.1.1", type="ipv4") + campaign = entity.Entity.find(name="Fresh campaign", type="campaign") + regex = indicator.Indicator.find(name="awesome_regexp", type="regex") + + self.assertIsNotNone(obs1) + self.assertIsNotNone(obs2) + self.assertIsNotNone(campaign) + self.assertIsNotNone(regex) + + vertices, paths, count = campaign.neighbors() + + self.assertEqual(len(paths), 2) + self.assertEqual(count, 2) + self.assertEqual(len(vertices), 2) + + for path in paths: + if path[0].target == obs1.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "contacts") + elif path[0].target == obs2.extended_id: + self.assertEqual(path[0].source, campaign.extended_id) + self.assertEqual(path[0].description, "") + self.assertEqual(path[0].type, "communicates_with") + + self.assertIn(obs1.extended_id, vertices) + neighbor = vertices[obs1.extended_id] + self.assertEqual(neighbor.id, obs1.id) + self.assertIn(obs2.extended_id, vertices) + neighbor = vertices[obs2.extended_id] + self.assertEqual(neighbor.id, obs2.id) + + def test_generic_observable_creation(self) -> None: + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", source="SecretSource" + ) + yeti_package.add_observable("new_observable_value", "new_observable_type") + yeti_package.save() + obs1 = observable.Observable.find(value="new_observable_value", type="generic") + tags = obs1.get_tags() + self.assertEqual("type:new_observable_type", tags[0][1].name) + + def test_package_creation_with_tags(self) -> None: + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", + source="SecretSource", + tags=["tag1", "tag2"], + ) + yeti_package.add_observable("toto.com", "hostname") + yeti_package.add_entity("Fresh campaign", "campaign") + yeti_package.save() + obs1 = observable.Observable.find(value="toto.com", type="hostname") + for tag in obs1.get_tags(): + self.assertIn(tag[1].name, ["tag1", "tag2"]) + + campaign = entity.Entity.find(name="Fresh campaign", type="campaign") + for tag in campaign.get_tags(): + self.assertIn(tag[1].name, ["tag1", "tag2"]) + + def test_empty_package_creation(self) -> None: + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", source="SecretSource" + ) + with self.assertRaises(ValueError): + yeti_package.save() + + def test_package_creation_with_duplicate(self) -> None: + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", source="SecretSource" + ) + yeti_package.add_observable("toto.com", "hostname") + with self.assertRaises(ValueError): + yeti_package.add_observable("toto.com", "hostname") + + def test_package_creation_with_missing_relationship_target(self) -> None: + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", source="SecretSource" + ) + yeti_package.add_observable("toto.com", "hostname") + yeti_package.add_relationship("toto.com", "192.168.1.1", "resolves") + with self.assertRaises(ValueError): + yeti_package.save() + + def test_package_creation_with_missing_relationship_source(self) -> None: + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", source="SecretSource" + ) + yeti_package.add_observable("toto.com", "hostname") + yeti_package.add_relationship("192.168.1.1", "toto.com", "resolves") + with self.assertRaises(ValueError): + yeti_package.save() + + def test_package_creation_with_small_source_string(self) -> None: + with self.assertRaises(ValidationError): + yeti_package = package.YetiPackage( + timestamp="2024-04-10T10:00:00Z", source="a" + ) From 67617534628c9c33f6cf0b5194ae69a108859b8a Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Fri, 4 Oct 2024 09:28:15 +0200 Subject: [PATCH 04/10] Fix Ruff check --- tests/schemas/package.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/schemas/package.py b/tests/schemas/package.py index 7ec00743f..97372ea6a 100644 --- a/tests/schemas/package.py +++ b/tests/schemas/package.py @@ -310,6 +310,4 @@ def test_package_creation_with_missing_relationship_source(self) -> None: def test_package_creation_with_small_source_string(self) -> None: with self.assertRaises(ValidationError): - yeti_package = package.YetiPackage( - timestamp="2024-04-10T10:00:00Z", source="a" - ) + package.YetiPackage(timestamp="2024-04-10T10:00:00Z", source="a") From 1d7a5aba5ad551f0d2ce726fb5d49a6773ec5f7e Mon Sep 17 00:00:00 2001 From: udgover Date: Fri, 4 Oct 2024 12:12:14 +0200 Subject: [PATCH 05/10] Update core/schemas/package.py Co-authored-by: Thomas Chopitea --- core/schemas/package.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/schemas/package.py b/core/schemas/package.py index 739eb7faf..5b8a08753 100644 --- a/core/schemas/package.py +++ b/core/schemas/package.py @@ -121,7 +121,7 @@ def add_indicator(self, name, type, **kwargs) -> Self: # relationships validation is done at save time def add_relationship( - self, source: str, target: str, link_type: str = "observes" + self, source: str, target: str, link_type: str = "related-to" ) -> Self: if source not in self.relationships: self.relationships[source] = [] From 50c97513c8f6c20a217bca6bc296bf67a7a51c7e Mon Sep 17 00:00:00 2001 From: udgover Date: Fri, 4 Oct 2024 12:12:26 +0200 Subject: [PATCH 06/10] Update tests/schemas/package.py Co-authored-by: Thomas Chopitea --- tests/schemas/package.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/schemas/package.py b/tests/schemas/package.py index 97372ea6a..73f59ee89 100644 --- a/tests/schemas/package.py +++ b/tests/schemas/package.py @@ -68,7 +68,7 @@ def test_package_creation_from_methods(self) -> None: neighbor = vertices[obs2.extended_id] self.assertEqual(neighbor.id, obs2.id) - def test_package_creation_from_dict(self) -> None: + def test_package_creation_from_dict_objects(self) -> None: observables = [ {"value": "toto.com", "type": "hostname"}, {"value": "192.168.1.1", "type": "ipv4"}, From 2a2b136845b1fe077338c52f0464063841f3dfa1 Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Fri, 4 Oct 2024 12:58:29 +0200 Subject: [PATCH 07/10] Implement tags as dict --- core/schemas/package.py | 49 ++++++++++++++++++----------- tests/schemas/package.py | 67 +++++++++++++++++++++++++++++++++------- 2 files changed, 87 insertions(+), 29 deletions(-) diff --git a/core/schemas/package.py b/core/schemas/package.py index 739eb7faf..941bb8e2b 100644 --- a/core/schemas/package.py +++ b/core/schemas/package.py @@ -17,17 +17,15 @@ class YetiPackageRelationship(BaseModel): class YetiPackage(BaseModel): timestamp: str | int # add validator source: str = Field(min_length=3) - tags: Optional[List[str]] = [] + tags: Optional[Dict[str, List[str]]] = {} observables: Optional[List[observable.ObservableTypes]] = [] entities: Optional[List[entity.EntityTypes]] = [] indicators: Optional[List[indicator.Indicator]] = [] relationships: Optional[Dict[str, List[YetiPackageRelationship]]] = {} - _observables_generic_tags: ClassVar[Dict[str, str]] = {} - _objects: ClassVar[Dict[str, Any]] = {} - def __init__(self, **data: Any): super().__init__(**data) + self._objects: Dict[str, Any] = {} for observable_element in self.observables: self._objects[observable_element.value] = observable_element for entity_element in self.entities: @@ -40,8 +38,6 @@ def __init__(self, **data: Any): @model_validator(mode="before") @classmethod def handle_generic_observable_types(cls, data: Any) -> Any: - YetiPackage._observables_generic_tags = {} - YetiPackage._objects = {} if ( isinstance(data, dict) and "observables" in data @@ -50,12 +46,13 @@ def handle_generic_observable_types(cls, data: Any) -> Any: for observable_element in data["observables"]: if "type" in observable_element: observable_type = observable_element["type"] + observable_value = observable_element["value"] if observable_type in observable.TYPE_MAPPING: continue observable_element["type"] = "generic" - cls._observables_generic_tags[observable_element["value"]] = ( - f"type:{observable_type}" - ) + if observable_value not in data["tags"]: + data["tags"][observable_value] = [] + data["tags"][observable_value].append(f"type:{observable_type}") return data @classmethod @@ -83,12 +80,16 @@ def from_json(cls: Self, json_package: str) -> Self: def add_observable(self, value, type, **kwargs) -> Self: if value in self._objects: + print(self._objects) raise ValueError(f'"{value}" already exists') if type in observable.TYPE_MAPPING: cls = observable.TYPE_MAPPING[type] else: cls = observable.TYPE_MAPPING["generic"] - self._observables_generic_tags[value] = f"type:{type}" + if value not in self.tags: + self.tags[value] = [] + self.tags[value].append(f"type:{type}") + kwargs["value"] = value instance = cls(**kwargs, exclude="type") self.observables.append(instance) @@ -185,8 +186,13 @@ def _save_entity(self, element: entity.EntityTypes) -> None: else yeti_entity.last_seen ) yeti_entity = yeti_entity.save() - if self.tags: - yeti_entity.tag(self.tags) + tags = list() + if yeti_entity.name in self.tags: + tags.extend(self.tags[yeti_entity.name]) + if "global" in self.tags: + tags.extend(self.tags["global"]) + if tags: + yeti_entity.tag(set(tags)) yeti_entity = self._update_entity_context(yeti_entity) self._objects[element.name] = yeti_entity.save() @@ -194,22 +200,29 @@ def _save_indicator(self, element: indicator.IndicatorTypes) -> None: yeti_indicator = indicator.Indicator.find(name=element.name, type=element.type) if not yeti_indicator: yeti_indicator = element.save() - if self.tags: - yeti_indicator.tag(self.tags) + tags = list() + if yeti_indicator.name in self.tags: + tags.extend(self.tags[yeti_indicator.name]) + if "global" in self.tags: + tags.extend(self.tags["global"]) + if tags: + yeti_indicator.tag(set(tags)) self._objects[element.name] = yeti_indicator.save() def _save_observable(self, element: observable.ObservableTypes) -> None: yeti_observable = observable.Observable.find( value=element.value, type=element.type ) - tags = set(self.tags) if not yeti_observable: # support unknown observable type with generic and adds type as tag: type: yeti_observable = element.save() - if element.value in self._observables_generic_tags: - tags.add(self._observables_generic_tags[element.value]) + tags = list() + if yeti_observable.value in self.tags: + tags.extend(self.tags[yeti_observable.value]) + if "global" in self.tags: + tags.extend(self.tags["global"]) if tags: - yeti_observable.tag(tags) + yeti_observable.tag(set(tags)) yeti_observable = self._update_observable_context(yeti_observable) self._objects[element.value] = yeti_observable.save() diff --git a/tests/schemas/package.py b/tests/schemas/package.py index 97372ea6a..4ce4968e1 100644 --- a/tests/schemas/package.py +++ b/tests/schemas/package.py @@ -5,9 +5,6 @@ from core import database_arango from core.schemas import entity, indicator, observable, package - -# from core.schemas.indicator import Indicator -# from core.schemas.observable import Observable from core.schemas.observables import hostname, ipv4 @@ -197,7 +194,7 @@ def test_package_creation_from_json_string(self) -> None: json_string = """ {"timestamp": "2024-04-10T10:00:00Z", "source": "SuperSecretSource", - "tags": [], + "tags": {}, "observables": [ {"value": "192.168.1.1", "type": "ipv4"}, {"value": "toto.com", "type": "hostname"} @@ -259,21 +256,69 @@ def test_generic_observable_creation(self) -> None: self.assertEqual("type:new_observable_type", tags[0][1].name) def test_package_creation_with_tags(self) -> None: + tags = { + "toto.com": ["tag1"], + "Fresh campaign": ["tag2"], + "global": ["tag3"], + } + yeti_package = package.YetiPackage( - timestamp="2024-04-10T10:00:00Z", - source="SecretSource", - tags=["tag1", "tag2"], + timestamp="2024-04-10T10:00:00Z", source="SecretSource", tags=tags ) yeti_package.add_observable("toto.com", "hostname") yeti_package.add_entity("Fresh campaign", "campaign") yeti_package.save() obs1 = observable.Observable.find(value="toto.com", type="hostname") - for tag in obs1.get_tags(): - self.assertIn(tag[1].name, ["tag1", "tag2"]) + tags = obs1.get_tags() + self.assertEqual(len(tags), 2) + for tag in tags: + self.assertIn(tag[1].name, ["tag1", "tag3"]) + + campaign = entity.Entity.find(name="Fresh campaign", type="campaign") + tags = campaign.get_tags() + self.assertEqual(len(tags), 2) + for tag in tags: + self.assertIn(tag[1].name, ["tag2", "tag3"]) + + def test_package_creation_from_json_with_tags(self) -> None: + json_string = """ + {"timestamp": "2024-04-10T10:00:00Z", + "source": "SuperSecretSource", + "tags": {"toto.com": ["tag1"], "Fresh campaign": ["tag2"], "global": ["tag3"]}, + "observables": [ + {"value": "192.168.1.1", "type": "ipv4"}, + {"value": "toto.com", "type": "new_type"} + ], + "entities": [{"type": "campaign", "name": "Fresh campaign"}], + "indicators": [{"type": "regex", "name": "awesome_regexp", "pattern": ".*", "diamond": "adversary"}], + "relationships": { + "Fresh campaign": [ + {"target": "192.168.1.1", "link_type": "communicates_with"}, + {"target": "toto.com", "link_type": "contacts"} + ] + } + } + """ + yeti_package = package.YetiPackage.from_json(json_string) + yeti_package.save() + + obs1 = observable.Observable.find(value="192.168.1.1", type="ipv4") + tags = obs1.get_tags() + self.assertEqual(len(tags), 1) + for tag in tags: + self.assertEqual(tag[1].name, "tag3") + + obs2 = observable.Observable.find(value="toto.com", type="generic") + tags = obs2.get_tags() + self.assertEqual(len(tags), 3) + for tag in tags: + self.assertIn(tag[1].name, ["tag1", "type:new_type", "tag3"]) campaign = entity.Entity.find(name="Fresh campaign", type="campaign") - for tag in campaign.get_tags(): - self.assertIn(tag[1].name, ["tag1", "tag2"]) + tags = campaign.get_tags() + self.assertEqual(len(tags), 2) + for tag in tags: + self.assertIn(tag[1].name, ["tag2", "tag3"]) def test_empty_package_creation(self) -> None: yeti_package = package.YetiPackage( From 5b682a6b1a7bde980f127a5fcd8cc6bb46a47d3f Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Fri, 4 Oct 2024 13:48:04 +0200 Subject: [PATCH 08/10] Add YetiPackage docstring --- core/schemas/package.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/schemas/package.py b/core/schemas/package.py index ea8f27740..3db220fc1 100644 --- a/core/schemas/package.py +++ b/core/schemas/package.py @@ -15,6 +15,18 @@ class YetiPackageRelationship(BaseModel): class YetiPackage(BaseModel): + """YetiPackage is a generic package that can contain observables, entities, indicators and relationships. + + timestamp: str | int: timestamp of the package + source: str: source of the data that will be added. This is used to build context + tags: Dict[str, List[str]]: tags to be added to the elements. Key is the element name, + value is a list of tags to associate with. If the key is "global", the tags will be added to all elements. + observables: List[ObservableTypes]: list of observables to be added + entities: List[EntityTypes]: list of entities to be added + indicators: List[Indicator]: list of indicators to be added + relationships: Dict[str, List[YetiPackageRelationship]]: relationships between elements. + """ + timestamp: str | int # add validator source: str = Field(min_length=3) tags: Optional[Dict[str, List[str]]] = {} From ccfd48efcef7333a5118800b4dcf12ccd0372594 Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Fri, 4 Oct 2024 15:11:25 +0200 Subject: [PATCH 09/10] Switch timestamp to datetime type --- core/schemas/package.py | 23 +++++++++++------------ tests/schemas/package.py | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/core/schemas/package.py b/core/schemas/package.py index 3db220fc1..8d412aee9 100644 --- a/core/schemas/package.py +++ b/core/schemas/package.py @@ -1,6 +1,6 @@ import json from datetime import datetime, timezone -from typing import Any, ClassVar, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field, model_validator from typing_extensions import Self @@ -17,7 +17,7 @@ class YetiPackageRelationship(BaseModel): class YetiPackage(BaseModel): """YetiPackage is a generic package that can contain observables, entities, indicators and relationships. - timestamp: str | int: timestamp of the package + timestamp: datetime: timestamp of the event. Can be any of https://docs.pydantic.dev/dev/api/standard_library_types/#datetime-types source: str: source of the data that will be added. This is used to build context tags: Dict[str, List[str]]: tags to be added to the elements. Key is the element name, value is a list of tags to associate with. If the key is "global", the tags will be added to all elements. @@ -27,7 +27,7 @@ class YetiPackage(BaseModel): relationships: Dict[str, List[YetiPackageRelationship]]: relationships between elements. """ - timestamp: str | int # add validator + timestamp: datetime = datetime.now() source: str = Field(min_length=3) tags: Optional[Dict[str, List[str]]] = {} observables: Optional[List[observable.ObservableTypes]] = [] @@ -44,7 +44,6 @@ def __init__(self, **data: Any): self._objects[entity_element.name] = entity_element for indicator_element in self.indicators: self._objects[indicator_element.name] = indicator_element - self._timestamp_dt: datetime = self._convert_timestamp(self.timestamp) # Use model validator to convert unknown observable types to generic and add type as tag @model_validator(mode="before") @@ -188,13 +187,13 @@ def _save_entity(self, element: entity.EntityTypes) -> None: yeti_entity = element.save() if hasattr(yeti_entity, "first_seen") and hasattr(yeti_entity, "last_seen"): yeti_entity.first_seen = ( - self._timestamp_dt - if yeti_entity.first_seen > self._timestamp_dt + self.timestamp + if yeti_entity.first_seen > self.timestamp else yeti_entity.first_seen ) yeti_entity.last_seen = ( - self._timestamp_dt - if yeti_entity.last_seen < self._timestamp_dt + self.timestamp + if yeti_entity.last_seen < self.timestamp else yeti_entity.last_seen ) yeti_entity = yeti_entity.save() @@ -271,8 +270,8 @@ def _update_observable_context( updated_context = { "source": self.source, "total_seen": 1, - "first_seen": self._timestamp_dt, - "last_seen": self._timestamp_dt, + "first_seen": self.timestamp, + "last_seen": self.timestamp, } for idx, context in enumerate(list(yeti_observable.context)): if context["source"] == self.source: @@ -286,13 +285,13 @@ def _update_observable_context( else: first_seen = self._convert_timestamp(current_context["first_seen"]) # keep previous first_seen - if first_seen < self._timestamp_dt: + if first_seen < self.timestamp: updated_context["first_seen"] = first_seen if not current_context.get("last_seen"): last_seen = self.timestamp else: last_seen = self._convert_timestamp(current_context["last_seen"]) - if last_seen > self._timestamp_dt: + if last_seen > self.timestamp: updated_context["last_seen"] = last_seen updated_context["total_seen"] = current_context.get("total_seen", 0) + 1 yeti_observable.context[found_idx] = updated_context diff --git a/tests/schemas/package.py b/tests/schemas/package.py index 8e0df5421..597b9a100 100644 --- a/tests/schemas/package.py +++ b/tests/schemas/package.py @@ -245,6 +245,25 @@ def test_package_creation_from_json_string(self) -> None: neighbor = vertices[obs2.extended_id] self.assertEqual(neighbor.id, obs2.id) + def test_package_creation_timestamps(self) -> None: + package.YetiPackage(timestamp="2024-04-10", source="SecretSource") + package.YetiPackage(timestamp="2024-04-10T00:00:00", source="SecretSource") + package.YetiPackage(timestamp="2024-04-10T10:00:00Z", source="SecretSource") + package.YetiPackage(timestamp="2024-04-10T10:00:00.400+00:00", source="Secret") + package.YetiPackage( + timestamp=datetime.datetime(2024, 4, 10, 10, 0, 0), source="SecretSource" + ) + package.YetiPackage(timestamp=1704067200, source="SecretSource") + package.YetiPackage(timestamp=1704067200.0, source="SecretSource") + + def test_package_creation_bad_timestamps(self) -> None: + with self.assertRaises(ValidationError): + package.YetiPackage(timestamp="2024-04-10T10", source="SecretSource") + with self.assertRaises(ValidationError): + package.YetiPackage(timestamp="10-04-2024", source="SecretSource") + with self.assertRaises(ValidationError): + package.YetiPackage(timestamp=-99999999999999999, source="SecretSource") + def test_generic_observable_creation(self) -> None: yeti_package = package.YetiPackage( timestamp="2024-04-10T10:00:00Z", source="SecretSource" From 9e847998ef3c192cb56a90dd3c0d9afd3ac566de Mon Sep 17 00:00:00 2001 From: Fred Baguelin Date: Fri, 4 Oct 2024 16:26:39 +0200 Subject: [PATCH 10/10] Add more details about unknown observable types handling --- core/schemas/package.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/schemas/package.py b/core/schemas/package.py index 8d412aee9..9d71e31a7 100644 --- a/core/schemas/package.py +++ b/core/schemas/package.py @@ -21,7 +21,9 @@ class YetiPackage(BaseModel): source: str: source of the data that will be added. This is used to build context tags: Dict[str, List[str]]: tags to be added to the elements. Key is the element name, value is a list of tags to associate with. If the key is "global", the tags will be added to all elements. - observables: List[ObservableTypes]: list of observables to be added + observables: List[ObservableTypes]: list of observables to be added. When adding an unknown observable type, + the type will be automatically reset to "generic" observable type and a tag will be added with the type following + this format: type:. entities: List[EntityTypes]: list of entities to be added indicators: List[Indicator]: list of indicators to be added relationships: Dict[str, List[YetiPackageRelationship]]: relationships between elements.