Skip to content

Commit

Permalink
fix(ingest/dbt): support emitting only model performance (#10714)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and yoonhyejin committed Jul 16, 2024
1 parent 946d866 commit 0908407
Showing 1 changed file with 29 additions and 32 deletions.
61 changes: 29 additions & 32 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1251,9 +1251,6 @@ def create_dbt_platform_mces(
) -> Iterable[MetadataWorkUnit]:
"""Create MCEs and MCPs for the dbt platform."""

mce_platform = DBT_PLATFORM
mce_platform_instance = self.config.platform_instance

action_processor = OperationProcessor(
self.config.meta_mapping,
self.config.tag_prefix,
Expand All @@ -1269,15 +1266,10 @@ def create_dbt_platform_mces(
)
for node in sorted(dbt_nodes, key=lambda n: n.dbt_name):
node_datahub_urn = node.get_urn(
mce_platform,
DBT_PLATFORM,
self.config.env,
mce_platform_instance,
self.config.platform_instance,
)
if not self.config.entities_enabled.can_emit_node_type(node.node_type):
logger.debug(
f"Skipping emission of node {node_datahub_urn} because node_type {node.node_type} is disabled"
)
continue

meta_aspects: Dict[str, Any] = {}
if self.config.enable_meta_mapping and node.meta:
Expand All @@ -1289,7 +1281,7 @@ def create_dbt_platform_mces(
) # mutates meta_aspects

aspects = self._generate_base_dbt_aspects(
node, additional_custom_props_filtered, mce_platform, meta_aspects
node, additional_custom_props_filtered, DBT_PLATFORM, meta_aspects
)

# Upstream lineage.
Expand All @@ -1304,29 +1296,36 @@ def create_dbt_platform_mces(
if view_prop_aspect:
aspects.append(view_prop_aspect)

# Subtype.
sub_type_wu = self._create_subType_wu(node, node_datahub_urn)
if sub_type_wu:
yield sub_type_wu
# Generate main MCE.
if self.config.entities_enabled.can_emit_node_type(node.node_type):
# Subtype.
sub_type_wu = self._create_subType_wu(node, node_datahub_urn)
if sub_type_wu:
yield sub_type_wu

# DataPlatformInstance aspect.
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()
# DataPlatformInstance aspect.
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

if len(aspects) == 0:
continue
dataset_snapshot = DatasetSnapshot(urn=node_datahub_urn, aspects=aspects)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
mce = self.get_patched_mce(mce)
yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce)
dataset_snapshot = DatasetSnapshot(
urn=node_datahub_urn, aspects=aspects
)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
mce = self.get_patched_mce(mce)
yield MetadataWorkUnit(id=dataset_snapshot.urn, mce=mce)
else:
logger.debug(
f"Skipping emission of node {node_datahub_urn} because node_type {node.node_type} is disabled"
)

# Model performance.
yield from auto_workunit(
self._create_dataprocess_instance_mcps(node, upstream_lineage_class)
)
if self.config.entities_enabled.can_emit_model_performance:
yield from auto_workunit(
self._create_dataprocess_instance_mcps(node, upstream_lineage_class)
)

def _create_dataprocess_instance_mcps(
self,
Expand All @@ -1335,8 +1334,6 @@ def _create_dataprocess_instance_mcps(
) -> Iterable[MetadataChangeProposalWrapper]:
if not node.model_performances:
return
if not self.config.entities_enabled.can_emit_model_performance:
return

node_datahub_urn = node.get_urn(
DBT_PLATFORM,
Expand Down

0 comments on commit 0908407

Please sign in to comment.