Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
slincoln-aiq committed Nov 7, 2024
1 parent ded519f commit 50f9421
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 112 deletions.
2 changes: 2 additions & 0 deletions sigmaiq/backends/crowdstrike/crowdstrike.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
class SigmAIQCrowdstrikeSplunkBackend(AbstractGenericSigmAIQBackendClass, SplunkBackend):
"""SigmAIQ backend interface for the pySigma Splunk Backend library to translate a SigmaRule object
to a Splunk search query with the Crowdstrike FDR format"""

custom_formats = {}
associated_pipelines = ["crowdstrike_fdr"]
default_pipeline = "crowdstrike_fdr"
Expand All @@ -15,6 +16,7 @@ class SigmAIQCrowdstrikeSplunkBackend(AbstractGenericSigmAIQBackendClass, Splunk
class SigmAIQCrowdstrikeLogscaleBackend(AbstractGenericSigmAIQBackendClass, LogScaleBackend):
"""SigmAIQ backend interface for the pySigma Logscale Backend library to translate a SigmaRule object
to a Logscale search query with the Crowdstrike Falcon format"""

custom_formats = {}
associated_pipelines = ["crowdstrike_falcon"]
default_pipeline = "crowdstrike_falcon"
Expand Down
1 change: 1 addition & 0 deletions sigmaiq/backends/elasticsearch/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
class SigmAIQElasticsearchBackend(AbstractGenericSigmAIQBackendClass, LuceneBackend):
"""SigmAIQ backend interface for the pySigma Elasticsearch Backend library to translate a SigmaRule object
to an Elasticsearch search query"""

custom_formats = {}
associated_pipelines = [
"ecs_windows",
Expand Down
3 changes: 3 additions & 0 deletions sigmaiq/backends/kusto/kusto.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
class SigmAIQDefenderXDRBackend(AbstractGenericSigmAIQBackendClass, KustoBackend):
"""SigmAIQ backend interface for the pySigma Kusto Backend library to translate a SigmaRule object
to a Kusto search query with the Microsoft Defender XDR format"""

custom_formats = {}
associated_pipelines = ["microsoft_xdr"]
default_pipeline = "microsoft_xdr"
Expand All @@ -13,6 +14,7 @@ class SigmAIQDefenderXDRBackend(AbstractGenericSigmAIQBackendClass, KustoBackend
class SigmAIQSentinelASIMBackend(AbstractGenericSigmAIQBackendClass, KustoBackend):
"""SigmAIQ backend interface for the pySigma Kusto Backend library to translate a SigmaRule object
to a Kusto search query with the Microsoft Sentinel ASIM format"""

custom_formats = {}
associated_pipelines = ["sentinel_asim"]
default_pipeline = "sentinel_asim"
Expand All @@ -21,6 +23,7 @@ class SigmAIQSentinelASIMBackend(AbstractGenericSigmAIQBackendClass, KustoBacken
class SigmAIQAzureMonitorBackend(AbstractGenericSigmAIQBackendClass, KustoBackend):
"""SigmAIQ backend interface for the pySigma Kusto Backend library to translate a SigmaRule object
to a Kusto search query with the Microsoft Azure Monitor format"""

custom_formats = {}
associated_pipelines = ["azure_monitor"]
default_pipeline = "azure_monitor"
2 changes: 1 addition & 1 deletion sigmaiq/llm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(
rule_dir: str = None,
vector_store_dir: str = None,
embedding_model: OpenAIEmbeddings = None,
embedding_function: Type[Embeddings] = OpenAIEmbeddings, #TODO RS : Consolidate this with embedding_model
embedding_function: Type[Embeddings] = OpenAIEmbeddings, # TODO RS : Consolidate this with embedding_model
vector_store: Type[VectorStore] = FAISS,
rule_loader: Type[BaseLoader] = DirectoryLoader,
rule_splitter: Type[BaseDocumentTransformer] = CharacterTextSplitter,
Expand Down
42 changes: 30 additions & 12 deletions sigmaiq/llm/toolkits/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@
from langchain.agents.format_scratchpad import format_to_openai_function_messages
from langchain.agents.output_parsers import OpenAIFunctionsAgentOutputParser
from langchain.prompts import ChatPromptTemplate

# langchain typing
from langchain.schema import (AgentAction, AgentFinish, OutputParserException, AIMessage, BaseMessage)
from langchain.schema import AgentAction, AgentFinish, OutputParserException, AIMessage, BaseMessage
from langchain.schema.agent import AgentActionMessageLog
from langchain.schema.language_model import BaseLanguageModel
from langchain.schema.messages import (AIMessage, BaseMessage, )
from langchain.schema.messages import (
AIMessage,
BaseMessage,
)
from langchain.schema.vectorstore import VectorStore
from langchain_core.utils.function_calling import convert_to_openai_function
from langchain_openai import ChatOpenAI

from sigmaiq.llm.toolkits.prompts import SIGMA_AGENT_PROMPT

# sigmaiq
from sigmaiq.llm.toolkits.sigma_toolkit import SigmaToolkit

Expand All @@ -31,7 +36,7 @@ def create_sigma_agent(
) -> AgentExecutor:
if sigma_vectorstore is None:
raise ValueError("sigma_vectorstore must be provided")

if rule_creation_llm is None:
rule_creation_llm = ChatOpenAI(model="gpt-4o")

Expand All @@ -40,20 +45,32 @@ def create_sigma_agent(

# Assert if any of the tools does not have arun
for tool in tools:
assert hasattr(tool, 'arun'), f"Tool {tool.name} does not have an 'arun' method"
assert hasattr(tool, "arun"), f"Tool {tool.name} does not have an 'arun' method"

# Create OpenAI Function for each tool for the agent LLM, so we can create an OpenAI Function AgentExecutor
llm_with_tools = rule_creation_llm.bind(functions=[convert_to_openai_function(t) for t in tools])

# Create the agent
prompt = SIGMA_AGENT_PROMPT
agent = ({"input": lambda x: x["input"], "agent_scratchpad": lambda x: format_to_openai_function_messages(
x["intermediate_steps"]), } | prompt | llm_with_tools | CustomOpenAIFunctionsAgentOutputParser())
agent = (
{
"input": lambda x: x["input"],
"agent_scratchpad": lambda x: format_to_openai_function_messages(x["intermediate_steps"]),
}
| prompt
| llm_with_tools
| CustomOpenAIFunctionsAgentOutputParser()
)

# Create and return the AgentExecutor
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=verbose,
return_intermediate_steps=return_intermediate_steps, handle_parsing_errors=True,
**(agent_executor_kwargs or {}))
agent_executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=verbose,
return_intermediate_steps=return_intermediate_steps,
handle_parsing_errors=True,
**(agent_executor_kwargs or {}),
)

return agent_executor

Expand All @@ -70,7 +87,7 @@ def parse(self, message: Union[str, BaseMessage]) -> Union[AgentAction, AgentFin
raise ValueError("Expected an AIMessage object, got a string")
if not isinstance(message, AIMessage):
raise TypeError(f"Expected an AI message got {type(message)}")

return self._parse_ai_message(message)

@staticmethod
Expand All @@ -83,8 +100,9 @@ def _parse_ai_message(message: AIMessage) -> Union[AgentAction, AgentFinish]:
try:
_tool_input = json.loads(function_call["arguments"].strip(), strict=False) # HACK
except JSONDecodeError:
raise OutputParserException(f"Could not parse tool input: {function_call} because "
f"the `arguments` is not valid JSON.")
raise OutputParserException(
f"Could not parse tool input: {function_call} because " f"the `arguments` is not valid JSON."
)

# HACK HACK HACK:
# The code that encodes tool input into Open AI uses a special variable
Expand Down
2 changes: 1 addition & 1 deletion sigmaiq/llm/toolkits/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"3. create_sigma_rule_vectorstore: Creates new Sigma Rule from the users input, as well as rules in a sigma rule vectorstore to use as context based on the users question. If the user's question already contains a query, use 'query_to_sigma_rule' instead."
"4. query_to_sigma_rule: Converts/translates a product/SIEM/backend query or search from the query language into a YAML Sigma Rule."
"Do not use 'translate_sigma_rule' unless the user explicitly asks for a Sigma Rule to be converted or translated "
"into a query for a specific backend, pipeline, and/or output format."
"into a query for a specific backend, pipeline, and/or output format.",
),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
Expand Down
1 change: 1 addition & 0 deletions sigmaiq/llm/tools/find_sigma_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class FindSigmaRuleTool(BaseTool):

class Config:
"""Configuration for this pydantic object."""

extra = Extra.forbid

def _run(self, query: Union[str, dict]) -> str:
Expand Down
5 changes: 4 additions & 1 deletion sigmaiq/sigmaiq_backend_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ class SigmAIQBackend:
"""

def __init__(
self, backend: str, processing_pipeline: Optional[Union[str, list, ProcessingPipeline]] = None, output_format: Optional[str] = None
self,
backend: str,
processing_pipeline: Optional[Union[str, list, ProcessingPipeline]] = None,
output_format: Optional[str] = None,
):
"""Initialize instance attributes.
Expand Down
75 changes: 39 additions & 36 deletions sigmaiq/utils/sigmaiq/sigmaiq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,72 @@ def _is_v1_schema(rule_data: dict) -> bool:
"""Check if the rule uses v1 schema patterns."""
if not isinstance(rule_data, dict):
return False

# Check date format
date_str = rule_data.get('date')
if date_str and '/' in date_str:
date_str = rule_data.get("date")
if date_str and "/" in date_str:
return True

# Check modified format
modified_str = rule_data.get('modified')
if modified_str and '/' in modified_str:
modified_str = rule_data.get("modified")
if modified_str and "/" in modified_str:
return True

# Check tags format
tags = rule_data.get('tags', [])
tags = rule_data.get("tags", [])
for tag in tags:
if any(ns in tag for ns in ['attack-', 'attack_', 'cve-', 'detection-']):
if any(ns in tag for ns in ["attack-", "attack_", "cve-", "detection-"]):
return True

# Check related field
related = rule_data.get('related', [])
related = rule_data.get("related", [])
for rel in related:
if rel.get('type') == 'obsoletes':
if rel.get("type") == "obsoletes":
return True

return False


def _convert_to_v2_schema(rule_data: dict) -> dict:
"""Convert v1 schema rule to v2 schema."""
rule_data = rule_data.copy()

# Convert date and modified format
if 'date' in rule_data and '/' in rule_data['date']:
if "date" in rule_data and "/" in rule_data["date"]:
try:
date_obj = datetime.strptime(rule_data['date'], '%Y/%m/%d')
rule_data['date'] = date_obj.strftime('%Y-%m-%d')
date_obj = datetime.strptime(rule_data["date"], "%Y/%m/%d")
rule_data["date"] = date_obj.strftime("%Y-%m-%d")
except ValueError:
pass
if 'modified' in rule_data and '/' in rule_data['modified']:

if "modified" in rule_data and "/" in rule_data["modified"]:
try:
date_obj = datetime.strptime(rule_data['modified'], '%Y/%m/%d')
rule_data['modified'] = date_obj.strftime('%Y-%m-%d')
date_obj = datetime.strptime(rule_data["modified"], "%Y/%m/%d")
rule_data["modified"] = date_obj.strftime("%Y-%m-%d")
except ValueError:
pass

# Convert tags
if 'tags' in rule_data:
if "tags" in rule_data:
new_tags = []
for tag in rule_data['tags']:
for tag in rule_data["tags"]:
# Convert common namespace patterns
tag = tag.replace('attack-', 'attack.')
tag = tag.replace('attack_', 'attack.')
tag = tag.replace('cve-', 'cve.')
tag = tag.replace('detection-', 'detection.')
tag = tag.replace("attack-", "attack.")
tag = tag.replace("attack_", "attack.")
tag = tag.replace("cve-", "cve.")
tag = tag.replace("detection-", "detection.")
new_tags.append(tag)
rule_data['tags'] = new_tags
rule_data["tags"] = new_tags

# Convert related field
if 'related' in rule_data:
for rel in rule_data['related']:
if rel.get('type') == 'obsoletes':
rel['type'] = 'obsolete'
if "related" in rule_data:
for rel in rule_data["related"]:
if rel.get("type") == "obsoletes":
rel["type"] = "obsolete"

return rule_data


def create_sigma_rule_obj(sigma_rule: Union[SigmaRule, SigmaCollection, dict, str, list]):
"""Checks sigma_rule to ensure it's a SigmaRule or SigmaCollection object. It can also be a valid Sigma rule
representation in a dict or yaml str (or list of valid dicts/yaml strs) that can be used with SigmaRule class methods to
Expand Down Expand Up @@ -102,13 +104,14 @@ def create_sigma_rule_obj(sigma_rule: Union[SigmaRule, SigmaCollection, dict, st
if isinstance(sigma_rule, dict):
# Check and convert v1 schema if needed
if _is_v1_schema(sigma_rule):

sigma_rule = _convert_to_v2_schema(sigma_rule)
return SigmaRule.from_dict(sigma_rule)
if isinstance(sigma_rule, str):
# For YAML strings, we need to parse to dict first
try:
import yaml

rule_dict = yaml.safe_load(sigma_rule)
if _is_v1_schema(rule_dict):
rule_dict = _convert_to_v2_schema(rule_dict)
Expand Down
20 changes: 4 additions & 16 deletions tests/test_backend_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,23 +50,11 @@ def sigma_rule_dict():
"author": "AttackIQ",
"date": "2023-01-01",
"modified": "2023-01-02",
"tags": [
"attack.t1003",
"attack.t1003.001",
"attack.credential_access"
],
"logsource": {
"category": "process_creation",
"product": "windows"
},
"detection": {
"sel": {
"CommandLine": "valueA"
},
"condition": "sel"
},
"tags": ["attack.t1003", "attack.t1003.001", "attack.credential_access"],
"logsource": {"category": "process_creation", "product": "windows"},
"detection": {"sel": {"CommandLine": "valueA"}, "condition": "sel"},
"falsepositives": ["None"],
"level": "high"
"level": "high",
}


Expand Down
Loading

0 comments on commit 50f9421

Please sign in to comment.