diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index 772a638b6a948..773a7e8554832 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -10,7 +10,7 @@ The below table shows transformer which can transform aspects of entity [Dataset | `ownership` | - [Simple Add Dataset ownership](#simple-add-dataset-ownership)
- [Pattern Add Dataset ownership](#pattern-add-dataset-ownership)
- [Simple Remove Dataset Ownership](#simple-remove-dataset-ownership)
- [Extract Ownership from Tags](#extract-ownership-from-tags)
- [Clean suffix prefix from Ownership](#clean-suffix-prefix-from-ownership) | | `globalTags` | - [Simple Add Dataset globalTags ](#simple-add-dataset-globaltags)
- [Pattern Add Dataset globalTags](#pattern-add-dataset-globaltags)
- [Add Dataset globalTags](#add-dataset-globaltags) | | `browsePaths` | - [Set Dataset browsePath](#set-dataset-browsepath) | -| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) | +| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)
- [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms)
- [Tags to Term Mapping](#tags-to-term-mapping) | | `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) | | `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](#add-dataset-datasetproperties) | | `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains)
- [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) | @@ -668,6 +668,57 @@ We can add glossary terms to datasets based on a regex filter. ".*example1.*": ["urn:li:glossaryTerm:Email", "urn:li:glossaryTerm:Address"] ".*example2.*": ["urn:li:glossaryTerm:PostalCode"] ``` + +## Tags to Term Mapping +### Config Details + +| Field | Required | Type | Default | Description | +|---------------|----------|--------------------|-------------|-------------------------------------------------------------------------------------------------------| +| `tags` | ✅ | List[str] | | List of tag names based on which terms will be created and associated with the dataset. | +| `semantics` | | enum | "OVERWRITE" | Determines whether to OVERWRITE or PATCH the terms associated with the dataset on DataHub GMS. | + +
+ +The `tags_to_term` transformer is designed to map specific tags to glossary terms within DataHub. It takes a configuration of tags that should be translated into corresponding glossary terms. This transformer can apply these mappings to any tags found either at the column level of a dataset or at the dataset top level. + +When specifying tags in the configuration, use the tag's simple name rather than the full tag URN. + +For example, instead of using the tag URN `urn:li:tag:snowflakedb.snowflakeschema.tag_name:tag_value`, you should specify just the tag name `tag_name` in the mapping configuration. + +```yaml +transformers: + - type: "tags_to_term" + config: + semantics: OVERWRITE # OVERWRITE is the default behavior + tags: + - "tag_name" +``` + +The `tags_to_term` transformer can be configured in the following ways: + +- Add terms based on tags, however overwrite the terms available for the dataset on DataHub GMS +```yaml + transformers: + - type: "tags_to_term" + config: + semantics: OVERWRITE # OVERWRITE is default behaviour + tags: + - "example1" + - "example2" + - "example3" + ``` +- Add terms based on tags, however keep the terms available for the dataset on DataHub GMS +```yaml + transformers: + - type: "tags_to_term" + config: + semantics: PATCH + tags: + - "example1" + - "example2" + - "example3" + ``` + ## Pattern Add Dataset Schema Field glossaryTerms ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index e8508a6e7c827..cd4ed37d110bd 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -715,6 +715,7 @@ "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl", "pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser", "domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper", + "tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index 252846326b49e..7ba412b3e772c 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1278,6 +1278,20 @@ def create_tag(self, tag_name: str) -> str: # return urn return res["createTag"] + def remove_tag(self, tag_urn: str, resource_urn: str) -> bool: + graph_query = f""" + mutation removeTag {{ + removeTag( + input: {{ + tagUrn: "{tag_urn}", + resourceUrn: "{resource_urn}" + }}) + }} + """ + + res = self.execute_graphql(query=graph_query) + return res["removeTag"] + def _assertion_result_shared(self) -> str: fragment: str = """ fragment assertionResult on AssertionResult { diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index a78a79141e8e4..3e313ddd356be 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -27,6 +27,16 @@ def entity_types(self) -> List[str]: return ["dataset"] +class TagTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta): + """Transformer that does transform sequentially on each tag.""" + + def __init__(self): + super().__init__() + + def entity_types(self) -> List[str]: + return ["dataset", "container"] + + class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "ownership" @@ -128,3 +138,8 @@ def aspect_name(self) -> str: class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "datasetUsageStatistics" + + +class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta): + def aspect_name(self) -> str: + return "glossaryTerms" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py b/metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py new file mode 100644 index 0000000000000..338f191c0829d --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/tags_to_terms.py @@ -0,0 +1,145 @@ +from typing import List, Optional, Set, cast + +import datahub.emitter.mce_builder as builder +from datahub.configuration.common import ( + TransformerSemantics, + TransformerSemanticsConfigModel, +) +from datahub.emitter.mce_builder import Aspect, make_term_urn +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.graph.client import DataHubGraph +from datahub.ingestion.transformer.dataset_transformer import TagsToTermTransformer +from datahub.metadata.schema_classes import ( + AuditStampClass, + GlobalTagsClass, + GlossaryTermAssociationClass, + GlossaryTermsClass, + SchemaMetadataClass, +) + + +class TagsToTermMapperConfig(TransformerSemanticsConfigModel): + tags: List[str] + + +class TagsToTermMapper(TagsToTermTransformer): + """This transformer maps specified tags to corresponding glossary terms for a dataset.""" + + def __init__(self, config: TagsToTermMapperConfig, ctx: PipelineContext): + super().__init__() + self.ctx: PipelineContext = ctx + self.config: TagsToTermMapperConfig = config + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "TagsToTermMapper": + config = TagsToTermMapperConfig.parse_obj(config_dict) + return cls(config, ctx) + + @staticmethod + def _merge_with_server_glossary_terms( + graph: DataHubGraph, + urn: str, + glossary_terms_aspect: Optional[GlossaryTermsClass], + ) -> Optional[GlossaryTermsClass]: + if not glossary_terms_aspect or not glossary_terms_aspect.terms: + # nothing to add, no need to consult server + return None + + # Merge the transformed terms with existing server terms. + # The transformed terms takes precedence, which may change the term context. + server_glossary_terms_aspect = graph.get_glossary_terms(entity_urn=urn) + if server_glossary_terms_aspect is not None: + glossary_terms_aspect.terms = list( + { + **{term.urn: term for term in server_glossary_terms_aspect.terms}, + **{term.urn: term for term in glossary_terms_aspect.terms}, + }.values() + ) + + return glossary_terms_aspect + + @staticmethod + def get_tags_from_global_tags(global_tags: Optional[GlobalTagsClass]) -> Set[str]: + """Extracts tags urn from GlobalTagsClass.""" + if not global_tags or not global_tags.tags: + return set() + + return {tag_assoc.tag for tag_assoc in global_tags.tags} + + @staticmethod + def get_tags_from_schema_metadata( + schema_metadata: Optional[SchemaMetadataClass], + ) -> Set[str]: + """Extracts globalTags from all fields in SchemaMetadataClass.""" + if not schema_metadata or not schema_metadata.fields: + return set() + tags = set() + for field in schema_metadata.fields: + if field.globalTags: + tags.update( + TagsToTermMapper.get_tags_from_global_tags(field.globalTags) + ) + return tags + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + + in_glossary_terms: Optional[GlossaryTermsClass] = cast( + Optional[GlossaryTermsClass], aspect + ) + + assert self.ctx.graph + in_global_tags_aspect: Optional[GlobalTagsClass] = self.ctx.graph.get_tags( + entity_urn + ) + in_schema_metadata_aspect: Optional[ + SchemaMetadataClass + ] = self.ctx.graph.get_schema_metadata(entity_urn) + + if in_global_tags_aspect is None and in_schema_metadata_aspect is None: + return cast(Aspect, in_glossary_terms) + + global_tags = TagsToTermMapper.get_tags_from_global_tags(in_global_tags_aspect) + schema_metadata_tags = TagsToTermMapper.get_tags_from_schema_metadata( + in_schema_metadata_aspect + ) + + # Combine tags from both global and schema level + combined_tags = global_tags.union(schema_metadata_tags) + + tag_set = set(self.config.tags) + terms_to_add = set() + tags_to_delete = set() + + # Check each global tag against the configured tag list and prepare terms + for full_tag in combined_tags: + tag_name = full_tag.split("urn:li:tag:")[-1].split(".")[-1].split(":")[0] + if tag_name in tag_set: + term_urn = make_term_urn(tag_name) + terms_to_add.add(term_urn) + tags_to_delete.add(full_tag) # Full URN for deletion + + if not terms_to_add: + return cast(Aspect, in_glossary_terms) # No new terms to add + + for tag_urn in tags_to_delete: + self.ctx.graph.remove_tag(tag_urn=tag_urn, resource_urn=entity_urn) + + # Initialize the Glossary Terms properly + out_glossary_terms = GlossaryTermsClass( + terms=[GlossaryTermAssociationClass(urn=term) for term in terms_to_add], + auditStamp=AuditStampClass( + time=builder.get_sys_time(), actor="urn:li:corpUser:restEmitter" + ), + ) + + if self.config.semantics == TransformerSemantics.PATCH: + patch_glossary_terms: Optional[ + GlossaryTermsClass + ] = TagsToTermMapper._merge_with_server_glossary_terms( + self.ctx.graph, entity_urn, out_glossary_terms + ) + return cast(Optional[Aspect], patch_glossary_terms) + else: + return cast(Aspect, out_glossary_terms) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index a0deae972badb..4170fb5bf8b67 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -70,7 +70,10 @@ from datahub.ingestion.transformer.dataset_domain_based_on_tags import ( DatasetTagDomainMapper, ) -from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer +from datahub.ingestion.transformer.dataset_transformer import ( + DatasetTransformer, + TagTransformer, +) from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags from datahub.ingestion.transformer.extract_ownership_from_tags import ( ExtractOwnersFromTagsTransformer, @@ -86,6 +89,7 @@ SimpleRemoveDatasetOwnership, ) from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl +from datahub.ingestion.transformer.tags_to_terms import TagsToTermMapper from datahub.metadata.schema_classes import ( BrowsePathsClass, DatasetPropertiesClass, @@ -1891,12 +1895,14 @@ def test_pattern_dataset_schema_tags_transformation(mock_time): def run_dataset_transformer_pipeline( - transformer_type: Type[DatasetTransformer], + transformer_type: Type[Union[DatasetTransformer, TagTransformer]], aspect: Optional[builder.Aspect], config: dict, - pipeline_context: PipelineContext = PipelineContext(run_id="transformer_pipe_line"), + pipeline_context: Optional[PipelineContext] = None, use_mce: bool = False, ) -> List[RecordEnvelope]: + if pipeline_context is None: + pipeline_context = PipelineContext(run_id="transformer_pipe_line") transformer: DatasetTransformer = cast( DatasetTransformer, transformer_type.create(config, pipeline_context) ) @@ -3651,3 +3657,213 @@ def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]: assert len(transformed_aspect.domains) == 1 assert acryl_domain in transformed_aspect.domains assert server_domain not in transformed_aspect.domains + + +def test_tags_to_terms_transformation(mock_datahub_graph): + # Create domain URNs for the test + term_urn_example1 = builder.make_term_urn("example1") + term_urn_example2 = builder.make_term_urn("example2") + + def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass( + tags=[ + TagAssociationClass(tag=builder.make_tag_urn("example1")), + TagAssociationClass(tag=builder.make_tag_urn("example2")), + ] + ) + + # fake the server response + def fake_schema_metadata(entity_urn: str) -> models.SchemaMetadataClass: + return models.SchemaMetadataClass( + schemaName="customer", # not used + platform=builder.make_data_platform_urn( + "hive" + ), # important <- platform must be an urn + version=0, + # when the source system has a notion of versioning of schemas, insert this in, otherwise leave as 0 + hash="", + # when the source system has a notion of unique schemas identified via hash, include a hash, else leave it as empty string + platformSchema=models.OtherSchemaClass( + rawSchema="__insert raw schema here__" + ), + fields=[ + models.SchemaFieldClass( + fieldPath="first_name", + globalTags=models.GlobalTagsClass( + tags=[ + models.TagAssociationClass( + tag=builder.make_tag_urn("example2") + ) + ], + ), + glossaryTerms=models.GlossaryTermsClass( + terms=[ + models.GlossaryTermAssociationClass( + urn=builder.make_term_urn("pii") + ) + ], + auditStamp=models.AuditStampClass._construct_with_defaults(), + ), + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + # use this to provide the type of the field in the source system's vernacular + ), + models.SchemaFieldClass( + fieldPath="mobile_number", + glossaryTerms=models.GlossaryTermsClass( + terms=[ + models.GlossaryTermAssociationClass( + urn=builder.make_term_urn("pii") + ) + ], + auditStamp=models.AuditStampClass._construct_with_defaults(), + ), + type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), + nativeDataType="VARCHAR(100)", + # use this to provide the type of the field in the source system's vernacular + ), + ], + ) + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_tags = fake_get_tags # type: ignore + pipeline_context.graph.get_schema_metadata = fake_schema_metadata # type: ignore + + # Configuring the transformer + config = {"tags": ["example1", "example2"]} + + # Running the transformer within a test pipeline + output = run_dataset_transformer_pipeline( + transformer_type=TagsToTermMapper, + aspect=models.GlossaryTermsClass( + terms=[ + models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) + ], + auditStamp=models.AuditStampClass._construct_with_defaults(), + ), + config=config, + pipeline_context=pipeline_context, + ) + + # Expected results + expected_terms = [term_urn_example2, term_urn_example1] + + # Verify the output + assert len(output) == 2 # One for result and one for end of stream + terms_aspect = output[0].record.aspect + assert isinstance(terms_aspect, models.GlossaryTermsClass) + assert len(terms_aspect.terms) == len(expected_terms) + assert set(term.urn for term in terms_aspect.terms) == { + "urn:li:glossaryTerm:example1", + "urn:li:glossaryTerm:example2", + } + + +def test_tags_to_terms_with_no_matching_terms(mock_datahub_graph): + # Setup for test where no tags match the provided term mappings + def fake_get_tags_no_match(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass( + tags=[ + TagAssociationClass(tag=builder.make_tag_urn("nonMatchingTag1")), + TagAssociationClass(tag=builder.make_tag_urn("nonMatchingTag2")), + ] + ) + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_tags = fake_get_tags_no_match # type: ignore + + # No matching terms in config + config = {"tags": ["example1", "example2"]} + + # Running the transformer within a test pipeline + output = run_dataset_transformer_pipeline( + transformer_type=TagsToTermMapper, + aspect=models.GlossaryTermsClass( + terms=[ + models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) + ], + auditStamp=models.AuditStampClass._construct_with_defaults(), + ), + config=config, + pipeline_context=pipeline_context, + ) + + # Verify the output + assert len(output) == 2 # One for result and one for end of stream + terms_aspect = output[0].record.aspect + assert isinstance(terms_aspect, models.GlossaryTermsClass) + assert len(terms_aspect.terms) == 1 + + +def test_tags_to_terms_with_missing_tags(mock_datahub_graph): + # Setup for test where no tags are present + def fake_get_no_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass(tags=[]) + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_tags = fake_get_no_tags # type: ignore + + config = {"tags": ["example1", "example2"]} + + # Running the transformer with no tags + output = run_dataset_transformer_pipeline( + transformer_type=TagsToTermMapper, + aspect=models.GlossaryTermsClass( + terms=[ + models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) + ], + auditStamp=models.AuditStampClass._construct_with_defaults(), + ), + config=config, + pipeline_context=pipeline_context, + ) + + # Verify that no terms are added when there are no tags + assert len(output) == 2 + terms_aspect = output[0].record.aspect + assert isinstance(terms_aspect, models.GlossaryTermsClass) + assert len(terms_aspect.terms) == 1 + + +def test_tags_to_terms_with_partial_match(mock_datahub_graph): + # Setup for partial match scenario + def fake_get_partial_match_tags(entity_urn: str) -> models.GlobalTagsClass: + return models.GlobalTagsClass( + tags=[ + TagAssociationClass( + tag=builder.make_tag_urn("example1") + ), # Should match + TagAssociationClass( + tag=builder.make_tag_urn("nonMatchingTag") + ), # No match + ] + ) + + pipeline_context = PipelineContext(run_id="transformer_pipe_line") + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig()) + pipeline_context.graph.get_tags = fake_get_partial_match_tags # type: ignore + + config = {"tags": ["example1"]} # Only 'example1' has a term mapped + + # Running the transformer with partial matching tags + output = run_dataset_transformer_pipeline( + transformer_type=TagsToTermMapper, + aspect=models.GlossaryTermsClass( + terms=[ + models.GlossaryTermAssociationClass(urn=builder.make_term_urn("pii")) + ], + auditStamp=models.AuditStampClass._construct_with_defaults(), + ), + config=config, + pipeline_context=pipeline_context, + ) + + # Verify that only matched term is added + assert len(output) == 2 + terms_aspect = output[0].record.aspect + assert isinstance(terms_aspect, models.GlossaryTermsClass) + assert len(terms_aspect.terms) == 1 + assert terms_aspect.terms[0].urn == "urn:li:glossaryTerm:example1"