Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest): use correct native data type in all SQLAlchemy sources by compiling data type using dialect #10898

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,15 @@ def get_schema_fields_for_column(
self,
dataset_name: str,
column: Dict,
inspector: Inspector,
pk_constraints: Optional[dict] = None,
partition_keys: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
) -> List[SchemaField]:
fields = get_schema_fields_for_sqlalchemy_column(
column_name=column["name"],
column_type=column["type"],
inspector=inspector,
description=column.get("comment", None),
nullable=column.get("nullable", True),
is_part_of_key=(
Expand Down
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,16 @@ def get_schema_fields_for_column(
self,
dataset_name: str,
column: Dict[Any, Any],
inspector: Inspector,
pk_constraints: Optional[Dict[Any, Any]] = None,
partition_keys: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
) -> List[SchemaField]:
fields = super().get_schema_fields_for_column(
dataset_name, column, pk_constraints
dataset_name,
column,
inspector,
pk_constraints,
)

if self._COMPLEX_TYPE.match(fields[0].nativeDataType) and isinstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ def loop_tables(
)

# add table schema fields
schema_fields = self.get_schema_fields(dataset_name, columns)
schema_fields = self.get_schema_fields(dataset_name, columns, inspector)

self._set_partition_key(columns, schema_fields)

Expand Down Expand Up @@ -754,7 +754,9 @@ def loop_views(

# add view schema fields
schema_fields = self.get_schema_fields(
dataset.dataset_name, dataset.columns
dataset.dataset_name,
dataset.columns,
inspector,
)

schema_metadata = get_schema_metadata(
Expand Down Expand Up @@ -877,6 +879,7 @@ def get_schema_fields_for_column(
self,
dataset_name: str,
column: Dict[Any, Any],
inspector: Inspector,
pk_constraints: Optional[Dict[Any, Any]] = None,
partition_keys: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
Expand Down
17 changes: 15 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport
from datahub.utilities.sqlalchemy_type_converter import (
get_native_data_type_for_sqlalchemy_type,
)

if TYPE_CHECKING:
from datahub.ingestion.source.ge_data_profiler import (
Expand Down Expand Up @@ -788,6 +791,7 @@ def _process_table(
schema_fields = self.get_schema_fields(
dataset_name,
columns,
inspector,
pk_constraints,
tags=extra_tags,
partition_keys=partitions,
Expand Down Expand Up @@ -968,6 +972,7 @@ def get_schema_fields(
self,
dataset_name: str,
columns: List[dict],
inspector: Inspector,
pk_constraints: Optional[dict] = None,
partition_keys: Optional[List[str]] = None,
tags: Optional[Dict[str, List[str]]] = None,
Expand All @@ -980,6 +985,7 @@ def get_schema_fields(
fields = self.get_schema_fields_for_column(
dataset_name,
column,
inspector,
pk_constraints,
tags=column_tags,
partition_keys=partition_keys,
Expand All @@ -991,6 +997,7 @@ def get_schema_fields_for_column(
self,
dataset_name: str,
column: dict,
inspector: Inspector,
pk_constraints: Optional[dict] = None,
partition_keys: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
Expand All @@ -1000,10 +1007,16 @@ def get_schema_fields_for_column(
tags_str = [make_tag_urn(t) for t in tags]
tags_tac = [TagAssociationClass(t) for t in tags_str]
gtc = GlobalTagsClass(tags_tac)
full_type = column.get("full_type")
field = SchemaField(
fieldPath=column["name"],
type=get_column_type(self.report, dataset_name, column["type"]),
nativeDataType=column.get("full_type", repr(column["type"])),
nativeDataType=full_type
if full_type is not None
else get_native_data_type_for_sqlalchemy_type(
column["type"],
inspector=inspector,
),
description=column.get("comment", None),
nullable=column["nullable"],
recursive=False,
Expand Down Expand Up @@ -1076,7 +1089,7 @@ def _process_view(
self.warn(logger, dataset_name, "unable to get schema for this view")
schema_metadata = None
else:
schema_fields = self.get_schema_fields(dataset_name, columns)
schema_fields = self.get_schema_fields(dataset_name, columns, inspector)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
Expand Down
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/sql/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,12 +387,16 @@ def get_schema_fields_for_column(
self,
dataset_name: str,
column: dict,
inspector: Inspector,
pk_constraints: Optional[dict] = None,
partition_keys: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
) -> List[SchemaField]:
fields = super().get_schema_fields_for_column(
dataset_name, column, pk_constraints
dataset_name,
column,
inspector,
pk_constraints,
)

if isinstance(column["type"], (datatype.ROW, sqltypes.ARRAY, datatype.MAP)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,12 @@ def _process_projections(
foreign_keys = self._get_foreign_keys(
dataset_urn, inspector, schema, projection
)
schema_fields = self.get_schema_fields(dataset_name, columns, pk_constraints)
schema_fields = self.get_schema_fields(
dataset_name,
columns,
inspector,
pk_constraints,
)
schema_metadata = get_schema_metadata(
self.report,
dataset_name,
Expand Down Expand Up @@ -673,7 +678,7 @@ def _process_models(
)
dataset_snapshot.aspects.append(dataset_properties)

schema_fields = self.get_schema_fields(dataset_name, columns)
schema_fields = self.get_schema_fields(dataset_name, columns, inspector)

schema_metadata = get_schema_metadata(
self.report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from typing import Any, Dict, List, Optional, Type, Union

from sqlalchemy import types
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql.visitors import Visitable

from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
Expand Down Expand Up @@ -176,6 +178,7 @@ def get_avro_for_sqlalchemy_column(
def get_schema_fields_for_sqlalchemy_column(
column_name: str,
column_type: types.TypeEngine,
inspector: Inspector,
description: Optional[str] = None,
nullable: Optional[bool] = True,
is_part_of_key: Optional[bool] = False,
Expand Down Expand Up @@ -216,7 +219,10 @@ def get_schema_fields_for_sqlalchemy_column(
SchemaField(
fieldPath=column_name,
type=SchemaFieldDataTypeClass(type=NullTypeClass()),
nativeDataType=str(column_type),
nativeDataType=get_native_data_type_for_sqlalchemy_type(
column_type,
inspector,
),
)
]

Expand All @@ -240,3 +246,25 @@ def get_schema_fields_for_sqlalchemy_column(
)

return schema_fields


def get_native_data_type_for_sqlalchemy_type(
column_type: types.TypeEngine, inspector: Inspector
) -> str:
if isinstance(column_type, types.NullType):
return column_type.__visit_name__

try:
return column_type.compile(dialect=inspector.dialect)
except Exception as e:
logger.debug(
f"Unable to compile sqlalchemy type {column_type} the error was: {e}"
)

if (
isinstance(column_type, Visitable)
and column_type.__visit_name__ is not None
):
return column_type.__visit_name__

return repr(column_type)
8 changes: 2 additions & 6 deletions metadata-ingestion/tests/integration/hana/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,11 @@
version: '3.4'
services:
testhana:
image: "store/saplabs/hanaexpress:2.00.054.00.20210603.1"
image: "saplabs/hanaexpress:latest"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider pinning the Docker image to a specific version.

Using the latest version of the Docker image can introduce variability and potential instability due to untested changes. Pinning to a specific version ensures stability and reproducibility.

-    image: "saplabs/hanaexpress:latest"
+    image: "saplabs/hanaexpress:2.00.054.00.20210603.1"
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
image: "saplabs/hanaexpress:latest"
image: "saplabs/hanaexpress:2.00.054.00.20210603.1"

container_name: "testhana"
restart: "unless-stopped"
ports:
- 39013:39013
- 39017:39017
- 39041-39045:39041-39045
- 1128-1129:1128-1129
- 59013-59014:59013-59014
- 39041:39041
volumes:
- ./post_start:/hana/hooks/post_start/
- ./setup:/hana/mounts/setup/
Expand Down
Loading
Loading