Skip to content

Commit

Permalink
Merge pull request #325 from splunk/exception_on_extra_fields
Browse files Browse the repository at this point in the history
Exception on extra fields
  • Loading branch information
pyth0n1c authored Dec 10, 2024
2 parents e5c150d + 8b86914 commit 7646c24
Show file tree
Hide file tree
Showing 34 changed files with 174 additions and 160 deletions.
167 changes: 101 additions & 66 deletions contentctl/actions/new_content.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@


from dataclasses import dataclass
import questionary
from typing import Any
Expand All @@ -11,67 +9,108 @@
import pathlib
from contentctl.objects.abstract_security_content_objects.security_content_object_abstract import SecurityContentObject_Abstract
from contentctl.output.yml_writer import YmlWriter

from contentctl.objects.enums import AssetType
from contentctl.objects.constants import SES_OBSERVABLE_TYPE_MAPPING, SES_OBSERVABLE_ROLE_MAPPING
class NewContent:
UPDATE_PREFIX = "__UPDATE__"

DEFAULT_DRILLDOWN_DEF = [
{
"name": f'View the detection results for - "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" and "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"',
"search": f'%original_detection_search% | search "${UPDATE_PREFIX}FIRST_RISK_OBJECT = "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" second_observable_type_here = "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"',
"earliest_offset": '$info_min_time$',
"latest_offset": '$info_max_time$'
},
{
"name": f'View risk events for the last 7 days for - "${UPDATE_PREFIX}FIRST_RISK_OBJECT$" and "${UPDATE_PREFIX}SECOND_RISK_OBJECT$"',
"search": f'| from datamodel Risk.All_Risk | search normalized_risk_object IN ("${UPDATE_PREFIX}FIRST_RISK_OBJECT$", "${UPDATE_PREFIX}SECOND_RISK_OBJECT$") starthoursago=168 | stats count min(_time) as firstTime max(_time) as lastTime values(search_name) as "Search Name" values(risk_message) as "Risk Message" values(analyticstories) as "Analytic Stories" values(annotations._all) as "Annotations" values(annotations.mitre_attack.mitre_tactic) as "ATT&CK Tactics" by normalized_risk_object | `security_content_ctime(firstTime)` | `security_content_ctime(lastTime)`',
"earliest_offset": '$info_min_time$',
"latest_offset": '$info_max_time$'
}
]


def buildDetection(self)->dict[str,Any]:
def buildDetection(self) -> tuple[dict[str, Any], str]:
questions = NewContentQuestions.get_questions_detection()
answers: dict[str,str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...")
answers: dict[str, str] = questionary.prompt(
questions,
kbi_msg="User did not answer all of the prompt questions. Exiting...",
)
if not answers:
raise ValueError("User didn't answer one or more questions!")
answers.update(answers)
answers['name'] = answers['detection_name']
del answers['detection_name']
answers['id'] = str(uuid.uuid4())
answers['version'] = 1
answers['date'] = datetime.today().strftime('%Y-%m-%d')
answers['author'] = answers['detection_author']
del answers['detection_author']
answers['data_source'] = answers['data_source']
answers['type'] = answers['detection_type']
del answers['detection_type']
answers['status'] = "production" #start everything as production since that's what we INTEND the content to become
answers['description'] = 'UPDATE_DESCRIPTION'
file_name = answers['name'].replace(' ', '_').replace('-','_').replace('.','_').replace('/','_').lower()
answers['search'] = answers['detection_search'] + ' | `' + file_name + '_filter`'
del answers['detection_search']
answers['how_to_implement'] = 'UPDATE_HOW_TO_IMPLEMENT'
answers['known_false_positives'] = 'UPDATE_KNOWN_FALSE_POSITIVES'
answers['references'] = ['REFERENCE']
answers['tags'] = dict()
answers['tags']['analytic_story'] = ['UPDATE_STORY_NAME']
answers['tags']['asset_type'] = 'UPDATE asset_type'
answers['tags']['confidence'] = 'UPDATE value between 1-100'
answers['tags']['impact'] = 'UPDATE value between 1-100'
answers['tags']['message'] = 'UPDATE message'
answers['tags']['mitre_attack_id'] = [x.strip() for x in answers['mitre_attack_ids'].split(',')]
answers['tags']['observable'] = [{'name': 'UPDATE', 'type': 'UPDATE', 'role': ['UPDATE']}]
answers['tags']['product'] = ['Splunk Enterprise','Splunk Enterprise Security','Splunk Cloud']
answers['tags']['required_fields'] = ['UPDATE']
answers['tags']['risk_score'] = 'UPDATE (impact * confidence)/100'
answers['tags']['security_domain'] = answers['security_domain']
del answers["security_domain"]
answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']

#generate the tests section
answers['tests'] = [
{
'name': "True Positive Test",
'attack_data': [
{
'data': "https://github.com/splunk/contentctl/wiki",
"sourcetype": "UPDATE SOURCETYPE",
"source": "UPDATE SOURCE"
}
]
}
]
del answers["mitre_attack_ids"]
return answers

def buildStory(self)->dict[str,Any]:
data_source_field = (
answers["data_source"] if len(answers["data_source"]) > 0 else [f"{NewContent.UPDATE_PREFIX} zero or more data_sources"]
)
file_name = (
answers["detection_name"]
.replace(" ", "_")
.replace("-", "_")
.replace(".", "_")
.replace("/", "_")
.lower()
)

#Minimum lenght for a mitre tactic is 5 characters: T1000
if len(answers["mitre_attack_ids"]) >= 5:
mitre_attack_ids = [x.strip() for x in answers["mitre_attack_ids"].split(",")]
else:
#string was too short, so just put a placeholder
mitre_attack_ids = [f"{NewContent.UPDATE_PREFIX} zero or more mitre_attack_ids"]

output_file_answers: dict[str, Any] = {
"name": answers["detection_name"],
"id": str(uuid.uuid4()),
"version": 1,
"date": datetime.today().strftime("%Y-%m-%d"),
"author": answers["detection_author"],
"status": "production", # start everything as production since that's what we INTEND the content to become
"type": answers["detection_type"],
"description": f"{NewContent.UPDATE_PREFIX} by providing a description of your search",
"data_source": data_source_field,
"search": f"{answers['detection_search']} | `{file_name}_filter`",
"how_to_implement": f"{NewContent.UPDATE_PREFIX} how to implement your search",
"known_false_positives": f"{NewContent.UPDATE_PREFIX} known false positives for your search",
"references": [f"{NewContent.UPDATE_PREFIX} zero or more http references to provide more information about your search"],
"drilldown_searches": NewContent.DEFAULT_DRILLDOWN_DEF,
"tags": {
"analytic_story": [f"{NewContent.UPDATE_PREFIX} by providing zero or more analytic stories"],
"asset_type": f"{NewContent.UPDATE_PREFIX} by providing and asset type from {list(AssetType._value2member_map_)}",
"confidence": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100",
"impact": f"{NewContent.UPDATE_PREFIX} by providing a value between 1-100",
"message": f"{NewContent.UPDATE_PREFIX} by providing a risk message. Fields in your search results can be referenced using $fieldName$",
"mitre_attack_id": mitre_attack_ids,
"observable": [
{"name": f"{NewContent.UPDATE_PREFIX} the field name of the observable. This is a field that exists in your search results.", "type": f"{NewContent.UPDATE_PREFIX} the type of your observable from the list {list(SES_OBSERVABLE_TYPE_MAPPING.keys())}.", "role": [f"{NewContent.UPDATE_PREFIX} the role from the list {list(SES_OBSERVABLE_ROLE_MAPPING.keys())}"]}
],
"product": [
"Splunk Enterprise",
"Splunk Enterprise Security",
"Splunk Cloud",
],
"security_domain": answers["security_domain"],
"cve": [f"{NewContent.UPDATE_PREFIX} with CVE(s) if applicable"],
},
"tests": [
{
"name": "True Positive Test",
"attack_data": [
{
"data": f"{NewContent.UPDATE_PREFIX} the data file to replay. Go to https://github.com/splunk/contentctl/wiki for information about the format of this field",
"sourcetype": f"{NewContent.UPDATE_PREFIX} the sourcetype of your data file.",
"source": f"{NewContent.UPDATE_PREFIX} the source of your datafile",
}
],
}
],
}

if answers["detection_type"] not in ["TTP", "Anomaly", "Correlation"]:
del output_file_answers["drilldown_searches"]

return output_file_answers, answers['detection_kind']

def buildStory(self) -> dict[str, Any]:
questions = NewContentQuestions.get_questions_story()
answers = questionary.prompt(
questions,
Expand All @@ -96,12 +135,11 @@ def buildStory(self)->dict[str,Any]:
del answers['usecase']
answers['tags']['cve'] = ['UPDATE WITH CVE(S) IF APPLICABLE']
return answers


def execute(self, input_dto: new) -> None:
if input_dto.type == NewContentType.detection:
content_dict = self.buildDetection()
subdirectory = pathlib.Path('detections') / content_dict.pop('detection_kind')
content_dict, detection_kind = self.buildDetection()
subdirectory = pathlib.Path('detections') / detection_kind
elif input_dto.type == NewContentType.story:
content_dict = self.buildStory()
subdirectory = pathlib.Path('stories')
Expand All @@ -111,23 +149,20 @@ def execute(self, input_dto: new) -> None:
full_output_path = input_dto.path / subdirectory / SecurityContentObject_Abstract.contentNameToFileName(content_dict.get('name'))
YmlWriter.writeYmlFile(str(full_output_path), content_dict)



def writeObjectNewContent(self, object: dict, subdirectory_name: str, type: NewContentType) -> None:
if type == NewContentType.detection:
file_path = os.path.join(self.output_path, 'detections', subdirectory_name, self.convertNameToFileName(object['name'], object['tags']['product']))
output_folder = pathlib.Path(self.output_path)/'detections'/subdirectory_name
#make sure the output folder exists for this detection
# make sure the output folder exists for this detection
output_folder.mkdir(exist_ok=True)

YmlWriter.writeDetection(file_path, object)
print("Successfully created detection " + file_path)

elif type == NewContentType.story:
file_path = os.path.join(self.output_path, 'stories', self.convertNameToFileName(object['name'], object['tags']['product']))
YmlWriter.writeStory(file_path, object)
print("Successfully created story " + file_path)

else:
raise(Exception(f"Object Must be Story or Detection, but is not: {object}"))

2 changes: 1 addition & 1 deletion contentctl/contentctl.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def main():

else:
#The file exists, so load it up!
config_obj = YmlReader().load_file(configFile)
config_obj = YmlReader().load_file(configFile,add_fields=False)
t = test.model_validate(config_obj)
except Exception as e:
print(f"Error validating 'contentctl.yml':\n{str(e)}")
Expand Down
14 changes: 0 additions & 14 deletions contentctl/helper/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,6 @@ def validate_git_pull_request(repo_path: str, pr_number: int) -> str:

return hash

# @staticmethod
# def check_required_fields(
# thisField: str, definedFields: dict, requiredFields: list[str]
# ):
# missing_fields = [
# field for field in requiredFields if field not in definedFields
# ]
# if len(missing_fields) > 0:
# raise (
# ValueError(
# f"Could not validate - please resolve other errors resulting in missing fields {missing_fields}"
# )
# )

@staticmethod
def verify_file_exists(
file_path: str, verbose_print=False, timeout_seconds: int = 10
Expand Down
2 changes: 1 addition & 1 deletion contentctl/input/new_content_questions.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def get_questions_detection(cls) -> list[dict[str,Any]]:
"type": "text",
"message": "enter search (spl)",
"name": "detection_search",
"default": "| UPDATE_SPL",
"default": "| __UPDATE__ SPL",
},
{
"type": "text",
Expand Down
17 changes: 11 additions & 6 deletions contentctl/input/yml_reader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from typing import Dict, Any

import yaml


import sys
import pathlib

class YmlReader():

@staticmethod
def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=False) -> Dict[str,Any]:
def load_file(file_path: pathlib.Path, add_fields:bool=True, STRICT_YML_CHECKING:bool=False) -> Dict[str,Any]:
try:
file_handler = open(file_path, 'r', encoding="utf-8")

Expand All @@ -27,8 +24,16 @@ def load_file(file_path: pathlib.Path, add_fields=True, STRICT_YML_CHECKING=Fals
print(f"Error loading YML file {file_path}: {str(e)}")
sys.exit(1)
try:
#yml_obj = list(yaml.safe_load_all(file_handler))[0]
yml_obj = yaml.load(file_handler, Loader=yaml.CSafeLoader)
#Ideally we should use
# from contentctl.actions.new_content import NewContent
# and use NewContent.UPDATE_PREFIX,
# but there is a circular dependency right now which makes that difficult.
# We have instead hardcoded UPDATE_PREFIX
UPDATE_PREFIX = "__UPDATE__"
data = file_handler.read()
if UPDATE_PREFIX in data:
raise Exception(f"The file {file_path} contains the value '{UPDATE_PREFIX}'. Please fill out any unpopulated fields as required.")
yml_obj = yaml.load(data, Loader=yaml.CSafeLoader)
except yaml.YAMLError as exc:
print(exc)
sys.exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@

# TODO (#266): disable the use_enum_values configuration
class SecurityContentObject_Abstract(BaseModel, abc.ABC):
model_config = ConfigDict(use_enum_values=True,validate_default=True)

model_config = ConfigDict(use_enum_values=True,validate_default=True,extra="forbid")
name: str = Field(...,max_length=99)
author: str = Field(...,max_length=255)
date: datetime.date = Field(...)
Expand Down
3 changes: 2 additions & 1 deletion contentctl/objects/alert_action.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from pydantic import BaseModel, model_serializer
from pydantic import BaseModel, model_serializer, ConfigDict
from typing import Optional

from contentctl.objects.deployment_email import DeploymentEmail
Expand All @@ -9,6 +9,7 @@
from contentctl.objects.deployment_phantom import DeploymentPhantom

class AlertAction(BaseModel):
model_config = ConfigDict(extra="forbid")
email: Optional[DeploymentEmail] = None
notable: Optional[DeploymentNotable] = None
rba: Optional[DeploymentRBA] = DeploymentRBA()
Expand Down
1 change: 1 addition & 0 deletions contentctl/objects/atomic.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class InputArgumentType(StrEnum):
Url = "Url"

class AtomicExecutor(BaseModel):
model_config = ConfigDict(extra="forbid")
name: str
elevation_required: Optional[bool] = False #Appears to be optional
command: Optional[str] = None
Expand Down
3 changes: 2 additions & 1 deletion contentctl/objects/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Union
from abc import ABC, abstractmethod

from pydantic import BaseModel
from pydantic import BaseModel,ConfigDict

from contentctl.objects.base_test_result import BaseTestResult

Expand All @@ -21,6 +21,7 @@ def __str__(self) -> str:

# TODO (#224): enforce distinct test names w/in detections
class BaseTest(BaseModel, ABC):
model_config = ConfigDict(extra="forbid")
"""
A test case for a detection
"""
Expand Down
9 changes: 6 additions & 3 deletions contentctl/objects/baseline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

from __future__ import annotations
from typing import Annotated, Optional, List,Any
from pydantic import field_validator, ValidationInfo, Field, model_serializer
from typing import Annotated, List,Any
from pydantic import field_validator, ValidationInfo, Field, model_serializer, computed_field
from contentctl.objects.deployment import Deployment
from contentctl.objects.security_content_object import SecurityContentObject
from contentctl.objects.enums import DataModel
Expand All @@ -15,7 +15,6 @@
class Baseline(SecurityContentObject):
name:str = Field(...,max_length=CONTENTCTL_MAX_SEARCH_NAME_LENGTH)
type: Annotated[str,Field(pattern="^Baseline$")] = Field(...)
datamodel: Optional[List[DataModel]] = None
search: str = Field(..., min_length=4)
how_to_implement: str = Field(..., min_length=4)
known_false_positives: str = Field(..., min_length=4)
Expand All @@ -34,6 +33,10 @@ def get_conf_stanza_name(self, app:CustomApp)->str:
def getDeployment(cls, v:Any, info:ValidationInfo)->Deployment:
return Deployment.getDeployment(v,info)

@computed_field
@property
def datamodel(self) -> List[DataModel]:
return [dm for dm in DataModel if dm.value in self.search]

@model_serializer
def serialize_model(self):
Expand Down
5 changes: 2 additions & 3 deletions contentctl/objects/baseline_tags.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations
from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer
from pydantic import BaseModel, Field, field_validator, ValidationInfo, model_serializer, ConfigDict
from typing import List, Any, Union

from contentctl.objects.story import Story
Expand All @@ -12,12 +12,12 @@


class BaselineTags(BaseModel):
model_config = ConfigDict(extra="forbid")
analytic_story: list[Story] = Field(...)
#deployment: Deployment = Field('SET_IN_GET_DEPLOYMENT_FUNCTION')
# TODO (#223): can we remove str from the possible types here?
detections: List[Union[Detection,str]] = Field(...)
product: List[SecurityContentProductName] = Field(...,min_length=1)
required_fields: List[str] = Field(...,min_length=1)
security_domain: SecurityDomain = Field(...)


Expand All @@ -33,7 +33,6 @@ def serialize_model(self):
"analytic_story": [story.name for story in self.analytic_story],
"detections": [detection.name for detection in self.detections if isinstance(detection,Detection)],
"product": self.product,
"required_fields":self.required_fields,
"security_domain":self.security_domain,
"deployments": None
}
Expand Down
Loading

0 comments on commit 7646c24

Please sign in to comment.