diff --git a/metadata_service/__init__.py b/metadata_service/__init__.py index c52f672b..02c49667 100644 --- a/metadata_service/__init__.py +++ b/metadata_service/__init__.py @@ -25,7 +25,8 @@ from metadata_service.api.system import Neo4jDetailAPI from metadata_service.api.table import (TableBadgeAPI, TableDashboardAPI, TableDescriptionAPI, TableDetailAPI, - TableOwnerAPI, TableTagAPI) + TableLineageAPI, TableOwnerAPI, + TableTagAPI) from metadata_service.api.tag import TagAPI from metadata_service.api.user import (UserDetailAPI, UserFollowAPI, UserFollowsAPI, UserOwnAPI, UserOwnsAPI, @@ -101,6 +102,8 @@ def create_app(*, config_module_class: str) -> Flask: '/table//tag/') api.add_resource(TableBadgeAPI, '/table//badge/') + api.add_resource(TableLineageAPI, + '/table//lineage') api.add_resource(TableOwnerAPI, '/table//owner/') api.add_resource(TableDashboardAPI, diff --git a/metadata_service/api/__init__.py b/metadata_service/api/__init__.py index 9f49fe0e..6bcc2af2 100644 --- a/metadata_service/api/__init__.py +++ b/metadata_service/api/__init__.py @@ -33,11 +33,11 @@ def get_with_kwargs(self, *, id: Optional[str] = None, **kwargs: Optional[Any]) actual_id: Union[str, int] = int(id) if id.isdigit() else id object = get_object(id=actual_id, **kwargs) if object is not None: - return self.schema().dump(object).data, HTTPStatus.OK + return self.schema().dump(object), HTTPStatus.OK return None, HTTPStatus.NOT_FOUND except ValueError as e: return {'message': f'exception:{e}'}, HTTPStatus.BAD_REQUEST else: get_objects = getattr(self.client, f'get_{self.str_type}s') objects: List[Any] = get_objects() - return self.schema(many=True).dump(objects).data, HTTPStatus.OK + return self.schema().dump(objects, many=True), HTTPStatus.OK diff --git a/metadata_service/api/popular_tables.py b/metadata_service/api/popular_tables.py index 7ef6d228..ee6e5919 100644 --- a/metadata_service/api/popular_tables.py +++ b/metadata_service/api/popular_tables.py @@ -26,5 +26,5 @@ def get(self, user_id: Optional[str] = None) -> Iterable[Union[Mapping, int, Non limit = request.args.get('limit', 10, type=int) popular_tables: List[PopularTable] = self.client.get_popular_tables(num_entries=limit, user_id=user_id) - popular_tables_json: str = PopularTableSchema(many=True).dump(popular_tables).data + popular_tables_json: str = PopularTableSchema().dump(popular_tables, many=True) return {'popular_tables': popular_tables_json}, HTTPStatus.OK diff --git a/metadata_service/api/swagger_doc/lineage/lineage_get.yml b/metadata_service/api/swagger_doc/lineage/lineage_get.yml new file mode 100644 index 00000000..915c6aa8 --- /dev/null +++ b/metadata_service/api/swagger_doc/lineage/lineage_get.yml @@ -0,0 +1,33 @@ +Get lineage +--- +tags: + - 'lineage' +parameters: + - name: id + in: path + type: string + schema: + type: string + required: true + example: 'hive://gold.test_schema/test_table1' + - name: direction + in: query + type: string + schema: + type: string + required: false + example: 'upstream' + - name: depth + in: query + type: integer + schema: + type: integer + required: false + example: 0 +responses: + 200: + description: 'Lineage for requested direction and depth' + content: + application/json: + schema: + $ref: '#/components/schemas/Lineage' diff --git a/metadata_service/api/swagger_doc/template.yml b/metadata_service/api/swagger_doc/template.yml index 2a338e5e..34caff2a 100644 --- a/metadata_service/api/swagger_doc/template.yml +++ b/metadata_service/api/swagger_doc/template.yml @@ -248,6 +248,55 @@ components: is_view: type: boolean description: 'If the table is a view' + Lineage: + type: object + properties: + key: + type: string + description: 'Key of entity to obtain lineage for' + example: 'db://cluster.schema/test_table_1' + depth: + type: integer + description: 'how many levels of lineage are shown' + example: 1 + direction: + type: string + description: 'upstream, dowstream or both' + example: 'downstream' + upstream_entities: + type: array + description: 'upstream entities from key' + items: + $ref: '#/components/schemas/LineageItem' + upstream_entities: + type: array + description: 'downstream entities from key' + items: + $ref: '#/components/schemas/LineageItem' + LineageItem: + type: object + properties: + key: + type: string + description: 'upstream or downstream resource key' + example: 'db://cluster.schema/up_table_1' + level: + type: integer + description: 'distance up or downstream from requested' + example: 2 + source: + type: string + description: 'data source the resouce is extracted from' + example: 'hive' + badges: + type: array + description: 'badges associated with resource' + items: + $ref: '#/components/schemas/Badge' + usage: + type: integer + description: 'value to sort lineage results by' + example: 541 Neo4jDetail: type: object properties: diff --git a/metadata_service/api/table.py b/metadata_service/api/table.py index 07c045d6..5901e04d 100644 --- a/metadata_service/api/table.py +++ b/metadata_service/api/table.py @@ -5,6 +5,7 @@ from http import HTTPStatus from typing import Any, Iterable, Mapping, Optional, Union +from amundsen_common.models.lineage import LineageSchema from amundsen_common.models.table import TableSchema from flasgger import swag_from from flask import request @@ -31,13 +32,37 @@ def __init__(self) -> None: def get(self, table_uri: str) -> Iterable[Union[Mapping, int, None]]: try: table = self.client.get_table(table_uri=table_uri) - schema = TableSchema(strict=True) - return schema.dump(table).data, HTTPStatus.OK + schema = TableSchema() + return schema.dump(table), HTTPStatus.OK except NotFoundException: return {'message': 'table_uri {} does not exist'.format(table_uri)}, HTTPStatus.NOT_FOUND +class TableLineageAPI(Resource): + def __init__(self) -> None: + self.client = get_proxy_client() + self.parser = reqparse.RequestParser() + self.parser.add_argument('direction', type=str, required=False) + self.parser.add_argument('depth', type=int, required=False) + super(TableLineageAPI, self).__init__() + + @swag_from('swagger_doc/lineage/lineage_get.yml') + def get(self, id: str) -> Iterable[Union[Mapping, int, None]]: + args = self.parser.parse_args() + direction = args.get('direction', 'both') + depth = args.get('depth', 0) + try: + lineage = self.client.get_lineage(id=id, + resource_type=ResourceType.Table, + direction=direction, + depth=depth) + schema = LineageSchema() + return schema.dump(lineage), HTTPStatus.OK + except Exception as e: + return {'message': f'Exception raised when getting lineage: {e}'}, HTTPStatus.NOT_FOUND + + class TableOwnerAPI(Resource): """ TableOwner API to add / delete owner info diff --git a/metadata_service/api/user.py b/metadata_service/api/user.py index b0b44e29..4fc07139 100644 --- a/metadata_service/api/user.py +++ b/metadata_service/api/user.py @@ -37,7 +37,7 @@ def get(self, *, id: Optional[str] = None) -> Iterable[Union[Mapping, int, None] if app.config['USER_DETAIL_METHOD']: try: user_data = app.config['USER_DETAIL_METHOD'](id) - return UserSchema().dump(user_data).data, HTTPStatus.OK + return UserSchema().dump(user_data), HTTPStatus.OK except Exception: LOGGER.exception('UserDetailAPI GET Failed - Using "USER_DETAIL_METHOD" config variable') return {'message': 'user_id {} fetch failed'.format(id)}, HTTPStatus.NOT_FOUND @@ -73,13 +73,13 @@ def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]: } # type: Dict[str, List[Any]] if resources and table_key in resources and len(resources[table_key]) > 0: - result[table_key] = PopularTableSchema(many=True).dump(resources[table_key]).data + result[table_key] = PopularTableSchema().dump(resources[table_key], many=True) resources = self.client.get_dashboard_by_user_relation(user_email=user_id, relation_type=UserResourceRel.follow) if resources and dashboard_key in resources and len(resources[dashboard_key]) > 0: - result[dashboard_key] = DashboardSummarySchema(many=True).dump(resources[dashboard_key]).data + result[dashboard_key] = DashboardSummarySchema().dump(resources[dashboard_key], many=True) return result, HTTPStatus.OK @@ -179,13 +179,13 @@ def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]: resources = self.client.get_table_by_user_relation(user_email=user_id, relation_type=UserResourceRel.own) if resources and table_key in resources and len(resources[table_key]) > 0: - result[table_key] = PopularTableSchema(many=True).dump(resources[table_key]).data + result[table_key] = PopularTableSchema().dump(resources[table_key], many=True) resources = self.client.get_dashboard_by_user_relation(user_email=user_id, relation_type=UserResourceRel.own) if resources and dashboard_key in resources and len(resources[dashboard_key]) > 0: - result[dashboard_key] = DashboardSummarySchema(many=True).dump(resources[dashboard_key]).data + result[dashboard_key] = DashboardSummarySchema().dump(resources[dashboard_key], many=True) return result, HTTPStatus.OK @@ -261,7 +261,7 @@ def get(self, user_id: str) -> Iterable[Union[Mapping, int, None]]: try: resources = self.client.get_frequently_used_tables(user_email=user_id) if len(resources['table']) > 0: - return {'table': PopularTableSchema(many=True).dump(resources['table']).data}, HTTPStatus.OK + return {'table': PopularTableSchema().dump(resources['table'], many=True)}, HTTPStatus.OK return {'table': []}, HTTPStatus.OK except NotFoundException: diff --git a/metadata_service/entity/resource_type.py b/metadata_service/entity/resource_type.py index 1322d13e..95eedbbd 100644 --- a/metadata_service/entity/resource_type.py +++ b/metadata_service/entity/resource_type.py @@ -8,6 +8,7 @@ class ResourceType(Enum): Table = auto() Dashboard = auto() User = auto() + Column = auto() def to_resource_type(*, label: str) -> ResourceType: diff --git a/requirements.txt b/requirements.txt index 8ebefc8e..c7188ae7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -35,7 +35,7 @@ typing-extensions==3.7.4 # A common package that holds the models deifnition and schemas that are used # accross different amundsen repositories. -amundsen-common>=0.7.0 +amundsen-common>=0.8.1 amundsen-gremlin>=0.0.4 boto3==1.12.12 @@ -51,8 +51,6 @@ gremlinpython==3.4.3 itsdangerous==0.24 Jinja2>=2.10.1 jsonschema==2.6.0 -marshmallow>=2.15.3,<3.0 -marshmallow-annotations>=2.4.0,<3.0 MarkupSafe==1.1 pytz==2018.4 Werkzeug==0.15.5 @@ -67,3 +65,5 @@ beaker>=1.10.0 overrides==2.5 typed-ast==1.4.2 isort[colors]~=5.4 +marshmallow==3.6.0 +git+https://www.github.com/hilearn/marshmallow-annotations.git@a7a2dc96932430369bdef36555082df990ed9bef#egg=marshmallow-annotations diff --git a/setup.py b/setup.py index e7677703..73bad6ff 100644 --- a/setup.py +++ b/setup.py @@ -22,8 +22,14 @@ packages=find_packages(exclude=['tests*']), include_package_data=True, zip_safe=False, - dependency_links=[], - install_requires=requirements, + dependency_links=[ + ('git+https://www.github.com/hilearn/marshmallow-' + 'annotations.git@a7a2dc96932430369bdef36555082df990ed9bef#egg=marshmallow-annotations') + ], + install_requires=[ + 'marshmallow>=3.0,<=3.6', + 'marshmallow-annotations' + ], extras_require={ 'oidc': ['flaskoidc==0.1.1'] }, diff --git a/tests/unit/api/table/test_table_lineage_api.py b/tests/unit/api/table/test_table_lineage_api.py new file mode 100644 index 00000000..1509f99a --- /dev/null +++ b/tests/unit/api/table/test_table_lineage_api.py @@ -0,0 +1,89 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +from http import HTTPStatus + +from metadata_service.entity.resource_type import ResourceType +from metadata_service.exception import NotFoundException +from tests.unit.api.table.table_test_case import TableTestCase + +TABLE_URI = "db://cluster.schema/test_table_1" + +API_RESPONSE = { + "key": "db://cluster.schema/test_table_1", + "direction": "both", + "depth": 1, + "upstream_entities": [ + { + "level": 1, + "badges": [], + "source": "db", + "usage": 257, + "key": "db://cluster.schema/up_table_1" + }, + { + "level": 1, + "badges": [], + "source": "hive", + "usage": 164, + "key": "hive://cluster.schema/up_table_2" + }, + { + "level": 1, + "badges": [], + "source": "hive", + "usage": 94, + "key": "hive://cluster.schema/up_table_3" + }, + ], + "downstream_entities": [ + { + "level": 1, + "badges": [], + "source": "db", + "usage": 567, + "key": "db://cluster.schema/down_table_1" + }, + { + "level": 1, + "badges": [], + "source": "hive", + "usage": 54, + "key": "hive://cluster.schema/down_table_2" + }, + { + "level": 2, + "badges": [], + "source": "hive", + "usage": 17, + "key": "hive://cluster.schema/down_table_3" + }, + ] +} + +LINEAGE_RESPONSE = API_RESPONSE + + +class TestTableLineageAPI(TableTestCase): + def setUp(self) -> None: + super().setUp() + + def tearDown(self) -> None: + super().tearDown() + + def test_should_return_response(self) -> None: + self.mock_proxy.get_lineage.return_value = LINEAGE_RESPONSE + response = self.app.test_client().get(f'/table/{TABLE_URI}/lineage') + self.assertEqual(response.json, API_RESPONSE) + self.assertEqual(response.status_code, HTTPStatus.OK) + self.mock_proxy.get_lineage.assert_called_with(id=TABLE_URI, + resource_type=ResourceType.Table, + depth=None, + direction=None) + + def test_should_fail_when_table_doesnt_exist(self) -> None: + self.mock_proxy.get_lineage.side_effect = NotFoundException(message='table not found') + + response = self.app.test_client().get(f'/table/{TABLE_URI}/lineage') + + self.assertEqual(response.status_code, HTTPStatus.NOT_FOUND) diff --git a/tests/unit/proxy/test_neo4j_proxy.py b/tests/unit/proxy/test_neo4j_proxy.py index 952c42e1..0f166985 100644 --- a/tests/unit/proxy/test_neo4j_proxy.py +++ b/tests/unit/proxy/test_neo4j_proxy.py @@ -12,7 +12,7 @@ from amundsen_common.models.table import (Application, Badge, Column, ProgrammaticDescription, Source, Stat, Table, Tag, User, Watermark) -from amundsen_common.models.user import UserSchema +from amundsen_common.models.user import User as UserModel from neo4j import GraphDatabase from metadata_service import create_app @@ -614,10 +614,22 @@ def test_get_users(self) -> None: 'email': 'test_email', 'manager_fullname': 'test_manager', } + test_user_obj = UserModel(email='test_email', + first_name='test_first_name', + last_name='test_last_name', + full_name='test_full_name', + is_active=True, + github_username='test-github', + team_name='test_team', + slack_id='test_id', + employee_type='teamMember', + manager_fullname='test_manager') + + # TODO: Add frequent_used, bookmarked, & owned resources) mock_execute.return_value.single.return_value = {'users': [test_user]} neo4j_proxy = Neo4jProxy(host='DOES_NOT_MATTER', port=0000) users = neo4j_proxy.get_users() - actual_data = UserSchema(many=True).load([test_user]).data + actual_data = [test_user_obj] for attr in ['employee_type', 'full_name', 'is_active',