diff --git a/databuilder/extractor/atlas_search_data_extractor.py b/databuilder/extractor/atlas_search_data_extractor.py new file mode 100644 index 000000000..84869f177 --- /dev/null +++ b/databuilder/extractor/atlas_search_data_extractor.py @@ -0,0 +1,204 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +import importlib +import logging +from copy import deepcopy +from functools import reduce +from typing import Iterator, Optional, List, Tuple, Any, Dict, Union + +from atlasclient.client import Atlas +from atlasclient.models import Entity, SearchBasic +from atlasclient.utils import extract_entities +from pyhocon import ConfigTree, ConfigFactory + +from databuilder.extractor.base_extractor import Extractor + +LOGGER = logging.getLogger(__name__) + +# custom types +type_fields_mapping_spec = Dict[str, List[Tuple[str, str, Any, Any]]] +type_fields_mapping = List[Tuple[str, str, Any, Any]] + +# @todo document classes/methods +# @todo write tests + + +class AtlasSearchDataExtractor(Extractor): + ATLAS_URL_CONFIG_KEY = 'atlas_url' + ATLAS_PORT_CONFIG_KEY = 'atlas_port' + ATLAS_PROTOCOL_CONFIG_KEY = 'atlas_protocol' + ATLAS_VALIDATE_SSL_CONFIG_KEY = 'atlas_validate_ssl' + ATLAS_USERNAME_CONFIG_KEY = 'atlas_auth_user' + ATLAS_PASSWORD_CONFIG_KEY = 'atlas_auth_pw' + ATLAS_BATCH_CHUNK_SIZE_KEY = 'atlas_batch_chunk_size' + ATLAS_TIMEOUT_SECONDS_KEY = 'atlas_timeout_seconds' + + ENTITY_TYPE_KEY = 'entity_type' + + DEFAULT_QUERY_PARAMS_BY_ENTITY = { + 'Table': { + 'typeName': 'Table', + 'excludeDeletedEntities': True, + 'query': '*' + } + } + + DEFAULT_CONFIG = ConfigFactory.from_dict({ATLAS_URL_CONFIG_KEY: "localhost", + ATLAS_PORT_CONFIG_KEY: 21000, + ATLAS_PROTOCOL_CONFIG_KEY: 'http', + ATLAS_VALIDATE_SSL_CONFIG_KEY: False, + ATLAS_BATCH_CHUNK_SIZE_KEY: 100, + ATLAS_TIMEOUT_SECONDS_KEY: 60}) + + # @todo fill out below fields for TableESDocument + # tags: List[str], + + # es_document field, atlas field path, modification function, default_value + FIELDS_MAPPING_SPEC: type_fields_mapping_spec = { + 'Table': [ + ('database', 'typeName', None, None), + ('cluster', 'attributes.qualifiedName', lambda x: x.split('@')[-1], None), + ('schema', 'relationshipAttributes.db.displayText', None, None), + ('name', 'attributes.name', None, None), + ('key', 'attributes.qualifiedName', None, None), + ('description', 'attributes.description', None, None), + ('last_updated_timestamp', 'updateTime', lambda x: int(x) / 1000, 0), + ('total_usage', 'attributes.popularityScore', lambda x: int(x), 0), + ('unique_usage', 'attributes.uniqueUsage', lambda x: int(x), 1), + ('column_names', 'relationshipAttributes.columns', lambda x: [c.get('attributes').get('name') + for c in x if + c.get('status').lower() == 'active'], []), + ('column_descriptions', 'relationshipAttributes.columns', + lambda x: [c.get('attributes').get('description') for c in x if c.get('status').lower() == 'active'], []), + ('tags', 'tags', None, []), + ('badges', 'classifications', + lambda x: [c.get('typeName') for c in x if c.get('entityStatus', '').lower() == 'active'], []), + ('display_name', 'attributes.qualifiedName', lambda x: x.split('@')[0], None), + ('schema_description', 'attributes.parameters.sourceDescription', None, None), + ('programmatic_descriptions', 'attributes.parameters', lambda x: [str(s) for s in list(x.values())], None) + ] + } + + ENTITY_MODEL_BY_TYPE = { + 'Table': 'databuilder.models.table_elasticsearch_document.TableESDocument' + } + + REQUIRED_RELATIONSHIPS_BY_TYPE = { + 'Table': ['columns'] + } + + def init(self, conf: ConfigTree) -> None: + self.conf = conf.with_fallback(AtlasSearchDataExtractor.DEFAULT_CONFIG) + self.driver = self._get_driver() + + self._extract_iter: Optional[Iterator[Any]] = None + + @property + def entity_type(self) -> str: + return self.conf.get(AtlasSearchDataExtractor.ENTITY_TYPE_KEY) + + @property + def search_query(self) -> Dict: + return AtlasSearchDataExtractor.DEFAULT_QUERY_PARAMS_BY_ENTITY.get(self.entity_type) or {} + + @property + def model_class(self) -> Any: + model_class = AtlasSearchDataExtractor.ENTITY_MODEL_BY_TYPE.get(self.entity_type) + + if model_class: + module_name, class_name = model_class.rsplit(".", 1) + mod = importlib.import_module(module_name) + + return getattr(mod, class_name) + + @property + def field_mappings(self) -> type_fields_mapping: + return AtlasSearchDataExtractor.FIELDS_MAPPING_SPEC.get(self.entity_type) or [] + + def extract(self) -> Any: + if not self._extract_iter: + self._extract_iter = self._get_extract_iter() + + try: + return next(self._extract_iter) + except StopIteration: + return None + + def get_scope(self) -> str: + return 'extractor.atlas_search_data' + + def _get_driver(self) -> Any: + return Atlas(host=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY), + port=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY), + username=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY), + password=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY), + protocol=self.conf.get_string(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY), + validate_ssl=self.conf.get_bool(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY), + timeout=self.conf.get_int(AtlasSearchDataExtractor.ATLAS_TIMEOUT_SECONDS_KEY)) + + def _execute_query(self, params: Dict, relationships: Optional[List[str]] = None) -> Any: + + search_result: List[Union[Entity, SearchBasic]] = [] + offset = 0 + chunk_size = self.conf.get_int(AtlasSearchDataExtractor.ATLAS_BATCH_CHUNK_SIZE_KEY) + + while True: + _params = {'offset': str(offset), 'limit': str(chunk_size)} + + full_params = deepcopy(params) + full_params.update(**_params) + + results = self.driver.search_basic(**full_params) + + search_chunk = extract_entities(results) + + if relationships: + guids = [table.guid for table in search_chunk] + + if guids: + bulk_collection = self.driver.entity_bulk(guid=guids) + + for collection in bulk_collection: + search_chunk = list(collection.entities_with_relationships(attributes=relationships)) + + search_result = search_result + search_chunk + + for table in search_chunk: + yield table + + if len(search_chunk) == 0: + break + + offset = offset + chunk_size + + def _get_extract_iter(self) -> Iterator[Any]: + relationships = AtlasSearchDataExtractor.REQUIRED_RELATIONSHIPS_BY_TYPE.get(self.entity_type) + + for atlas_entity in self._execute_query(self.search_query, relationships=relationships): + model_dict = dict() + + data = atlas_entity.__dict__['_data'] + + for spec in self.field_mappings: + model_field, atlas_field_path, _transform_spec, default_value = spec + + atlas_value = reduce(lambda x, y: x.get(y, dict()), atlas_field_path.split('.'), data) or default_value + + transform_spec = _transform_spec or (lambda x: x) + + try: + es_entity_value = transform_spec(atlas_value) + model_dict[model_field] = es_entity_value + + except Exception: + LOGGER.warning( + f'Error processing entity. model_field: {model_field} | atlas_field_path: {atlas_field_path} ', + exc_info=True) + + try: + result = self.model_class(**model_dict) + + yield result + except Exception: + LOGGER.warning(f'Error building model object.', exc_info=True) diff --git a/example/scripts/sample_atlas_search_extractor.py b/example/scripts/sample_atlas_search_extractor.py new file mode 100644 index 000000000..d86169361 --- /dev/null +++ b/example/scripts/sample_atlas_search_extractor.py @@ -0,0 +1,82 @@ +# Copyright Contributors to the Amundsen project. +# SPDX-License-Identifier: Apache-2.0 + +import uuid + +from elasticsearch import Elasticsearch +from pyhocon import ConfigFactory + +from databuilder.extractor.atlas_search_data_extractor import AtlasSearchDataExtractor +from databuilder.job.job import DefaultJob +from databuilder.loader.file_system_elasticsearch_json_loader import FSElasticsearchJSONLoader +from databuilder.publisher.elasticsearch_publisher import ElasticsearchPublisher +from databuilder.task.task import DefaultTask +from databuilder.transformer.base_transformer import NoopTransformer + +entity_type = 'Table' +extracted_search_data_path = '/tmp/search_data.json' + +# atlas config +atlas_url = 'localhost' +atlas_port = 21000 +atlas_protocol = 'http' +atlas_verify_ssl = False +atlas_username = 'admin' +atlas_password = 'admin' +atlas_batch_chunk_size = 50 + +# elastic config +es = Elasticsearch([ + {'host': 'localhost'}, +]) + +elasticsearch_client = es +elasticsearch_new_index_key = 'tables-' + str(uuid.uuid4()) +elasticsearch_new_index_key_type = 'table' +elasticsearch_index_alias = 'table_search_index' + +job_config = ConfigFactory.from_dict({ + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_URL_CONFIG_KEY): + atlas_url, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PORT_CONFIG_KEY): + atlas_port, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PROTOCOL_CONFIG_KEY): + atlas_protocol, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_VALIDATE_SSL_CONFIG_KEY): + atlas_verify_ssl, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_USERNAME_CONFIG_KEY): + atlas_username, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_PASSWORD_CONFIG_KEY): + atlas_password, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ATLAS_BATCH_CHUNK_SIZE_KEY): + atlas_batch_chunk_size, + 'extractor.atlas_search_data.{}'.format(AtlasSearchDataExtractor.ENTITY_TYPE_KEY): + entity_type, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'loader.filesystem.elasticsearch.{}'.format(FSElasticsearchJSONLoader.FILE_MODE_CONFIG_KEY): + 'w', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_PATH_CONFIG_KEY): + extracted_search_data_path, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.FILE_MODE_CONFIG_KEY): + 'r', + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_CLIENT_CONFIG_KEY): + elasticsearch_client, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_NEW_INDEX_CONFIG_KEY): + elasticsearch_new_index_key, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_DOC_TYPE_CONFIG_KEY): + elasticsearch_new_index_key_type, + 'publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_ALIAS_CONFIG_KEY): + elasticsearch_index_alias +}) + +if __name__ == "__main__": + task = DefaultTask(extractor=AtlasSearchDataExtractor(), + transformer=NoopTransformer(), + loader=FSElasticsearchJSONLoader()) + + job = DefaultJob(conf=job_config, + task=task, + publisher=ElasticsearchPublisher()) + + job.launch() diff --git a/requirements.txt b/requirements.txt index 05836a95c..f6150768b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -57,3 +57,5 @@ pandas>=0.21.0,<1.2.0 requests==2.23.0,<3.0 responses==0.10.6 + +pyatlasclient==1.1.1 \ No newline at end of file