Skip to content

Commit

Permalink
fix(ingest): downgrade column type mapping warning to info (#11115)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Aug 7, 2024
1 parent a25df8e commit d6e46b9
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 152 deletions.
74 changes: 2 additions & 72 deletions metadata-ingestion/src/datahub/ingestion/source/abs/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,10 @@
from collections import OrderedDict
from datetime import datetime
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

import smart_open.compression as so_compression
from more_itertools import peekable
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
NullType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from smart_open import open as smart_open

from datahub.emitter.mce_builder import (
Expand All @@ -48,7 +29,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.abs.config import DataLakeSourceConfig, PathSpec
from datahub.ingestion.source.abs.report import DataLakeSourceReport
Expand All @@ -72,22 +53,14 @@
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
MapTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
Expand All @@ -100,55 +73,12 @@
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)

# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html
_field_type_mapping = {
NullType: NullTypeClass,
StringType: StringTypeClass,
BinaryType: BytesTypeClass,
BooleanType: BooleanTypeClass,
DateType: DateTypeClass,
TimestampType: TimeTypeClass,
DecimalType: NumberTypeClass,
DoubleType: NumberTypeClass,
FloatType: NumberTypeClass,
ByteType: BytesTypeClass,
IntegerType: NumberTypeClass,
LongType: NumberTypeClass,
ShortType: NumberTypeClass,
ArrayType: NullTypeClass,
MapType: MapTypeClass,
StructField: RecordTypeClass,
StructType: RecordTypeClass,
}
PAGE_SIZE = 1000

# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])


def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
) -> SchemaFieldDataType:
"""
Maps known Spark types to datahub types
"""
TypeClass: Any = None

for field_type, type_class in _field_type_mapping.items():
if isinstance(column_type, field_type):
TypeClass = type_class
break

# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
TypeClass = NullTypeClass

return SchemaFieldDataType(type=TypeClass())


# config flags to emit telemetry for
config_options_to_report = [
"platform",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,8 +849,11 @@ def get_column_type(
# if still not found, report the warning
if TypeClass is None:
if column_type:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
report.info(
title="Unable to map column types to DataHub types",
message="Got an unexpected column type. The column's parsed field type will not be populated.",
context=f"{dataset_name} - {column_type}",
log=False,
)
TypeClass = NullTypeClass

Expand Down
79 changes: 3 additions & 76 deletions metadata-ingestion/src/datahub/ingestion/source/s3/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,13 @@
from collections import OrderedDict
from datetime import datetime
from pathlib import PurePath
from typing import Any, Dict, Iterable, List, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple

import smart_open.compression as so_compression
from more_itertools import peekable
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import (
ArrayType,
BinaryType,
BooleanType,
ByteType,
DateType,
DecimalType,
DoubleType,
FloatType,
IntegerType,
LongType,
MapType,
NullType,
ShortType,
StringType,
StructField,
StructType,
TimestampType,
)
from pyspark.sql.utils import AnalysisException
from smart_open import open as smart_open

Expand All @@ -52,7 +33,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_boto_utils import get_s3_tags, list_folders
from datahub.ingestion.source.aws.s3_util import (
Expand All @@ -72,22 +53,13 @@
StatefulIngestionSourceBase,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BooleanTypeClass,
BytesTypeClass,
DateTypeClass,
NullTypeClass,
NumberTypeClass,
RecordTypeClass,
SchemaField,
SchemaFieldDataType,
SchemaMetadata,
StringTypeClass,
TimeTypeClass,
)
from datahub.metadata.schema_classes import (
DataPlatformInstanceClass,
DatasetPropertiesClass,
MapTypeClass,
OperationClass,
OperationTypeClass,
OtherSchemaClass,
Expand All @@ -101,55 +73,12 @@
logging.getLogger("py4j").setLevel(logging.ERROR)
logger: logging.Logger = logging.getLogger(__name__)

# for a list of all types, see https://spark.apache.org/docs/3.0.3/api/python/_modules/pyspark/sql/types.html
_field_type_mapping = {
NullType: NullTypeClass,
StringType: StringTypeClass,
BinaryType: BytesTypeClass,
BooleanType: BooleanTypeClass,
DateType: DateTypeClass,
TimestampType: TimeTypeClass,
DecimalType: NumberTypeClass,
DoubleType: NumberTypeClass,
FloatType: NumberTypeClass,
ByteType: BytesTypeClass,
IntegerType: NumberTypeClass,
LongType: NumberTypeClass,
ShortType: NumberTypeClass,
ArrayType: NullTypeClass,
MapType: MapTypeClass,
StructField: RecordTypeClass,
StructType: RecordTypeClass,
}
PAGE_SIZE = 1000

# Hack to support the .gzip extension with smart_open.
so_compression.register_compressor(".gzip", so_compression._COMPRESSOR_REGISTRY[".gz"])


def get_column_type(
report: SourceReport, dataset_name: str, column_type: str
) -> SchemaFieldDataType:
"""
Maps known Spark types to datahub types
"""
TypeClass: Any = None

for field_type, type_class in _field_type_mapping.items():
if isinstance(column_type, field_type):
TypeClass = type_class
break

# if still not found, report the warning
if TypeClass is None:
report.report_warning(
dataset_name, f"unable to map type {column_type} to metadata schema"
)
TypeClass = NullTypeClass

return SchemaFieldDataType(type=TypeClass())


# config flags to emit telemetry for
config_options_to_report = [
"platform",
Expand Down Expand Up @@ -490,9 +419,7 @@ def add_partition_columns_to_schema(
if not is_fieldpath_v2
else f"[version=2.0].[type=string].{partition_key}",
nativeDataType="string",
type=SchemaFieldDataType(StringTypeClass())
if not is_fieldpath_v2
else SchemaFieldDataTypeClass(type=StringTypeClass()),
type=SchemaFieldDataTypeClass(StringTypeClass()),
isPartitioningKey=True,
nullable=True,
recursive=False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,11 @@ def get_column_type(
break

if TypeClass is None:
sql_report.report_warning(
dataset_name, f"unable to map type {column_type!r} to metadata schema"
sql_report.info(
title="Unable to map column types to DataHub types",
message="Got an unexpected column type. The column's parsed field type will not be populated.",
context=f"{dataset_name} - {column_type!r}",
log=False,
)
TypeClass = NullTypeClass

Expand Down

0 comments on commit d6e46b9

Please sign in to comment.