Skip to content

Commit

Permalink
feat: Data Owner Implementation of Atlas Proxy (amundsen-io#156)
Browse files Browse the repository at this point in the history
* Implements the Data Owner functionality for the Table

Signed-off-by: verdan <verdan.mahmood@gmail.com>

Updates

Signed-off-by: verdan <verdan.mahmood@gmail.com>

Refactoring - Code Reviews

Signed-off-by: verdan <verdan.mahmood@gmail.com>

Refactoring

Signed-off-by: verdan <verdan.mahmood@gmail.com>

Updates the test cases, based on owner changes

Signed-off-by: verdan <verdan.mahmood@gmail.com>

Updates the lambda function definition

Signed-off-by: verdan <verdan.mahmood@gmail.com>

* Code Review Changes

Signed-off-by: verdan <verdan.mahmood@gmail.com>
  • Loading branch information
verdan authored Aug 19, 2020
1 parent 14eb9ba commit 48b4c71
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 50 deletions.
6 changes: 2 additions & 4 deletions docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@ Most of the configurations are set through Flask [Config Class](https://github.c

#### USER_DETAIL_METHOD `OPTIONAL`
This is a method that can be used to get the user details from any third-party or custom system.
This custom function takes user_id as a parameter, and returns a tuple consisting user details defined in [UserSchema](https://github.com/lyft/amundsencommon/blob/master/amundsen_common/models/user.py) along with the status code.
This custom function takes user_id as a parameter, and returns a dictionary consisting user details' fields defined in [UserSchema](https://github.com/lyft/amundsencommon/blob/master/amundsen_common/models/user.py).

Example:
```python

def get_user_details(user_id):
from amundsen_common.models.user import UserSchema
from http import HTTPStatus
user_info = {
'email': 'test@email.com',
'user_id': user_id,
'first_name': 'Firstname',
'last_name': 'Lastname',
'full_name': 'Firstname Lastname',
}
return UserSchema().dump(user_info).data, HTTPStatus.OK
return user_info

USER_DETAIL_METHOD = get_user_details
```
Expand Down
3 changes: 2 additions & 1 deletion metadata_service/api/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ def __init__(self) -> None:
def get(self, *, id: Optional[str] = None) -> Iterable[Union[Mapping, int, None]]:
if app.config['USER_DETAIL_METHOD']:
try:
return app.config['USER_DETAIL_METHOD'](id)
user_data = app.config['USER_DETAIL_METHOD'](id)
return UserSchema().dump(user_data).data, HTTPStatus.OK
except Exception:
LOGGER.exception('UserDetailAPI GET Failed - Using "USER_DETAIL_METHOD" config variable')
return {'message': 'user_id {} fetch failed'.format(id)}, HTTPStatus.NOT_FOUND
Expand Down
137 changes: 98 additions & 39 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from random import randint
from typing import Any, Dict, List, Union, Optional

from amundsen_common.models.dashboard import DashboardSummary
from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader,\
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader, \
ProgrammaticDescription, ResourceReport
from amundsen_common.models.user import User as UserEntity
from amundsen_common.models.dashboard import DashboardSummary
from atlasclient.client import Atlas
from atlasclient.exceptions import BadRequest
from atlasclient.exceptions import BadRequest, Conflict, NotFound
from atlasclient.models import EntityUniqueAttribute
from atlasclient.utils import (make_table_qualified_name,
parse_table_qualified_name,
Expand All @@ -23,8 +23,8 @@

from metadata_service.entity.dashboard_detail import DashboardDetail as DashboardDetailEntity
from metadata_service.entity.description import Description
from metadata_service.entity.tag_detail import TagDetail
from metadata_service.entity.resource_type import ResourceType
from metadata_service.entity.tag_detail import TagDetail
from metadata_service.exception import NotFoundException
from metadata_service.proxy import BaseProxy
from metadata_service.util import UserResourceRel
Expand All @@ -35,6 +35,11 @@
_ATLAS_PROXY_CACHE_EXPIRY_SEC = 11 * 60 * 60 + randint(0, 3600)


class Status:
ACTIVE = "ACTIVE"
DELETED = "DELETED"


# noinspection PyMethodMayBeStatic
class AtlasProxy(BaseProxy):
"""
Expand All @@ -49,11 +54,11 @@ class AtlasProxy(BaseProxy):
READER_TYPE = 'Reader'
QN_KEY = 'qualifiedName'
BOOKMARK_ACTIVE_KEY = 'active'
ENTITY_ACTIVE_STATUS = 'ACTIVE'
GUID_KEY = 'guid'
ATTRS_KEY = 'attributes'
REL_ATTRS_KEY = 'relationshipAttributes'
ENTITY_URI_KEY = 'entityUri'
user_detail_method = app.config.get('USER_DETAIL_METHOD') or (lambda *args: None)
_CACHE = CacheManager(**parse_cache_config_options({'cache.regions': 'atlas_proxy',
'cache.atlas_proxy.type': 'memory',
'cache.atlas_proxy.expire': _ATLAS_PROXY_CACHE_EXPIRY_SEC}))
Expand Down Expand Up @@ -355,7 +360,7 @@ def _get_reports(self, guids: List[str]) -> List[ResourceReport]:
report_entities_collection = self._driver.entity_bulk(guid=guids)
for report_entity in extract_entities(report_entities_collection):
try:
if report_entity.status == self.ENTITY_ACTIVE_STATUS:
if report_entity.status == Status.ACTIVE:
report_attrs = report_entity.attributes
reports.append(
ResourceReport(
Expand All @@ -372,6 +377,23 @@ def _get_reports(self, guids: List[str]) -> List[ResourceReport]:

return parsed_reports

def _get_owners(self, data_owners: list, fallback_owner: str) -> List[User]:
owners_detail = list()
active_owners = filter(lambda item:
item['entityStatus'] == Status.ACTIVE and
item['relationshipStatus'] == Status.ACTIVE,
data_owners)

for owner in active_owners:
owner_qn = owner['displayText']
owner_data = self.user_detail_method(owner_qn) or {
'email': owner_qn,
'user_id': owner_qn
}
owners_detail.append(User(**owner_data))

return owners_detail or [User(email=fallback_owner, user_id=fallback_owner)]

def get_user(self, *, id: str) -> Union[UserEntity, None]:
pass

Expand All @@ -391,15 +413,15 @@ def get_table(self, *, table_uri: str) -> Table:
try:
attrs = table_details[self.ATTRS_KEY]

programmatic_descriptions = self._get_programmatic_descriptions(attrs.get('parameters'))
programmatic_descriptions = self._get_programmatic_descriptions(attrs.get('parameters', dict()))

table_qn = parse_table_qualified_name(
qualified_name=attrs.get(self.QN_KEY)
)

tags = []
# Using or in case, if the key 'classifications' is there with a None
for classification in table_details.get("classifications") or list():
for classification in table_details.get('classifications') or list():
tags.append(
Tag(
tag_name=classification.get('typeName'),
Expand All @@ -420,7 +442,7 @@ def get_table(self, *, table_uri: str) -> Table:
name=attrs.get('name') or table_qn.get("table_name", ''),
tags=tags,
description=attrs.get('description') or attrs.get('comment'),
owners=[User(email=attrs.get('owner'))],
owners=self._get_owners(table_details[self.REL_ATTRS_KEY].get('ownedBy'), attrs.get('owner')),
resource_reports=self._get_reports(guids=reports_guids),
columns=columns,
is_view=is_view,
Expand All @@ -437,20 +459,71 @@ def get_table(self, *, table_uri: str) -> Table:
.format(table_uri=table_uri))

def delete_owner(self, *, table_uri: str, owner: str) -> None:
pass
"""
:param table_uri:
:param owner:
:return:
"""
table = self._get_table_entity(table_uri=table_uri)
table_entity = table.entity

if table_entity[self.REL_ATTRS_KEY].get("ownedBy"):
try:
active_owners = filter(lambda item:
item['relationshipStatus'] == Status.ACTIVE
and item['displayText'] == owner,
table_entity[self.REL_ATTRS_KEY]['ownedBy'])
if list(active_owners):
self._driver.relationship_guid(next(active_owners)
.get('relationshipGuid')).delete()
else:
raise BadRequest('You can not delete this owner.')
except NotFound as ex:
LOGGER.exception('Error while removing table data owner. {}'
.format(str(ex)))

def add_owner(self, *, table_uri: str, owner: str) -> None:
"""
It simply replaces the owner field in atlas with the new string.
FixMe (Verdan): Implement multiple data owners and
atlas changes in the documentation if needed to make owner field a list
Query on Atlas User entity to find if the entity exist for the
owner string in parameter, if not create one. And then use that User
entity's GUID and add a relationship between Table and User, on ownedBy field.
:param table_uri:
:param owner: Email address of the owner
:return: None, as it simply adds the owner.
"""
entity = self._get_table_entity(table_uri=table_uri)
entity.entity[self.ATTRS_KEY]['owner'] = owner
entity.update()
if not (self.user_detail_method(owner) or owner):
raise NotFoundException(f'User "{owner}" does not exist.')

user_dict = {
"entity": {
"typeName": "User",
"attributes": {"qualifiedName": owner},
}
}

# Get or Create a User
user_entity = self._driver.entity_post.create(data=user_dict)
user_guid = next(iter(user_entity.get("guidAssignments").values()))

table = self._get_table_entity(table_uri=table_uri)

entity_def = {
"typeName": "DataSet_Users_Owner",
"end1": {
"guid": table.entity.get("guid"), "typeName": "Table",
},
"end2": {
"guid": user_guid, "typeName": "User",
},
}
try:
self._driver.relationship.create(data=entity_def)
except Conflict as ex:
LOGGER.exception('Error while adding the owner information. {}'
.format(str(ex)))
raise BadRequest(f'User {owner} is already added as a data owner for '
f'table {table_uri}.')

def get_table_description(self, *,
table_uri: str) -> Union[str, None]:
Expand Down Expand Up @@ -644,7 +717,7 @@ def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[Popul
entity_status = user_reads['entityStatus']
relationship_status = user_reads['relationshipStatus']

if entity_status == 'ACTIVE' and relationship_status == 'ACTIVE':
if entity_status == Status.ACTIVE and relationship_status == Status.ACTIVE:
readers_guids.append(user_reads['guid'])

readers = extract_entities(self._driver.entity_bulk(guid=readers_guids, ignoreRelationships=True))
Expand Down Expand Up @@ -682,16 +755,7 @@ def add_resource_relation_by_user(self, *,
if resource_type is not ResourceType.Table:
raise NotImplemented('resource type {} is not supported'.format(resource_type))

self._add_table_relation_by_user(table_uri=id,
user_email=user_id,
relation_type=relation_type)

def _add_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:

entity = self._get_bookmark_entity(entity_uri=table_uri, user_id=user_email)
entity = self._get_bookmark_entity(entity_uri=id, user_id=user_id)
entity.entity[self.ATTRS_KEY][self.BOOKMARK_ACTIVE_KEY] = True
entity.update()

Expand All @@ -703,15 +767,7 @@ def delete_resource_relation_by_user(self, *,
if resource_type is not ResourceType.Table:
raise NotImplemented('resource type {} is not supported'.format(resource_type))

self._delete_table_relation_by_user(table_uri=id,
user_email=user_id,
relation_type=relation_type)

def _delete_table_relation_by_user(self, *,
table_uri: str,
user_email: str,
relation_type: UserResourceRel) -> None:
entity = self._get_bookmark_entity(entity_uri=table_uri, user_id=user_email)
entity = self._get_bookmark_entity(entity_uri=id, user_id=user_id)
entity.entity[self.ATTRS_KEY][self.BOOKMARK_ACTIVE_KEY] = False
entity.update()

Expand Down Expand Up @@ -765,9 +821,12 @@ def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Rea
read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False))

for read_entity in read_entities:
reader = Reader(user=User(email=read_entity.relationshipAttributes['user']['displayText'],
user_id=read_entity.relationshipAttributes['user']['displayText']),
read_count=read_entity.attributes['count'])
reader_qn = read_entity.relationshipAttributes['user']['displayText']
reader_details = self.user_detail_method(reader_qn) or {
'email': reader_qn,
'user_id': reader_qn
}
reader = Reader(user=User(**reader_details), read_count=read_entity.attributes['count'])

results.append(reader)

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ neotime==1.7.1
pytz==2018.4
requests-aws4auth==0.9
statsd==3.2.1
pyatlasclient==1.0.4
pyatlasclient==1.0.5
beaker>=1.10.0
mocket==3.7.3
overrides==2.5
Expand Down
18 changes: 13 additions & 5 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ def _get_table(self, custom_stats_format: bool = False) -> None:
test_exp_col = self.test_exp_col_stats_formatted
else:
test_exp_col = self.test_exp_col_stats_raw

ent_attrs = cast(dict, self.entity1['attributes'])
self._mock_get_table_entity()
self._create_mocked_report_entities_collection()
self.proxy._get_owners = MagicMock(return_value=[User(email=ent_attrs['owner'])]) # type: ignore
self.proxy._driver.entity_bulk = MagicMock(return_value=self.report_entity_collection)
response = self.proxy.get_table(table_uri=self.table_uri)

classif_name = self.classification_entity['classifications'][0]['typeName']
ent_attrs = cast(dict, self.entity1['attributes'])

col_attrs = cast(dict, self.test_column['attributes'])
exp_col_stats = list()
Expand Down Expand Up @@ -257,10 +257,18 @@ def test_delete_tag(self) -> None:

def test_add_owner(self) -> None:
owner = "OWNER"
entity = self._mock_get_table_entity()
with patch.object(entity, 'update') as mock_execute:
user_guid = 123
self._mock_get_table_entity()
self.proxy._driver.entity_post = MagicMock()
self.proxy._driver.entity_post.create = MagicMock(return_value={"guidAssignments": {user_guid: user_guid}})

with patch.object(self.proxy._driver.relationship, 'create') as mock_execute:
self.proxy.add_owner(table_uri=self.table_uri, owner=owner)
mock_execute.assert_called_with()
mock_execute.assert_called_with(
data={'typeName': 'DataSet_Users_Owner',
'end1': {'guid': self.entity1['guid'], 'typeName': 'Table'},
'end2': {'guid': user_guid, 'typeName': 'User'}}
)

def test_get_column(self) -> None:
self._mock_get_table_entity()
Expand Down

0 comments on commit 48b4c71

Please sign in to comment.