From 0cda01e54e0f78bb970deb58d0abc2e598b1fb9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Katarzyna=20Ka=C5=82ek?= Date: Mon, 4 Nov 2024 18:08:02 +0100 Subject: [PATCH 1/2] added fileFormat, locationPath and external table lineage to Glue ingestion --- .../source/database/glue/metadata.py | 43 ++++++++++++++++--- .../ingestion/source/database/glue/models.py | 7 +++ .../resources/datasets/glue_db_dataset.json | 10 ++--- .../tests/unit/topology/database/test_glue.py | 40 ++++++++++++++++- .../connectors/database/glue/index.md | 4 +- 5 files changed, 88 insertions(+), 16 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py index 1cb42e5285ba..35be904cd70b 100755 --- a/ingestion/src/metadata/ingestion/source/database/glue/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/metadata.py @@ -26,7 +26,12 @@ from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Column, Table, TableType +from metadata.generated.schema.entity.data.table import ( + Column, + FileFormat, + Table, + TableType, +) from metadata.generated.schema.entity.services.connections.database.glueConnection import ( GlueConnection, ) @@ -52,6 +57,9 @@ from metadata.ingestion.source.database.column_helpers import truncate_column_name from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser from metadata.ingestion.source.database.database_service import DatabaseServiceSource +from metadata.ingestion.source.database.external_table_lineage_mixin import ( + ExternalTableLineageMixin, +) from metadata.ingestion.source.database.glue.models import Column as GlueColumn from metadata.ingestion.source.database.glue.models import ( DatabasePage, @@ -66,7 +74,7 @@ logger = ingestion_logger() -class GlueSource(DatabaseServiceSource): +class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource): """ Implements the necessary methods to extract Database metadata from Glue Source @@ -84,6 +92,7 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata): self.connection_obj = self.glue self.schema_description_map = {} + self.external_location_map = {} self.test_connection() @classmethod @@ -305,9 +314,16 @@ def yield_table( table_name, table_type = table_name_and_type table = self.context.get().table_data table_constraints = None + storage_descriptor = table.StorageDescriptor + database_name = self.context.get().database + schema_name = self.context.get().database_schema + if storage_descriptor.Location: + # s3a doesn't occur as a path in containers, so it needs to be replaced for lineage to work + self.external_location_map[ + (database_name, schema_name, table_name) + ] = storage_descriptor.Location.replace("s3a://", "s3://") try: - columns = self.get_columns(table.StorageDescriptor) - + columns = self.get_columns(storage_descriptor) table_request = CreateTableRequest( name=EntityName(table_name), tableType=table_type, @@ -319,8 +335,8 @@ def yield_table( metadata=self.metadata, entity_type=DatabaseSchema, service_name=self.context.get().database_service, - database_name=self.context.get().database, - schema_name=self.context.get().database_schema, + database_name=database_name, + schema_name=schema_name, ) ), sourceUrl=self.get_source_url( @@ -328,6 +344,8 @@ def yield_table( schema_name=self.context.get().database_schema, database_name=self.context.get().database, ), + fileFormat=self.get_format(storage_descriptor), + locationPath=storage_descriptor.Location, ) yield Either(right=table_request) self.register_record(table_request=table_request) @@ -370,6 +388,19 @@ def get_columns(self, column_data: StorageDetails) -> Optional[Iterable[Column]] for column in self.context.get().table_data.PartitionKeys: yield self._get_column_object(column) + @classmethod + def get_format(cls, storage: StorageDetails) -> Optional[FileFormat]: + library = storage.SerdeInfo.SerializationLibrary + if library is None: + return None + if library.endswith(".LazySimpleSerDe"): + return ( + FileFormat.tsv + if storage.SerdeInfo.Parameters.get("serialization.format") == "\t" + else FileFormat.csv + ) + return next((fmt for fmt in FileFormat if fmt.value in library.lower()), None) + def standardize_table_name(self, _: str, table: str) -> str: return table[:128] diff --git a/ingestion/src/metadata/ingestion/source/database/glue/models.py b/ingestion/src/metadata/ingestion/source/database/glue/models.py index 229d18b3a433..3975d774ac86 100644 --- a/ingestion/src/metadata/ingestion/source/database/glue/models.py +++ b/ingestion/src/metadata/ingestion/source/database/glue/models.py @@ -37,8 +37,15 @@ class Column(BaseModel): Comment: Optional[str] = None +class SerializationDetails(BaseModel): + SerializationLibrary: Optional[str] = None + Parameters: Optional[dict] = {} + + class StorageDetails(BaseModel): Columns: Optional[List[Column]] = [] + Location: Optional[str] = None + SerdeInfo: Optional[SerializationDetails] = SerializationDetails() class GlueTable(BaseModel): diff --git a/ingestion/tests/unit/resources/datasets/glue_db_dataset.json b/ingestion/tests/unit/resources/datasets/glue_db_dataset.json index f491335cf4b3..d5c3543d8587 100644 --- a/ingestion/tests/unit/resources/datasets/glue_db_dataset.json +++ b/ingestion/tests/unit/resources/datasets/glue_db_dataset.json @@ -370,15 +370,13 @@ } ], "Location": "s3://athena-postgres/map-test", - "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", - "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", "Compressed": false, "NumberOfBuckets": -1, "SerdeInfo": { - "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", - "Parameters": { - "serialization.format": "1" - } + "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe", + "Parameters": {} }, "BucketColumns": [], "SortColumns": [], diff --git a/ingestion/tests/unit/topology/database/test_glue.py b/ingestion/tests/unit/topology/database/test_glue.py index 36f88a8e6f49..d9da11f3ebbb 100644 --- a/ingestion/tests/unit/topology/database/test_glue.py +++ b/ingestion/tests/unit/topology/database/test_glue.py @@ -21,7 +21,7 @@ from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import TableType +from metadata.generated.schema.entity.data.table import FileFormat, TableType from metadata.generated.schema.entity.services.databaseService import ( DatabaseConnection, DatabaseService, @@ -69,6 +69,11 @@ }, } + +def mock_fqn_build(*args, **kwargs) -> str: + return ".".join((kwargs[key] for key in kwargs if key.endswith("_name"))) + + MOCK_CUSTOM_DB_NAME = "NEW_DB" mock_glue_config_db_test = deepcopy(mock_glue_config) @@ -124,6 +129,14 @@ EXPECTED_TABLE_TYPES = [TableType.External, TableType.Iceberg, TableType.View] +EXPECTED_FILE_FORMATS = [None, FileFormat.tsv, FileFormat.parquet] + +EXPECTED_LOCATION_PATHS = [ + "s3://athena-examples-MyRegion/cloudfront/plaintext", + "s3://athena-postgres/", + "s3://athena-postgres/map-test", +] + class GlueUnitTest(TestCase): @patch( @@ -151,6 +164,11 @@ def __init__(self, methodName, test_connection) -> None: TablePage(**mock_data.get("mock_table_paginator")) ] + def get_table_requests(self): + tables = self.glue_source.get_tables_name_and_type() + for table in tables: + yield next(self.glue_source.yield_table(table)).right + def test_database_names(self): assert EXPECTED_DATABASE_NAMES == list(self.glue_source.get_database_names()) @@ -172,8 +190,26 @@ def test_database_schema_names(self): self.glue_source.get_database_schema_names() ) - def test_table_names(self): + @patch("metadata.ingestion.source.database.glue.metadata.fqn") + def test_table_names(self, fqn): + fqn.build = mock_fqn_build for table_and_table_type in list(self.glue_source.get_tables_name_and_type()): table_and_table_type[0] assert table_and_table_type[0] in EXPECTED_TABLE_NAMES assert table_and_table_type[1] in EXPECTED_TABLE_TYPES + + @patch("metadata.ingestion.source.database.glue.metadata.fqn") + def test_file_formats(self, fqn): + fqn.build = mock_fqn_build + assert ( + list(map(lambda x: x.fileFormat, self.get_table_requests())) + == EXPECTED_FILE_FORMATS + ) + + @patch("metadata.ingestion.source.database.glue.metadata.fqn") + def test_location_paths(self, fqn): + fqn.build = mock_fqn_build + assert ( + list(map(lambda x: x.locationPath, self.get_table_requests())) + == EXPECTED_LOCATION_PATHS + ) diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md index 87c93812bb7f..b5d536787658 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md @@ -7,8 +7,8 @@ slug: /connectors/database/glue name="Glue" stage="PROD" platform="OpenMetadata" -availableFeatures=["Metadata", "dbt"] -unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage"] +availableFeatures=["Metadata", "dbt", "Lineage"] +unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Data Profiler", "Data Quality", "Column-level Lineage"] / %} From 18dc7cc3e45eaab92e5c8083ccb76bf0d8cb8d92 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 5 Nov 2024 16:57:17 +0530 Subject: [PATCH 2/2] Improve Lineage Label --- .../content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md index b5d536787658..c136a8132e4b 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/database/glue/index.md @@ -7,7 +7,7 @@ slug: /connectors/database/glue name="Glue" stage="PROD" platform="OpenMetadata" -availableFeatures=["Metadata", "dbt", "Lineage"] +availableFeatures=["Metadata", "dbt", "External Table Lineage"] unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Data Profiler", "Data Quality", "Column-level Lineage"] / %}