Skip to content

Commit

Permalink
fix(ingest/dbt): always encode tag urns
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Jun 27, 2024
1 parent fa2ab1b commit f9aa820
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def get_or_add_aspect(mce: MetadataChangeEventClass, default: Aspect) -> Aspect:

def make_global_tag_aspect_with_tag_list(tags: List[str]) -> GlobalTagsClass:
return GlobalTagsClass(
tags=[TagAssociationClass(f"urn:li:tag:{tag}") for tag in tags]
tags=[TagAssociationClass(make_tag_urn(tag)) for tag in tags]
)


Expand Down
17 changes: 11 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import itertools
import logging
import re
Expand Down Expand Up @@ -114,7 +115,7 @@
UpstreamLineageClass,
ViewPropertiesClass,
)
from datahub.metadata.urns import DatasetUrn
from datahub.metadata.urns import DatasetUrn, TagUrn
from datahub.sql_parsing.schema_resolver import SchemaResolver
from datahub.sql_parsing.sqlglot_lineage import (
SchemaInfo,
Expand All @@ -132,6 +133,7 @@
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.time import datetime_to_ts_millis
from datahub.utilities.topological_sort import topological_sort
from datahub.utilities.urns.error import InvalidUrnError

logger = logging.getLogger(__name__)
DBT_PLATFORM = "dbt"
Expand Down Expand Up @@ -1483,7 +1485,7 @@ def get_patched_mce(self, mce):
transformed_tag_list = self.get_transformed_tags_by_prefix(
tag_aspect.tags,
mce.proposedSnapshot.urn,
mce_builder.make_tag_urn(self.config.tag_prefix),
tag_prefix=self.config.tag_prefix,
)
tag_aspect.tags = transformed_tag_list

Expand Down Expand Up @@ -1874,16 +1876,19 @@ def get_transformed_tags_by_prefix(
self,
new_tags: List[TagAssociationClass],
entity_urn: str,
tags_prefix_filter: str,
tag_prefix: str,
) -> List[TagAssociationClass]:
tag_set = {new_tag.tag for new_tag in new_tags}

if self.ctx.graph:
existing_tags_class = self.ctx.graph.get_tags(entity_urn)
if existing_tags_class and existing_tags_class.tags:
for exiting_tag in existing_tags_class.tags:
if not exiting_tag.tag.startswith(tags_prefix_filter):
tag_set.add(exiting_tag.tag)
for existing_tag in existing_tags_class.tags:
with contextlib.suppress(InvalidUrnError):
existing_tag_urn = TagUrn.from_string(existing_tag.tag)
if tag_prefix and existing_tag_urn.name.startswith(tag_prefix):
continue
tag_set.add(existing_tag.tag)
return [TagAssociationClass(tag) for tag in sorted(tag_set)]

# This method attempts to read-modify and return the glossary terms of a dataset.
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/test_dbt_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_dbt_source_patching_tags():
["new_non_dbt", "dbt:new_dbt"]
)
transformed_tags = source.get_transformed_tags_by_prefix(
new_tag_aspect.tags, "urn:li:dataset:dummy", "urn:li:tag:dbt:"
new_tag_aspect.tags, "urn:li:dataset:dummy", "dbt:"
)
expected_tags = {
"urn:li:tag:new_non_dbt",
Expand Down

0 comments on commit f9aa820

Please sign in to comment.