Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion/glue): delta schemas #10299

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cd7f751
feat(ingestion/glue): delta schemas
sgomezvillamor Apr 16, 2024
ac351e9
lint fix
sgomezvillamor Apr 16, 2024
21b3c47
update delta golden files
sgomezvillamor Apr 17, 2024
7fe6f0f
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 17, 2024
1562daa
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 19, 2024
cecb5aa
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 19, 2024
5f53955
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 22, 2024
bd8a97b
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 23, 2024
7774c34
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 24, 2024
1a66f4d
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 25, 2024
dbceda1
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor Apr 26, 2024
d634b10
refactor: address PR review comments
sgomezvillamor May 10, 2024
8fc4f15
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 10, 2024
ee9f466
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 13, 2024
f602238
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 15, 2024
310102e
Update metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
sgomezvillamor May 16, 2024
da5c3b7
Update metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
sgomezvillamor May 16, 2024
6bec426
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 16, 2024
4bbadbd
fix lint
sgomezvillamor May 16, 2024
b8dfc25
Merge branch 'master' into feat-ingestion-glue-delta-schema
sgomezvillamor May 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 87 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import logging
from collections import defaultdict
from dataclasses import dataclass, field as dataclass_field
Expand Down Expand Up @@ -98,6 +99,7 @@
UpstreamClass,
UpstreamLineageClass,
)
from datahub.utilities.delta import delta_type_to_hive_type
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -161,6 +163,11 @@ class GlueSourceConfig(
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
default=None, description=""
)
extract_delta_schema_from_parameters: Optional[bool] = Field(
default=False,
description="If enabled, delta schemas can be alternatively fetched from table parameters "
"(https://github.com/delta-io/delta/pull/2310)",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's please move this PR's link as code comment on this config - rather than in description of config,

sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
)

def is_profiling_enabled(self) -> bool:
return self.profiling is not None and is_profiling_enabled(
Expand Down Expand Up @@ -204,6 +211,8 @@ class GlueSourceReport(StaleEntityRemovalSourceReport):
num_job_script_failed_parsing: int = 0
num_job_without_nodes: int = 0
num_dataset_to_dataset_edges_in_job: int = 0
num_dataset_invalid_delta_schema: int = 0
num_dataset_valid_delta_schema: int = 0

def report_table_scanned(self) -> None:
self.tables_scanned += 1
Expand Down Expand Up @@ -1147,10 +1156,42 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
)
return new_tags

def _is_delta_schema(
provider: str, num_parts: int, columns: Optional[List[Mapping[str, Any]]]
) -> bool:
return (
(self.source_config.extract_delta_schema_from_parameters is True)
and (provider == "delta")
and (num_parts > 0)
and (columns is not None)
and (len(columns) == 1)
and (columns[0].get("Name", "") == "col")
and (columns[0].get("Type", "") == "array<string>")
)

def get_schema_metadata() -> Optional[SchemaMetadata]:
if not table.get("StorageDescriptor"):
# As soon as the hive integration with Spark is correctly providing the schema as expected in the
# StorageProperties, the alternative path to fetch schema from table parameters can be removed.
# https://github.com/datahub-project/datahub/pull/10299
sgomezvillamor marked this conversation as resolved.
Show resolved Hide resolved
# https://github.com/delta-io/delta/pull/2310
provider = table.get("Parameters", {}).get("spark.sql.sources.provider", "")
num_parts = int(
table.get("Parameters", {}).get(
"spark.sql.sources.schema.numParts", "0"
)
)
columns = table.get("StorageDescriptor", {}).get("Columns", [{}])

if _is_delta_schema(provider, num_parts, columns):
return _get_delta_schema_metadata()

elif table.get("StorageDescriptor"):
return _get_glue_schema_metadata()

else:
return None

def _get_glue_schema_metadata() -> Optional[SchemaMetadata]:
schema = table["StorageDescriptor"]["Columns"]
fields: List[SchemaField] = []
for field in schema:
Expand Down Expand Up @@ -1183,6 +1224,51 @@ def get_schema_metadata() -> Optional[SchemaMetadata]:
platformSchema=MySqlDDL(tableSchema=""),
)

def _get_delta_schema_metadata() -> Optional[SchemaMetadata]:
assert (
table["Parameters"]["spark.sql.sources.provider"] == "delta"
and int(table["Parameters"]["spark.sql.sources.schema.numParts"]) > 0
)

try:
numParts = int(table["Parameters"]["spark.sql.sources.schema.numParts"])
schema_str = "".join(
[
table["Parameters"][f"spark.sql.sources.schema.part.{i}"]
for i in range(numParts)
]
)
schema_json = json.loads(schema_str)
fields: List[SchemaField] = []
for field in schema_json["fields"]:
field_type = delta_type_to_hive_type(field.get("type", "unknown"))
schema_fields = get_schema_fields_for_hive_column(
hive_column_name=field["name"],
hive_column_type=field_type,
description=field.get("description"),
default_nullable=bool(field.get("nullable", True)),
)
assert schema_fields
fields.extend(schema_fields)

self.report.num_dataset_valid_delta_schema += 1
return SchemaMetadata(
schemaName=table_name,
version=0,
fields=fields,
platform=f"urn:li:dataPlatform:{self.platform}",
hash="",
platformSchema=MySqlDDL(tableSchema=""),
)

except Exception as e:
self.report_warning(
dataset_urn,
f"Could not parse schema for {table_name} because of {type(e).__name__}: {e}",
)
self.report.num_dataset_invalid_delta_schema += 1
return None

def get_data_platform_instance() -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
import time
from typing import Any, Dict, Iterable, List
from typing import Dict, Iterable, List
from urllib.parse import urlparse

from deltalake import DeltaTable
Expand Down Expand Up @@ -51,6 +51,7 @@
SchemaFieldClass,
)
from datahub.telemetry import telemetry
from datahub.utilities.delta import delta_type_to_hive_type
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column

logging.getLogger("py4j").setLevel(logging.ERROR)
Expand Down Expand Up @@ -126,46 +127,12 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> "Source":
config = DeltaLakeSourceConfig.parse_obj(config_dict)
return cls(config, ctx)

def delta_type_to_hive_type(self, field_type: Any) -> str:
if isinstance(field_type, str):
"""
return the field type
"""
return field_type
else:
if field_type.get("type") == "array":
"""
if array is of complex type, recursively parse the
fields and create the native datatype
"""
return (
"array<"
+ self.delta_type_to_hive_type(field_type.get("elementType"))
+ ">"
)
elif field_type.get("type") == "struct":
parsed_struct = ""
for field in field_type.get("fields"):
"""
if field is of complex type, recursively parse
and create the native datatype
"""
parsed_struct += (
"{0}:{1}".format(
field.get("name"),
self.delta_type_to_hive_type(field.get("type")),
)
+ ","
)
return "struct<" + parsed_struct.rstrip(",") + ">"
return ""

def _parse_datatype(self, raw_field_json_str: str) -> List[SchemaFieldClass]:
raw_field_json = json.loads(raw_field_json_str)

# get the parent field name and type
field_name = raw_field_json.get("name")
field_type = self.delta_type_to_hive_type(raw_field_json.get("type"))
field_type = delta_type_to_hive_type(raw_field_json.get("type"))

return get_schema_fields_for_hive_column(field_name, field_type)

Expand Down
34 changes: 34 additions & 0 deletions metadata-ingestion/src/datahub/utilities/delta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Any


def delta_type_to_hive_type(field_type: Any) -> str:
if isinstance(field_type, str):
"""
return the field type
"""
return field_type
else:
if field_type.get("type") == "array":
"""
if array is of complex type, recursively parse the
fields and create the native datatype
"""
return (
"array<" + delta_type_to_hive_type(field_type.get("elementType")) + ">"
)
elif field_type.get("type") == "struct":
parsed_struct = ""
for field in field_type.get("fields"):
"""
if field is of complex type, recursively parse
and create the native datatype
"""
parsed_struct += (
"{0}:{1}".format(
field.get("name"),
delta_type_to_hive_type(field.get("type")),
)
+ ","
)
return "struct<" + parsed_struct.rstrip(",") + ">"
return ""
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class HiveColumnToAvroConverter:
"bigint": "long",
"varchar": "string",
"char": "string",
"long": "long",
"bytes": "bytes",
}
_EXTRA_BIGQUERY_TYPE_TO_AVRO_TYPE = {
Expand Down
Loading
Loading