diff --git a/amundsen_application/api/metadata/v0.py b/amundsen_application/api/metadata/v0.py index 6c50e1595..d769bb336 100644 --- a/amundsen_application/api/metadata/v0.py +++ b/amundsen_application/api/metadata/v0.py @@ -3,6 +3,7 @@ import logging import json + from http import HTTPStatus from typing import Any, Dict, Optional @@ -16,7 +17,7 @@ from amundsen_application.api.utils.metadata_utils import is_table_editable, marshall_table_partial, \ marshall_table_full, marshall_dashboard_partial, marshall_dashboard_full, TableUri -from amundsen_application.api.utils.request_utils import get_query_param, request_metadata +from amundsen_application.api.utils.request_utils import get_query_param, request_metadata, request_search LOGGER = logging.getLogger(__name__) @@ -377,6 +378,83 @@ def get_tags() -> Response: return make_response(payload, HTTPStatus.INTERNAL_SERVER_ERROR) +def _update_metadata_tag(table_key: str, method: str, tag: str) -> int: + table_endpoint = _get_table_endpoint() + url = f'{table_endpoint}/{table_key}/tag/{tag}' + response = request_metadata(url=url, method=method) + status_code = response.status_code + if status_code != HTTPStatus.OK: + LOGGER.info(f'Fail to update tag in metadataservice, http status code: {status_code}') + LOGGER.debug(response.text) + return status_code + + +def _update_search_tag(table_key: str, method: str, tag: str) -> int: + """ + call the search service endpoint to get whole table information uniquely identified by table_key + update tags list, call search service endpoint again to write back the updated field + TODO: we should update dashboard tag in the future + :param table_key: table key e.g. 'database://cluster.schema/table' + :param method: PUT or DELETE + :param tag: tag name to be put/delete + :return: HTTP status code + """ + searchservice_base = app.config['SEARCHSERVICE_BASE'] + searchservice_get_table_url = f'{searchservice_base}/search_table' + + # searchservice currently doesn't allow colon or / inside filters, thus can't get item based on key + # table key e.g: 'database://cluster.schema/table' + table_uri = TableUri.from_uri(table_key) + + request_param_map = { + 'search_request': + { + 'type': 'AND', + 'filters': + { + 'database': [table_uri.database], + 'schema': [table_uri.schema], + 'table': [table_uri.table], + 'cluster': [table_uri.cluster] + } + }, + 'query_term': '' + } + + get_table_response = request_search(url=searchservice_get_table_url, method='POST', json=request_param_map) + get_status_code = get_table_response.status_code + if get_status_code != HTTPStatus.OK: + LOGGER.info(f'Fail to get table info from serviceservice, http status code: {get_status_code}') + LOGGER.debug(get_table_response.text) + return get_status_code + + raw_data_map = json.loads(get_table_response.text) + # key is unique, thus (database, cluster, schema, table) should uniquely identify the table + if len(raw_data_map['results']) > 1: + LOGGER.error(f'Error! Duplicate table key: {table_key}') + table = raw_data_map['results'][0] + + old_tags_list = table['tags'] + new_tags_list = [item for item in old_tags_list if item['tag_name'] != tag] + if method != 'DELETE': + new_tags_list.append({'tag_name': tag}) + table['tags'] = new_tags_list + + # remove None values + pruned_table = {k: v for k, v in table.items() if v is not None} + + post_param_map = {"data": pruned_table} + searchservice_update_url = f'{searchservice_base}/document_table' + update_table_response = request_search(url=searchservice_update_url, method='PUT', json=post_param_map) + update_status_code = update_table_response.status_code + if update_status_code != HTTPStatus.OK: + LOGGER.info(f'Fail to update table info in searchservice, http status code: {update_status_code}') + LOGGER.debug(update_table_response.text) + return update_table_response.status_code + + return HTTPStatus.OK + + @metadata_blueprint.route('/update_table_tags', methods=['PUT', 'DELETE']) def update_table_tags() -> Response: @@ -388,26 +466,26 @@ def _log_update_table_tags(*, table_key: str, method: str, tag: str) -> None: args = request.get_json() method = request.method - table_endpoint = _get_table_endpoint() table_key = get_query_param(args, 'key') tag = get_query_param(args, 'tag') - url = f'{table_endpoint}/{table_key}/tag/{tag}' - _log_update_table_tags(table_key=table_key, method=method, tag=tag) - response = request_metadata(url=url, method=method) - status_code = response.status_code + metadata_status_code = _update_metadata_tag(table_key=table_key, method=method, tag=tag) + search_status_code = _update_search_tag(table_key=table_key, method=method, tag=tag) - if status_code == HTTPStatus.OK: + http_status_code = HTTPStatus.OK + if metadata_status_code == HTTPStatus.OK and search_status_code == HTTPStatus.OK: message = 'Success' else: message = f'Encountered error: {method} table tag failed' logging.error(message) + http_status_code = HTTPStatus.INTERNAL_SERVER_ERROR payload = jsonify({'msg': message}) - return make_response(payload, status_code) + return make_response(payload, http_status_code) + except Exception as e: message = 'Encountered exception: ' + str(e) logging.exception(message) diff --git a/amundsen_application/api/utils/request_utils.py b/amundsen_application/api/utils/request_utils.py index 334ab6dd0..9b104b866 100644 --- a/amundsen_application/api/utils/request_utils.py +++ b/amundsen_application/api/utils/request_utils.py @@ -20,7 +20,8 @@ def request_metadata(*, # type: ignore method: str = 'GET', headers=None, timeout_sec: int = 0, - data=None): + data=None, + json=None): """ Helper function to make a request to metadata service. Sets the client and header information based on the configuration @@ -43,7 +44,8 @@ def request_metadata(*, # type: ignore client=app.config['METADATASERVICE_REQUEST_CLIENT'], headers=headers, timeout_sec=timeout_sec, - data=data) + data=data, + json=json) def request_search(*, # type: ignore @@ -51,7 +53,8 @@ def request_search(*, # type: ignore method: str = 'GET', headers=None, timeout_sec: int = 0, - data=None): + data=None, + json=None): """ Helper function to make a request to search service. Sets the client and header information based on the configuration @@ -75,11 +78,12 @@ def request_search(*, # type: ignore client=app.config['SEARCHSERVICE_REQUEST_CLIENT'], headers=headers, timeout_sec=timeout_sec, - data=data) + data=data, + json=json) # TODO: Define an interface for envoy_client -def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, data=None): # type: ignore +def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, data=None, json=None): # type: ignore """ Wraps a request to use Envoy client and headers, if available :param method: DELETE | GET | POST | PUT @@ -99,9 +103,9 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da elif method == 'GET': return client.get(url, headers=headers, raw_response=True) elif method == 'POST': - return client.post(url, headers=headers, raw_response=True, raw_request=True, data=data) + return client.post(url, headers=headers, raw_response=True, raw_request=True, data=data, json=json) elif method == 'PUT': - return client.put(url, headers=headers, raw_response=True, raw_request=True, data=data) + return client.put(url, headers=headers, raw_response=True, raw_request=True, data=data, json=json) else: raise Exception('Method not allowed: {}'.format(method)) else: @@ -111,9 +115,9 @@ def request_wrapper(method: str, url: str, client, headers, timeout_sec: int, da elif method == 'GET': return s.get(url, headers=headers, timeout=timeout_sec) elif method == 'POST': - return s.post(url, headers=headers, timeout=timeout_sec, data=data) + return s.post(url, headers=headers, timeout=timeout_sec, data=data, json=json) elif method == 'PUT': - return s.put(url, headers=headers, timeout=timeout_sec, data=data) + return s.put(url, headers=headers, timeout=timeout_sec, data=data, json=json) else: raise Exception('Method not allowed: {}'.format(method)) diff --git a/tests/unit/api/metadata/test_v0.py b/tests/unit/api/metadata/test_v0.py index 4a191dee8..2f0513322 100644 --- a/tests/unit/api/metadata/test_v0.py +++ b/tests/unit/api/metadata/test_v0.py @@ -803,6 +803,15 @@ def test_update_table_tags_put(self) -> None: url = local_app.config['METADATASERVICE_BASE'] + TABLE_ENDPOINT + '/db://cluster.schema/table/tag/tag_5' responses.add(responses.PUT, url, json={}, status=HTTPStatus.OK) + searchservice_base = local_app.config['SEARCHSERVICE_BASE'] + get_table_url = f'{searchservice_base}/search_table' + responses.add(responses.POST, get_table_url, + json={'results': [{'id': '1', 'tags': [{'tag_name': 'tag_1'}, {'tag_name': 'tag_2'}]}]}, + status=HTTPStatus.OK) + + post_table_url = f'{searchservice_base}/document_table' + responses.add(responses.PUT, post_table_url, json={}, status=HTTPStatus.OK) + with local_app.test_client() as test: response = test.put( '/api/metadata/v0/update_table_tags', @@ -822,6 +831,15 @@ def test_update_table_tags_delete(self) -> None: url = local_app.config['METADATASERVICE_BASE'] + TABLE_ENDPOINT + '/db://cluster.schema/table/tag/tag_5' responses.add(responses.DELETE, url, json={}, status=HTTPStatus.OK) + searchservice_base = local_app.config['SEARCHSERVICE_BASE'] + get_table_url = f'{searchservice_base}/search_table' + responses.add(responses.POST, get_table_url, + json={'results': [{'id': '1', 'tags': [{'tag_name': 'tag_1'}, {'tag_name': 'tag_2'}]}]}, + status=HTTPStatus.OK) + + post_table_url = f'{searchservice_base}/document_table' + responses.add(responses.PUT, post_table_url, json={}, status=HTTPStatus.OK) + with local_app.test_client() as test: response = test.delete( '/api/metadata/v0/update_table_tags',