Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Table Lineage API #262

Merged
merged 28 commits into from
Mar 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5de083f
Table Lingeage API
allisonsuarez Feb 26, 2021
e759562
cleaned up implementation and tested manually
allisonsuarez Mar 2, 2021
ce937d2
flake fixes
allisonsuarez Mar 2, 2021
a3e1195
added response template
allisonsuarez Mar 2, 2021
552775f
some import changes
allisonsuarez Mar 2, 2021
90a2804
addind unit tests
allisonsuarez Mar 2, 2021
2abe2b7
finished tests
allisonsuarez Mar 3, 2021
f3bc13e
flake
allisonsuarez Mar 3, 2021
3c12730
forgot to add this change
allisonsuarez Mar 3, 2021
c946400
reqs and chnge
allisonsuarez Mar 3, 2021
7c1e124
changed reqs back
allisonsuarez Mar 3, 2021
206e3ac
reqs test
allisonsuarez Mar 3, 2021
1b003b3
marsh req
allisonsuarez Mar 3, 2021
c70bada
marsh req
allisonsuarez Mar 3, 2021
23b709f
marshmalllow-annotations in setup
allisonsuarez Mar 3, 2021
8f85297
marshmalllow-annotations in setup
allisonsuarez Mar 3, 2021
dc19bf1
lets see if this works
allisonsuarez Mar 3, 2021
2b26951
marshmallow version doesn't require .data
allisonsuarez Mar 3, 2021
7c8fba6
marshmallow version doesn't require .data
allisonsuarez Mar 3, 2021
7e602ea
removed use of strict
allisonsuarez Mar 3, 2021
e828170
removed many
allisonsuarez Mar 3, 2021
1034d17
put back in correct place
allisonsuarez Mar 3, 2021
aa77f0d
updated failing test after marshmallow update
allisonsuarez Mar 3, 2021
b4f5973
resolved naming conflict for user
allisonsuarez Mar 3, 2021
fe2af78
indent
allisonsuarez Mar 3, 2021
0a2cd55
organized imports
allisonsuarez Mar 3, 2021
bfefe5d
more isort
allisonsuarez Mar 3, 2021
4e77da1
typo fix
allisonsuarez Mar 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion metadata_service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from metadata_service.api.system import Neo4jDetailAPI
from metadata_service.api.table import (TableBadgeAPI, TableDashboardAPI,
TableDescriptionAPI, TableDetailAPI,
TableOwnerAPI, TableTagAPI)
TableLineageAPI, TableOwnerAPI,
TableTagAPI)
from metadata_service.api.tag import TagAPI
from metadata_service.api.user import (UserDetailAPI, UserFollowAPI,
UserFollowsAPI, UserOwnAPI, UserOwnsAPI,
Expand Down Expand Up @@ -101,6 +102,8 @@ def create_app(*, config_module_class: str) -> Flask:
'/table/<path:id>/tag/<tag>')
api.add_resource(TableBadgeAPI,
'/table/<path:id>/badge/<badge>')
api.add_resource(TableLineageAPI,
'/table/<path:id>/lineage')
api.add_resource(TableOwnerAPI,
'/table/<path:table_uri>/owner/<owner>')
api.add_resource(TableDashboardAPI,
Expand Down
4 changes: 2 additions & 2 deletions metadata_service/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ def get_with_kwargs(self, *, id: Optional[str] = None, **kwargs: Optional[Any])
actual_id: Union[str, int] = int(id) if id.isdigit() else id
object = get_object(id=actual_id, **kwargs)
if object is not None:
return self.schema().dump(object).data, HTTPStatus.OK
return self.schema().dump(object), HTTPStatus.OK
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
return None, HTTPStatus.NOT_FOUND
except ValueError as e:
return {'message': f'exception:{e}'}, HTTPStatus.BAD_REQUEST
else:
get_objects = getattr(self.client, f'get_{self.str_type}s')
objects: List[Any] = get_objects()
return self.schema(many=True).dump(objects).data, HTTPStatus.OK
return self.schema().dump(objects, many=True), HTTPStatus.OK
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 1 addition & 1 deletion metadata_service/api/popular_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ def get(self, user_id: Optional[str] = None) -> Iterable[Union[Mapping, int, Non
limit = request.args.get('limit', 10, type=int)
popular_tables: List[PopularTable] = self.client.get_popular_tables(num_entries=limit,
user_id=user_id)
popular_tables_json: str = PopularTableSchema(many=True).dump(popular_tables).data
popular_tables_json: str = PopularTableSchema().dump(popular_tables, many=True)
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
return {'popular_tables': popular_tables_json}, HTTPStatus.OK
33 changes: 33 additions & 0 deletions metadata_service/api/swagger_doc/lineage/lineage_get.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
Get lineage
---
tags:
- 'lineage'
parameters:
- name: id
in: path
type: string
schema:
type: string
required: true
example: 'hive://gold.test_schema/test_table1'
- name: direction
in: query
type: string
schema:
type: string
required: false
example: 'upstream'
- name: depth
in: query
type: integer
schema:
type: integer
required: false
example: 0
responses:
200:
description: 'Lineage for requested direction and depth'
content:
application/json:
schema:
$ref: '#/components/schemas/Lineage'
49 changes: 49 additions & 0 deletions metadata_service/api/swagger_doc/template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,55 @@ components:
is_view:
type: boolean
description: 'If the table is a view'
Lineage:
type: object
properties:
key:
type: string
description: 'Key of entity to obtain lineage for'
example: 'db://cluster.schema/test_table_1'
depth:
type: integer
description: 'how many levels of lineage are shown'
example: 1
direction:
type: string
description: 'upstream, dowstream or both'
example: 'downstream'
upstream_entities:
type: array
description: 'upstream entities from key'
items:
$ref: '#/components/schemas/LineageItem'
upstream_entities:
type: array
description: 'downstream entities from key'
items:
$ref: '#/components/schemas/LineageItem'
LineageItem:
type: object
properties:
key:
type: string
description: 'upstream or downstream resource key'
example: 'db://cluster.schema/up_table_1'
level:
type: integer
description: 'distance up or downstream from requested'
example: 2
source:
type: string
description: 'data source the resouce is extracted from'
example: 'hive'
badges:
type: array
description: 'badges associated with resource'
items:
$ref: '#/components/schemas/Badge'
usage:
type: integer
description: 'value to sort lineage results by'
example: 541
Neo4jDetail:
type: object
properties:
Expand Down
29 changes: 27 additions & 2 deletions metadata_service/api/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from http import HTTPStatus
from typing import Any, Iterable, Mapping, Optional, Union

from amundsen_common.models.lineage import LineageSchema
from amundsen_common.models.table import TableSchema
from flasgger import swag_from
from flask import request
Expand All @@ -31,13 +32,37 @@ def __init__(self) -> None:
def get(self, table_uri: str) -> Iterable[Union[Mapping, int, None]]:
try:
table = self.client.get_table(table_uri=table_uri)
schema = TableSchema(strict=True)
return schema.dump(table).data, HTTPStatus.OK
schema = TableSchema()
allisonsuarez marked this conversation as resolved.
Show resolved Hide resolved
return schema.dump(table), HTTPStatus.OK

except NotFoundException:
return {'message': 'table_uri {} does not exist'.format(table_uri)}, HTTPStatus.NOT_FOUND


class TableLineageAPI(Resource):
def __init__(self) -> None:
self.client = get_proxy_client()
self.parser = reqparse.RequestParser()
self.parser.add_argument('direction', type=str, required=False)
self.parser.add_argument('depth', type=int, required=False)
super(TableLineageAPI, self).__init__()

@swag_from('swagger_doc/lineage/lineage_get.yml')
def get(self, id: str) -> Iterable[Union[Mapping, int, None]]:
args = self.parser.parse_args()
direction = args.get('direction', 'both')
depth = args.get('depth', 0)
try:
lineage = self.client.get_lineage(id=id,
resource_type=ResourceType.Table,
direction=direction,
depth=depth)
schema = LineageSchema()
return schema.dump(lineage), HTTPStatus.OK
except Exception as e:
return {'message': f'Exception raised when getting lineage: {e}'}, HTTPStatus.NOT_FOUND


class TableOwnerAPI(Resource):
"""
TableOwner API to add / delete owner info
Expand Down
12 changes: 6 additions & 6 deletions metadata_service/api/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get(self, *, id: Optional[str] = None) -> Iterable[Union[Mapping, int, None]
if app.config['USER_DETAIL_METHOD']:
try:
user_data = app.config['USER_DETAIL_METHOD'](id)
return UserSchema().dump(user_data).data, HTTPStatus.OK
return UserSchema().dump(user_data), HTTPStatus.OK
except Exception:
LOGGER.exception('UserDetailAPI GET Failed - Using "USER_DETAIL_METHOD" config variable')
return {'message': 'user_id {} fetch failed'.format(id)}, HTTPStatus.NOT_FOUND
Expand Down Expand Up @@ -73,13 +73,13 @@ def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]:
} # type: Dict[str, List[Any]]

if resources and table_key in resources and len(resources[table_key]) > 0:
result[table_key] = PopularTableSchema(many=True).dump(resources[table_key]).data
result[table_key] = PopularTableSchema().dump(resources[table_key], many=True)

resources = self.client.get_dashboard_by_user_relation(user_email=user_id,
relation_type=UserResourceRel.follow)

if resources and dashboard_key in resources and len(resources[dashboard_key]) > 0:
result[dashboard_key] = DashboardSummarySchema(many=True).dump(resources[dashboard_key]).data
result[dashboard_key] = DashboardSummarySchema().dump(resources[dashboard_key], many=True)

return result, HTTPStatus.OK

Expand Down Expand Up @@ -179,13 +179,13 @@ def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]:
resources = self.client.get_table_by_user_relation(user_email=user_id,
relation_type=UserResourceRel.own)
if resources and table_key in resources and len(resources[table_key]) > 0:
result[table_key] = PopularTableSchema(many=True).dump(resources[table_key]).data
result[table_key] = PopularTableSchema().dump(resources[table_key], many=True)

resources = self.client.get_dashboard_by_user_relation(user_email=user_id,
relation_type=UserResourceRel.own)

if resources and dashboard_key in resources and len(resources[dashboard_key]) > 0:
result[dashboard_key] = DashboardSummarySchema(many=True).dump(resources[dashboard_key]).data
result[dashboard_key] = DashboardSummarySchema().dump(resources[dashboard_key], many=True)

return result, HTTPStatus.OK

Expand Down Expand Up @@ -261,7 +261,7 @@ def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]:
try:
resources = self.client.get_frequently_used_tables(user_email=user_id)
if len(resources['table']) > 0:
return {'table': PopularTableSchema(many=True).dump(resources['table']).data}, HTTPStatus.OK
return {'table': PopularTableSchema().dump(resources['table'], many=True)}, HTTPStatus.OK
return {'table': []}, HTTPStatus.OK

except NotFoundException:
Expand Down
1 change: 1 addition & 0 deletions metadata_service/entity/resource_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class ResourceType(Enum):
Table = auto()
Dashboard = auto()
User = auto()
Column = auto()


def to_resource_type(*, label: str) -> ResourceType:
Expand Down
6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ typing-extensions==3.7.4

# A common package that holds the models deifnition and schemas that are used
# accross different amundsen repositories.
amundsen-common>=0.7.0
amundsen-common>=0.8.1
amundsen-gremlin>=0.0.4

boto3==1.12.12
Expand All @@ -51,8 +51,6 @@ gremlinpython==3.4.3
itsdangerous==0.24
Jinja2>=2.10.1
jsonschema==2.6.0
marshmallow>=2.15.3,<3.0
marshmallow-annotations>=2.4.0,<3.0
MarkupSafe==1.1
pytz==2018.4
Werkzeug==0.15.5
Expand All @@ -67,3 +65,5 @@ beaker>=1.10.0
overrides==2.5
typed-ast==1.4.2
isort[colors]~=5.4
marshmallow==3.6.0
git+https://www.github.com/hilearn/marshmallow-annotations.git@a7a2dc96932430369bdef36555082df990ed9bef#egg=marshmallow-annotations
10 changes: 8 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@
packages=find_packages(exclude=['tests*']),
include_package_data=True,
zip_safe=False,
dependency_links=[],
install_requires=requirements,
dependency_links=[
('git+https://www.github.com/hilearn/marshmallow-'
'annotations.git@a7a2dc96932430369bdef36555082df990ed9bef#egg=marshmallow-annotations')
],
install_requires=[
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@allisonsuarez is there a reason we removed the requirements file from the install_requires? as it breaks the projects if we simply install metadata using setup.py, and hence via pip. Or it's just a typo?

cc: @feng-tao

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah removed it because it breaks when you get to the link line

'marshmallow>=3.0,<=3.6',
'marshmallow-annotations'
],
extras_require={
'oidc': ['flaskoidc==0.1.1']
},
Expand Down
89 changes: 89 additions & 0 deletions tests/unit/api/table/test_table_lineage_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

from http import HTTPStatus

from metadata_service.entity.resource_type import ResourceType
from metadata_service.exception import NotFoundException
from tests.unit.api.table.table_test_case import TableTestCase

TABLE_URI = "db://cluster.schema/test_table_1"

API_RESPONSE = {
"key": "db://cluster.schema/test_table_1",
"direction": "both",
"depth": 1,
"upstream_entities": [
{
"level": 1,
"badges": [],
"source": "db",
"usage": 257,
"key": "db://cluster.schema/up_table_1"
},
{
"level": 1,
"badges": [],
"source": "hive",
"usage": 164,
"key": "hive://cluster.schema/up_table_2"
},
{
"level": 1,
"badges": [],
"source": "hive",
"usage": 94,
"key": "hive://cluster.schema/up_table_3"
},
],
"downstream_entities": [
{
"level": 1,
"badges": [],
"source": "db",
"usage": 567,
"key": "db://cluster.schema/down_table_1"
},
{
"level": 1,
"badges": [],
"source": "hive",
"usage": 54,
"key": "hive://cluster.schema/down_table_2"
},
{
"level": 2,
"badges": [],
"source": "hive",
"usage": 17,
"key": "hive://cluster.schema/down_table_3"
},
]
}

LINEAGE_RESPONSE = API_RESPONSE


class TestTableLineageAPI(TableTestCase):
def setUp(self) -> None:
super().setUp()

def tearDown(self) -> None:
super().tearDown()

def test_should_return_response(self) -> None:
self.mock_proxy.get_lineage.return_value = LINEAGE_RESPONSE
response = self.app.test_client().get(f'/table/{TABLE_URI}/lineage')
self.assertEqual(response.json, API_RESPONSE)
self.assertEqual(response.status_code, HTTPStatus.OK)
self.mock_proxy.get_lineage.assert_called_with(id=TABLE_URI,
resource_type=ResourceType.Table,
depth=None,
direction=None)

def test_should_fail_when_table_doesnt_exist(self) -> None:
self.mock_proxy.get_lineage.side_effect = NotFoundException(message='table not found')

response = self.app.test_client().get(f'/table/{TABLE_URI}/lineage')

self.assertEqual(response.status_code, HTTPStatus.NOT_FOUND)
16 changes: 14 additions & 2 deletions tests/unit/proxy/test_neo4j_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from amundsen_common.models.table import (Application, Badge, Column,
ProgrammaticDescription, Source,
Stat, Table, Tag, User, Watermark)
from amundsen_common.models.user import UserSchema
from amundsen_common.models.user import User as UserModel
from neo4j import GraphDatabase

from metadata_service import create_app
Expand Down Expand Up @@ -614,10 +614,22 @@ def test_get_users(self) -> None:
'email': 'test_email',
'manager_fullname': 'test_manager',
}
test_user_obj = UserModel(email='test_email',
first_name='test_first_name',
last_name='test_last_name',
full_name='test_full_name',
is_active=True,
github_username='test-github',
team_name='test_team',
slack_id='test_id',
employee_type='teamMember',
manager_fullname='test_manager')

# TODO: Add frequent_used, bookmarked, & owned resources)
mock_execute.return_value.single.return_value = {'users': [test_user]}
neo4j_proxy = Neo4jProxy(host='DOES_NOT_MATTER', port=0000)
users = neo4j_proxy.get_users()
actual_data = UserSchema(many=True).load([test_user]).data
actual_data = [test_user_obj]
for attr in ['employee_type',
'full_name',
'is_active',
Expand Down