From 0c31d9ae0d39ed652a296b8292ebcdf366c2a353 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 9 Jan 2025 15:55:56 -0800 Subject: [PATCH] feat(ingest/datahub): support dropping duplicate schema fields (#12308) --- .../src/datahub/ingestion/source/datahub/config.py | 6 ++++++ .../ingestion/source/datahub/datahub_source.py | 14 ++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py index 09f38913f11b19..8622e221940317 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/config.py @@ -108,6 +108,12 @@ class DataHubSourceConfig(StatefulIngestionConfigBase): urn_pattern: AllowDenyPattern = Field(default=AllowDenyPattern()) + drop_duplicate_schema_fields: bool = Field( + default=False, + description="Whether to drop duplicate schema fields in the schemaMetadata aspect. " + "Useful if the source system has duplicate field paths in the db, but we're pushing to a system with server-side duplicate checking.", + ) + @root_validator(skip_on_failure=True) def check_ingesting_data(cls, values): if ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py index 12daba298a2014..472abd0a97ec70 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_source.py @@ -12,7 +12,10 @@ support_status, ) from datahub.ingestion.api.source import MetadataWorkUnitProcessor, SourceReport -from datahub.ingestion.api.source_helpers import auto_workunit_reporter +from datahub.ingestion.api.source_helpers import ( + auto_fix_duplicate_schema_field_paths, + auto_workunit_reporter, +) from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.datahub.config import DataHubSourceConfig from datahub.ingestion.source.datahub.datahub_api_reader import DataHubApiReader @@ -57,7 +60,14 @@ def get_report(self) -> SourceReport: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: # Exactly replicate data from DataHub source - return [partial(auto_workunit_reporter, self.get_report())] + return [ + ( + auto_fix_duplicate_schema_field_paths + if self.config.drop_duplicate_schema_fields + else None + ), + partial(auto_workunit_reporter, self.get_report()), + ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.stop_time = datetime.now(tz=timezone.utc)