-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ciac 11100 playbook run in build #4658
base: master
Are you sure you want to change the base?
Changes from 1 commit
265b6e1
03a494e
30b9120
0df0342
c0a375c
925ef3d
41b5444
554ce44
06b02da
b0ee17a
1319502
c33ac6e
e6e9db1
b83b9b7
4726b59
7b69bda
1208413
e6e9eb6
8893847
45907d1
7fbff0b
0e0e8e4
972981f
1c94842
b14d562
d1d286c
3bbbb7d
7eebf1a
019abf9
3a72475
b15dc52
df6c50a
ada9961
6dec80f
f3cf78f
79f27b5
be71a83
cc443f8
ba6d255
0d308cc
0e4e193
7328074
e248a06
cc9544f
cdef090
0ce3da0
7a75515
952ebaf
aea4dce
5454083
b6cffea
8804d27
5e1d5be
7ded669
be0a042
6b47ee8
aa14934
3e852c8
c8d9fb8
8e364b4
af45921
22b4fa8
9d5daac
78eb356
d6e9ec7
0038669
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,21 +53,24 @@ def marketplace(self) -> MarketplaceVersions: | |
""" | ||
|
||
def _process_response(self, response, status_code, expected_status=200): | ||
"""Process the HTTP response coming from the XSOAR client.""" # noqa: E999 | ||
"""Process the HTTP response coming from the XSOAR client.""" | ||
if status_code == expected_status: | ||
if response: | ||
try: | ||
return json.loads(response) | ||
except json.JSONDecodeError: | ||
api_response = response.replace("'", '"').replace( | ||
"False", "false").replace("True", "true").replace("None", "null") | ||
api_response = ( | ||
response.replace("'", '"') | ||
.replace("False", "false") | ||
.replace("True", "true") | ||
.replace("None", "null") | ||
) | ||
return json.loads(api_response) | ||
return {} | ||
else: | ||
error_message = f"Expected status {expected_status}, but got {status_code}. Response: {response}" | ||
raise Exception(error_message) | ||
|
||
|
||
""" | ||
############################# | ||
xsoar related methods | ||
|
@@ -82,8 +85,7 @@ def delete_incidents( | |
response_type: str = "object", | ||
): | ||
# if in the future it will be possible to delete incidents in XSIAM, implement this method | ||
raise NotImplementedError( | ||
"it is not possible to delete incidents in XSIAM") | ||
raise NotImplementedError("it is not possible to delete incidents in XSIAM") | ||
|
||
""" | ||
############################# | ||
|
@@ -99,8 +101,7 @@ def push_to_dataset( | |
data_format: str = "json", | ||
): | ||
if self.server_config.token: | ||
endpoint = urljoin( | ||
self.server_config.base_api_url, "logs/v1/xsiam") | ||
endpoint = urljoin(self.server_config.base_api_url, "logs/v1/xsiam") | ||
additional_headers = { | ||
"authorization": self.server_config.token, | ||
"format": data_format, | ||
|
@@ -110,8 +111,7 @@ def push_to_dataset( | |
} | ||
token_type = "xsiam_token" | ||
elif self.server_config.collector_token: | ||
endpoint = urljoin( | ||
self.server_config.base_api_url, "logs/v1/event") | ||
endpoint = urljoin(self.server_config.base_api_url, "logs/v1/event") | ||
additional_headers = { | ||
"authorization": self.server_config.collector_token, | ||
"content-type": "application/json" | ||
|
@@ -191,8 +191,7 @@ def get_xql_query_result(self, execution_id: str, timeout: int = 300): | |
self.server_config.base_api_url, "public_api/v1/xql/get_query_results/" | ||
) | ||
logger.info(f"Getting xql query results: endpoint={endpoint}") | ||
response = self._xdr_client.post( | ||
endpoint, data=payload, timeout=timeout) | ||
response = self._xdr_client.post(endpoint, data=payload, timeout=timeout) | ||
logger.debug("Request completed to get xql query results") | ||
data = response.json() | ||
logger.debug(pformat(data)) | ||
|
@@ -213,7 +212,8 @@ def get_xql_query_result(self, execution_id: str, timeout: int = 300): | |
def get_ioc_rules(self): | ||
# /ioc-rules is only an endpoint in XSIAM. | ||
response, status_code, response_headers = self._xsoar_client.generic_request( | ||
"/ioc-rules", "GET", response_type="object") | ||
"/ioc-rules", "GET", response_type="object" | ||
) | ||
if ( | ||
"text/html" in response_headers.get("Content-Type") | ||
or status_code != requests.codes.ok | ||
|
@@ -232,43 +232,55 @@ def get_ioc_rules(self): | |
|
||
def create_alert_from_json(self, json_content: dict) -> int: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this going to be used since we have the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can leave it here as it offers another simple way to create alerts. |
||
alert_payload = {"request_data": {"alert": json_content}} | ||
res = requests.post(url=f'{self.base_url}/public_api/v1/alerts/create_alert', | ||
headers=self._xdr_client.headers, json=alert_payload) | ||
res = requests.post( | ||
url=f"{self.base_url}/public_api/v1/alerts/create_alert", | ||
headers=self._xdr_client.headers, | ||
json=alert_payload, | ||
) | ||
alert_data = self._process_response(res.content, res.status_code, 200) | ||
return alert_data['reply'] | ||
return alert_data["reply"] | ||
|
||
def get_internal_alert_id(self, alert_external_id: str) -> int: | ||
data = self.search_alerts(alert_external_id) | ||
return data['alerts'][0]['alert_id'] | ||
return data["alerts"][0]["alert_id"] | ||
|
||
def update_alert(self, alert_id: str | list[str], updated_data: dict): | ||
def update_alert(self, alert_id: str | list[str], updated_data: dict) -> dict: | ||
""" | ||
Args: | ||
alert_id (str | list[str]): alert ids to edit. | ||
updated_data (dict): The data to update the alerts with. https://cortex-panw.stoplight.io/docs/cortex-xsiam-1/rpt3p1ne2bwfe-update-alerts | ||
""" | ||
alert_payload = {"request_data": { | ||
"update_data": updated_data, "alert_id_list": alert_id}} | ||
res = requests.post(url=f'{self.base_url}/public_api/v1/alerts/update_alerts', | ||
headers=self._xdr_client.headers, json=alert_payload) | ||
alert_payload = { | ||
"request_data": {"update_data": updated_data, "alert_id_list": alert_id} | ||
} | ||
res = requests.post( | ||
url=f"{self.base_url}/public_api/v1/alerts/update_alerts", | ||
headers=self._xdr_client.headers, | ||
json=alert_payload, | ||
) | ||
alert_data = self._process_response(res.content, res.status_code, 200) | ||
return alert_data | ||
|
||
def search_alerts(self, external_alert_id: str | list[str]): | ||
def search_alerts(self, external_alert_id: str | list[str]) -> dict: | ||
body = { | ||
"request_data": { | ||
"filters": [ | ||
{ | ||
"field": "external_id_list", | ||
"operator": "in", | ||
"value": external_alert_id if isinstance(external_alert_id, list) else [external_alert_id] | ||
"value": external_alert_id | ||
if isinstance(external_alert_id, list) | ||
else [external_alert_id], | ||
} | ||
] | ||
} | ||
} | ||
res = requests.post(url=f'{self.base_url}/public_api/v1/alerts/get_alerts/', | ||
headers=self._xdr_client.headers, json=body) | ||
return self._process_response(res.content, res.status_code, 200)['reply'] | ||
res = requests.post( | ||
url=f"{self.base_url}/public_api/v1/alerts/get_alerts/", | ||
headers=self._xdr_client.headers, | ||
json=body, | ||
) | ||
return self._process_response(res.content, res.status_code, 200)["reply"] | ||
|
||
""" | ||
############################# | ||
|
@@ -279,10 +291,14 @@ def search_alerts(self, external_alert_id: str | list[str]): | |
def get_playbook_data(self, playbook_id: int) -> dict: | ||
playbook_endpoint = f"/playbook/{playbook_id}" | ||
|
||
response, status_code, _ = self._xsoar_client.generic_request(playbook_endpoint, method='GET', accept='application/json') | ||
response, status_code, _ = self._xsoar_client.generic_request( | ||
playbook_endpoint, method="GET", accept="application/json" | ||
) | ||
return self._process_response(response, status_code, 200) | ||
|
||
def update_playbook_input(self, playbook_id: str, new_inputs: dict): | ||
saving_inputs_path = f"/playbook/inputs/{playbook_id}" | ||
response, status_code, _ = self._xsoar_client.generic_request(saving_inputs_path, method='POST', body={"inputs":new_inputs}) | ||
response, status_code, _ = self._xsoar_client.generic_request( | ||
saving_inputs_path, method="POST", body={"inputs": new_inputs} | ||
) | ||
return self._process_response(response, status_code, 200) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.