diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index f5d63c68d137ec..7d0d6f24dbdb62 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -2,6 +2,7 @@ import json from typing import Any, Dict, Iterable, List, Optional, TypeVar +from deprecated import deprecated from pydantic.fields import Field from pydantic.main import BaseModel @@ -18,7 +19,6 @@ ) from datahub.metadata.com.linkedin.pegasus2avro.container import ContainerProperties from datahub.metadata.schema_classes import ( - ChangeTypeClass, ContainerClass, DomainsClass, GlobalTagsClass, @@ -31,7 +31,6 @@ TagAssociationClass, _Aspect, ) -from datahub.utilities.urns.urn import guess_entity_type def _stable_guid_from_dict(d: dict) -> str: @@ -129,24 +128,18 @@ def default(self, obj: Any) -> Any: def add_domain_to_entity_wu( - entity_type: str, entity_urn: str, domain_urn: str + entity_urn: str, domain_urn: str ) -> Iterable[MetadataWorkUnit]: - mcp = MetadataChangeProposalWrapper( - entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, + yield MetadataChangeProposalWrapper( entityUrn=f"{entity_urn}", aspect=DomainsClass(domains=[domain_urn]), - ) - wu = MetadataWorkUnit(id=f"{domain_urn}-to-{entity_urn}", mcp=mcp) - yield wu + ).as_workunit() def add_owner_to_entity_wu( entity_type: str, entity_urn: str, owner_urn: str ) -> Iterable[MetadataWorkUnit]: - mcp = MetadataChangeProposalWrapper( - entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, + yield MetadataChangeProposalWrapper( entityUrn=f"{entity_urn}", aspect=OwnershipClass( owners=[ @@ -156,26 +149,22 @@ def add_owner_to_entity_wu( ) ] ), - ) - wu = MetadataWorkUnit(id=f"{owner_urn}-to-{entity_urn}", mcp=mcp) - yield wu + ).as_workunit() def add_tags_to_entity_wu( entity_type: str, entity_urn: str, tags: List[str] ) -> Iterable[MetadataWorkUnit]: - mcp = MetadataChangeProposalWrapper( + yield MetadataChangeProposalWrapper( entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, entityUrn=f"{entity_urn}", aspect=GlobalTagsClass( tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags] ), - ) - wu = MetadataWorkUnit(id=f"tags-to-{entity_urn}", mcp=mcp) - yield wu + ).as_workunit() +@deprecated("use MetadataChangeProposalWrapper(...).as_workunit() instead") def wrap_aspect_as_workunit( entityName: str, entityUrn: str, @@ -210,9 +199,7 @@ def gen_containers( container_urn = make_container_urn( guid=container_key.guid(), ) - mcp = MetadataChangeProposalWrapper( - entityType="container", - changeType=ChangeTypeClass.UPSERT, + yield MetadataChangeProposalWrapper( entityUrn=f"{container_urn}", # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), aspect=ContainerProperties( @@ -229,51 +216,32 @@ def gen_containers( if last_modified is not None else None, ), - ) - wu = MetadataWorkUnit(id=f"container-info-{name}-{container_urn}", mcp=mcp) - yield wu + ).as_workunit() # add status - yield wrap_aspect_as_workunit( - entityName="container", + yield MetadataChangeProposalWrapper( entityUrn=f"{container_urn}", aspect=StatusClass(removed=False), - aspectName=StatusClass.get_aspect_name(), - ) + ).as_workunit() - mcp = MetadataChangeProposalWrapper( - entityType="container", - changeType=ChangeTypeClass.UPSERT, + yield MetadataChangeProposalWrapper( entityUrn=f"{container_urn}", - # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), aspect=DataPlatformInstance( platform=f"{make_data_platform_urn(container_key.platform)}", instance=f"{make_dataplatform_instance_urn(container_key.platform, container_key.instance)}" if container_key.instance else None, ), - ) - wu = MetadataWorkUnit( - id=f"container-platforminstance-{name}-{container_urn}", mcp=mcp - ) - yield wu + ).as_workunit() # Set subtype - subtype_mcp = MetadataChangeProposalWrapper( - entityType="container", - changeType=ChangeTypeClass.UPSERT, + yield MetadataChangeProposalWrapper( entityUrn=f"{container_urn}", - # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), aspect=SubTypesClass(typeNames=sub_types), - ) - wu = MetadataWorkUnit( - id=f"container-subtypes-{name}-{container_urn}", mcp=subtype_mcp - ) - yield wu + ).as_workunit() if domain_urn: yield from add_domain_to_entity_wu( - entity_type="container", entity_urn=container_urn, domain_urn=domain_urn, ) @@ -299,39 +267,23 @@ def gen_containers( # Set database container parent_container_mcp = MetadataChangeProposalWrapper( - entityType="container", - changeType=ChangeTypeClass.UPSERT, entityUrn=f"{container_urn}", - # entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()), aspect=ContainerClass(container=parent_container_urn), - # aspect=ContainerKeyClass(guid=database_container_key.guid()) ) - wu = MetadataWorkUnit( - id=f"container-parent-container-{name}-{container_urn}-{parent_container_urn}", - mcp=parent_container_mcp, - ) - - yield wu + yield parent_container_mcp.as_workunit() def add_dataset_to_container( - # FIXME: Union requires two or more type arguments - container_key: KeyType, - dataset_urn: str, + container_key: KeyType, dataset_urn: str ) -> Iterable[MetadataWorkUnit]: container_urn = make_container_urn( guid=container_key.guid(), ) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, + yield MetadataChangeProposalWrapper( entityUrn=f"{dataset_urn}", aspect=ContainerClass(container=f"{container_urn}"), - # aspect=ContainerKeyClass(guid=schema_container_key.guid()) - ) - wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{dataset_urn}", mcp=mcp) - yield wu + ).as_workunit() def add_entity_to_container( @@ -340,14 +292,11 @@ def add_entity_to_container( container_urn = make_container_urn( guid=container_key.guid(), ) - mcp = MetadataChangeProposalWrapper( + yield MetadataChangeProposalWrapper( entityType=entity_type, - changeType=ChangeTypeClass.UPSERT, entityUrn=entity_urn, aspect=ContainerClass(container=f"{container_urn}"), - ) - wu = MetadataWorkUnit(id=f"container-{container_urn}-to-{entity_urn}", mcp=mcp) - yield wu + ).as_workunit() def mcps_from_mce( @@ -355,8 +304,6 @@ def mcps_from_mce( ) -> Iterable[MetadataChangeProposalWrapper]: for aspect in mce.proposedSnapshot.aspects: yield MetadataChangeProposalWrapper( - entityType=guess_entity_type(mce.proposedSnapshot.urn), - changeType=ChangeTypeClass.UPSERT, entityUrn=mce.proposedSnapshot.urn, auditHeader=mce.auditHeader, aspect=aspect, diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index b1e6197fa19dfd..02465936e8063c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -930,12 +930,11 @@ def _gen_domain_urn(self, dataset_name: str) -> Optional[str]: return None def _get_domain_wu( - self, dataset_name: str, entity_urn: str, entity_type: str + self, dataset_name: str, entity_urn: str ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(dataset_name) if domain_urn: wus = add_domain_to_entity_wu( - entity_type=entity_type, entity_urn=entity_urn, domain_urn=domain_urn, ) @@ -985,7 +984,6 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: yield from self._get_domain_wu( dataset_name=full_table_name, entity_urn=dataset_urn, - entity_type="dataset", ) yield from self.add_table_to_database_container( dataset_urn=dataset_urn, db_name=database_name diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 0b7f2a2c2d66ff..c2b31081c645ee 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -800,12 +800,10 @@ def _get_domain_wu( self, dataset_name: str, entity_urn: str, - entity_type: str, ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(dataset_name) if domain_urn: wus = add_domain_to_entity_wu( - entity_type=entity_type, entity_urn=entity_urn, domain_urn=domain_urn, ) @@ -963,7 +961,6 @@ def gen_dataset_workunits( yield from self._get_domain_wu( dataset_name=str(datahub_dataset_name), entity_urn=dataset_urn, - entity_type="dataset", ) def gen_lineage( diff --git a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py index 824c5dfa5e6238..d0c7f85a3e4c65 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py +++ b/metadata-ingestion/src/datahub/ingestion/source/identity/azure_ad.py @@ -29,7 +29,6 @@ ) from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( - ChangeTypeClass, CorpGroupInfoClass, CorpUserInfoClass, GroupMembershipClass, @@ -296,10 +295,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: yield wu group_origin_mcp = MetadataChangeProposalWrapper( - entityType="corpGroup", entityUrn=datahub_corp_group_snapshot.urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="origin", aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"), ) group_origin_wu_id = f"group-origin-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}" @@ -310,10 +306,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: yield group_origin_wu group_status_mcp = MetadataChangeProposalWrapper( - entityType="corpGroup", entityUrn=datahub_corp_group_snapshot.urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="status", aspect=StatusClass(removed=False), ) group_status_wu_id = f"group-status-{group_count + 1 if self.config.mask_group_id else datahub_corp_group_snapshot.urn}" @@ -445,10 +438,7 @@ def ingest_ad_users( yield wu user_origin_mcp = MetadataChangeProposalWrapper( - entityType="corpuser", entityUrn=datahub_corp_user_snapshot.urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="origin", aspect=OriginClass(OriginTypeClass.EXTERNAL, "AZURE_AD"), ) user_origin_wu_id = f"user-origin-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}" @@ -457,10 +447,7 @@ def ingest_ad_users( yield user_origin_wu user_status_mcp = MetadataChangeProposalWrapper( - entityType="corpuser", entityUrn=datahub_corp_user_snapshot.urn, - changeType=ChangeTypeClass.UPSERT, - aspectName="status", aspect=StatusClass(removed=False), ) user_status_wu_id = f"user-status-{user_count + 1 if self.config.mask_user_id else datahub_corp_user_snapshot.urn}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index 20c5818b0afb41..81685426e9de88 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -312,7 +312,6 @@ def _extract_record( if domain_urn: wus = add_domain_to_entity_wu( - entity_type="dataset", entity_urn=dataset_urn, domain_urn=domain_urn, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index dc2f6550217ddc..044a2fb551741b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -500,7 +500,6 @@ def _extract_record( if domain_urn: wus = add_domain_to_entity_wu( - entity_type="dataset", entity_urn=dataset_urn, domain_urn=domain_urn, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py index 69228e5f61b928..aaf254b0ee8151 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/salesforce.py +++ b/metadata-ingestion/src/datahub/ingestion/source/salesforce.py @@ -350,7 +350,7 @@ def get_domain_workunit( if domain_urn: yield from add_domain_to_entity_wu( - domain_urn=domain_urn, entity_type="dataset", entity_urn=datasetUrn + domain_urn=domain_urn, entity_urn=datasetUrn ) def get_platform_instance_workunit(self, datasetUrn: str) -> WorkUnit: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py index 0d8f975ebda605..4bb49f7359d275 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_utils.py @@ -200,16 +200,10 @@ def wrap_aspect_as_workunit( aspectName: str, aspect: _Aspect, ) -> MetadataWorkUnit: - id = f"{aspectName}-for-{entityUrn}" - if "timestampMillis" in aspect._inner_dict: - id = f"{aspectName}-{aspect.timestampMillis}-for-{entityUrn}" # type: ignore - wu = MetadataWorkUnit( - id=id, - mcp=MetadataChangeProposalWrapper( - entityUrn=entityUrn, - aspect=aspect, - ), - ) + wu = MetadataChangeProposalWrapper( + entityUrn=entityUrn, + aspect=aspect, + ).as_workunit() self.report.report_workunit(wu) return wu diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 97b4b04700ddff..779cf3b09f5d8f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -905,7 +905,6 @@ def gen_dataset_workunits( yield from self._get_domain_wu( dataset_name=dataset_name, entity_urn=dataset_urn, - entity_type="dataset", ) if ( @@ -1082,12 +1081,10 @@ def _get_domain_wu( self, dataset_name: str, entity_urn: str, - entity_type: str, ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(dataset_name) if domain_urn: wus = add_domain_to_entity_wu( - entity_type=entity_type, entity_urn=entity_urn, domain_urn=domain_urn, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py index 718585b49283fc..b37e2712e2625f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py @@ -527,7 +527,6 @@ def loop_tables( yield from self._get_domain_wu( dataset_name=dataset_name, entity_urn=dataset_urn, - entity_type="dataset", sql_config=sql_config, ) @@ -744,7 +743,6 @@ def loop_views( yield from self._get_domain_wu( dataset_name=dataset.dataset_name, entity_urn=dataset_urn, - entity_type="dataset", sql_config=sql_config, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 6db79f2fb6f69a..098ae15679bcf4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -734,13 +734,11 @@ def _get_domain_wu( self, dataset_name: str, entity_urn: str, - entity_type: str, sql_config: SQLAlchemyConfig, ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(dataset_name) if domain_urn: wus = add_domain_to_entity_wu( - entity_type=entity_type, entity_urn=entity_urn, domain_urn=domain_urn, ) @@ -901,7 +899,6 @@ def _process_table( yield from self._get_domain_wu( dataset_name=dataset_name, entity_urn=dataset_urn, - entity_type="dataset", sql_config=sql_config, ) @@ -1185,7 +1182,6 @@ def _process_view( yield from self._get_domain_wu( dataset_name=dataset_name, entity_urn=dataset_urn, - entity_type="dataset", sql_config=sql_config, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 42620d24a974a7..0a0af54516ff28 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -246,7 +246,6 @@ def process_tables(self, schema: proxy.Schema) -> Iterable[MetadataWorkUnit]: f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" ), entity_urn=dataset_urn, - entity_type="dataset", ) if self.config.include_column_lineage: @@ -340,12 +339,10 @@ def _get_domain_wu( self, dataset_name: str, entity_urn: str, - entity_type: str, ) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(dataset_name) if domain_urn: wus = add_domain_to_entity_wu( - entity_type=entity_type, entity_urn=entity_urn, domain_urn=domain_urn, ) diff --git a/metadata-ingestion/tests/unit/test_kafka_sink.py b/metadata-ingestion/tests/unit/test_kafka_sink.py index 1e3d66fa50039b..9f4062bf93bf82 100644 --- a/metadata-ingestion/tests/unit/test_kafka_sink.py +++ b/metadata-ingestion/tests/unit/test_kafka_sink.py @@ -39,10 +39,7 @@ def test_kafka_sink_mcp(self, mock_producer, mock_callback): from datahub.emitter.mcp import MetadataChangeProposalWrapper mcp = MetadataChangeProposalWrapper( - entityType="dataset", entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)", - changeType=models.ChangeTypeClass.UPSERT, - aspectName="datasetProfile", aspect=models.DatasetProfileClass( rowCount=2000, columnCount=15, diff --git a/metadata-ingestion/tests/unit/test_rest_sink.py b/metadata-ingestion/tests/unit/test_rest_sink.py index fec2bf47843477..8390e8be0ed375 100644 --- a/metadata-ingestion/tests/unit/test_rest_sink.py +++ b/metadata-ingestion/tests/unit/test_rest_sink.py @@ -249,10 +249,7 @@ ), ( MetadataChangeProposalWrapper( - entityType="dataset", entityUrn="urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)", - changeType=models.ChangeTypeClass.UPSERT, - aspectName="ownership", aspect=models.OwnershipClass( owners=[ models.OwnerClass( diff --git a/metadata-ingestion/tests/unit/test_source.py b/metadata-ingestion/tests/unit/test_source.py index 41fc83caf30962..58c008049f2a66 100644 --- a/metadata-ingestion/tests/unit/test_source.py +++ b/metadata-ingestion/tests/unit/test_source.py @@ -4,7 +4,7 @@ from datahub.ingestion.api import workunit from datahub.ingestion.api.common import PipelineContext, WorkUnit from datahub.ingestion.api.source import Source, SourceReport -from datahub.metadata.schema_classes import ChangeTypeClass, StatusClass +from datahub.metadata.schema_classes import StatusClass from datahub.utilities.urns.dataset_urn import DatasetUrn @@ -14,8 +14,6 @@ def get_workunits(self) -> Iterable[WorkUnit]: workunit.MetadataWorkUnit( id="test-workunit", mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=str( DatasetUrn.create_from_ids( platform_id="elasticsearch", @@ -23,7 +21,6 @@ def get_workunits(self) -> Iterable[WorkUnit]: env="PROD", ) ), - aspectName="status", aspect=StatusClass(removed=False), ), ) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index da5cd4343fbd43..493c3d086cea66 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -64,7 +64,6 @@ ) from datahub.metadata.schema_classes import ( BrowsePathsClass, - ChangeTypeClass, DatasetPropertiesClass, GlobalTagsClass, MetadataChangeEventClass, @@ -1301,8 +1300,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path): Union[MetadataChangeEventClass, MetadataChangeProposalWrapper] ] = [ MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=str( DatasetUrn.create_from_ids( platform_id="elasticsearch", @@ -1310,7 +1307,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path): env="PROD", ) ), - aspectName="globalTags", aspect=GlobalTagsClass(tags=[TagAssociationClass(tag="urn:li:tag:Test")]), ) for i in range(0, 10) @@ -1318,8 +1314,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path): mcps.extend( [ MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, entityUrn=str( DatasetUrn.create_from_ids( platform_id="elasticsearch", @@ -1327,7 +1321,6 @@ def test_mcp_multiple_transformers_replace(mock_time, tmp_path): env="PROD", ) ), - aspectName="datasetProperties", aspect=DatasetPropertiesClass(description="test dataset"), ) for i in range(0, 10)