Skip to content

Commit

Permalink
Fixed pagination and meta files delete for aws athena (#1176)
Browse files Browse the repository at this point in the history
  • Loading branch information
Arthur Muradyan committed Dec 7, 2022
1 parent a9c637f commit 020d640
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 87 deletions.
24 changes: 12 additions & 12 deletions stix_shifter_modules/aws_athena/stix_translation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,13 @@ STOP t'2020-11-30T10:43:10.003Z'"
{
"queries": [
{
"vpcflow": "((lower(sourceaddress) = lower('172.31.76.105') OR lower(destinationaddress) = lower('172.31.76.105')) AND start BETWEEN 1588322590 AND 1606732990) LIMIT 10000"
"vpcflow": "((lower(sourceaddress) = lower('172.31.76.105') OR lower(destinationaddress) = lower('172.31.76.105')) AND start BETWEEN 1588322590 AND 1606732990)"
},
{
"guardduty": "((lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.1.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.publicip')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.networkconnectionaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.portprobeaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.awsapicallaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105')) AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z') LIMIT 10000"
"guardduty": "((lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.1.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.publicip')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.networkconnectionaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.portprobeaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.awsapicallaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105')) AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z')"
},
{
"ocsf": "(lower(src_endpoint.intermediate_ips) = lower('172.31.76.105') AND _time BETWEEN 1588322590000 AND 1606732990000) LIMIT 10000"
"ocsf": "(lower(src_endpoint.intermediate_ips) = lower('172.31.76.105') AND _time BETWEEN 1588322590000 AND 1606732990000)"
}
]
}
Expand All @@ -113,7 +113,7 @@ STOP t'2020-11-30T10:43:10.003Z'"
\"guardduty_database_name\":\"logs_db\",\"guardduty_table_name\":\"gd_logs\"}" "{\"auth\":{\"aws_access_key_id\":
\"xxxx\",
\"aws_secret_access_key\": \"yyyy\"}}" query "{\"guardduty\": \"((lower(json_extract_scalar(resource,'$
.instancedetails.networkinterfaces.0.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.1.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.publicip')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.networkconnectionaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.portprobeaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.awsapicallaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105')) AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z') LIMIT 10000\"}"
.instancedetails.networkinterfaces.0.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.1.privateipaddress')) = lower('172.31.76.105') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.publicip')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.networkconnectionaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.portprobeaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105') OR lower(json_extract_scalar(service,'$.action.awsapicallaction.remoteipdetails.ipaddressv4')) = lower('172.31.76.105')) AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z')\"}"
```
#### Transmit query: VPCFlow query is passed to STIX transmission module
Expand All @@ -123,7 +123,7 @@ transmit aws_athena "{\"region\": \"us-east-1\", \"s3_bucket_location\": \"s3://
\"vpcflow_database_name\":\"logs_db\", \"vpcflow_table_name\":\"vpc_flow_log\",
\"guardduty_database_name\":\"logs_db\",\"guardduty_table_name\":\"gd_logs\"}" "{\"auth\":{\"aws_access_key_id\": \"xxxx\",
\"aws_secret_access_key\": \"yyyy\"}}" query "{\"vpcflow\": \"((lower(sourceaddress) = lower('172.31.76.105') OR lower
(destinationaddress) = lower('172.31.76.105')) AND starttime BETWEEN 1588322590 AND 1606732990) LIMIT 10000\"}"
(destinationaddress) = lower('172.31.76.105')) AND starttime BETWEEN 1588322590 AND 1606732990)\"}"
```
#### GuardDuty Search id:
Expand Down Expand Up @@ -376,10 +376,10 @@ translate aws_athena query '{}' "[x-ibm-finding:name = '146.168.246.36 is perfor
{
"queries": [
{
"guardduty": "(lower(title) = lower('146.168.246.36 is performing SSH brute force attacks against i-03d7e6195920aa4c0.') AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z') LIMIT 10000"
"guardduty": "(lower(title) = lower('146.168.246.36 is performing SSH brute force attacks against i-03d7e6195920aa4c0.') AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z')"
},
{
"ocsf": "(lower(observables.name) = lower('146.168.246.36 is performing SSH brute force attacks against i-03d7e6195920aa4c0.') AND _time BETWEEN 1588322590000 AND 1606732990000) LIMIT 10000"
"ocsf": "(lower(observables.name) = lower('146.168.246.36 is performing SSH brute force attacks against i-03d7e6195920aa4c0.') AND _time BETWEEN 1588322590000 AND 1606732990000)"
}
]
}
Expand All @@ -399,10 +399,10 @@ translate aws_athena query '{}' "([network-traffic:src_port = '3389'] OR [domain
{
"queries": [
{
"vpcflow": "(CAST(sourceport AS varchar) = '3389' AND start BETWEEN 1588322590 AND 1606732990) LIMIT 10000"
"vpcflow": "(CAST(sourceport AS varchar) = '3389' AND start BETWEEN 1588322590 AND 1606732990)"
},
{
"guardduty": "(((CAST(json_extract_scalar(service,'$.action.networkconnectionaction.localportdetails.port') AS varchar) = '3389' OR CAST(json_extract_scalar(service,'$.action.portprobeaction.localportdetails.port') AS varchar) = '3389') AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z') UNION ((lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.privatednsname')) = lower('guarddutyc2activityb.com') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.1.privatednsname')) = lower('guarddutyc2activityb.com') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.publicdnsname')) = lower('guarddutyc2activityb.com') OR lower(json_extract_scalar(service,'$.action.dnsrequestaction.domain')) = lower('guarddutyc2activityb.com')) AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z')) LIMIT 10000"
"guardduty": "(((CAST(json_extract_scalar(service,'$.action.networkconnectionaction.localportdetails.port') AS varchar) = '3389' OR CAST(json_extract_scalar(service,'$.action.portprobeaction.localportdetails.port') AS varchar) = '3389') AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z') UNION ((lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.privatednsname')) = lower('guarddutyc2activityb.com') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.1.privatednsname')) = lower('guarddutyc2activityb.com') OR lower(json_extract_scalar(resource,'$.instancedetails.networkinterfaces.0.publicdnsname')) = lower('guarddutyc2activityb.com') OR lower(json_extract_scalar(service,'$.action.dnsrequestaction.domain')) = lower('guarddutyc2activityb.com')) AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z'))"
}
]
}
Expand All @@ -422,13 +422,13 @@ translate aws_athena query '{}' "([x-aws-details:account_id = '979326520502'] AN
{
"queries": [
{
"vpcflow": "((CAST(account AS varchar) = '979326520502' AND start BETWEEN 1588322590 AND 1606732990) INTERSECT (lower(action) = lower('accept') AND start BETWEEN 1588322590 AND 1606732990)) LIMIT 10000"
"vpcflow": "((CAST(account AS varchar) = '979326520502' AND start BETWEEN 1588322590 AND 1606732990) INTERSECT (lower(action) = lower('accept') AND start BETWEEN 1588322590 AND 1606732990))"
},
{
"guardduty": "((CAST(accountid AS varchar) = '979326520502' AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z') INTERSECT (lower(type) = lower('accept') AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z')) LIMIT 10000"
"guardduty": "((CAST(accountid AS varchar) = '979326520502' AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z') INTERSECT (lower(type) = lower('accept') AND updatedat BETWEEN '2020-05-01T08:43:10.003Z' AND '2020-11-30T10:43:10.003Z'))"
},
{
"ocsf": "(lower(observables.type) = lower('accept') AND _time BETWEEN 1588322590000 AND 1606732990000) LIMIT 10000"
"ocsf": "(lower(observables.type) = lower('accept') AND _time BETWEEN 1588322590000 AND 1606732990000)"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,7 @@ def translate_pattern(pattern: Pattern, data_model_mapping, options):
:param options: dict, time_range defaults to 5 and result_limit defaults to 10000
:return: dict, AWS Athena SQL query
"""
result_limit = options['result_limit']
time_range = options['time_range']
query = QueryStringPatternTranslator(pattern, data_model_mapping, time_range)
query_string = "({condition}) LIMIT {result_limit}".format(condition=query.translated, result_limit=result_limit)
query_string = "({condition})".format(condition=query.translated)
return [{query.service_type: query_string}]
143 changes: 96 additions & 47 deletions stix_shifter_modules/aws_athena/stix_transmission/results_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,70 +14,119 @@ class AccessDeniedException(Exception):


class ResultsConnector(BaseResultsConnector):
# https://docs.aws.amazon.com/athena/latest/APIReference/API_GetQueryResults.html#API_GetQueryResults_RequestSyntax
# Athena MaxResults count is 1000 when calling get_query_results
ATHENA_MAX_RESULTS_SIZE = 1000

def __init__(self, client, s3_client):
self.client = client
self.s3_client = s3_client
self.logger = logger.set_logger(__name__)
self.connector = __name__.split('.')[1]

def create_results_connection(self, search_id, offset, length):
def create_results_connection(self, search_id, offset, length, metadata=None):
"""
Fetching the results using search id, offset and length
:param search_id: str, search id generated in transmit query
:param offset: str, offset value
:param offset: str, offset value, not used in AWS requests
:param length: str, length value
:param metadata: str, None or string in format "<NextToken>:::json(<schema_columns>)"
:return: dict
"""

return_obj = dict()
response_dict = dict()
try:
offset = int(offset)
length = int(length)
total_records = offset+length
search_id, service_type = search_id.split(':')[0], search_id.split(':')[1]
if 'dummy' in search_id:
return_obj = {'success': True, 'data': []}
return return_obj
paginator = self.client.get_paginator('get_query_results')
get_query_response = paginator.paginate(QueryExecutionId=search_id)
next_page_token = None
schema_columns = []
result_response_list = []
for page in get_query_response:
result_response_list.extend(page['ResultSet']['Rows'])
# Formatting the response from api
schema_columns = result_response_list[0]['Data']
schema_columns = [list(x.values()) for x in schema_columns]
schema_columns_list = [column_name for sublist in schema_columns for column_name in sublist]
schema_row_values = result_response_list[1:]
result_list = []
for row in schema_row_values:
row_values = [list('-') if list(x.values()) == [] else list(x.values()) for x in row['Data']]
row_value_list = [row_value for sublist in row_values for row_value in sublist]
response_dict = dict(zip(schema_columns_list, row_value_list))
result_list.append(response_dict)
results = result_list[offset:total_records]
# Flattening the response
if service_type == 'ocsf':
formatted_result = self.format_result_ocsf(results)
process_on = True

if metadata:
next_page_token, schema_columns_str = metadata.split(':::')
schema_columns = json.loads(schema_columns_str)

if next_page_token == 'COMPLETE': # we got to the end of the iteration, stop here
return_obj['success'] = True
return_obj['data'] = []
else:
flatten_result_cleansed = self.flatten_result(results, service_type)
# Unflatten results using to_stix_map keys to avoid lengthy key value
formatted_result = self.format_result(flatten_result_cleansed, service_type)
return_obj['success'] = True
return_obj['data'] = formatted_result
# Delete output files(search_id.csv, search_id.csv.metadata) in s3 bucket
get_query_response = self.client.get_query_execution(QueryExecutionId=search_id)
s3_output_location = get_query_response['QueryExecution']['ResultConfiguration']['OutputLocation']
s3_output_bucket_with_file = s3_output_location.split('//')[1]
s3_output_bucket = s3_output_bucket_with_file.split('/')[0]
s3_output_key = '/'.join(s3_output_bucket_with_file.split('/')[1:])
s3_output_key_metadata = s3_output_key + '.metadata'
delete = dict()
delete['Objects'] = [{'Key': s3_output_key}, {'Key': s3_output_key_metadata}]
# Api call to delete s3 object
delete_object = self.s3_client.delete_objects(Bucket=s3_output_bucket, Delete=delete)
if delete_object.get('Errors'):
message = delete_object.get('Errors')[0].get('Message')
raise AccessDeniedException(message)
length = int(length)

search_id, service_type = search_id.split(':')[0], search_id.split(':')[1]
if 'dummy' in search_id:
return_obj = {'success': True, 'data': []}
return return_obj

if length > self.ATHENA_MAX_RESULTS_SIZE:
page_size = self.ATHENA_MAX_RESULTS_SIZE
else:
page_size = length

received_result_count = 0
while process_on:
if received_result_count + page_size > length:
page_size = length - received_result_count

if next_page_token:
result = self.client.get_query_results(QueryExecutionId=search_id, NextToken=next_page_token, MaxResults=page_size)
else:
result = self.client.get_query_results(QueryExecutionId=search_id, MaxResults=page_size)

if 'NextToken' in result and len(result['NextToken']) > 1:
next_page_token = result['NextToken']
else:
next_page_token = 'COMPLETE'
process_on = False

if not schema_columns: # this means that this is the first call for the search_id and the first row will contain schema_columns
schema_columns = result['ResultSet']['Rows'][0]['Data']
result_response_list.extend(result['ResultSet']['Rows'][1:])
else:
result_response_list.extend(result['ResultSet']['Rows'])

received_result_count = len(result_response_list)

if received_result_count >= length:
process_on = False

schema_columns_list = [list(x.values()) for x in schema_columns]
schema_columns_list = [column_name for sublist in schema_columns_list for column_name in sublist]
schema_row_values = result_response_list
result_list = []
for row in schema_row_values:
row_values = [list('-') if list(x.values()) == [] else list(x.values()) for x in row['Data']]
row_value_list = [row_value for sublist in row_values for row_value in sublist]
response_dict = dict(zip(schema_columns_list, row_value_list))
result_list.append(response_dict)
results = result_list

# Flattening the response
if service_type == 'ocsf':
formatted_result = self.format_result_ocsf(results)
else:
flatten_result_cleansed = self.flatten_result(results, service_type)
# Unflatten results using to_stix_map keys to avoid lengthy key value
formatted_result = self.format_result(flatten_result_cleansed, service_type)
return_obj['success'] = True
return_obj['data'] = formatted_result

if next_page_token:
return_obj['metadata'] = next_page_token + ':::' + json.dumps(schema_columns)
if next_page_token == 'COMPLETE':
# Delete output files(search_id.csv, search_id.csv.metadata) in s3 bucket
get_query_response = self.client.get_query_execution(QueryExecutionId=search_id)
s3_output_location = get_query_response['QueryExecution']['ResultConfiguration']['OutputLocation']
s3_output_bucket_with_file = s3_output_location.split('//')[1]
s3_output_bucket = s3_output_bucket_with_file.split('/')[0]
s3_output_key = '/'.join(s3_output_bucket_with_file.split('/')[1:])
s3_output_key_metadata = s3_output_key + '.metadata'
delete = dict()
delete['Objects'] = [{'Key': s3_output_key}, {'Key': s3_output_key_metadata}]
# Api call to delete s3 object
delete_object = self.s3_client.delete_objects(Bucket=s3_output_bucket, Delete=delete)
if delete_object.get('Errors'):
message = delete_object.get('Errors')[0].get('Message')
raise AccessDeniedException(message)
except Exception as ex:
return_obj = dict()
response_dict['__type'] = ex.__class__.__name__
Expand Down
Loading

0 comments on commit 020d640

Please sign in to comment.