From 4fc6f8c63619cdd81f7cd655a93aded0c26d2522 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Thu, 9 Jul 2020 08:39:05 +0200 Subject: [PATCH 1/4] :tada: Initial commit. --- metadata_service/proxy/atlas_proxy.py | 92 +++++++++++++++++++++++++-- requirements.txt | 2 +- 2 files changed, 89 insertions(+), 5 deletions(-) diff --git a/metadata_service/proxy/atlas_proxy.py b/metadata_service/proxy/atlas_proxy.py index 291a7ac6..1330e371 100644 --- a/metadata_service/proxy/atlas_proxy.py +++ b/metadata_service/proxy/atlas_proxy.py @@ -4,14 +4,15 @@ from typing import Any, Dict, List, Union, Optional from amundsen_common.models.popular_table import PopularTable -from amundsen_common.models.table import Column, Statistics, Table, Tag, User +from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader from amundsen_common.models.user import User as UserEntity from amundsen_common.models.dashboard import DashboardSummary from atlasclient.client import Atlas from atlasclient.exceptions import BadRequest from atlasclient.models import EntityUniqueAttribute from atlasclient.utils import (make_table_qualified_name, - parse_table_qualified_name) + parse_table_qualified_name, + extract_entities) from beaker.cache import CacheManager from beaker.util import parse_cache_config_options from flask import current_app as app @@ -41,6 +42,7 @@ class AtlasProxy(BaseProxy): STATISTICS_FORMAT_SPEC = app.config['STATISTICS_FORMAT_SPEC'] BOOKMARK_TYPE = 'Bookmark' USER_TYPE = 'User' + READER_TYPE = 'Reader' QN_KEY = 'qualifiedName' BOOKMARK_ACTIVE_KEY = 'active' GUID_KEY = 'guid' @@ -386,6 +388,7 @@ def get_table(self, *, table_uri: str) -> Table: description=attrs.get('description') or attrs.get('comment'), owners=[User(email=attrs.get('owner'))], columns=columns, + table_readers=self._get_readers(attrs.get(self.QN_KEY)), last_updated_timestamp=self._parse_date(table_details.get('updateTime'))) return table @@ -596,8 +599,42 @@ def get_table_by_user_relation(self, *, user_email: str, relation_type: UserReso return {'table': results} - def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, Any]: - pass + def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[PopularTable]]: + user = self._driver.entity_unique_attribute(self.USER_TYPE, qualifiedName=user_email).entity + + readers_guids = [] + for user_reads in user['relationshipAttributes'].get('entityReads'): + entity_status = user_reads['entityStatus'] + relationship_status = user_reads['relationshipStatus'] + + if entity_status == 'ACTIVE' and relationship_status == 'ACTIVE': + readers_guids.append(user_reads['guid']) + + readers = extract_entities(self._driver.entity_bulk(guid=readers_guids, ignoreRelationships=True)) + + _results = {} + for reader in readers: + entity_uri = reader.attributes.get(self.ENTITY_URI_KEY) + count = reader.attributes.get('count') + + if count: + details = self._extract_info_from_uri(table_uri=entity_uri) + + _results[count] = dict(cluster=details.get('cluster'), + name=details.get('name'), + schema=details.get('db'), + database=details.get('entity')) + + sorted_counts = sorted(_results.keys()) + + results = [] + for count in sorted_counts: + data: dict = _results.get(count, dict()) + table = PopularTable(**data) + + results.append(table) + + return {'table': results} def add_resource_relation_by_user(self, *, id: str, @@ -652,6 +689,53 @@ def _parse_date(self, date: int) -> Optional[int]: except Exception: return None + def _get_readers(self, qualified_name: str, top: Optional[int] = None) -> List[Reader]: + params = { + 'typeName': self.READER_TYPE, + 'offset': '0', + 'limit': top or 15, + 'excludeDeletedEntities': True, + 'entityFilters': { + 'condition': 'AND', + 'criterion': [ + { + 'attributeName': self.QN_KEY, + 'operator': 'STARTSWITH', + 'attributeValue': qualified_name.split('@')[0] + }, + { + 'attributeName': 'count', + 'operator': 'gte', + 'attributeValue': f'{app.config["POPULAR_TABLE_MINIMUM_READER_COUNT"]}' + } + ] + }, + 'attributes': ['count', self.QN_KEY], + 'sortBy': 'count', + 'sortOrder': 'DESCENDING' + } + + search_results = self._driver.search_basic.create(data=params, ignoreRelationships=False) + + readers = [] + + for record in search_results.entities: + readers.append(record.guid) + + results = [] + + if len(readers) > 0: + full_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False)) + + for r in full_entities: + reader = Reader(user=User(email=r.relationshipAttributes['user']['displayText'], + user_id=r.relationshipAttributes['user']['displayText']), + read_count=r.attributes['count']) + + results.append(reader) + + return results + def get_dashboard(self, dashboard_uri: str, ) -> DashboardDetailEntity: diff --git a/requirements.txt b/requirements.txt index 78073dd8..41c4f56b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -67,7 +67,7 @@ neotime==1.7.1 pytz==2018.4 requests-aws4auth==0.9 statsd==3.2.1 -pyatlasclient==1.0.3 +pyatlasclient==1.0.4 beaker>=1.10.0 mocket==3.7.3 overrides==2.5 From 046dce214de1835a5ff0631b9dccbba9b4923989 Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 10 Jul 2020 14:24:35 +0200 Subject: [PATCH 2/4] :recycle: Refactoring code. --- metadata_service/proxy/atlas_proxy.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/metadata_service/proxy/atlas_proxy.py b/metadata_service/proxy/atlas_proxy.py index 1330e371..474b3e33 100644 --- a/metadata_service/proxy/atlas_proxy.py +++ b/metadata_service/proxy/atlas_proxy.py @@ -725,12 +725,12 @@ def _get_readers(self, qualified_name: str, top: Optional[int] = None) -> List[R results = [] if len(readers) > 0: - full_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False)) + read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False)) - for r in full_entities: - reader = Reader(user=User(email=r.relationshipAttributes['user']['displayText'], - user_id=r.relationshipAttributes['user']['displayText']), - read_count=r.attributes['count']) + for read_entity in read_entities: + reader = Reader(user=User(email=read_entity.relationshipAttributes['user']['displayText'], + user_id=read_entity.relationshipAttributes['user']['displayText']), + read_count=read_entity.attributes['count']) results.append(reader) From 61e901846bbd66501b9f38d3ffa2517d8a15fc4b Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Fri, 10 Jul 2020 14:43:03 +0200 Subject: [PATCH 3/4] :ok_hand: Updating code due to code review changes. --- metadata_service/proxy/atlas_proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata_service/proxy/atlas_proxy.py b/metadata_service/proxy/atlas_proxy.py index 474b3e33..b744c01c 100644 --- a/metadata_service/proxy/atlas_proxy.py +++ b/metadata_service/proxy/atlas_proxy.py @@ -724,7 +724,7 @@ def _get_readers(self, qualified_name: str, top: Optional[int] = None) -> List[R results = [] - if len(readers) > 0: + if readers: read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False)) for read_entity in read_entities: From 81aa8d1231faccf9260ebc39c91338f3b0d7e3ab Mon Sep 17 00:00:00 2001 From: mgorsk1 Date: Sat, 11 Jul 2020 14:13:08 +0200 Subject: [PATCH 4/4] :ok_hand: Updating code due to code review changes. (tests) --- metadata_service/proxy/atlas_proxy.py | 4 +- tests/unit/proxy/fixtures/atlas_test_data.py | 86 ++++++++++++++++++++ tests/unit/proxy/test_atlas_proxy.py | 43 +++++++++- 3 files changed, 128 insertions(+), 5 deletions(-) diff --git a/metadata_service/proxy/atlas_proxy.py b/metadata_service/proxy/atlas_proxy.py index b744c01c..e84505bd 100644 --- a/metadata_service/proxy/atlas_proxy.py +++ b/metadata_service/proxy/atlas_proxy.py @@ -689,11 +689,11 @@ def _parse_date(self, date: int) -> Optional[int]: except Exception: return None - def _get_readers(self, qualified_name: str, top: Optional[int] = None) -> List[Reader]: + def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Reader]: params = { 'typeName': self.READER_TYPE, 'offset': '0', - 'limit': top or 15, + 'limit': top, 'excludeDeletedEntities': True, 'entityFilters': { 'condition': 'AND', diff --git a/tests/unit/proxy/fixtures/atlas_test_data.py b/tests/unit/proxy/fixtures/atlas_test_data.py index 8b94aab0..3caa25a4 100644 --- a/tests/unit/proxy/fixtures/atlas_test_data.py +++ b/tests/unit/proxy/fixtures/atlas_test_data.py @@ -1,6 +1,13 @@ import copy +class DottedDict(dict): + """dot.notation access to dictionary attributes""" + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + + class Data: entity_type = 'hive_table' column_type = 'hive_column' @@ -166,3 +173,82 @@ class Data: bookmark_entity2, ] } + + user_entity_1 = { + "typeName": "User", + "attributes": { + "qualifiedName": "test_user_1" + }, + "guid": "", + "status": "ACTIVE", + "displayText": 'test_user_1', + "classificationNames": [], + "meaningNames": [], + "meanings": [] + } + + user_entity_2 = { + "typeName": "User", + "attributes": { + "qualifiedName": "test_user_2" + }, + "guid": "", + "status": "ACTIVE", + "displayText": 'test_user_2', + "classificationNames": [], + "meaningNames": [], + "meanings": [], + "relationshipAttributes": { + "entityReads": [ + { + "entityStatus": "ACTIVE", + "relationshipStatus": "ACTIVE", + "guid": "1" + }, + { + "entityStatus": "INACTIVE", + "relationshipStatus": "ACTIVE", + "guid": "2" + }, + { + "entityStatus": "ACTIVE", + "relationshipStatus": "INACTIVE", + "guid": "3" + } + ] + } + } + + reader_entity_1 = { + "typeName": "Reader", + "attributes": { + "count": 5, + "qualifiedName": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_1', cluster), + "entityUri": f"hive_table://{cluster}.{db}/Table1", + }, + "guid": "1", + "status": "ACTIVE", + "displayText": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user', cluster), + "classificationNames": [], + "meaningNames": [], + "meanings": [], + "relationshipAttributes": {"user": user_entity_1} + } + + reader_entity_2 = { + "typeName": "Reader", + "attributes": { + "count": 150, + "qualifiedName": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_2', cluster), + "entityUri": f"hive_table://{cluster}.{db}/Table1", + }, + "guid": "2", + "status": "ACTIVE", + "displayText": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_2', cluster), + "classificationNames": [], + "meaningNames": [], + "meanings": [], + "relationshipAttributes": {"user": user_entity_2} + } + + reader_entities = [DottedDict(reader_entity) for reader_entity in [reader_entity_1, reader_entity_2]] diff --git a/tests/unit/proxy/test_atlas_proxy.py b/tests/unit/proxy/test_atlas_proxy.py index 4db0d7a4..2cde4ded 100644 --- a/tests/unit/proxy/test_atlas_proxy.py +++ b/tests/unit/proxy/test_atlas_proxy.py @@ -1,12 +1,12 @@ import copy import unittest -from typing import Any, Dict, Optional, cast +from typing import Any, Dict, Optional, cast, List from amundsen_common.models.popular_table import PopularTable -from amundsen_common.models.table import Column, Statistics, Table, Tag, User +from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader from atlasclient.exceptions import BadRequest from mock import MagicMock, patch -from tests.unit.proxy.fixtures.atlas_test_data import Data +from tests.unit.proxy.fixtures.atlas_test_data import Data, DottedDict from metadata_service import create_app from metadata_service.entity.tag_detail import TagDetail @@ -303,6 +303,43 @@ def test_delete_resource_relation_by_user(self) -> None: resource_type=ResourceType.Table) mock_execute.assert_called_with() + def test_get_readers(self) -> None: + basic_search_result = MagicMock() + basic_search_result.entities = self.reader_entities + + self.proxy._driver.search_basic.create = MagicMock(return_value=basic_search_result) + + entity_bulk_result = MagicMock() + entity_bulk_result.entities = self.reader_entities + self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result]) + + res = self.proxy._get_readers('dummy', 1) + + expected: List[Reader] = [] + + expected += [Reader(user=User(email='test_user_1', user_id='test_user_1'), read_count=5)] + expected += [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)] + + self.assertEqual(res, expected) + + def test_get_frequently_used_tables(self) -> None: + entity_unique_attribute_result = MagicMock() + entity_unique_attribute_result.entity = DottedDict(self.user_entity_2) + self.proxy._driver.entity_unique_attribute = MagicMock(return_value=entity_unique_attribute_result) + + entity_bulk_result = MagicMock() + entity_bulk_result.entities = [DottedDict(self.reader_entity_1)] + self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result]) + + expected = {'table': [PopularTable(cluster=self.cluster, + name='Table1', + schema=self.db, + database=self.entity_type)]} + + res = self.proxy.get_frequently_used_tables(user_email='dummy') + + self.assertEqual(expected, res) + if __name__ == '__main__': unittest.main()