Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Free Form Queries for Cortex XDR #113

Merged
merged 8 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Surveyor is a Python utility that queries Endpoint Detection and Response (EDR)
products and summarizes the results. Security and IT teams can use Surveyor to
baseline their environments and identify abnormal activity.

## Current Version: 2.4.1
## Current Version: 2.5.0

Version 2.0 introduced breaking changes to the command line interface and support for SentinelOne.
If you are looking for the prior version of Surveyor, see [past releases](https://github.com/redcanaryco/surveyor/releases).
Expand Down
50 changes: 23 additions & 27 deletions products/cortex_xdr.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ def build_query(self, filters: dict) -> Tuple[str, int]:
elif key == 'minutes':
relative_time_ms = value * 60 * 1000
elif key == 'hostname':
query_base += f' | filter lowercase(agent_hostname) contains "{value.lower()}"'
query_base += f' | filter agent_hostname contains "{value}"'
elif key == 'username':
# Need to look at both actor and action in case action is actually a filemod,netconn,regmod rather than proc
query_base += f' | filter lowercase(action_process_username) contains "{value.lower()}" or lowercase(actor_primary_username) contains "{value.lower()}"'
query_base += f' | filter action_process_username contains "{value}" or actor_primary_username contains "{value}"'
else:
self._echo(f'Query filter {key} is not supported by product {self.product}', logging.WARNING)

Expand All @@ -187,45 +187,44 @@ def process_search(self, tag: Tag, base_query: dict, query: str) -> None:
if tag not in self._queries:
self._queries[tag] = list()

full_query = Query(relative_time_ms, None, None, None, f'dataset=xdr_data {query}')
full_query = Query(relative_time_ms, None, None, None, query)
self._queries[tag].append(full_query)

def nested_process_search(self, tag: Tag, criteria: dict, base_query: dict) -> None:
self._base_query, relative_time_ms = self.build_query(base_query)

try:
for search_field, terms in criteria.items():
if tag not in self._queries:
self._queries[tag] = list()

if search_field == 'query':
operator = 'raw'
parameter = 'query'
if isinstance(terms, list):
if len(terms) > 1:
search_value = ' '.join(terms)
for term in terms:
self._queries[tag].append(Query(relative_time_ms, None, None, None, term))
else:
search_value = terms[0]
self._queries[tag].append(Query(relative_time_ms, None, None, None, terms[0]))
else:
search_value = terms
self._queries[tag].append(Query(relative_time_ms, None, None, None, terms))
else:
all_terms = ', '.join((f'"*{term}*"').replace("**", "*") for term in terms)

if search_field not in PARAMETER_MAPPING:
self._echo(f'Query filter {search_field} is not supported by product {self.product}',
logging.WARNING)
continue

parameter = PARAMETER_MAPPING[search_field]
search_value = all_terms

if len(terms) > 1:
# there isn't an operator for `in contains` so we have to use wildcards instead
all_terms = ', '.join((f'"*{term}*"').replace("**", "*") for term in terms)
search_value = f'({all_terms})'
operator = 'in'
else:
operator = 'contains'
search_value = f'"{terms[0]}"'

if tag not in self._queries:
self._queries[tag] = list()

self._queries[tag].append(Query(relative_time_ms, parameter, operator, search_value))
self._queries[tag].append(Query(relative_time_ms, parameter, operator, search_value))
except KeyboardInterrupt:
self._echo("Caught CTRL-C. Returning what we have...")

Expand Down Expand Up @@ -262,17 +261,10 @@ def _process_queries(self) -> None:
if query.full_query is not None:
query_string = query.full_query
else:
query_string = 'dataset=xdr_data'

if query.operator in ('contains', 'in'):
# Fix the query to be case-insensitive if using `contains`
query_string += f' | filter lowercase({query.parameter}) {query.operator} {str(query.search_value).lower()}'
elif query.operator == 'raw':
query_string += f' {query.search_value}'
else:
query_string += f' | filter {query.parameter} {query.operator} {query.search_value}'
query_string = f'dataset=xdr_data | filter {query.parameter} {query.operator} {str(query.search_value)}'

query_string += f' {self._base_query} | fields agent_hostname, action_process_image_path, action_process_username, action_process_image_command_line, actor_process_image_path, actor_primary_username, actor_process_command_line, event_id'
query_string += f' {self._base_query}' if self._base_query != '' else ''
query_string += f' | fields agent_hostname, action_process_image_path, action_process_username, action_process_image_command_line, actor_process_image_path, actor_primary_username, actor_process_command_line, event_id'

# Run that query!
params = self._get_default_body()
Expand All @@ -293,14 +285,18 @@ def _process_queries(self) -> None:
if 'reply' not in body:
raise ValueError(f'Cortex encountered an error and could not process query "{query_string}"')

self.log.debug(query_response.json())
self.log.debug(body)
query_response.raise_for_status()

query_id = body['reply']
self.log.info(f'Query ID is {query_id}')

events, count = self._get_xql_results(query_id)
self.log.debug(f'Got {count} events')
if count == 1000:
self.log.info(f'Maximum limit of results (1000) was reached')
else:
self.log.debug(f'Got {count} event(s)')

self._results[tag] = list()
for event in events:
hostname = event['agent_hostname'] if 'agent_hostname' in event else ''
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def find_scripts():
packages=find_packages(),
scripts=find_scripts(),
description='Extracts summarized process data from EDR platforms',
version='2.4.1',
version='2.5.0',
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
Expand Down
6 changes: 4 additions & 2 deletions surveyor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help", "-what-am-i-doing"])

# Application version
current_version = "2.4.1"
current_version = "2.5.0"


def _list_products(ctx, _, value) -> None:
Expand Down Expand Up @@ -322,7 +322,7 @@ def survey(ctx, product_str: str = 'cbr') -> None:
if not opt.no_file:
# determine output file name
if opt.output and opt.prefix:
log_echo("Output arg takes precendence so prefix arg will be ignored", log)
log.debug("Output arg takes precendence so prefix arg will be ignored")
if opt.output:
file_name = opt.output
elif opt.prefix:
Expand Down Expand Up @@ -429,6 +429,8 @@ def survey(ctx, product_str: str = 'cbr') -> None:
# if there's sigma rules to be processed
if len(sigma_rules) > 0:
translated_rules = sigma_translation(product_str, sigma_rules)
if len(translated_rules['queries']) != len(sigma_rules):
log.warning(f"Only {len(translated_rules['queries'])} out of {len(sigma_rules)} were able to be translated.")
for rule in tqdm(translated_rules['queries'], desc="Processing sigma rules", disable=opt.no_progress):
program = f"{rule['title']} - {rule['id']}"
source = 'Sigma Rule'
Expand Down
25 changes: 25 additions & 0 deletions tests/data/cortex_surveyor_testing.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"field_translation":{
"process_name":["cmd.exe"],
"ipaddr":["8.8.8.8"],
"cmdline":["grep"],
"digsig_publisher":["Microsoft Corporation"],
"modload":["asdf.dll"],
"filemod":["helloworld.txt"],
"regmod":["HKCU"],
"md5": ["asdfasdfasdf"],
"sha256":["qwerqwerqwer"],
"ipport":["80"],
"filewrite_md5":["zxcvzxcvzxcv"],
"filewrite_sha256":["poiupoiupoiu"]
},
"multiple_values":{
"process_name":["svchost.exe", "services.exe"]
},
"single_query":{
"query":["FieldA=ValueB"]
},
"multiple_query":{
"query":["FieldA=ValueB", "FieldC=ValueD"]
}
}
201 changes: 201 additions & 0 deletions tests/test_cortex_xdr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import pytest
import sys
import os
import logging
import json
from unittest.mock import patch
sys.path.append(os.getcwd())
from products.cortex_xdr import CortexXDR, Query
from common import Tag


def test_init_lower_limit_option(tmpdir, mocker):
mocker.patch.object(CortexXDR, '_authenticate')
cred_file_path = tmpdir.mkdir('test_dir').join('test_creds.ini')
cred_file_path.write("asdfasdfasdf")
cortex_product = CortexXDR(profile='default',creds_file = cred_file_path, limit = -1)
assert cortex_product._limit == 1000


def test_init_upper_limit_option(tmpdir, mocker):
mocker.patch.object(CortexXDR, '_authenticate')
cred_file_path = tmpdir.mkdir('test_dir').join('test_creds.ini')
cred_file_path.write("asdfasdfasdf")
cortex_product = CortexXDR(profile='default',creds_file = cred_file_path, limit = 1001)
assert cortex_product._limit == 1000


def test_init_limit_option(tmpdir, mocker):
mocker.patch.object(CortexXDR, '_authenticate')
cred_file_path = tmpdir.mkdir('test_dir').join('test_creds.ini')
cred_file_path.write("asdfasdfasdf")
cortex_product = CortexXDR(profile='default',creds_file = cred_file_path, limit = 11)
assert cortex_product._limit == 11

@pytest.fixture
def cortex_product():
with patch.object(CortexXDR, "__init__", lambda x, y: None):
return CortexXDR(None)

def test_build_query_with_supported_field(cortex_product : CortexXDR):
filters = {
'hostname': 'workstation1',
'username': 'admin'
}

result, timestamp = cortex_product.build_query(filters)

assert result == ' | filter agent_hostname contains "workstation1" | filter action_process_username contains "admin" or actor_primary_username contains "admin"'

def test_build_query_with_days(cortex_product : CortexXDR):
filters = {
'days': 7
}

result, timestamp = cortex_product.build_query(filters)

assert timestamp == 7 * 24 * 60 * 60 * 1000

def test_build_query_with_min(cortex_product : CortexXDR):
filters = {
'minutes': 5
}

result, timestamp = cortex_product.build_query(filters)

assert timestamp == 5 * 60 * 1000

def test_build_query_with_unsupported_field(cortex_product : CortexXDR):
filters = {
"useless key": "asdfasdasdf"
}

cortex_product.log = logging.getLogger('pytest_surveyor')

result, timestamp = cortex_product.build_query(filters)

assert result == ''

def test_process_search(cortex_product : CortexXDR):
cortex_product._queries = {}
cortex_product.log = logging.getLogger('pytest_surveyor')

cortex_product.process_search(Tag('test_query'), {}, 'FieldA=ValueB')

assert len(cortex_product._queries[Tag('test_query')]) == 1
assert cortex_product._queries[Tag('test_query')][0].parameter is None
assert cortex_product._queries[Tag('test_query')][0].operator is None
assert cortex_product._queries[Tag('test_query')][0].search_value is None
assert cortex_product._queries[Tag('test_query')][0].full_query == 'FieldA=ValueB'
assert cortex_product._queries[Tag('test_query')][0].relative_time_ms == 14 * 24 * 60 * 60 * 1000

def test_nested_process_search(cortex_product : CortexXDR):
cortex_product._queries = {}
cortex_product.log = logging.getLogger('pytest_surveyor')

with open(os.path.join(os.getcwd(), 'tests','data','cortex_surveyor_testing.json')) as f:
programs = json.load(f)

for program, criteria in programs.items():
cortex_product.nested_process_search(Tag(program), criteria, {})

assert len(cortex_product._queries) == 4

assert len(cortex_product._queries[Tag('field_translation')]) == 12
relative_ts = 14 * 24 * 60 * 60 * 1000
assert Query(relative_ts, 'action_process_image_name', 'contains', '"cmd.exe"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_remote_ip', 'contains', '"8.8.8.8"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_process_command_line', 'contains', '"grep"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_file_signature_vendor', 'contains', '"Microsoft Corporation"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_module_path', 'contains', '"asdf.dll"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_file_path', 'contains', '"helloworld.txt"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_registry_key_name', 'contains', '"HKCU"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_process_image_md5', 'contains', '"asdfasdfasdf"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_process_image_sha256', 'contains', '"qwerqwerqwer"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_remote_port', 'contains', '"80"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_file_md5', 'contains', '"zxcvzxcvzxcv"') in cortex_product._queries[Tag('field_translation')]
assert Query(relative_ts, 'action_file_sha256', 'contains', '"poiupoiupoiu"') in cortex_product._queries[Tag('field_translation')]

assert len(cortex_product._queries[Tag('multiple_values')]) == 1
assert Query(relative_ts, 'action_process_image_name', 'in', '("*svchost.exe*", "*services.exe*")') in cortex_product._queries[Tag('multiple_values')]

assert len(cortex_product._queries[Tag('single_query')]) == 1
assert Query(relative_ts, None, None, None, 'FieldA=ValueB') in cortex_product._queries[Tag('single_query')]

assert len(cortex_product._queries[Tag('multiple_query')]) == 2
assert Query(relative_ts, None, None, None, 'FieldA=ValueB') in cortex_product._queries[Tag('multiple_query')]
assert Query(relative_ts, None, None, None, 'FieldC=ValueD') in cortex_product._queries[Tag('multiple_query')]

def test_nested_process_search_unsupported_field(cortex_product : CortexXDR):
criteria = {'foo': 'bar'}
cortex_product._queries = {}
cortex_product.log = logging.getLogger('pytest_surveyor')

cortex_product.nested_process_search(Tag('unsupported_field'), criteria, {})

assert len(cortex_product._queries) == 1
assert cortex_product._queries[Tag('unsupported_field')] == []

def test_process_queries_full_query(cortex_product : CortexXDR, mocker):
cortex_product._queries = {}
cortex_product._results = {}

cortex_product._url = 'https://cortex.xdr.domain'
mocker.patch('products.cortex_xdr.CortexXDR._get_default_header', return_value = {})

criteria = {'query': ['FieldA=cmd.exe']}
cortex_product.nested_process_search(Tag('single_test'), criteria, {})

cortex_product.log = logging.getLogger('pytest_surveyor')

json_response = {'reply': []}
response_mock = mocker.Mock()
response_mock.json.return_value = json_response

cortex_product._session = mocker.Mock()
mocker.patch('products.cortex_xdr.CortexXDR._get_xql_results', return_value= [[], 0])
mocked_func = mocker.patch.object(cortex_product._session, 'post', return_value=response_mock)

cortex_product._process_queries()

params = {
'request_data':{
'query': 'FieldA=cmd.exe | fields agent_hostname, action_process_image_path, action_process_username, action_process_image_command_line, actor_process_image_path, actor_primary_username, actor_process_command_line, event_id',
'tenants': [],
'timeframe':{'relativeTime': 14*24*60*60*1000 }
}
}

mocked_func.assert_called_once_with('https://cortex.xdr.domain/public_api/v1/xql/start_xql_query/', headers={}, data=json.dumps(params))

def test_process_queries_query_parameter(cortex_product : CortexXDR, mocker):
cortex_product._queries = {}
cortex_product._results = {}

cortex_product._url = 'https://cortex.xdr.domain'
mocker.patch('products.cortex_xdr.CortexXDR._get_default_header', return_value = {})

criteria = {'process_name': ['cmd.exe']}
cortex_product.nested_process_search(Tag('single_test'), criteria, {})

cortex_product.log = logging.getLogger('pytest_surveyor')

json_response = {'reply': []}
response_mock = mocker.Mock()
response_mock.json.return_value = json_response

cortex_product._session = mocker.Mock()
mocker.patch('products.cortex_xdr.CortexXDR._get_xql_results', return_value= [[], 0])
mocked_func = mocker.patch.object(cortex_product._session, 'post', return_value=response_mock)

cortex_product._process_queries()

params = {
'request_data':{
'query': 'dataset=xdr_data | filter action_process_image_name contains "cmd.exe" | fields agent_hostname, action_process_image_path, action_process_username, action_process_image_command_line, actor_process_image_path, actor_primary_username, actor_process_command_line, event_id',
'tenants': [],
'timeframe':{'relativeTime': 14*24*60*60*1000 }
}
}

mocked_func.assert_called_once_with('https://cortex.xdr.domain/public_api/v1/xql/start_xql_query/', headers={}, data=json.dumps(params))