Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Frequent Users feature in [AtlasProxy] #147

Merged
merged 4 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 88 additions & 4 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from typing import Any, Dict, List, Union, Optional

from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Statistics, Table, Tag, User
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader
from amundsen_common.models.user import User as UserEntity
from amundsen_common.models.dashboard import DashboardSummary
from atlasclient.client import Atlas
from atlasclient.exceptions import BadRequest
from atlasclient.models import EntityUniqueAttribute
from atlasclient.utils import (make_table_qualified_name,
parse_table_qualified_name)
parse_table_qualified_name,
extract_entities)
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from flask import current_app as app
Expand Down Expand Up @@ -41,6 +42,7 @@ class AtlasProxy(BaseProxy):
STATISTICS_FORMAT_SPEC = app.config['STATISTICS_FORMAT_SPEC']
BOOKMARK_TYPE = 'Bookmark'
USER_TYPE = 'User'
READER_TYPE = 'Reader'
QN_KEY = 'qualifiedName'
BOOKMARK_ACTIVE_KEY = 'active'
GUID_KEY = 'guid'
Expand Down Expand Up @@ -386,6 +388,7 @@ def get_table(self, *, table_uri: str) -> Table:
description=attrs.get('description') or attrs.get('comment'),
owners=[User(email=attrs.get('owner'))],
columns=columns,
table_readers=self._get_readers(attrs.get(self.QN_KEY)),
last_updated_timestamp=self._parse_date(table_details.get('updateTime')))

return table
Expand Down Expand Up @@ -596,8 +599,42 @@ def get_table_by_user_relation(self, *, user_email: str, relation_type: UserReso

return {'table': results}

def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, Any]:
pass
def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[PopularTable]]:
mgorsk1 marked this conversation as resolved.
Show resolved Hide resolved
user = self._driver.entity_unique_attribute(self.USER_TYPE, qualifiedName=user_email).entity

readers_guids = []
for user_reads in user['relationshipAttributes'].get('entityReads'):
entity_status = user_reads['entityStatus']
relationship_status = user_reads['relationshipStatus']

if entity_status == 'ACTIVE' and relationship_status == 'ACTIVE':
readers_guids.append(user_reads['guid'])

readers = extract_entities(self._driver.entity_bulk(guid=readers_guids, ignoreRelationships=True))

_results = {}
for reader in readers:
entity_uri = reader.attributes.get(self.ENTITY_URI_KEY)
count = reader.attributes.get('count')

if count:
details = self._extract_info_from_uri(table_uri=entity_uri)

_results[count] = dict(cluster=details.get('cluster'),
name=details.get('name'),
schema=details.get('db'),
database=details.get('entity'))

sorted_counts = sorted(_results.keys())

results = []
for count in sorted_counts:
data: dict = _results.get(count, dict())
table = PopularTable(**data)

results.append(table)

return {'table': results}

def add_resource_relation_by_user(self, *,
id: str,
Expand Down Expand Up @@ -652,6 +689,53 @@ def _parse_date(self, date: int) -> Optional[int]:
except Exception:
return None

def _get_readers(self, qualified_name: str, top: Optional[int] = 15) -> List[Reader]:
params = {
'typeName': self.READER_TYPE,
'offset': '0',
'limit': top,
'excludeDeletedEntities': True,
'entityFilters': {
'condition': 'AND',
'criterion': [
{
'attributeName': self.QN_KEY,
'operator': 'STARTSWITH',
'attributeValue': qualified_name.split('@')[0]
},
{
'attributeName': 'count',
'operator': 'gte',
'attributeValue': f'{app.config["POPULAR_TABLE_MINIMUM_READER_COUNT"]}'
}
]
},
'attributes': ['count', self.QN_KEY],
'sortBy': 'count',
'sortOrder': 'DESCENDING'
}

search_results = self._driver.search_basic.create(data=params, ignoreRelationships=False)

readers = []

for record in search_results.entities:
mgorsk1 marked this conversation as resolved.
Show resolved Hide resolved
readers.append(record.guid)

results = []

if readers:
read_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False))

for read_entity in read_entities:
reader = Reader(user=User(email=read_entity.relationshipAttributes['user']['displayText'],
user_id=read_entity.relationshipAttributes['user']['displayText']),
read_count=read_entity.attributes['count'])

results.append(reader)

return results

def get_dashboard(self,
dashboard_uri: str,
) -> DashboardDetailEntity:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ neotime==1.7.1
pytz==2018.4
requests-aws4auth==0.9
statsd==3.2.1
pyatlasclient==1.0.3
pyatlasclient==1.0.4
beaker>=1.10.0
mocket==3.7.3
overrides==2.5
Expand Down
86 changes: 86 additions & 0 deletions tests/unit/proxy/fixtures/atlas_test_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import copy


class DottedDict(dict):
"""dot.notation access to dictionary attributes"""
__getattr__ = dict.get
__setattr__ = dict.__setitem__
__delattr__ = dict.__delitem__


class Data:
entity_type = 'hive_table'
column_type = 'hive_column'
Expand Down Expand Up @@ -166,3 +173,82 @@ class Data:
bookmark_entity2,
]
}

user_entity_1 = {
"typeName": "User",
"attributes": {
"qualifiedName": "test_user_1"
},
"guid": "",
"status": "ACTIVE",
"displayText": 'test_user_1',
"classificationNames": [],
"meaningNames": [],
"meanings": []
}

user_entity_2 = {
"typeName": "User",
"attributes": {
"qualifiedName": "test_user_2"
},
"guid": "",
"status": "ACTIVE",
"displayText": 'test_user_2',
"classificationNames": [],
"meaningNames": [],
"meanings": [],
"relationshipAttributes": {
"entityReads": [
{
"entityStatus": "ACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "1"
},
{
"entityStatus": "INACTIVE",
"relationshipStatus": "ACTIVE",
"guid": "2"
},
{
"entityStatus": "ACTIVE",
"relationshipStatus": "INACTIVE",
"guid": "3"
}
]
}
}

reader_entity_1 = {
"typeName": "Reader",
"attributes": {
"count": 5,
"qualifiedName": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_1', cluster),
"entityUri": f"hive_table://{cluster}.{db}/Table1",
},
"guid": "1",
"status": "ACTIVE",
"displayText": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user', cluster),
"classificationNames": [],
"meaningNames": [],
"meanings": [],
"relationshipAttributes": {"user": user_entity_1}
}

reader_entity_2 = {
"typeName": "Reader",
"attributes": {
"count": 150,
"qualifiedName": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_2', cluster),
"entityUri": f"hive_table://{cluster}.{db}/Table1",
},
"guid": "2",
"status": "ACTIVE",
"displayText": '{}.{}.{}.reader@{}'.format(db, 'Table1', 'test_user_2', cluster),
"classificationNames": [],
"meaningNames": [],
"meanings": [],
"relationshipAttributes": {"user": user_entity_2}
}

reader_entities = [DottedDict(reader_entity) for reader_entity in [reader_entity_1, reader_entity_2]]
43 changes: 40 additions & 3 deletions tests/unit/proxy/test_atlas_proxy.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import copy
import unittest
from typing import Any, Dict, Optional, cast
from typing import Any, Dict, Optional, cast, List

from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Statistics, Table, Tag, User
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader
from atlasclient.exceptions import BadRequest
from mock import MagicMock, patch
from tests.unit.proxy.fixtures.atlas_test_data import Data
from tests.unit.proxy.fixtures.atlas_test_data import Data, DottedDict

from metadata_service import create_app
from metadata_service.entity.tag_detail import TagDetail
Expand Down Expand Up @@ -303,6 +303,43 @@ def test_delete_resource_relation_by_user(self) -> None:
resource_type=ResourceType.Table)
mock_execute.assert_called_with()

def test_get_readers(self) -> None:
basic_search_result = MagicMock()
basic_search_result.entities = self.reader_entities

self.proxy._driver.search_basic.create = MagicMock(return_value=basic_search_result)

entity_bulk_result = MagicMock()
entity_bulk_result.entities = self.reader_entities
self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result])

res = self.proxy._get_readers('dummy', 1)

expected: List[Reader] = []

expected += [Reader(user=User(email='test_user_1', user_id='test_user_1'), read_count=5)]
expected += [Reader(user=User(email='test_user_2', user_id='test_user_2'), read_count=150)]

self.assertEqual(res, expected)

def test_get_frequently_used_tables(self) -> None:
entity_unique_attribute_result = MagicMock()
entity_unique_attribute_result.entity = DottedDict(self.user_entity_2)
self.proxy._driver.entity_unique_attribute = MagicMock(return_value=entity_unique_attribute_result)

entity_bulk_result = MagicMock()
entity_bulk_result.entities = [DottedDict(self.reader_entity_1)]
self.proxy._driver.entity_bulk = MagicMock(return_value=[entity_bulk_result])

expected = {'table': [PopularTable(cluster=self.cluster,
name='Table1',
schema=self.db,
database=self.entity_type)]}

res = self.proxy.get_frequently_used_tables(user_email='dummy')

self.assertEqual(expected, res)


if __name__ == '__main__':
unittest.main()