From ce4354b45469e7662730041c4d6f05b0c45281b2 Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Fri, 22 Nov 2024 21:19:21 +0530 Subject: [PATCH] Fix: mstr removed dependency issues (#18732) * Fix: mstr removed dependency issues * fix session still active error * py_format * fix tests * Addressed Comments * Addressed Comments * addressed comments * Addressed comments * Add constants * Fix pytests --- ingestion/setup.py | 1 - .../src/metadata/ingestion/ometa/client.py | 21 ++- .../ingestion/source/dashboard/mstr/client.py | 174 +++++++++++------- .../source/dashboard/mstr/metadata.py | 95 ++++++---- .../ingestion/source/dashboard/mstr/models.py | 10 +- 5 files changed, 187 insertions(+), 114 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index 18e2d4d35e5f..df5f3a7e1fcb 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -304,7 +304,6 @@ "psycopg2-binary", VERSIONS["geoalchemy2"], }, - "mstr": {"mstr-rest-requests==0.14.1"}, "sagemaker": {VERSIONS["boto3"]}, "salesforce": {"simple_salesforce~=1.11"}, "sample-data": {VERSIONS["avro"], VERSIONS["grpc-tools"]}, diff --git a/ingestion/src/metadata/ingestion/ometa/client.py b/ingestion/src/metadata/ingestion/ometa/client.py index 30a7b0561828..589b09866c7a 100644 --- a/ingestion/src/metadata/ingestion/ometa/client.py +++ b/ingestion/src/metadata/ingestion/ometa/client.py @@ -14,7 +14,7 @@ import time import traceback from datetime import datetime, timezone -from typing import Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Union import requests from requests.exceptions import HTTPError @@ -115,6 +115,7 @@ class ClientConfig(ConfigModel): allow_redirects: Optional[bool] = False auth_token_mode: Optional[str] = "Bearer" verify: Optional[Union[bool, str]] = None + cookies: Optional[Any] = None ttl_cache: int = 60 @@ -138,10 +139,11 @@ def __init__(self, config: ClientConfig): self._auth_token = self.config.auth_token self._auth_token_mode = self.config.auth_token_mode self._verify = self.config.verify + self._cookies = self.config.cookies self._limits_reached = TTLCache(config.ttl_cache) - def _request( # pylint: disable=too-many-arguments + def _request( # pylint: disable=too-many-arguments,too-many-branches self, method, path, @@ -160,10 +162,12 @@ def _request( # pylint: disable=too-many-arguments base_url = base_url or self._base_url version = api_version if api_version else self._api_version url: URL = URL(base_url + "/" + version + path) + cookies = self._cookies if ( self.config.expires_in and datetime.now(timezone.utc).timestamp() >= self.config.expires_in or not self.config.access_token + and self._auth_token ): self.config.access_token, expiry = self._auth_token() if not self.config.access_token == "no_token": @@ -173,12 +177,12 @@ def _request( # pylint: disable=too-many-arguments self.config.expires_in = ( datetime.now(timezone.utc).timestamp() + expiry - 120 ) - - headers[self.config.auth_header] = ( - f"{self._auth_token_mode} {self.config.access_token}" - if self._auth_token_mode - else self.config.access_token - ) + if self.config.auth_header: + headers[self.config.auth_header] = ( + f"{self._auth_token_mode} {self.config.access_token}" + if self._auth_token_mode + else self.config.access_token + ) # Merge extra headers if provided. # If a header value is provided in modulo string format and matches an existing header, @@ -198,6 +202,7 @@ def _request( # pylint: disable=too-many-arguments # It's better to fail early if the URL isn't right. "allow_redirects": self.config.allow_redirects, "verify": self._verify, + "cookies": cookies, } method_key = "params" if method.upper() == "GET" else "data" diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mstr/client.py b/ingestion/src/metadata/ingestion/source/dashboard/mstr/client.py index c3627187c6e7..1388364db11c 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mstr/client.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mstr/client.py @@ -15,13 +15,14 @@ from typing import List, Optional import requests -from mstr.requests import MSTRRESTSession from metadata.generated.schema.entity.services.connections.dashboard.mstrConnection import ( MstrConnection, ) from metadata.ingestion.connections.test_connections import SourceConnectionException +from metadata.ingestion.ometa.client import REST, ClientConfig from metadata.ingestion.source.dashboard.mstr.models import ( + AuthHeaderCookie, MstrDashboard, MstrDashboardDetails, MstrDashboardList, @@ -36,6 +37,8 @@ logger = ingestion_logger() API_VERSION = "MicroStrategyLibrary/api" +LOGIN_MODE_GUEST = 8 +APPLICATION_TYPE = 35 class MSTRClient: @@ -45,32 +48,90 @@ class MSTRClient: def _get_base_url(self, path=None): if not path: - return f"{clean_uri(self.config.hostPort)}/{API_VERSION}/" + return f"{clean_uri(self.config.hostPort)}/{API_VERSION}" return f"{clean_uri(self.config.hostPort)}/{API_VERSION}/{path}" - def _get_mstr_session(self) -> MSTRRESTSession: - try: - session = MSTRRESTSession(base_url=self._get_base_url()) - session.login( - username=self.config.username, - password=self.config.password.get_secret_value(), - ) - return session - - except KeyError as exe: - msg = "Failed to fetch mstr session, please validate credentials" - raise SourceConnectionException(msg) from exe - - except Exception as exc: - msg = f"Unknown error in connection: {exc}." - raise SourceConnectionException(msg) from exc - def __init__( self, config: MstrConnection, ): self.config = config - self.session = self._get_mstr_session() + + self.auth_params: AuthHeaderCookie = self._get_auth_header_and_cookies() + + client_config = ClientConfig( + base_url=clean_uri(config.hostPort), + api_version=API_VERSION, + extra_headers=self.auth_params.auth_header, + allow_redirects=True, + cookies=self.auth_params.auth_cookies, + ) + + self.client = REST(client_config) + self._set_api_session() + + def _get_auth_header_and_cookies(self) -> Optional[AuthHeaderCookie]: + """ + Send a request to authenticate the user and get headers and + + To know about the data params below please visit + https://demo.microstrategy.com/MicroStrategyLibrary/api-docs/index.html#/Authentication/postLogin + """ + try: + data = { + "username": self.config.username, + "password": self.config.password.get_secret_value(), + "loginMode": LOGIN_MODE_GUEST, + "applicationType": APPLICATION_TYPE, + } + response = requests.post( + url=self._get_base_url("auth/login"), data=data, timeout=60 + ) + if not response: + raise SourceConnectionException() + return AuthHeaderCookie( + auth_header=response.headers, auth_cookies=response.cookies + ) + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.error( + f"Failed to fetch the auth header and cookies due to [{exc}], please validate credentials" + ) + return None + + def _set_api_session(self) -> bool: + """ + Set the user api session to active this will keep the connection alive + """ + api_session = requests.put( + url=self._get_base_url("sessions"), + headers=self.auth_params.auth_header, + cookies=self.auth_params.auth_cookies, + timeout=60, + ) + if api_session.ok: + logger.info( + f"Connection Successful User {self.config.username} is Authenticated" + ) + return True + raise requests.ConnectionError( + "Connection Failed, Failed to set an api session, Please validate the credentials" + ) + + def close_api_session(self) -> None: + """ + Closes the active api session + """ + try: + close_api_session = self.client.post( + path="/auth/logout", + ) + if close_api_session.ok: + logger.info("API Session Closed Successfully") + + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Failed to close the api sesison due to [{exc}]") def is_project_name(self) -> bool: return bool(self.config.projectName) @@ -80,14 +141,11 @@ def get_projects_list(self) -> List[MstrProject]: Get List of all projects """ try: - resp_projects = self.session.get( - url=self._get_base_url("projects"), params={"include_auth": True} + resp_projects = self.client.get( + path="/projects", ) - if not resp_projects.ok: - raise requests.ConnectionError() - - project_list = MstrProjectList(projects=resp_projects.json()) + project_list = MstrProjectList(projects=resp_projects) return project_list.projects except Exception as exc: @@ -101,15 +159,11 @@ def get_project_by_name(self) -> Optional[MstrProject]: Get Project By Name """ try: - resp_projects = self.session.get( - url=self._get_base_url(f"projects/{self.config.projectName}"), - params={"include_auth": True}, + resp_projects = self.client.get( + path=f"/projects/{self.config.projectName}", ) - if not resp_projects.ok: - raise requests.ConnectionError() - - project = MstrProject(**resp_projects.json()) + project = MstrProject.model_validate(resp_projects) return project except Exception: @@ -123,32 +177,28 @@ def get_search_results_list( ) -> List[MstrSearchResult]: """ Get Search Results + + To know about the data params below please visit + https://demo.microstrategy.com/MicroStrategyLibrary/api-docs/index.html?#/Browsing/doQuickSearch """ try: - resp_results = self.session.get( - url=self._get_base_url("searches/results"), - params={ - "include_auth": True, - "project_id": project_id, - "type": object_type, - "getAncestors": False, - "offset": 0, - "limit": -1, - "certifiedStatus": "ALL", - "isCrossCluster": False, - "result.hidden": False, - }, + data = { + "project_id": project_id, + "type": object_type, + "getAncestors": False, + "offset": 0, + "limit": -1, + "certifiedStatus": "ALL", + "isCrossCluster": False, + "result.hidden": False, + } + resp_results = self.client.get( + path="/searches/results", + data=data, ) - if not resp_results.ok: - raise requests.ConnectionError() - - results = [] - for resp_result in resp_results.json()["result"]: - results.append(resp_result) - - results_list = MstrSearchResultList(results=results) - return results_list.results + results_list = MstrSearchResultList.model_validate(resp_results).result + return results_list except Exception: logger.debug(traceback.format_exc()) @@ -187,19 +237,13 @@ def get_dashboard_details( Get Dashboard Details """ try: - resp_dashboard = self.session.get( - url=self._get_base_url(f"v2/dossiers/{dashboard_id}/definition"), - params={ - "include_auth": True, - }, - headers={"X-MSTR-ProjectID": project_id}, + headers = {"X-MSTR-ProjectID": project_id} | self.auth_params.auth_header + resp_dashboard = self.client._request( # pylint: disable=protected-access + "GET", path=f"/v2/dossiers/{dashboard_id}/definition", headers=headers ) - if not resp_dashboard.ok: - raise requests.ConnectionError() - return MstrDashboardDetails( - projectId=project_id, projectName=project_name, **resp_dashboard.json() + projectId=project_id, projectName=project_name, **resp_dashboard ) except Exception: diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py index ee2648c9abde..d26ee0ef0e60 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mstr/metadata.py @@ -91,6 +91,18 @@ def get_dashboard_name(self, dashboard: MstrDashboard) -> str: """ return dashboard.name + def get_project_name(self, dashboard_details: MstrDashboard) -> Optional[str]: + """ + Get dashboard project name + """ + try: + return dashboard_details.projectName + except Exception as exc: + logger.debug( + f"Cannot get project name from dashboard [{dashboard_details.name}] - [{exc}]" + ) + return None + def get_dashboard_details(self, dashboard: MstrDashboard) -> MstrDashboardDetails: """ Get Dashboard Details @@ -106,40 +118,41 @@ def yield_dashboard( """ Method to Get Dashboard Entity """ - try: - dashboard_url = ( - f"{clean_uri(self.service_connection.hostPort)}/MicroStrategyLibrary/app/" - f"{dashboard_details.projectId}/{dashboard_details.id}" - ) - dashboard_request = CreateDashboardRequest( - name=EntityName(dashboard_details.id), - displayName=dashboard_details.name, - sourceUrl=SourceUrl(dashboard_url), - project=dashboard_details.projectName, - charts=[ - FullyQualifiedEntityName( - fqn.build( - self.metadata, - entity_type=Chart, - service_name=self.context.get().dashboard_service, - chart_name=chart, + if dashboard_details: + try: + dashboard_url = ( + f"{clean_uri(self.service_connection.hostPort)}/MicroStrategyLibrary/app/" + f"{dashboard_details.projectId}/{dashboard_details.id}" + ) + dashboard_request = CreateDashboardRequest( + name=EntityName(dashboard_details.id), + displayName=dashboard_details.name, + sourceUrl=SourceUrl(dashboard_url), + project=dashboard_details.projectName, + charts=[ + FullyQualifiedEntityName( + fqn.build( + self.metadata, + entity_type=Chart, + service_name=self.context.get().dashboard_service, + chart_name=chart, + ) ) + for chart in self.context.get().charts or [] + ], + service=self.context.get().dashboard_service, + owners=self.get_owner_ref(dashboard_details=dashboard_details), + ) + yield Either(right=dashboard_request) + self.register_record(dashboard_request=dashboard_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=dashboard_details.id, + error=f"Error yielding dashboard for {dashboard_details}: {exc}", + stackTrace=traceback.format_exc(), ) - for chart in self.context.get().charts or [] - ], - service=self.context.get().dashboard_service, - owners=self.get_owner_ref(dashboard_details=dashboard_details), - ) - yield Either(right=dashboard_request) - self.register_record(dashboard_request=dashboard_request) - except Exception as exc: - yield Either( - left=StackTraceError( - name=dashboard_details.id, - error=f"Error yielding dashboard for {dashboard_details}: {exc}", - stackTrace=traceback.format_exc(), ) - ) def yield_dashboard_lineage_details( self, dashboard_details: MstrDashboardDetails, db_service_name: str @@ -156,14 +169,15 @@ def yield_dashboard_chart( Returns: Iterable[CreateChartRequest] """ - try: - for chapter in dashboard_details.chapters: - for page in chapter.pages: - yield from self._yield_chart_from_visualization(page) + if dashboard_details: + try: + for chapter in dashboard_details.chapters: + for page in chapter.pages: + yield from self._yield_chart_from_visualization(page) - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Error creating dashboard: {exc}") + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning(f"Error creating dashboard: {exc}") def _yield_chart_from_visualization( self, page: MstrPage @@ -192,3 +206,8 @@ def _yield_chart_from_visualization( stackTrace=traceback.format_exc(), ) ) + + def close(self): + # close the api session + self.client.close_api_session() + self.metadata.close() diff --git a/ingestion/src/metadata/ingestion/source/dashboard/mstr/models.py b/ingestion/src/metadata/ingestion/source/dashboard/mstr/models.py index 0dc4ee839f97..372dfb92d0a6 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/mstr/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/mstr/models.py @@ -12,7 +12,7 @@ MSTR Models """ from datetime import datetime -from typing import List, Optional +from typing import Any, List, Optional from pydantic import BaseModel @@ -69,7 +69,8 @@ class MstrSearchResult(BaseModel): class MstrSearchResultList(BaseModel): - results: Optional[List[MstrSearchResult]] + totalItems: Optional[int] = 0 + result: Optional[List[MstrSearchResult]] = None class MstrDashboard(BaseModel): @@ -142,3 +143,8 @@ class MstrDashboardDetails(BaseModel): projectName: str currentChapter: str chapters: List[MstrChapter] + + +class AuthHeaderCookie(BaseModel): + auth_header: dict + auth_cookies: Any