Skip to content

Commit

Permalink
MATI - Supporting multiple inputs for generic enrichment commands (#3…
Browse files Browse the repository at this point in the history
…0940) (#31334)

* Supporting multiple inputs for generic enrichment commands

* Return list of CommandResults

* Re-adding rawJSON

* Bumping docker version

* Relesase Notes

* Tests

* Tests

* Adding details to contexts

* Fixing tests

* Bumping docker

* Bumping docker

* Fixing spacing

* Fixing spacing

* Fixing fetch

---------

Co-authored-by: Christopher Hultin <chrishultin@google.com>
Co-authored-by: MLainer1 <93524335+MLainer1@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 6, 2023
1 parent cafde7f commit 5138c0c
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ def last_updated_filter(start_time: datetime):
)


def get_verdict(mscore: Optional[str]) -> int:
def get_verdict(mscore: Optional[int]) -> int:
"""
Convert mscore to dbot score
Args:
Expand Down Expand Up @@ -322,14 +322,23 @@ def get_dbot_score(indicator: dict, indicator_type: str = None) -> dict:
return {
"Indicator": indicator.get("value"),
"Type": indicator_type,
"Vendor": "Mandiant",
"Vendor": "Mandiant Advantage Threat Intelligence",
"Score": get_verdict(indicator.get("mscore", 0)),
"Reliability": demisto.params().get(
"feedReliability", DBotScoreReliability.A_PLUS
),
}


def get_dbot_score_obj(dbot_score: dict) -> Common.DBotScore:
return Common.DBotScore(
indicator=dbot_score["Indicator"],
indicator_type=dbot_score["Type"],
score=dbot_score["Score"],
reliability=dbot_score["Reliability"]
)


def get_indicator_relationships(
raw_indicator: dict,
indicator_field: str,
Expand Down Expand Up @@ -373,7 +382,7 @@ def get_indicator_relationships(
return relationships


def create_malware_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
def create_malware_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[None, dict]:
"""
Creates a malware indicator
Args:
Expand Down Expand Up @@ -461,7 +470,7 @@ def create_malware_indicator(client: MandiantClient, raw_indicator: dict) -> dic
"score": get_verdict(raw_indicator.get("mscore")),
}

return indicator_obj
return None, indicator_obj


def create_campaign_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
Expand Down Expand Up @@ -532,7 +541,7 @@ def create_campaign_indicator(client: MandiantClient, raw_indicator: dict) -> di
return indicator_obj


def create_actor_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
def create_actor_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[None, dict]:
"""
Create indicator
Args:
Expand Down Expand Up @@ -638,7 +647,8 @@ def create_actor_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
"fields": fields,
"relationships": relationships,
}
return indicator_obj

return None, indicator_obj


def parse_cvss(cve: dict) -> dict:
Expand All @@ -656,6 +666,7 @@ def parse_cvss(cve: dict) -> dict:
cvss = {
"cvss": "v3.1",
"cvssvector": cve_details.get("vector_string"),
"cvssscore": cve_details.get("base_score", 0),
"cvss3": [
{
"metric": camel_case_to_underscore(k).replace("_", " ").title(),
Expand All @@ -669,6 +680,7 @@ def parse_cvss(cve: dict) -> dict:
cvss = {
"cvss": "v2.0",
"cvssvector": cve_details.get("vector_string"),
"cvssscore": cve_details.get("base_score", 0),
"cvss2": [
{
"metric": camel_case_to_underscore(k).replace("_", " ").title(),
Expand All @@ -681,7 +693,7 @@ def parse_cvss(cve: dict) -> dict:
return cvss


def create_cve_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
def create_cve_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[Common.CVE, dict]:
"""
Create CVE indicator
Args:
Expand All @@ -697,10 +709,18 @@ def create_cve_indicator(client: MandiantClient, raw_indicator: dict) -> dict:

indicator_obj["fields"] = indicator_obj["fields"] | additional_fields

return indicator_obj
indicator = Common.CVE(
id=additional_fields["id"],
cvss=str(cvss_data["cvssscore"]),
published=indicator_obj["rawJSON"]["publish_date"],
modified=indicator_obj["rawJSON"]["last_modified_date"],
description=indicator_obj["rawJSON"]["title"]
)

return indicator, indicator_obj


def create_file_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
def create_file_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[Common.File, dict]:
"""
Args:
client: MandiantClient
Expand Down Expand Up @@ -738,10 +758,17 @@ def create_file_indicator(client: MandiantClient, raw_indicator: dict) -> dict:

indicator_obj["fields"] = indicator_obj["fields"] | additional_fields

return indicator_obj
indicator = Common.File(
dbot_score=get_dbot_score_obj(indicator_obj["fields"]["dbotscore"]),
md5=additional_fields.get("md5"),
sha1=additional_fields.get("sha1"),
sha256=additional_fields.get("sha256"),
)

return indicator, indicator_obj

def create_ip_indicator(client: MandiantClient, raw_indicator: dict) -> dict:

def create_ip_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[Common.IP, dict]:
"""
Args:
client: MandiantClient
Expand All @@ -754,11 +781,15 @@ def create_ip_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
}

indicator_obj["fields"] = indicator_obj["fields"] | additional_fields
indicator = Common.IP(
dbot_score=get_dbot_score_obj(indicator_obj["fields"]["dbotscore"]),
ip=additional_fields["ip"]
)

return indicator_obj
return indicator, indicator_obj


def create_fqdn_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
def create_fqdn_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[Common.Domain, dict]:
"""
Args:
client: MandiantClient
Expand All @@ -773,10 +804,16 @@ def create_fqdn_indicator(client: MandiantClient, raw_indicator: dict) -> dict:

indicator_obj["fields"] = indicator_obj["fields"] | additional_fields

return indicator_obj
indicator = Common.Domain(
domain=raw_indicator.get("value"),
dns=raw_indicator.get("value"),
dbot_score=get_dbot_score_obj(indicator_obj["fields"]["dbotscore"])
)

return indicator, indicator_obj


def create_url_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
def create_url_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[Common.URL, dict]:
"""
Args:
client: MandiantClient
Expand All @@ -791,10 +828,15 @@ def create_url_indicator(client: MandiantClient, raw_indicator: dict) -> dict:

indicator_obj["fields"] = indicator_obj["fields"] | additional_fields

return indicator_obj
indicator = Common.URL(
url=additional_fields["url"],
dbot_score=get_dbot_score_obj(indicator_obj["fields"]["dbotscore"])
)

return indicator, indicator_obj


def create_indicator(client: MandiantClient, raw_indicator: dict) -> dict:
def create_indicator(client: MandiantClient, raw_indicator: dict) -> tuple[Common.Indicator, dict]:
"""
Create indicator
Args:
Expand Down Expand Up @@ -848,7 +890,7 @@ def create_base_indicator(
"stixid": raw_indicator.get("id"),
"trafficlightprotocol": tlp_color,
"publications": generate_publications(raw_indicator.get("publications", [])),
"DBotScore": get_dbot_score(raw_indicator),
"dbotscore": get_dbot_score(raw_indicator),
"tags": client.tags
}

Expand All @@ -857,8 +899,9 @@ def create_base_indicator(
} # filter none and redacted values
indicator_obj = {
"value": raw_indicator.get("value"),
"rawJSON": raw_indicator,
"score": get_verdict(raw_indicator.get("mscore")),
# "DBotScore": get_dbot_score(raw_indicator),
"rawJSON": raw_indicator,
"type": indicator_type,
"fields": fields,
"relationships": campaign_relationships,
Expand Down Expand Up @@ -1129,6 +1172,8 @@ def fetch_indicators(client: MandiantClient, args: dict = None) -> tuple[List, d
result = []
last_run_dict = demisto.getLastRun()

demisto.debug("fetching indicators")

for indicator_type in types:
indicators_list, new_last_updated = get_indicator_list(client, limit, first_fetch, indicator_type)

Expand All @@ -1140,9 +1185,9 @@ def fetch_indicators(client: MandiantClient, args: dict = None) -> tuple[List, d
)
for indicator in indicators_list
]

demisto.debug("getting indicators")
indicators = [
MAP_INDICATORS_FUNCTIONS[indicator_type](client, indicator)
MAP_INDICATORS_FUNCTIONS[indicator_type](client, indicator)[1]
for indicator in indicators_list
]
if enrichment and indicator_type != "Indicators":
Expand Down Expand Up @@ -1203,15 +1248,15 @@ def fetch_indicator_by_value(client: MandiantClient, args: dict = None):
]

for indicator in indicators:
indicator["value"] = indicators_value_to_clickable([indicator["value"]])
indicator[1]["value"] = indicators_value_to_clickable([indicator[1]["value"]])

if indicators:
table = {
'Value': indicators[0]["rawJSON"]['value'],
'MScore': indicators[0]["rawJSON"]['mscore'],
'Last Seen': indicators[0]["rawJSON"]["last_seen"]
'Value': indicators[0][1]["rawJSON"]['value'],
'MScore': indicators[0][1]["rawJSON"]['mscore'],
'Last Seen': indicators[0][1]["rawJSON"]["last_seen"]
}
indicator_type = indicators[0]["rawJSON"]["type"].lower()
indicator_type = indicators[0][1]["rawJSON"]["type"].lower()

markdown = tableToMarkdown(f'Mandiant Advantage Threat Intelligence information for {table["Value"]}\n'
f'[View on Mandiant Advantage](https://advantage.mandiant.com/indicator/'
Expand All @@ -1222,7 +1267,7 @@ def fetch_indicator_by_value(client: MandiantClient, args: dict = None):
readable_output=markdown,
content_format=formats["json"],
outputs_prefix=f"MANDIANTTI.{INDICATOR_TYPE_MAP[indicators_list[0]['type']].upper()}",
outputs=indicators,
outputs=[i[1] for i in indicators],
outputs_key_field="name",
ignore_auto_extract=True,
)
Expand All @@ -1237,7 +1282,7 @@ def fetch_threat_actor(client: MandiantClient, args: dict = None):
indicator_obj: dict = client.get_indicator_info(
identifier=actor_name, indicator_type="Actors"
)
indicator = [create_actor_indicator(client, indicator_obj)]
indicator = [create_actor_indicator(client, indicator_obj)[1]]

if client.enrichment:
enrich_indicators(client, indicator, "Actors")
Expand All @@ -1262,7 +1307,7 @@ def fetch_malware_family(client: MandiantClient, args: dict = None):
indicator = client.get_indicator_info(
identifier=malware_name, indicator_type="Malware"
)
indicator_list = [create_malware_indicator(client, indicator)]
indicator_list = [create_malware_indicator(client, indicator)[1]]
if client.enrichment:
enrich_indicators(client, indicator_list, "Malware")

Expand Down Expand Up @@ -1305,42 +1350,47 @@ def fetch_campaign(client: MandiantClient, args: dict = None):
def fetch_reputation(client: MandiantClient, args: dict = None):
args = args if args else {}
input_type: str = demisto.command()
indicator_value: str = str(args.get(input_type))
indicator_values: list[str] = argToList(str(args.get(input_type)))

if input_type == "cve":
indicators_list = [client.get_indicator_info(indicator_value, "Vulnerability")]
indicators_list = [client.get_indicator_info(i, "Vulnerability") for i in indicator_values]
else:
indicators_list = client.get_indicators_by_value(
indicator_value=indicator_value
)
indicators_list = []
for i in indicator_values:
indicators_list.extend(client.get_indicators_by_value(i))

indicators = [
MAP_INDICATORS_FUNCTIONS[input_type](client, indicator)
for indicator in indicators_list
]

demisto.createIndicators(indicators)
demisto.createIndicators([i[1] for i in indicators])

if indicators:
table = {
'Value': indicators[0]['value'],
'MScore': indicators[0]["rawJSON"].get("mscore", ''),
'Last Seen': indicators[0]["rawJSON"].get("last_seen", '')
}
indicator_type = indicators[0]["rawJSON"]["type"].lower()

markdown = tableToMarkdown(f'Mandiant Advantage Threat Intelligence information for {indicators[0]["value"]}\n'
f'[View on Mandiant Advantage](https://advantage.mandiant.com/indicator/'
f'{indicator_type}/{indicators[0]["value"]})',
table)

return CommandResults(
readable_output=markdown,
outputs=indicators,
outputs_prefix=f"MANDIANTTI.{input_type.upper()}",
ignore_auto_extract=True)
output = []
for indicator_obj, indicator in indicators:
demisto.debug(json.dumps(indicator))
table = {
'Value': indicator['value'],
'MScore': indicator["rawJSON"].get("mscore", ''),
'Last Seen': indicator["rawJSON"].get("last_seen", '')
}
indicator_type = indicator["rawJSON"]["type"].lower()

markdown = tableToMarkdown(f'Mandiant Advantage Threat Intelligence information for {indicator["value"]}\n'
f'[View on Mandiant Advantage](https://advantage.mandiant.com/indicator/'
f'{indicator_type}/{indicator["value"]})',
table)

output.append(CommandResults(
readable_output=markdown,
outputs_prefix=f"MANDIANTTI.{input_type.upper()}",
outputs=indicator,
indicator=indicator_obj,
ignore_auto_extract=True))
return output
else:
return f"No indicators found matching value {indicator_value}"
return f"No indicators found matching value {indicator_values}"


""" COMMAND FUNCTIONS """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ script:
- Actors
required: true
description: Get Mandiant Indicators.
dockerimage: demisto/python3:3.10.13.80014
dockerimage: demisto/python3:3.10.13.82467
feed: true
runonce: false
script: '-'
Expand Down
Loading

0 comments on commit 5138c0c

Please sign in to comment.