Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#11799 - Fix Airfow ownership & add pipeline tasks #14510

Merged
merged 11 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions ingestion/src/metadata/ingestion/ometa/mixins/es_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ def _search_es_entity(

return None

def _get_entity_from_es(
self, entity: Type[T], query_string: str, fields: Optional[list] = None
) -> Optional[T]:
"""Fetch an entity instance from ES"""

try:
entity_list = self._search_es_entity(
entity_type=entity, query_string=query_string, fields=fields
)
for instance in entity_list or []:
return instance
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(f"Could not get {entity.__name__} info from ES due to {err}")

return None

def es_search_from_fqn(
self,
entity_type: Type[T],
Expand Down
139 changes: 117 additions & 22 deletions ingestion/src/metadata/ingestion/ometa/mixins/user_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@

To be used by OpenMetadata class
"""
import traceback
from functools import lru_cache
from typing import Optional
from typing import Optional, Type

from metadata.generated.schema.entity.teams.team import Team
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import T
from metadata.ingestion.ometa.client import REST
from metadata.utils.constants import ENTITY_REFERENCE_TYPE_MAP
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.logger import ometa_logger

Expand All @@ -34,42 +37,134 @@ class OMetaUserMixin:

client: REST

email_search = (
"/search/query?q=email.keyword:{email}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[User.__name__]
)
@staticmethod
def email_search_query_es(entity: Type[T]) -> str:
return (
"/search/query?q=email.keyword:{email}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[entity.__name__]
)

@lru_cache(maxsize=None)
def get_user_by_email(
@staticmethod
def name_search_query_es(entity: Type[T]) -> str:
"""
Allow for more flexible lookup following what the UI is doing when searching users.

We don't want to stick to `q=name:{name}` since in case a user is named `random.user`
but looked as `Random User`, we want to find this match.
"""
return (
"/search/query?q={name}&from={from_}&size={size}&index="
+ ES_INDEX_MAP[entity.__name__]
)

def _search_by_email(
self,
entity: Type[T],
email: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[User]:
) -> Optional[T]:
"""
GET user entity by name
GET user or team entity by mail

Args:
email: user email to search
from_count: records to expect
size: number of records
fields: Optional field list to pass to ES request
"""
if email:
query_string = self.email_search.format(
query_string = self.email_search_query_es(entity=entity).format(
email=email, from_=from_count, size=size
)
return self._get_entity_from_es(
entity=entity, query_string=query_string, fields=fields
)

return None

def _search_by_name(
self,
entity: Type[T],
name: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[T]:
"""
GET entity by name

Args:
name: user name to search
from_count: records to expect
size: number of records
fields: Optional field list to pass to ES request
"""
if name:
query_string = self.name_search_query_es(entity=entity).format(
name=name, from_=from_count, size=size
)
return self._get_entity_from_es(
entity=entity, query_string=query_string, fields=fields
)

try:
entity_list = self._search_es_entity(
entity_type=User, query_string=query_string, fields=fields
)
for user in entity_list or []:
return user
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(
f"Could not get user info from ES for user email {email} due to {err}"
)
return None

@lru_cache(maxsize=None)
def get_reference_by_email(
self,
email: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[EntityReference]:
"""
Get a User or Team Entity Reference by searching by its mail
"""
maybe_user = self._search_by_email(
entity=User, email=email, from_count=from_count, size=size, fields=fields
)
if maybe_user:
return EntityReference(
id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__]
)

maybe_team = self._search_by_email(
entity=Team, email=email, from_count=from_count, size=size, fields=fields
)
if maybe_team:
return EntityReference(
id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__]
)

return None

@lru_cache(maxsize=None)
def get_reference_by_name(
self,
name: Optional[str],
from_count: int = 0,
size: int = 1,
fields: Optional[list] = None,
) -> Optional[EntityReference]:
"""
Get a User or Team Entity Reference by searching by its name
"""
maybe_user = self._search_by_name(
entity=User, name=name, from_count=from_count, size=size, fields=fields
)
if maybe_user:
return EntityReference(
id=maybe_user.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[User.__name__]
)

maybe_team = self._search_by_name(
entity=Team, name=name, from_count=from_count, size=size, fields=fields
)
if maybe_team:
return EntityReference(
id=maybe_team.id.__root__, type=ENTITY_REFERENCE_TYPE_MAP[Team.__name__]
)

return None
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,7 @@ def get_owner_details(
try:
owner_details = self.client.domo.users_get(owner.id)
if owner_details.get("email"):
user = self.metadata.get_user_by_email(owner_details["email"])
if user:
return EntityReference(id=user.id.__root__, type="user")
logger.warning(
f"No user found with email [{owner_details['email']}] in OMD"
)
return self.metadata.get_reference_by_email(owner_details["email"])
except Exception as exc:
logger.warning(
f"Error while getting details of user {owner.displayName} - {exc}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,9 +634,7 @@ def get_owner_details(
try:
if dashboard_details.user_id is not None:
dashboard_owner = self.client.user(dashboard_details.user_id)
user = self.metadata.get_user_by_email(dashboard_owner.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(dashboard_owner.email)

except Exception as err:
logger.debug(traceback.format_exc())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ def get_dashboard_details(self, dashboard: dict) -> dict:
def get_owner_details(self, dashboard_details) -> Optional[EntityReference]:
"""Get owner from mail"""
if dashboard_details.get("user") and dashboard_details["user"].get("email"):
user = self.metadata.get_user_by_email(
return self.metadata.get_reference_by_email(
dashboard_details["user"].get("email")
)
if user:
return EntityReference(id=user.id.__root__, type="user")
return None

def get_dashboard_url(self, dashboard_details: dict) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,7 @@ def get_dashboard_details(

def _get_user_by_email(self, email: Optional[str]) -> Optional[EntityReference]:
if email:
user = self.metadata.get_user_by_email(email)
if user:
return EntityReference(id=user.id.__root__, type="user")

return self.metadata.get_reference_by_email(email)
return None

def get_owner_details(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,7 @@ def get_owner_details(
) -> Optional[EntityReference]:
"""Get dashboard owner from email"""
if dashboard_details.owner and dashboard_details.owner.email:
user = self.metadata.get_user_by_email(dashboard_details.owner.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(dashboard_details.owner.email)
return None

def yield_tag(self, *_, **__) -> Iterable[Either[OMetaTagAndClassification]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,7 @@ def get_owners(self, owner: Owner) -> Optional[EntityReference]:
try:
owner_details = User(**self.domo_client.users_get(owner.id))
if owner_details.email:
user = self.metadata.get_user_by_email(owner_details.email)
if user:
return EntityReference(id=user.id.__root__, type="user")
return self.metadata.get_reference_by_email(owner_details.email)
except Exception as exc:
logger.warning(f"Error while getting details of user {owner.name} - {exc}")
return None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1054,9 +1054,9 @@ def ingest_pipelines(self) -> Iterable[Either[Pipeline]]:
for pipeline in self.pipelines["pipelines"]:
owner = None
if pipeline.get("owner"):
user = self.metadata.get_user_by_email(email=pipeline.get("owner"))
if user:
owner = EntityReference(id=user.id.__root__, type="user")
owner = self.metadata.get_reference_by_email(
email=pipeline.get("owner")
)
pipeline_ev = CreatePipelineRequest(
name=pipeline["name"],
displayName=pipeline["displayName"],
Expand Down
Loading
Loading