Skip to content

Commit

Permalink
fix(ingestion/transformer): replace the externalUrl container (#11013)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Jul 30, 2024
1 parent a6eb1f4 commit da72ba2
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 15 deletions.
20 changes: 19 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ Then define your class to return a list of custom properties, for example:
add_properties_resolver_class: "<your_module>.<your_class>"
```

## Replace ExternalUrl
## Replace ExternalUrl Dataset
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
Expand All @@ -971,6 +971,24 @@ transformers:
replacement: "sub"
```

## Replace ExternalUrl Container
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `input_pattern` | ✅ | string | | String or pattern to replace |
| `replacement` | ✅ | string | | Replacement string |


Matches the full/partial string in the externalUrl of the container properties and replace that with the replacement string

```yaml
transformers:
- type: "replace_external_url_container"
config:
input_pattern: '\b\w*hub\b'
replacement: "sub"
```

## Clean User URN in DatasetUsageStatistics Aspect
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,8 @@
"add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct",
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlDataset",
"replace_external_url_container = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlContainer",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
"tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ def entity_types(self) -> List[str]:
return ["dataset", "container"]


class ContainerTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each dataset."""

def __init__(self):
super().__init__()

def entity_types(self) -> List[str]:
return ["container"]


class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"
Expand Down Expand Up @@ -143,3 +153,8 @@ def aspect_name(self) -> str:
class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "glossaryTerms"


class ContainerPropertiesTransformer(ContainerTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "containerProperties"
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,28 @@
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
ContainerPropertiesTransformer,
DatasetPropertiesTransformer,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.metadata.schema_classes import (
ContainerPropertiesClass,
DatasetPropertiesClass,
)


class ReplaceExternalUrlConfig(ConfigModel):
input_pattern: str
replacement: str


class ReplaceExternalUrl(DatasetPropertiesTransformer):
"""Transformer that clean the ownership URN."""
class ReplaceUrl:
def replace_url(self, pattern: str, replacement: str, external_url: str) -> str:
pattern_obj = re.compile(pattern)
return re.sub(pattern_obj, replacement, external_url)


class ReplaceExternalUrlDataset(DatasetPropertiesTransformer, ReplaceUrl):
"""Transformer that replace the external URL for dataset properties."""

ctx: PipelineContext
config: ReplaceExternalUrlConfig
Expand All @@ -34,7 +44,9 @@ def __init__(
self.resolver_args = resolver_args

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "ReplaceExternalUrl":
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "ReplaceExternalUrlDataset":
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
return cls(config, ctx)

Expand All @@ -55,11 +67,60 @@ def transform_aspect(
in_dataset_properties_aspect
)

pattern = re.compile(self.config.input_pattern)
replacement = self.config.replacement

out_dataset_properties_aspect.externalUrl = re.sub(
pattern, replacement, in_dataset_properties_aspect.externalUrl
out_dataset_properties_aspect.externalUrl = self.replace_url(
self.config.input_pattern,
self.config.replacement,
in_dataset_properties_aspect.externalUrl,
)

return cast(Aspect, out_dataset_properties_aspect)


class ReplaceExternalUrlContainer(ContainerPropertiesTransformer, ReplaceUrl):
"""Transformer that replace the external URL for container properties."""

ctx: PipelineContext
config: ReplaceExternalUrlConfig

def __init__(
self,
config: ReplaceExternalUrlConfig,
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
super().__init__()
self.ctx = ctx
self.config = config
self.resolver_args = resolver_args

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "ReplaceExternalUrlContainer":
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:

in_container_properties_aspect: ContainerPropertiesClass = cast(
ContainerPropertiesClass, aspect
)
if (
not hasattr(in_container_properties_aspect, "externalUrl")
or not in_container_properties_aspect.externalUrl
):
return cast(Aspect, in_container_properties_aspect)
else:
out_container_properties_aspect: ContainerPropertiesClass = copy.deepcopy(
in_container_properties_aspect
)

out_container_properties_aspect.externalUrl = self.replace_url(
self.config.input_pattern,
self.config.replacement,
in_container_properties_aspect.externalUrl,
)

return cast(Aspect, out_container_properties_aspect)
147 changes: 143 additions & 4 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
DatasetTagDomainMapper,
)
from datahub.ingestion.transformer.dataset_transformer import (
ContainerTransformer,
DatasetTransformer,
TagTransformer,
)
Expand All @@ -88,7 +89,10 @@
from datahub.ingestion.transformer.remove_dataset_ownership import (
SimpleRemoveDatasetOwnership,
)
from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl
from datahub.ingestion.transformer.replace_external_url import (
ReplaceExternalUrlContainer,
ReplaceExternalUrlDataset,
)
from datahub.ingestion.transformer.tags_to_terms import TagsToTermMapper
from datahub.metadata.schema_classes import (
BrowsePathsClass,
Expand Down Expand Up @@ -134,6 +138,22 @@ def make_generic_dataset_mcp(
)


def make_generic_container_mcp(
entity_urn: str = "urn:li:container:6338f55439c7ae58243a62c4d6fbffeee",
aspect_name: str = "status",
aspect: Any = None,
) -> MetadataChangeProposalWrapper:
if aspect is None:
aspect = models.StatusClass(removed=False)
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
entityType=Urn.create_from_string(entity_urn).get_type(),
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
)


def create_and_run_test_pipeline(
events: List[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]],
transformers: List[Dict[str, Any]],
Expand Down Expand Up @@ -1929,6 +1949,41 @@ def run_dataset_transformer_pipeline(
return outputs


def run_container_transformer_pipeline(
transformer_type: Type[ContainerTransformer],
aspect: Optional[builder.Aspect],
config: dict,
pipeline_context: Optional[PipelineContext] = None,
use_mce: bool = False,
) -> List[RecordEnvelope]:
if pipeline_context is None:
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
transformer: ContainerTransformer = cast(
ContainerTransformer, transformer_type.create(config, pipeline_context)
)

container: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
if use_mce:
container = MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:container:6338f55439c7ae58243a62c4d6fbffde",
aspects=[],
)
)
else:
assert aspect
container = make_generic_container_mcp(
aspect=aspect, aspect_name=transformer.aspect_name()
)

outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [container, EndOfStream()]]
)
)
return outputs


def test_simple_add_dataset_domain_aspect_name(mock_datahub_graph):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
Expand Down Expand Up @@ -3235,7 +3290,7 @@ def test_replace_external_url_word_replace(
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
Expand All @@ -3262,7 +3317,7 @@ def test_replace_external_regex_replace_1(
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
Expand All @@ -3289,7 +3344,7 @@ def test_replace_external_regex_replace_2(
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
Expand Down Expand Up @@ -3867,3 +3922,87 @@ def fake_get_partial_match_tags(entity_urn: str) -> models.GlobalTagsClass:
assert isinstance(terms_aspect, models.GlossaryTermsClass)
assert len(terms_aspect.terms) == 1
assert terms_aspect.terms[0].urn == "urn:li:glossaryTerm:example1"


def test_replace_external_url_container_word_replace(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": "datahub", "replacement": "starhub"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml"
)


def test_replace_external_regex_container_replace_1(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/test/foo.view.lkml"
)


def test_replace_external_regex_container_replace_2(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": r"\b\w*hub\b", "replacement": "test"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)

0 comments on commit da72ba2

Please sign in to comment.