Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Glue ingestion with external table features #18511

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import Column, Table, TableType
from metadata.generated.schema.entity.data.table import (
Column,
FileFormat,
Table,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection,
)
Expand All @@ -52,6 +57,9 @@
from metadata.ingestion.source.database.column_helpers import truncate_column_name
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.database_service import DatabaseServiceSource
from metadata.ingestion.source.database.external_table_lineage_mixin import (
ExternalTableLineageMixin,
)
from metadata.ingestion.source.database.glue.models import Column as GlueColumn
from metadata.ingestion.source.database.glue.models import (
DatabasePage,
Expand All @@ -66,7 +74,7 @@
logger = ingestion_logger()


class GlueSource(DatabaseServiceSource):
class GlueSource(ExternalTableLineageMixin, DatabaseServiceSource):
"""
Implements the necessary methods to extract
Database metadata from Glue Source
Expand All @@ -84,6 +92,7 @@ def __init__(self, config: WorkflowSource, metadata: OpenMetadata):

self.connection_obj = self.glue
self.schema_description_map = {}
self.external_location_map = {}
self.test_connection()

@classmethod
Expand Down Expand Up @@ -305,9 +314,16 @@ def yield_table(
table_name, table_type = table_name_and_type
table = self.context.get().table_data
table_constraints = None
storage_descriptor = table.StorageDescriptor
database_name = self.context.get().database
schema_name = self.context.get().database_schema
if storage_descriptor.Location:
# s3a doesn't occur as a path in containers, so it needs to be replaced for lineage to work
self.external_location_map[
(database_name, schema_name, table_name)
] = storage_descriptor.Location.replace("s3a://", "s3://")
try:
columns = self.get_columns(table.StorageDescriptor)

columns = self.get_columns(storage_descriptor)
table_request = CreateTableRequest(
name=EntityName(table_name),
tableType=table_type,
Expand All @@ -319,15 +335,17 @@ def yield_table(
metadata=self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.get().database_service,
database_name=self.context.get().database,
schema_name=self.context.get().database_schema,
database_name=database_name,
schema_name=schema_name,
)
),
sourceUrl=self.get_source_url(
table_name=table_name,
schema_name=self.context.get().database_schema,
database_name=self.context.get().database,
),
fileFormat=self.get_format(storage_descriptor),
locationPath=storage_descriptor.Location,
)
yield Either(right=table_request)
self.register_record(table_request=table_request)
Expand Down Expand Up @@ -370,6 +388,19 @@ def get_columns(self, column_data: StorageDetails) -> Optional[Iterable[Column]]
for column in self.context.get().table_data.PartitionKeys:
yield self._get_column_object(column)

@classmethod
def get_format(cls, storage: StorageDetails) -> Optional[FileFormat]:
library = storage.SerdeInfo.SerializationLibrary
if library is None:
return None
if library.endswith(".LazySimpleSerDe"):
return (
FileFormat.tsv
if storage.SerdeInfo.Parameters.get("serialization.format") == "\t"
else FileFormat.csv
)
return next((fmt for fmt in FileFormat if fmt.value in library.lower()), None)

def standardize_table_name(self, _: str, table: str) -> str:
return table[:128]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,15 @@ class Column(BaseModel):
Comment: Optional[str] = None


class SerializationDetails(BaseModel):
SerializationLibrary: Optional[str] = None
Parameters: Optional[dict] = {}


class StorageDetails(BaseModel):
Columns: Optional[List[Column]] = []
Location: Optional[str] = None
SerdeInfo: Optional[SerializationDetails] = SerializationDetails()


class GlueTable(BaseModel):
Expand Down
10 changes: 4 additions & 6 deletions ingestion/tests/unit/resources/datasets/glue_db_dataset.json
Original file line number Diff line number Diff line change
Expand Up @@ -370,15 +370,13 @@
}
],
"Location": "s3://athena-postgres/map-test",
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"Compressed": false,
"NumberOfBuckets": -1,
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"Parameters": {
"serialization.format": "1"
}
"SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
"Parameters": {}
},
"BucketColumns": [],
"SortColumns": [],
Expand Down
40 changes: 38 additions & 2 deletions ingestion/tests/unit/topology/database/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.data.table import FileFormat, TableType
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
Expand Down Expand Up @@ -69,6 +69,11 @@
},
}


def mock_fqn_build(*args, **kwargs) -> str:
return ".".join((kwargs[key] for key in kwargs if key.endswith("_name")))


MOCK_CUSTOM_DB_NAME = "NEW_DB"

mock_glue_config_db_test = deepcopy(mock_glue_config)
Expand Down Expand Up @@ -124,6 +129,14 @@

EXPECTED_TABLE_TYPES = [TableType.External, TableType.Iceberg, TableType.View]

EXPECTED_FILE_FORMATS = [None, FileFormat.tsv, FileFormat.parquet]

EXPECTED_LOCATION_PATHS = [
"s3://athena-examples-MyRegion/cloudfront/plaintext",
"s3://athena-postgres/",
"s3://athena-postgres/map-test",
]


class GlueUnitTest(TestCase):
@patch(
Expand Down Expand Up @@ -151,6 +164,11 @@ def __init__(self, methodName, test_connection) -> None:
TablePage(**mock_data.get("mock_table_paginator"))
]

def get_table_requests(self):
tables = self.glue_source.get_tables_name_and_type()
for table in tables:
yield next(self.glue_source.yield_table(table)).right

def test_database_names(self):
assert EXPECTED_DATABASE_NAMES == list(self.glue_source.get_database_names())

Expand All @@ -172,8 +190,26 @@ def test_database_schema_names(self):
self.glue_source.get_database_schema_names()
)

def test_table_names(self):
@patch("metadata.ingestion.source.database.glue.metadata.fqn")
def test_table_names(self, fqn):
fqn.build = mock_fqn_build
for table_and_table_type in list(self.glue_source.get_tables_name_and_type()):
table_and_table_type[0]
assert table_and_table_type[0] in EXPECTED_TABLE_NAMES
assert table_and_table_type[1] in EXPECTED_TABLE_TYPES

@patch("metadata.ingestion.source.database.glue.metadata.fqn")
def test_file_formats(self, fqn):
fqn.build = mock_fqn_build
assert (
list(map(lambda x: x.fileFormat, self.get_table_requests()))
== EXPECTED_FILE_FORMATS
)

@patch("metadata.ingestion.source.database.glue.metadata.fqn")
def test_location_paths(self, fqn):
fqn.build = mock_fqn_build
assert (
list(map(lambda x: x.locationPath, self.get_table_requests()))
== EXPECTED_LOCATION_PATHS
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ slug: /connectors/database/glue
name="Glue"
stage="PROD"
platform="OpenMetadata"
availableFeatures=["Metadata", "dbt"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Data Profiler", "Data Quality", "Lineage", "Column-level Lineage"]
availableFeatures=["Metadata", "dbt", "External Table Lineage"]
unavailableFeatures=["Query Usage", "Owners", "Tags", "Stored Procedures", "Data Profiler", "Data Quality", "Column-level Lineage"]
/ %}


Expand Down
Loading