From 74fdad7811acae21fb31d0cf80c9f5e0b251bb43 Mon Sep 17 00:00:00 2001 From: Thomas Legros Date: Thu, 11 Jan 2024 14:03:19 +0100 Subject: [PATCH 1/2] Added new custom scripts APIs --- .github/workflows/lint.yml | 2 +- .github/workflows/release.yml | 3 +- README.md | 12 ++- pyproject.toml | 6 +- src/pytmv1/__init__.py | 8 ++ src/pytmv1/caller.py | 134 +++++++++++++++++++++++++++++ src/pytmv1/core.py | 72 ++++++---------- src/pytmv1/mapper.py | 6 +- src/pytmv1/model/commons.py | 55 ++++++++---- src/pytmv1/model/enums.py | 70 ++++++++------- src/pytmv1/model/responses.py | 29 +++++-- src/pytmv1/utils.py | 10 ++- tests/conftest.py | 21 ++++- tests/integration/test_endpoint.py | 19 +--- tests/integration/test_script.py | 74 ++++++++++++++++ tests/unit/test_core.py | 12 +-- tests/unit/test_utils.py | 9 ++ 17 files changed, 399 insertions(+), 143 deletions(-) create mode 100644 tests/integration/test_script.py diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 5326e8d..2759a25 100755 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -52,6 +52,6 @@ jobs: with: python-version: "3.7" - name: Install dependencies - run: pip install --upgrade pip mypy==1.0.1 pydantic==1.10.4 + run: pip install --upgrade pip mypy pydantic - name: Run mypy run: mypy --install-types --non-interactive ./src diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 53833ff..b58e6a6 100755 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -42,13 +42,12 @@ jobs: git checkout main git merge --no-ff ${{ env.BRANCH }} git tag -a v${{ env.VERSION }} -m "Release ${{ env.VERSION }}" - git push --atomic origin main refs/tags/v${{ env.VERSION }} # Try to merge back release branch git checkout ${{ env.BRANCH }} git merge --ff-only main git checkout develop git merge --no-ff ${{ env.BRANCH }} - git push origin develop :${{ env.BRANCH }} + git push --atomic origin main develop refs/tags/v${{ env.VERSION }} :${{ env.BRANCH }} - name: Publish package to PyPI uses: pypa/gh-action-pypi-publish@v1.5.2 with: diff --git a/README.md b/README.md index 76b2d86..ae2fb82 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,11 @@ ResultCode.SUCCESS #### Build the project +Set virtual env +```console +python3 -m venv venv +source venv/bin/activate +``` Install dependencies ```console pip install -e ".[dev]" @@ -89,7 +94,12 @@ Supported APIs | `get_base_task_result` | [Download response task results](https://automation.trendmicro.com/xdr/api-v3#tag/Common/paths/~1v3.0~1response~1tasks~1%7Bid%7D/get) | | `get_task_result` | [Download response task results](https://automation.trendmicro.com/xdr/api-v3#tag/Common/paths/~1v3.0~1response~1tasks~1{id}/get) | | **Custom Scripts** | | -| `run_custom_script` | [Run Custom Script](https://automation.trendmicro.com/xdr/api-v3#tag/Custom-Script/paths/~1v3.0~1response~1endpoints~1runScript/post) | +| `get_custom_script_list` | [List custom scripts](https://automation.trendmicro.com/xdr/api-v3#tag/Custom-Script/paths/~1v3.0~1response~1customScripts/get) | +| `add_custom_script` | [Add custom script](https://automation.trendmicro.com/xdr/api-v3#tag/Custom-Script/paths/~1v3.0~1response~1customScripts/post) | +| `update_custom_script` | [Update custom script](https://automation.trendmicro.com/xdr/api-v3#tag/Custom-Script/paths/~1v3.0~1response~1customScripts~1%7Bid%7D~1update/post) | +| `download_custom_script` | [Download custom script](https://automation.trendmicro.com/xdr/api-v3#tag/Custom-Script/paths/~1v3.0~1response~1customScripts~1%7Bid%7D/get) | +| `delete_custom_script` | [Delete custom script](https://automation.trendmicro.com/xdr/api-v3#tag/Custom-Script/paths/~1v3.0~1response~1customScripts~1%7Bid%7D/delete) | +| `run_custom_script` | [Run custom script](https://automation.trendmicro.com/xdr/api-v3#tag/Custom-Script/paths/~1v3.0~1response~1endpoints~1runScript/post) | | **Domain Account** | | | `disable_account` | [Disable user account](https://automation.trendmicro.com/xdr/api-v3#tag/Domain-Account/paths/~1v3.0~1response~1domainAccounts~1disable/post) | | `enable_account` | [Enable user account](https://automation.trendmicro.com/xdr/api-v3#tag/Domain-Account/paths/~1v3.0~1response~1domainAccounts~1enable/post) | diff --git a/pyproject.toml b/pyproject.toml index 7b9e4e2..a614261 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,7 @@ classifiers = [ dependencies = [ "beautifulsoup4 ~= 4.11.1", "requests ~= 2.31.0", - "pydantic ~= 1.10.4", + "pydantic ~= 2.5.3", ] [project.optional-dependencies] @@ -49,7 +49,7 @@ dev = [ "Issues" = "https://github.com/TrendATI/pytmv1/issues" [tool.hatch.build.targets.sdist] -exclude = [".github", "tests"] +exclude = [".github", "template", "tests"] [tool.hatch.version] path = "src/pytmv1/__about__.py" @@ -67,7 +67,7 @@ color_output = true [tool.mypy] python_version = "3.7" -exclude = ["dist", "tests", "venv"] +exclude = ["dist", "template", "tests", "venv"] show_column_numbers = true warn_unused_configs = true pretty = true diff --git a/src/pytmv1/__init__.py b/src/pytmv1/__init__.py index 02fedc8..2957aa1 100755 --- a/src/pytmv1/__init__.py +++ b/src/pytmv1/__init__.py @@ -35,6 +35,7 @@ EntityType, EventID, EventSubID, + FileType, Iam, IntegrityLevel, InvestigationStatus, @@ -67,6 +68,7 @@ from .model.responses import ( AccountTaskResp, AddAlertNoteResp, + AddCustomScriptResp, BaseTaskResp, BlockListTaskResp, BytesResp, @@ -78,6 +80,7 @@ EndpointTaskResp, GetAlertDetailsResp, GetAlertListResp, + GetCustomScriptListResp, GetEmailActivityDataCountResp, GetEmailActivityDataResp, GetEndpointActivityDataCountResp, @@ -94,6 +97,7 @@ SandboxSuspiciousListResp, SubmitFileToSandboxResp, TerminateProcessTaskResp, + TextResp, ) from .results import MultiResult, Result, ResultCode @@ -105,6 +109,7 @@ "AccountTask", "AccountTaskResp", "AddAlertNoteResp", + "AddCustomScriptResp", "Alert", "BaseTaskResp", "BlockListTaskResp", @@ -133,6 +138,7 @@ "FileTask", "GetAlertDetailsResp", "GetAlertListResp", + "GetCustomScriptListResp", "GetEmailActivityDataResp", "GetEmailActivityDataCountResp", "GetEndpointActivityDataResp", @@ -142,6 +148,7 @@ "GetSuspiciousListResp", "HostInfo", "Iam", + "FileType", "ImpactScope", "Indicator", "IntegrityLevel", @@ -187,6 +194,7 @@ "SuspiciousObjectTask", "TaskAction", "TerminateProcessTaskResp", + "TextResp", "TiAlert", "TiIndicator", "Value", diff --git a/src/pytmv1/caller.py b/src/pytmv1/caller.py index 55cd334..e03d944 100755 --- a/src/pytmv1/caller.py +++ b/src/pytmv1/caller.py @@ -13,11 +13,13 @@ EndpointActivity, ExceptionObject, SaeAlert, + Script, SuspiciousObject, TiAlert, ) from .model.enums import ( Api, + FileType, HttpMethod, InvestigationStatus, QueryOp, @@ -36,12 +38,14 @@ ) from .model.responses import ( AddAlertNoteResp, + AddCustomScriptResp, BaseTaskResp, BytesResp, ConnectivityResp, ConsumeLinkableResp, GetAlertDetailsResp, GetAlertListResp, + GetCustomScriptListResp, GetEmailActivityDataCountResp, GetEmailActivityDataResp, GetEndpointActivityDataCountResp, @@ -57,6 +61,7 @@ SandboxSubmissionStatusResp, SandboxSuspiciousListResp, SubmitFileToSandboxResp, + TextResp, ) from .results import MultiResult, Result @@ -131,6 +136,36 @@ def add_alert_note( json={"content": note}, ) + def add_custom_script( + self, + file_type: FileType, + file_name: str, + file: bytes, + description: Optional[str] = None, + ) -> Result[AddCustomScriptResp]: + """ + Uploads a custom script. Supported file extensions: .ps1, .sh. + Note: Custom scripts must use UTF-8 encoding. + :param file_type: File type. + :type file_type: FileType + :param file_name: File name. + :type file_name: str + :param file: Raw content in bytes. + :type file: bytes + :param description: Description. + :type description: Optional[str] + :return: Result[AddACustomScriptResp] + """ + return self._core.send( + AddCustomScriptResp, + Api.ADD_CUSTOM_SCRIPT, + HttpMethod.POST, + data=utils.filter_none( + {"fileType": file_type.value, "description": description} + ), + files={"file": (file_name, file, "text/plain")}, + ) + def add_to_block_list( self, *objects: ObjectTask ) -> MultiResult[MultiResp]: @@ -220,6 +255,30 @@ def consume_alert_list( ), ) + def consume_custom_script_list( + self, + consumer: Callable[[Script], None], + op: QueryOp = QueryOp.AND, + **fields: str, + ) -> Result[ConsumeLinkableResp]: + """Retrieves and consume cust. scripts filtered by provided values. + + :param consumer: Function which will consume every record in result. + :type consumer: Callable[[Script], None] + :param op: Operator to apply between fields (ie: ... OR ...). + :type op: QueryOp + :param fields: Field/value used to filter result (i.e:fileName="1.sh"), + check Vision One API documentation for full list of supported fields. + :type fields: Dict[str, str] + :return: Result[ConsumeLinkableResp] + """ + return self._core.send_linkable( + GetCustomScriptListResp, + Api.GET_CUSTOM_SCRIPT_LIST, + consumer, + params={"filter": utils.custom_script_query(op, **fields)}, + ) + def consume_email_activity_data( self, consumer: Callable[[EmailActivity], None], @@ -368,6 +427,19 @@ def consume_suspicious_list( GetSuspiciousListResp, Api.GET_SUSPICIOUS_LIST, consumer ) + def delete_custom_script(self, script_id: str) -> Result[NoContentResp]: + """Deletes custom script. + + :param script_id: Unique string that identifies a script file. + :type script_id: str + :return: Result[NoContentResp] + """ + return self._core.send( + NoContentResp, + Api.DELETE_CUSTOM_SCRIPT.value.format(script_id), + HttpMethod.DELETE, + ) + def delete_email_message( self, *messages: Union[EmailMessageUIdTask, EmailMessageIdTask] ) -> MultiResult[MultiResp]: @@ -405,6 +477,17 @@ def disable_account( ], ) + def download_custom_script(self, script_id: str) -> Result[TextResp]: + """Downloads custom script. + + :param script_id: Unique string that identifies a script file. + :type script_id: str + :return: Result[BytesResp] + """ + return self._core.send( + TextResp, Api.DOWNLOAD_CUSTOM_SCRIPT.value.format(script_id) + ) + def download_sandbox_analysis_result( self, submit_id: str, @@ -565,6 +648,24 @@ def get_base_task_result( BaseTaskResp, task_id, poll, poll_time_sec ) + def get_custom_script_list( + self, op: QueryOp = QueryOp.AND, **fields: str + ) -> Result[GetCustomScriptListResp]: + """Retrieves scripts in a paginated list filtered by provided values. + + :param op: Operator to apply between fields (ie: ... OR ...). + :type op: QueryOp + :param fields: Field/value used to filter result (i.e:fileName="1.sh"), + check Vision One API documentation for full list of supported fields. + :type fields: Dict[str, str] + :return: Result[GetCustomScriptsResp] + """ + return self._core.send( + GetCustomScriptListResp, + Api.GET_CUSTOM_SCRIPT_LIST, + params={"filter": utils.custom_script_query(op, **fields)}, + ) + def get_email_activity_data( self, start_time: Optional[str] = None, @@ -1093,6 +1194,39 @@ def terminate_process( Api.TERMINATE_ENDPOINT_PROCESS, *processes ) + def update_custom_script( + self, + script_id: str, + file_type: FileType, + file_name: str, + file: bytes, + description: Optional[str] = None, + ) -> Result[NoContentResp]: + """ + Updates a custom script. Supported file extensions: .ps1, .sh. + Note: Custom scripts must use UTF-8 encoding. + :param script_id: Unique string that identifies a script file. + :type script_id: str + :param file_type: File type. + :type file_type: FileType + :param file_name: File name. + :type file_name: str + :param file: Raw content in bytes. + :type file: bytes + :param description: Description. + :type description: Optional[str] + :return: Result[NoContentResp] + """ + return self._core.send( + NoContentResp, + Api.UPDATE_CUSTOM_SCRIPT.value.format(script_id), + HttpMethod.POST, + data=utils.filter_none( + {"fileType": file_type.value, "description": description} + ), + files={"file": (file_name, file, "text/plain")}, + ) + def check_connectivity(self) -> Result[ConnectivityResp]: """Checks the connection to the API service and verifies if your authentication token is valid. diff --git a/src/pytmv1/core.py b/src/pytmv1/core.py index 8ee1159..d3f1907 100755 --- a/src/pytmv1/core.py +++ b/src/pytmv1/core.py @@ -6,7 +6,7 @@ from urllib.parse import SplitResult, urlsplit from bs4 import BeautifulSoup -from pydantic import AnyHttpUrl, parse_obj_as +from pydantic import AnyHttpUrl, TypeAdapter from requests import PreparedRequest, Request, Response from .__about__ import __version__ @@ -18,22 +18,12 @@ ServerMultiJsonError, ServerTextError, ) -from .model.commons import ( - Error, - MsData, - MsDataUrl, - MsError, - MsStatus, - SaeAlert, - TiAlert, -) -from .model.enums import Api, HttpMethod, Provider, Status +from .model.commons import Error, MsError, MsStatus +from .model.enums import Api, HttpMethod, Status from .model.requests import EndpointTask from .model.responses import ( MR, - AddAlertNoteResp, BaseLinkableResp, - BaseMultiResponse, BytesResp, C, ConsumeLinkableResp, @@ -44,6 +34,7 @@ R, S, SandboxSubmissionStatusResp, + TextResp, ) from .results import multi_result, result @@ -69,7 +60,7 @@ def __init__( self._r_timeout = read_timeout self._appname = appname self._token = token - self._url = parse_obj_as(AnyHttpUrl, _format(url)) + self._url = str(TypeAdapter(AnyHttpUrl).validate_python(_format(url))) self._headers: Dict[str, str] = { "Authorization": f"Bearer {self._token}", "User-Agent": f"{self._appname}-{USERAGENT_SUFFIX}/{__version__}", @@ -279,41 +270,22 @@ def _is_http_success(status_codes: List[int]) -> bool: def _parse_data(raw_response: Response, class_: Type[R]) -> R: content_type = raw_response.headers.get("Content-Type", "") if "json" in content_type: - if issubclass(class_, BaseMultiResponse): - log.debug( - "Parsing json multi response [Class=%s]", class_.__name__ - ) - class_d: Type[List[Any]] - if issubclass(class_, MultiUrlResp): - class_d = List[MsDataUrl] - else: - class_d = List[MsData] - return class_( - items=parse_obj_as( - class_d, - raw_response.json(), - ) - ) - log.info("Parsing json response [Class=%s]", class_.__name__) + log.debug("Parsing json response [Class=%s]", class_.__name__) + if class_ in [MultiResp, MultiUrlResp]: + return class_(items=raw_response.json()) if class_ == GetAlertDetailsResp: - response_json: Dict[str, str] = raw_response.json() return class_( - alert=parse_obj_as( - ( - SaeAlert - if response_json.get("alertProvider") == Provider.SAE - else TiAlert - ), - response_json, - ), + alert=raw_response.json(), etag=raw_response.headers.get("ETag", ""), ) - return class_.parse_obj(raw_response.json()) + return class_(**raw_response.json()) if "application" in content_type and class_ == BytesResp: - log.info("Parsing binary response") - return class_(content=raw_response.content) - if raw_response.status_code == 201 and class_ == AddAlertNoteResp: - return class_.parse_obj(raw_response.headers) + log.debug("Parsing binary response") + return class_.model_construct(content=raw_response.content) + if "text" in content_type and class_ == TextResp: + return class_.model_construct(text=raw_response.text) + if raw_response.status_code == 201: + return class_(**raw_response.headers) if raw_response.status_code == 204 and class_ == NoContentResp: return class_() raise ParseModelError(class_.__name__, raw_response) @@ -336,6 +308,7 @@ def _poll_status( response: S = status_call() while elapsed_time < poll_time_sec: if response.status in [Status.QUEUED, Status.RUNNING]: + time.sleep(2) response = status_call() elapsed_time = time.time() - start_time else: @@ -354,13 +327,18 @@ def _validate(raw_response: Response) -> None: error: Dict[str, Any] = raw_response.json().get("error") error["status"] = raw_response.status_code raise ServerJsonError( - Error.parse_obj(error), + Error(**error), ) raise ServerTextError(raw_response.status_code, raw_response.text) if raw_response.status_code == 207: if not _is_http_success( - MsStatus.parse_obj(raw_response.json()).values() + MsStatus( + root=[int(d.get("status", 500)) for d in raw_response.json()] + ).values() ): raise ServerMultiJsonError( - parse_obj_as(List[MsError], raw_response.json()) + [ + MsError.model_validate(error) + for error in raw_response.json() + ] ) diff --git a/src/pytmv1/mapper.py b/src/pytmv1/mapper.py index 6ae3277..59a4c36 100755 --- a/src/pytmv1/mapper.py +++ b/src/pytmv1/mapper.py @@ -1,6 +1,6 @@ from typing import Dict, List -from pydantic.utils import to_lower_camel +from pydantic.alias_generators import to_camel from .model.commons import ( Alert, @@ -83,9 +83,7 @@ def _map_indicators(data: Dict[str, str], indicators: List[Indicator]) -> None: data["src"] = ", ".join(indicator.value.ips) else: data[ - INDICATOR_CEF_MAP.get( - indicator.type, to_lower_camel(indicator.type) - ) + INDICATOR_CEF_MAP.get(indicator.type, to_camel(indicator.type)) ] = indicator.value diff --git a/src/pytmv1/model/commons.py b/src/pytmv1/model/commons.py index 40d873c..54366e4 100644 --- a/src/pytmv1/model/commons.py +++ b/src/pytmv1/model/commons.py @@ -4,12 +4,14 @@ from pydantic import BaseModel as PydanticBaseModel from pydantic import Field -from pydantic.utils import to_lower_camel +from pydantic import RootModel as PydanticRootModel +from pydantic.alias_generators import to_camel from .enums import ( EntityType, EventID, EventSubID, + FileType, Iam, IntegrityLevel, InvestigationStatus, @@ -25,13 +27,23 @@ class BaseModel(PydanticBaseModel): + def __init__(self, **data: Any): + super().__init__(**data) + class Config: - alias_generator = to_lower_camel + alias_generator = to_camel + allow_population_by_field_name = True + + +class RootModel(PydanticRootModel[List[int]]): + class Config: + alias_generator = to_camel allow_population_by_field_name = True class BaseConsumable(BaseModel): - ... + def __init__(self, **data: Any): + super().__init__(**data) def _get_task_id(headers: List[Dict[str, str]]) -> Optional[str]: @@ -192,12 +204,15 @@ class Error(BaseModel): message: Optional[str] = None number: Optional[int] = None + def __init__(self, **data: Any): + super().__init__(**data) + class ExceptionObject(BaseConsumable): value: str type: ObjectType last_modified_date_time: str - description: Optional[str] + description: Optional[str] = None def __init__(self, **data: str) -> None: super().__init__(value=self._obj_value(data), **data) @@ -266,8 +281,8 @@ def __init__(self, **data: Any): class MsDataUrl(MsData): url: str - id: Optional[str] - digest: Optional[Digest] + id: Optional[str] = None + digest: Optional[Digest] = None def __init__(self, **data: Any): data.update(data.pop("body", {})) @@ -276,7 +291,7 @@ def __init__(self, **data: Any): class MsError(Error): extra: Dict[str, str] = {} - task_id: Optional[str] + task_id: Optional[str] = None def __init__(self, **data: Any): data.update(data.pop("body", {})) @@ -288,16 +303,11 @@ def __init__(self, **data: Any): ) -class MsStatus(BaseModel): - __root__: List[int] - - def __init__(self, **data: Any): - super().__init__( - root=[int(d.get("status", 500)) for d in data.get("__root__", [])] - ) +class MsStatus(RootModel): + root: List[int] def values(self) -> List[int]: - return self.__root__ + return self.root class SaeAlert(Alert): @@ -318,7 +328,7 @@ class SandboxSuspiciousObject(BaseModel): type: ObjectType value: str - def __init__(self, **data: str) -> None: + def __init__(self, **data: Any): obj: Tuple[str, str] = self._map(data) super().__init__(type=obj[0], value=obj[1], **data) @@ -331,6 +341,13 @@ def _map(args: Dict[str, str]) -> Tuple[str, str]: }.pop() +class Script(BaseConsumable): + id: str + file_name: str + file_type: FileType + description: Optional[str] = None + + class SuspiciousObject(ExceptionObject): scan_action: ScanAction risk_level: RiskLevel @@ -339,9 +356,9 @@ class SuspiciousObject(ExceptionObject): class TiAlert(Alert): - campaign: Optional[str] - industry: Optional[str] - region_and_country: Optional[str] + campaign: Optional[str] = None + industry: Optional[str] = None + region_and_country: Optional[str] = None created_by: str total_indicator_count: int matched_indicator_count: int diff --git a/src/pytmv1/model/enums.py b/src/pytmv1/model/enums.py index 20a4a2f..af6dfa8 100644 --- a/src/pytmv1/model/enums.py +++ b/src/pytmv1/model/enums.py @@ -2,48 +2,53 @@ class Api(str, Enum): - ADD_ALERT_NOTE = "/workbench/alerts/{0}/notes" - ADD_TO_BLOCK_LIST = "/response/suspiciousObjects" - ADD_TO_EXCEPTION_LIST = "/threatintel/suspiciousObjectExceptions" - ADD_TO_SUSPICIOUS_LIST = "/threatintel/suspiciousObjects" - COLLECT_ENDPOINT_FILE = "/response/endpoints/collectFile" - CONNECTIVITY = ("/healthcheck/connectivity",) + CONNECTIVITY = "/healthcheck/connectivity" + GET_ENDPOINT_DATA = "/eiqs/endpoints" + GET_CUSTOM_SCRIPT_LIST = "/response/customScripts" + ADD_CUSTOM_SCRIPT = "/response/customScripts" + DELETE_CUSTOM_SCRIPT = "/response/customScripts/{0}" + DOWNLOAD_CUSTOM_SCRIPT = "/response/customScripts/{0}" + UPDATE_CUSTOM_SCRIPT = "/response/customScripts/{0}/update" DELETE_EMAIL_MESSAGE = "/response/emails/delete" + QUARANTINE_EMAIL_MESSAGE = "/response/emails/quarantine" + RESTORE_EMAIL_MESSAGE = "/response/emails/restore" DISABLE_ACCOUNT = "/response/domainAccounts/disable" - DOWNLOAD_SANDBOX_ANALYSIS_RESULT = "/sandbox/analysisResults/{0}/report" - DOWNLOAD_SANDBOX_INVESTIGATION_PACKAGE = ( - "/sandbox/analysisResults/{0}/investigationPackage" - ) - EDIT_ALERT_STATUS = "/workbench/alerts/{0}" ENABLE_ACCOUNT = "/response/domainAccounts/enable" + RESET_PASSWORD = "/response/domainAccounts/resetPassword" + SIGN_OUT_ACCOUNT = "/response/domainAccounts/signOut" + COLLECT_ENDPOINT_FILE = "/response/endpoints/collectFile" ISOLATE_ENDPOINT = "/response/endpoints/isolate" - GET_ALERT_DETAILS = "/workbench/alerts/{0}" - GET_ALERT_LIST = "/workbench/alerts" - GET_EMAIL_ACTIVITY_DATA = "/search/emailActivities" - GET_ENDPOINT_ACTIVITY_DATA = "/search/endpointActivities" - GET_ENDPOINT_DATA = "/eiqs/endpoints" - GET_EXCEPTION_LIST = "/threatintel/suspiciousObjectExceptions" - GET_SANDBOX_SUBMISSION_STATUS = "/sandbox/tasks/{0}" + RESTORE_ENDPOINT = "/response/endpoints/restore" + RUN_CUSTOM_SCRIPT = "/response/endpoints/runScript" + TERMINATE_ENDPOINT_PROCESS = "/response/endpoints/terminateProcess" + ADD_TO_BLOCK_LIST = "/response/suspiciousObjects" + REMOVE_FROM_BLOCK_LIST = "/response/suspiciousObjects/delete" + GET_TASK_RESULT = "/response/tasks/{0}" GET_SANDBOX_ANALYSIS_RESULT = "/sandbox/analysisResults/{0}" + DOWNLOAD_SANDBOX_INVESTIGATION_PACKAGE = ( + "/sandbox/analysisResults/{0}/investigationPackage" + ) + DOWNLOAD_SANDBOX_ANALYSIS_RESULT = "/sandbox/analysisResults/{0}/report" GET_SANDBOX_SUSPICIOUS_LIST = ( "/sandbox/analysisResults/{0}/suspiciousObjects" ) + SUBMIT_FILE_TO_SANDBOX = "/sandbox/files/analyze" + GET_SANDBOX_SUBMISSION_STATUS = "/sandbox/tasks/{0}" + SUBMIT_URLS_TO_SANDBOX = "/sandbox/urls/analyze" + GET_EMAIL_ACTIVITY_DATA = "/search/emailActivities" + GET_ENDPOINT_ACTIVITY_DATA = "/search/endpointActivities" GET_SUSPICIOUS_LIST = "/threatintel/suspiciousObjects" - GET_TASK_RESULT = "/response/tasks/{0}" - QUARANTINE_EMAIL_MESSAGE = "/response/emails/quarantine" - REMOVE_FROM_BLOCK_LIST = "/response/suspiciousObjects/delete" + ADD_TO_SUSPICIOUS_LIST = "/threatintel/suspiciousObjects" + REMOVE_FROM_SUSPICIOUS_LIST = "/threatintel/suspiciousObjects/delete" + ADD_TO_EXCEPTION_LIST = "/threatintel/suspiciousObjectExceptions" + GET_EXCEPTION_LIST = "/threatintel/suspiciousObjectExceptions" REMOVE_FROM_EXCEPTION_LIST = ( "/threatintel/suspiciousObjectExceptions/delete" ) - REMOVE_FROM_SUSPICIOUS_LIST = "/threatintel/suspiciousObjects/delete" - RESET_PASSWORD = "/response/domainAccounts/resetPassword" - RESTORE_EMAIL_MESSAGE = "/response/emails/restore" - RESTORE_ENDPOINT = "/response/endpoints/restore" - RUN_CUSTOM_SCRIPT = "/response/endpoints/runScript" - SIGN_OUT_ACCOUNT = "/response/domainAccounts/signOut" - SUBMIT_FILE_TO_SANDBOX = "/sandbox/files/analyze" - SUBMIT_URLS_TO_SANDBOX = "/sandbox/urls/analyze" - TERMINATE_ENDPOINT_PROCESS = "/response/endpoints/terminateProcess" + GET_ALERT_LIST = "/workbench/alerts" + EDIT_ALERT_STATUS = "/workbench/alerts/{0}" + GET_ALERT_DETAILS = "/workbench/alerts/{0}" + ADD_ALERT_NOTE = "/workbench/alerts/{0}/notes" class Iam(str, Enum): @@ -144,6 +149,11 @@ class EventSubID(int, Enum): TELEMETRY_BM_INVOKE_API = 1102 +class FileType(str, Enum): + POWERSHELL = "powershell" + BASH = "bash" + + class HttpMethod(str, Enum): GET = "GET" PATCH = "PATCH" diff --git a/src/pytmv1/model/responses.py b/src/pytmv1/model/responses.py index b083eb2..6c855f2 100644 --- a/src/pytmv1/model/responses.py +++ b/src/pytmv1/model/responses.py @@ -1,7 +1,6 @@ from typing import Any, Dict, Generic, List, Optional, Tuple, TypeVar, Union from pydantic import Field -from pydantic.generics import GenericModel from .commons import ( Account, @@ -17,6 +16,7 @@ MsDataUrl, SaeAlert, SandboxSuspiciousObject, + Script, SuspiciousObject, TiAlert, ) @@ -34,15 +34,16 @@ class BaseResponse(BaseModel): - ... + def __init__(self, **data: Any): + super().__init__(**data) -class BaseLinkableResp(BaseResponse, GenericModel, Generic[C]): +class BaseLinkableResp(BaseResponse, Generic[C]): next_link: Optional[str] = None items: List[C] = [] -class BaseMultiResponse(BaseResponse, GenericModel, Generic[M]): +class BaseMultiResponse(BaseResponse, Generic[M]): items: List[M] = [] @@ -58,6 +59,9 @@ class BaseTaskResp(BaseStatusResponse): description: Optional[str] = None account: Optional[str] = None + def __init__(self, **data: Any): + super().__init__(**data) + MR = TypeVar("MR", bound=BaseMultiResponse[Any]) R = TypeVar("R", bound=BaseResponse) @@ -75,11 +79,18 @@ def note_id(self) -> str: return self.location.split("/")[-1] +class AddCustomScriptResp(BaseResponse): + location: str = Field(alias="Location") + + def script_id(self) -> str: + return self.location.split("/")[-1] + + class BlockListTaskResp(BaseTaskResp): type: ObjectType value: str - def __init__(self, **data: str) -> None: + def __init__(self, **data: Any): obj: Tuple[str, str] = self._map(data) super().__init__(type=obj[0], value=obj[1], **data) @@ -131,6 +142,10 @@ class GetAlertListResp(BaseLinkableResp[Union[SaeAlert, TiAlert]]): count: int +class GetCustomScriptListResp(BaseLinkableResp[Script]): + ... + + class GetEndpointActivityDataResp(BaseLinkableResp[EndpointActivity]): progress_rate: int @@ -225,3 +240,7 @@ class TerminateProcessTaskResp(BaseTaskResp): endpoint_name: str file_sha1: str file_name: Optional[str] = None + + +class TextResp(BaseResponse): + text: str diff --git a/src/pytmv1/utils.py b/src/pytmv1/utils.py index e68805c..5e16b45 100755 --- a/src/pytmv1/utils.py +++ b/src/pytmv1/utils.py @@ -2,7 +2,7 @@ import re from typing import Any, Dict, List, Optional, Pattern, Tuple -from pydantic import IPvAnyAddress, IPvAnyAddressError +from pydantic import IPvAnyAddress, TypeAdapter from .model.enums import ( OperatingSystem, @@ -84,6 +84,10 @@ def build_suspicious_request( ] +def custom_script_query(op: QueryOp, **fields: str) -> str: + return (" " + op + " ").join([f"{k} eq '{v}'" for k, v in fields.items()]) + + def activity_query(op: QueryOp, **fields: str) -> Dict[str, str]: return { "TMV1-Query": (" " + op + " ").join( @@ -130,6 +134,6 @@ def _b64_encode(value: Optional[str]) -> Optional[str]: def _is_ip_address(endpoint_value: str) -> bool: try: - return bool(IPvAnyAddress.validate(endpoint_value)) - except IPvAnyAddressError: + return bool(TypeAdapter(IPvAnyAddress).validate_python(endpoint_value)) + except ValueError: return False diff --git a/tests/conftest.py b/tests/conftest.py index 9fb2827..43dc94c 100755 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,6 +12,13 @@ def pytest_addoption(parser): dest="mock-url", help="Mock URL for Vision One API", ) + parser.addoption( + "--token", + action="store", + default="", + dest="token", + help="Token Vision One API", + ) @pytest.fixture(scope="package") @@ -21,19 +28,25 @@ def url(pytestconfig): @pytest.fixture(scope="package") -def client(pytestconfig, url): +def token(pytestconfig): + token = pytestconfig.getoption("token") + return token if token else "dummyToken" + + +@pytest.fixture(scope="package") +def client(pytestconfig, token, url): return pytmv1.client( "appname", - "token", + token, url, ) @pytest.fixture(scope="package") -def core(pytestconfig, url): +def core(pytestconfig, token, url): return Core( "appname", - "token", + token, url, 0, 0, diff --git a/tests/integration/test_endpoint.py b/tests/integration/test_endpoint.py index e04e3b4..a54b5d6 100755 --- a/tests/integration/test_endpoint.py +++ b/tests/integration/test_endpoint.py @@ -1,11 +1,4 @@ -from pytmv1 import ( - CustomScriptTask, - EndpointTask, - FileTask, - MultiResp, - ProcessTask, - ResultCode, -) +from pytmv1 import EndpointTask, FileTask, MultiResp, ProcessTask, ResultCode def test_collect_file(client): @@ -34,16 +27,6 @@ def test_restore_endpoint(client): assert result.response.items[0].status == 202 -def test_run_custom_script(client): - result = client.run_custom_script( - CustomScriptTask(fileName="test", endpointName="client1") - ) - assert isinstance(result.response, MultiResp) - assert result.result_code == ResultCode.SUCCESS - assert len(result.response.items) > 0 - assert result.response.items[0].status == 202 - - def test_terminate_process(client): result = client.terminate_process( ProcessTask( diff --git a/tests/integration/test_script.py b/tests/integration/test_script.py new file mode 100644 index 0000000..2388fd7 --- /dev/null +++ b/tests/integration/test_script.py @@ -0,0 +1,74 @@ +from pytmv1 import ( + AddCustomScriptResp, + ConsumeLinkableResp, + CustomScriptTask, + FileType, + GetCustomScriptListResp, + MultiResp, + NoContentResp, + ResultCode, + TextResp, +) + + +def test_add_custom_script(client): + result = client.add_custom_script( + file_type=FileType.BASH, + file_name="add_script.sh", + file=bytes("#!/bin/sh\necho 'Add script'", "utf-8"), + ) + assert isinstance(result.response, AddCustomScriptResp) + assert result.result_code == ResultCode.SUCCESS + assert result.response.script_id() + + +def test_delete_custom_script(client): + result = client.delete_custom_script("delete_script") + assert isinstance(result.response, NoContentResp) + assert result.result_code == ResultCode.SUCCESS + + +def test_download_custom_script(client): + result = client.download_custom_script("download_script") + assert isinstance(result.response, TextResp) + assert result.result_code == ResultCode.SUCCESS + assert result.response.text == "#!/bin/sh Download Script" + + +def test_run_custom_script(client): + result = client.run_custom_script( + CustomScriptTask(fileName="test", endpointName="client1") + ) + assert isinstance(result.response, MultiResp) + assert result.result_code == ResultCode.SUCCESS + assert len(result.response.items) > 0 + assert result.response.items[0].status == 202 + + +def test_update_custom_script(client): + result = client.update_custom_script( + script_id="123", + file_type=FileType.BASH, + file_name="update_script.sh", + file=bytes("#!/bin/sh Update script", "utf-8"), + ) + assert isinstance(result.response, NoContentResp) + assert result.result_code == ResultCode.SUCCESS + + +def test_consume_custom_script_list(client): + result = client.consume_custom_script_list( + lambda s: None, fileName="random_script.ps1", fileType="powershell" + ) + assert isinstance(result.response, ConsumeLinkableResp) + assert result.result_code == ResultCode.SUCCESS + assert result.response.total_consumed == 1 + + +def test_get_custom_script_list(client): + result = client.get_custom_script_list( + fileName="random_script.ps1", fileType="powershell" + ) + assert isinstance(result.response, GetCustomScriptListResp) + assert result.result_code == ResultCode.SUCCESS + assert len(result.response.items) > 0 diff --git a/tests/unit/test_core.py b/tests/unit/test_core.py index 2c947e9..bcc7bec 100755 --- a/tests/unit/test_core.py +++ b/tests/unit/test_core.py @@ -133,7 +133,7 @@ def test_errors(): def test_headers(core): - assert core._headers["Authorization"] == "Bearer token" + assert core._headers["Authorization"] == "Bearer dummyToken" assert core._headers["User-Agent"] == "appname-{}/{}".format( USERAGENT_SUFFIX, __version__ ) @@ -174,8 +174,8 @@ def test_parse_data_with_html_is_failed(): def test_parse_data_with_json(): raw_response = Response() raw_response.headers = {"Content-Type": "application/json"} - raw_response.json = lambda: SandboxSuspiciousListResp( - items=[ + raw_response.json = lambda: { + "items": [ SandboxSuspiciousObject( riskLevel=RiskLevel.HIGH, analysisCompletionDateTime="2021-05-07T03:08:40", @@ -184,7 +184,7 @@ def test_parse_data_with_json(): ip="6.6.6.6", ) ] - ) + } response = core_m._parse_data(raw_response, SandboxSuspiciousListResp) assert response.items[0].risk_level == "high" assert ( @@ -205,7 +205,7 @@ def test_parse_data_with_multi_and_wrong_model_is_failed(): raw_response.headers = {"Content-Type": "application/json"} raw_response.status_code = 207 raw_response.json = lambda: MultiResp(items=[MsData(status=200)]) - with pytest.raises(ValidationError): + with pytest.raises(TypeError): core_m._parse_data(raw_response, AddAlertNoteResp) @@ -417,7 +417,7 @@ def test_send_with_validation_error_is_failed(core, mocker): mocker.patch.object( core, "_send_internal", - side_effect=ValidationError([], NoContentResp), + side_effect=ValidationError.from_exception_data("Test", []), ) result = core.send(GetExceptionListResp, Api.GET_EXCEPTION_LIST) assert result.result_code == ResultCode.ERROR diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 8e8d784..9a65f46 100755 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -9,6 +9,15 @@ def test_b64_encode_with_none(): assert utils._b64_encode(None) is None +def test_custom_script_query(): + assert ( + utils.custom_script_query( + op=QueryOp.AND, fileName="test.sh", fileType="bash" + ) + == "fileName eq 'test.sh' and fileType eq 'bash'" + ) + + def test_activity_query(): assert ( utils.activity_query( From d01b7b809eefb21cfa77d8f72b2ce34528fecab2 Mon Sep 17 00:00:00 2001 From: Thomas Legros Date: Fri, 12 Jan 2024 14:23:42 +0000 Subject: [PATCH 2/2] Release 0.7.5: increment version --- src/pytmv1/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pytmv1/__about__.py b/src/pytmv1/__about__.py index ed9d4d8..ab55bb1 100755 --- a/src/pytmv1/__about__.py +++ b/src/pytmv1/__about__.py @@ -1 +1 @@ -__version__ = "0.7.4" +__version__ = "0.7.5"