Skip to content

Commit

Permalink
fix(ingest/transformer): Add dataset domains based on tags using tran…
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware authored and sleeperdeep committed Jun 25, 2024
1 parent 1e46d5d commit d461357
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 1 deletion.
57 changes: 56 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `glossaryTerms` | - [Simple Add Dataset glossaryTerms ](#simple-add-dataset-glossaryterms)<br/> - [Pattern Add Dataset glossaryTerms](#pattern-add-dataset-glossaryterms) |
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains)<br/> - [Domain Mapping Based on Tags](#domain-mapping-based-on-tags) |
| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)<br/> - [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)<br/> - [Add Dataset dataProduct](#add-dataset-dataproduct)

## Extract Ownership from Tags
Expand Down Expand Up @@ -1064,6 +1064,61 @@ in both of the cases domain should be provisioned on DataHub GMS
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"]
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
```



## Domain Mapping Based on Tags
### Config Details

| Field | Required | Type | Default | Description |
|-----------------|----------|-------------------------|-------------|---------------------------------------------------------------------------------------------------------|
| `domain_mapping`| ✅ | Dict[str, str] | | Dataset Entity tag as key and domain urn or name as value to map with dataset as asset. |
| `semantics` | | enum | "OVERWRITE" | Whether to OVERWRITE or PATCH the entity present on DataHub GMS.|

<br/>

let’s suppose we’d like to add domain to dataset based on tag, in this case you can use `domain_mapping_based_on_tags` transformer.

The config, which we’d append to our ingestion recipe YAML, would look like this:

Here we can set domains to either urn (i.e. urn:li:domain:engineering) or simple domain name (i.e. engineering) in both of the cases domain should be provisioned on DataHub GMS

When specifying tags within the domain mapping, use the tag's simple name rather than the full tag URN.

For example, instead of using the tag URN urn:li:tag:NeedsDocumentation, you should specify just the simple tag name NeedsDocumentation in the domain mapping configuration

```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
domain_mapping:
'NeedsDocumentation': "urn:li:domain:documentation"
```


`domain_mapping_based_on_tags` can be configured in below different way

- Add domains based on tags, however overwrite the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: OVERWRITE # OVERWRITE is default behaviour
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```
- Add domains based on tags, however keep the domains available for the dataset on DataHub GMS
```yaml
transformers:
- type: "domain_mapping_based_on_tags"
config:
semantics: PATCH
domain_mapping:
'example1': "urn:li:domain:engineering"
'example2': "urn:li:domain:hr"
```

## Simple Add Dataset dataProduct
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Dict, List, Optional, Set, cast

from datahub.configuration.common import (
TransformerSemantics,
TransformerSemanticsConfigModel,
)
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_domain import AddDatasetDomain
from datahub.ingestion.transformer.dataset_transformer import DatasetDomainTransformer
from datahub.metadata.schema_classes import DomainsClass, GlobalTagsClass


class DatasetTagDomainMapperConfig(TransformerSemanticsConfigModel):
domain_mapping: Dict[str, str]


class DatasetTagDomainMapper(DatasetDomainTransformer):
"""A transformer that appends a predefined set of domains to each dataset that includes specific tags defined in the transformer."""

def __init__(self, config: DatasetTagDomainMapperConfig, ctx: PipelineContext):
super().__init__()
self.ctx: PipelineContext = ctx
self.config: DatasetTagDomainMapperConfig = config

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "DatasetTagDomainMapper":
config = DatasetTagDomainMapperConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
# Initialize the existing domain aspect
existing_domain_aspect: DomainsClass = cast(DomainsClass, aspect)
assert self.ctx.graph
global_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_tags(entity_urn)
# Check if we have tags received in existing aspect
if global_tags:
domain_mapping = self.config.domain_mapping
transformer_tags = domain_mapping.keys()
tags_seen: Set[str] = set()
for tag_item in global_tags.tags:
tag = tag_item.tag.split("urn:li:tag:")[-1]
if tag in transformer_tags:
tags_seen.add(tag)

if tags_seen:
domain_aspect = DomainsClass(domains=[])
domains_to_add: List[str] = []
for tag in tags_seen:
if domain_mapping.get(tag):
domains_to_add.append(domain_mapping[tag])

mapped_domains = AddDatasetDomain.get_domain_class(
self.ctx.graph, domains_to_add
)
domain_aspect.domains.extend(mapped_domains.domains)
if self.config.semantics == TransformerSemantics.PATCH:
# Try merging with server-side domains
patch_domain_aspect: Optional[
DomainsClass
] = AddDatasetDomain._merge_with_server_domains(
self.ctx.graph, entity_urn, domain_aspect
)
return cast(Optional[Aspect], patch_domain_aspect)
return cast(Optional[Aspect], domain_aspect)
return cast(Optional[Aspect], existing_domain_aspect)
193 changes: 193 additions & 0 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@
PatternAddDatasetDomain,
SimpleAddDatasetDomain,
)
from datahub.ingestion.transformer.dataset_domain_based_on_tags import (
DatasetTagDomainMapper,
)
from datahub.ingestion.transformer.dataset_transformer import DatasetTransformer
from datahub.ingestion.transformer.extract_dataset_tags import ExtractDatasetTags
from datahub.ingestion.transformer.extract_ownership_from_tags import (
Expand Down Expand Up @@ -3458,3 +3461,193 @@ def test_pattern_cleanup_usage_statistics_user_3(
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts


def test_domain_mapping_based_on_tags_with_valid_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")

tag_one = builder.make_tag_urn("test:tag_1")

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=tag_one)])

pipeline_context = PipelineContext(run_id="transformer_pipe_line")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[server_domain]),
config={"domain_mapping": {"test:tag_1": acryl_domain}},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0] is not None
assert output[0].record is not None
assert isinstance(output[0].record, MetadataChangeProposalWrapper)
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains


def test_domain_mapping_based_on_tags_with_no_matching_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
non_matching_tag = builder.make_tag_urn("nonMatching")

pipeline_context = PipelineContext(run_id="no_match_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=non_matching_tag)])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[server_domain]),
config={
"domain_mapping": {"test:tag_1": acryl_domain},
},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain not in transformed_aspect.domains
assert server_domain in transformed_aspect.domains


def test_domain_mapping_based_on_tags_with_empty_config(mock_datahub_graph):
some_tag = builder.make_tag_urn("someTag")

pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[TagAssociationClass(tag=some_tag)])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[]),
config={"domain_mapping": {}},
pipeline_context=pipeline_context,
)
assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 0


def test_domain_mapping_based__r_on_tags_with_multiple_tags(mock_datahub_graph):
# Two tags that match different rules in the domain mapping configuration
tag_one = builder.make_tag_urn("test:tag_1")
tag_two = builder.make_tag_urn("test:tag_2")
existing_domain = builder.make_domain_urn("existing.io")
finance = builder.make_domain_urn("finance")
hr = builder.make_domain_urn("hr")

pipeline_context = PipelineContext(run_id="multiple_matches_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(
tags=[TagAssociationClass(tag=tag_one), TagAssociationClass(tag=tag_two)]
)

# Return fake aspect to simulate server behaviour
def fake_get_domain(entity_urn: str) -> models.DomainsClass:
return models.DomainsClass(domains=[existing_domain])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore
pipeline_context.graph.get_domain = fake_get_domain # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[existing_domain]),
config={
"domain_mapping": {"test:tag_1": finance, "test:tag_2": hr},
"semantics": "PATCH",
},
pipeline_context=pipeline_context,
)

# Assertions to verify the expected outcome
assert len(output) == 2
assert output[0].record is not None
assert output[0].record.aspect is not None
assert isinstance(output[0].record.aspect, models.DomainsClass)
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)

# Expecting domains from both matched tags
assert set(output[0].record.aspect.domains) == {existing_domain, finance, hr}
assert len(transformed_aspect.domains) == 3


def test_domain_mapping_based_on_tags_with_empty_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> models.GlobalTagsClass:
return models.GlobalTagsClass(tags=[])

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains


def test_domain_mapping_based_on_tags_with_no_tags(mock_datahub_graph):
acryl_domain = builder.make_domain_urn("acryl.io")
server_domain = builder.make_domain_urn("test.io")
pipeline_context = PipelineContext(run_id="empty_config_pipeline")
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig())

# Return fake aspect to simulate server behaviour
def fake_get_tags(entity_urn: str) -> Optional[models.GlobalTagsClass]:
return None

pipeline_context.graph.get_tags = fake_get_tags # type: ignore

output = run_dataset_transformer_pipeline(
transformer_type=DatasetTagDomainMapper,
aspect=models.DomainsClass(domains=[acryl_domain]),
config={"domain_mapping": {"test:tag_1": server_domain}},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert isinstance(output[0].record.aspect, models.DomainsClass)
assert len(output[0].record.aspect.domains) == 1
transformed_aspect = cast(models.DomainsClass, output[0].record.aspect)
assert len(transformed_aspect.domains) == 1
assert acryl_domain in transformed_aspect.domains
assert server_domain not in transformed_aspect.domains

0 comments on commit d461357

Please sign in to comment.