Skip to content

Commit

Permalink
feat(ingest/dbt): produce multiple assertions for multi-table dbt tes…
Browse files Browse the repository at this point in the history
…ts (#11451)
  • Loading branch information
hsheth2 authored Sep 25, 2024
1 parent ba9bfa0 commit d754650
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 174 deletions.
104 changes: 58 additions & 46 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ def can_emit_node_type(self, node_type: str) -> bool:

return allowed == EmitDirective.YES

@property
def can_emit_test_definitions(self) -> bool:
return self.test_definitions == EmitDirective.YES

@property
def can_emit_test_results(self) -> bool:
return self.test_results == EmitDirective.YES
Expand Down Expand Up @@ -736,8 +740,8 @@ def get_upstreams_for_test(
all_nodes_map: Dict[str, DBTNode],
platform_instance: Optional[str],
environment: str,
) -> List[str]:
upstream_urns = []
) -> Dict[str, str]:
upstreams = {}

for upstream in test_node.upstream_nodes:
if upstream not in all_nodes_map:
Expand All @@ -748,15 +752,13 @@ def get_upstreams_for_test(

upstream_manifest_node = all_nodes_map[upstream]

upstream_urns.append(
upstream_manifest_node.get_urn(
target_platform=DBT_PLATFORM,
data_platform_instance=platform_instance,
env=environment,
)
upstreams[upstream] = upstream_manifest_node.get_urn(
target_platform=DBT_PLATFORM,
data_platform_instance=platform_instance,
env=environment,
)

return upstream_urns
return upstreams


def make_mapping_upstream_lineage(
Expand Down Expand Up @@ -893,53 +895,63 @@ def __init__(self, config: DBTCommonConfig, ctx: PipelineContext, platform: str)
def create_test_entity_mcps(
self,
test_nodes: List[DBTNode],
custom_props: Dict[str, str],
extra_custom_props: Dict[str, str],
all_nodes_map: Dict[str, DBTNode],
) -> Iterable[MetadataWorkUnit]:
for node in sorted(test_nodes, key=lambda n: n.dbt_name):
assertion_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
{
k: v
for k, v in {
"platform": DBT_PLATFORM,
"name": node.dbt_name,
"instance": self.config.platform_instance,
**(
# Ideally we'd include the env unconditionally. However, we started out
# not including env in the guid, so we need to maintain backwards compatibility
# with existing PROD assertions.
{"env": self.config.env}
if self.config.env != mce_builder.DEFAULT_ENV
and self.config.include_env_in_assertion_guid
else {}
),
}.items()
if v is not None
}
)
)

if self.config.entities_enabled.can_emit_node_type("test"):
yield MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

upstream_urns = get_upstreams_for_test(
upstreams = get_upstreams_for_test(
test_node=node,
all_nodes_map=all_nodes_map,
platform_instance=self.config.platform_instance,
environment=self.config.env,
)

# In case a dbt test depends on multiple tables, we create separate assertions for each.
# TODO: This logic doesn't actually work properly, since we're reusing the same assertion_urn
# across multiple upstream tables, so we're actually only creating one assertion and the last
# upstream_urn gets used. Luckily, most dbt tests are associated with a single table, so this
# doesn't cause major issues in practice.
for upstream_urn in sorted(upstream_urns):
if self.config.entities_enabled.can_emit_node_type("test"):
for upstream_node_name, upstream_urn in upstreams.items():
guid_upstream_part = {}
if len(upstreams) > 1:
# If we depend on multiple upstreams, we need to generate a unique guid for each assertion.
# If there was only one upstream, we want to maintain the original assertion for backwards compatibility.
guid_upstream_part = {
"on_dbt_upstream": upstream_node_name,
}

assertion_urn = mce_builder.make_assertion_urn(
mce_builder.datahub_guid(
{
k: v
for k, v in {
"platform": DBT_PLATFORM,
"name": node.dbt_name,
"instance": self.config.platform_instance,
**(
# Ideally we'd include the env unconditionally. However, we started out
# not including env in the guid, so we need to maintain backwards compatibility
# with existing PROD assertions.
{"env": self.config.env}
if self.config.env != mce_builder.DEFAULT_ENV
and self.config.include_env_in_assertion_guid
else {}
),
**guid_upstream_part,
}.items()
if v is not None
}
)
)

custom_props = {
"dbt_unique_id": node.dbt_name,
"dbt_test_upstream_unique_id": upstream_node_name,
**extra_custom_props,
}

if self.config.entities_enabled.can_emit_test_definitions:
yield MetadataChangeProposalWrapper(
entityUrn=assertion_urn,
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

yield make_assertion_from_test(
custom_props,
node,
Expand Down
Loading

0 comments on commit d754650

Please sign in to comment.