Skip to content

Commit

Permalink
feat(ingestion/glue): delta schemas (#10299)
Browse files Browse the repository at this point in the history
Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
  • Loading branch information
sgomezvillamor and mayurinehate authored May 17, 2024
1 parent ad41d20 commit 0059960
Show file tree
Hide file tree
Showing 9 changed files with 1,973 additions and 440 deletions.
86 changes: 85 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from collections import defaultdict
from dataclasses import dataclass, field as dataclass_field
Expand Down Expand Up @@ -98,6 +99,7 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.delta import delta_type_to_hive_type
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,6 +163,10 @@ class GlueSourceConfig(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)
extract_delta_schema_from_parameters: Optional[bool] = Field(
default=False,
description="If enabled, delta schemas can be alternatively fetched from table parameters.",
)

def is_profiling_enabled(self) -> bool:
return self.profiling is not None and is_profiling_enabled(
Expand Down Expand Up @@ -204,6 +210,8 @@ class GlueSourceReport(StaleEntityRemovalSourceReport):
num_job_script_failed_parsing: int = 0
num_job_without_nodes: int = 0
num_dataset_to_dataset_edges_in_job: int = 0
num_dataset_invalid_delta_schema: int = 0
num_dataset_valid_delta_schema: int = 0

def report_table_scanned(self) -> None:
self.tables_scanned += 1
Expand Down Expand Up @@ -1147,10 +1155,41 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
)
return new_tags

def _is_delta_schema(
provider: str, num_parts: int, columns: Optional[List[Mapping[str, Any]]]
) -> bool:
return (
(self.source_config.extract_delta_schema_from_parameters is True)
and (provider == "delta")
and (num_parts > 0)
and (columns is not None)
and (len(columns) == 1)
and (columns[0].get("Name", "") == "col")
and (columns[0].get("Type", "") == "array<string>")
)

def get_schema_metadata() -> Optional[SchemaMetadata]:
if not table.get("StorageDescriptor"):
# As soon as the hive integration with Spark is correctly providing the schema as expected in the
# StorageProperties, the alternative path to fetch schema from table parameters for delta schemas can be removed.
# https://github.com/delta-io/delta/pull/2310
provider = table.get("Parameters", {}).get("spark.sql.sources.provider", "")
num_parts = int(
table.get("Parameters", {}).get(
"spark.sql.sources.schema.numParts", "0"
)
)
columns = table.get("StorageDescriptor", {}).get("Columns", [{}])

if _is_delta_schema(provider, num_parts, columns):
return _get_delta_schema_metadata()

elif table.get("StorageDescriptor"):
return _get_glue_schema_metadata()

else:
return None

def _get_glue_schema_metadata() -> Optional[SchemaMetadata]:
schema = table["StorageDescriptor"]["Columns"]
fields: List[SchemaField] = []
for field in schema:
Expand Down Expand Up @@ -1183,6 +1222,51 @@ def get_schema_metadata() -> Optional[SchemaMetadata]:
platformSchema=MySqlDDL(tableSchema=""),
)

def _get_delta_schema_metadata() -> Optional[SchemaMetadata]:
assert (
table["Parameters"]["spark.sql.sources.provider"] == "delta"
and int(table["Parameters"]["spark.sql.sources.schema.numParts"]) > 0
)

try:
numParts = int(table["Parameters"]["spark.sql.sources.schema.numParts"])
schema_str = "".join(
[
table["Parameters"][f"spark.sql.sources.schema.part.{i}"]
for i in range(numParts)
]
)
schema_json = json.loads(schema_str)
fields: List[SchemaField] = []
for field in schema_json["fields"]:
field_type = delta_type_to_hive_type(field.get("type", "unknown"))
schema_fields = get_schema_fields_for_hive_column(
hive_column_name=field["name"],
hive_column_type=field_type,
description=field.get("description"),
default_nullable=bool(field.get("nullable", True)),
)
assert schema_fields
fields.extend(schema_fields)

self.report.num_dataset_valid_delta_schema += 1
return SchemaMetadata(
schemaName=table_name,
version=0,
fields=fields,
platform=f"urn:li:dataPlatform:{self.platform}",
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

except Exception as e:
self.report_warning(
dataset_urn,
f"Could not parse schema for {table_name} because of {type(e).__name__}: {e}",
)
self.report.num_dataset_invalid_delta_schema += 1
return None

def get_data_platform_instance() -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
import time
from typing import Any, Dict, Iterable, List
from typing import Dict, Iterable, List
from urllib.parse import urlparse

from deltalake import DeltaTable
Expand Down Expand Up @@ -51,6 +51,7 @@
SchemaFieldClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.delta import delta_type_to_hive_type
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column

logging.getLogger("py4j").setLevel(logging.ERROR)
Expand Down Expand Up @@ -126,46 +127,12 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = DeltaLakeSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def delta_type_to_hive_type(self, field_type: Any) -> str:
if isinstance(field_type, str):
"""
return the field type
"""
return field_type
else:
if field_type.get("type") == "array":
"""
if array is of complex type, recursively parse the
fields and create the native datatype
"""
return (
"array<"
+ self.delta_type_to_hive_type(field_type.get("elementType"))
+ ">"
)
elif field_type.get("type") == "struct":
parsed_struct = ""
for field in field_type.get("fields"):
"""
if field is of complex type, recursively parse
and create the native datatype
"""
parsed_struct += (
"{}:{}".format(
field.get("name"),
self.delta_type_to_hive_type(field.get("type")),
)
+ ","
)
return "struct<" + parsed_struct.rstrip(",") + ">"
return ""

def _parse_datatype(self, raw_field_json_str: str) -> List[SchemaFieldClass]:
raw_field_json = json.loads(raw_field_json_str)

# get the parent field name and type
field_name = raw_field_json.get("name")
field_type = self.delta_type_to_hive_type(raw_field_json.get("type"))
field_type = delta_type_to_hive_type(raw_field_json.get("type"))

return get_schema_fields_for_hive_column(field_name, field_type)

Expand Down
34 changes: 34 additions & 0 deletions metadata-ingestion/src/datahub/utilities/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Any


def delta_type_to_hive_type(field_type: Any) -> str:
if isinstance(field_type, str):
"""
return the field type
"""
return field_type
else:
if field_type.get("type") == "array":
"""
if array is of complex type, recursively parse the
fields and create the native datatype
"""
return (
"array<" + delta_type_to_hive_type(field_type.get("elementType")) + ">"
)
elif field_type.get("type") == "struct":
parsed_struct = ""
for field in field_type.get("fields"):
"""
if field is of complex type, recursively parse
and create the native datatype
"""
parsed_struct += (
"{}:{}".format(
field.get("name"),
delta_type_to_hive_type(field.get("type")),
)
+ ","
)
return "struct<" + parsed_struct.rstrip(",") + ">"
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class HiveColumnToAvroConverter:
"bigint": "long",
"varchar": "string",
"char": "string",
"long": "long",
"bytes": "bytes",
}
_EXTRA_BIGQUERY_TYPE_TO_AVRO_TYPE = {
Expand Down
Loading

0 comments on commit 0059960

Please sign in to comment.