Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/transformer): tranformer to replace the externalUrl in container properties #11013

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ Then define your class to return a list of custom properties, for example:
add_properties_resolver_class: "<your_module>.<your_class>"
```

## Replace ExternalUrl
## Replace ExternalUrl Dataset
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
Expand All @@ -971,6 +971,24 @@ transformers:
replacement: "sub"
```

## Replace ExternalUrl Container
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `input_pattern` | ✅ | string | | String or pattern to replace |
| `replacement` | ✅ | string | | Replacement string |


Matches the full/partial string in the externalUrl of the container properties and replace that with the replacement string

```yaml
transformers:
- type: "replace_external_url_container"
config:
input_pattern: '\b\w*hub\b'
replacement: "sub"
```

## Clean User URN in DatasetUsageStatistics Aspect
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
3 changes: 2 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,8 @@
"add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct",
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlDataset",
"replace_external_url_container = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrlContainer",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
"domain_mapping_based_on_tags = datahub.ingestion.transformer.dataset_domain_based_on_tags:DatasetTagDomainMapper",
"tags_to_term = datahub.ingestion.transformer.tags_to_terms:TagsToTermMapper",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ def entity_types(self) -> List[str]:
return ["dataset", "container"]


class ContainerTransformer(BaseTransformer, SingleAspectTransformer, metaclass=ABCMeta):
"""Transformer that does transform sequentially on each dataset."""

def __init__(self):
super().__init__()

def entity_types(self) -> List[str]:
return ["container"]


class DatasetOwnershipTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "ownership"
Expand Down Expand Up @@ -143,3 +153,8 @@ def aspect_name(self) -> str:
class TagsToTermTransformer(TagTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "glossaryTerms"


class ContainerPropertiesTransformer(ContainerTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "containerProperties"
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,28 @@
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
ContainerPropertiesTransformer,
DatasetPropertiesTransformer,
)
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.metadata.schema_classes import (
ContainerPropertiesClass,
DatasetPropertiesClass,
)


class ReplaceExternalUrlConfig(ConfigModel):
input_pattern: str
replacement: str


class ReplaceExternalUrl(DatasetPropertiesTransformer):
"""Transformer that clean the ownership URN."""
class ReplaceUrl:
def replace_url(self, pattern: str, replacement: str, external_url: str) -> str:
pattern_obj = re.compile(pattern)
return re.sub(pattern_obj, replacement, external_url)
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved


class ReplaceExternalUrlDataset(DatasetPropertiesTransformer, ReplaceUrl):
"""Transformer that replace the external URL for dataset properties."""

ctx: PipelineContext
config: ReplaceExternalUrlConfig
Expand All @@ -34,7 +44,9 @@ def __init__(
self.resolver_args = resolver_args

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "ReplaceExternalUrl":
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "ReplaceExternalUrlDataset":
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
return cls(config, ctx)

Expand All @@ -55,11 +67,60 @@ def transform_aspect(
in_dataset_properties_aspect
)

pattern = re.compile(self.config.input_pattern)
replacement = self.config.replacement

out_dataset_properties_aspect.externalUrl = re.sub(
pattern, replacement, in_dataset_properties_aspect.externalUrl
out_dataset_properties_aspect.externalUrl = self.replace_url(
self.config.input_pattern,
self.config.replacement,
in_dataset_properties_aspect.externalUrl,
)

return cast(Aspect, out_dataset_properties_aspect)


class ReplaceExternalUrlContainer(ContainerPropertiesTransformer, ReplaceUrl):
"""Transformer that replace the external URL for container properties."""

ctx: PipelineContext
config: ReplaceExternalUrlConfig

def __init__(
self,
config: ReplaceExternalUrlConfig,
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
super().__init__()
self.ctx = ctx
self.config = config
self.resolver_args = resolver_args

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "ReplaceExternalUrlContainer":
config = ReplaceExternalUrlConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:

in_container_properties_aspect: ContainerPropertiesClass = cast(
ContainerPropertiesClass, aspect
)
if (
not hasattr(in_container_properties_aspect, "externalUrl")
or not in_container_properties_aspect.externalUrl
):
return cast(Aspect, in_container_properties_aspect)
else:
out_container_properties_aspect: ContainerPropertiesClass = copy.deepcopy(
in_container_properties_aspect
)

out_container_properties_aspect.externalUrl = self.replace_url(
self.config.input_pattern,
self.config.replacement,
in_container_properties_aspect.externalUrl,
)

return cast(Aspect, out_container_properties_aspect)
147 changes: 143 additions & 4 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
DatasetTagDomainMapper,
)
from datahub.ingestion.transformer.dataset_transformer import (
ContainerTransformer,
DatasetTransformer,
TagTransformer,
)
Expand All @@ -88,7 +89,10 @@
from datahub.ingestion.transformer.remove_dataset_ownership import (
SimpleRemoveDatasetOwnership,
)
from datahub.ingestion.transformer.replace_external_url import ReplaceExternalUrl
from datahub.ingestion.transformer.replace_external_url import (
ReplaceExternalUrlContainer,
ReplaceExternalUrlDataset,
)
from datahub.ingestion.transformer.tags_to_terms import TagsToTermMapper
from datahub.metadata.schema_classes import (
BrowsePathsClass,
Expand Down Expand Up @@ -134,6 +138,22 @@ def make_generic_dataset_mcp(
)


def make_generic_container_mcp(
entity_urn: str = "urn:li:container:6338f55439c7ae58243a62c4d6fbffeee",
aspect_name: str = "status",
aspect: Any = None,
) -> MetadataChangeProposalWrapper:
if aspect is None:
aspect = models.StatusClass(removed=False)
return MetadataChangeProposalWrapper(
entityUrn=entity_urn,
entityType=Urn.create_from_string(entity_urn).get_type(),
aspectName=aspect_name,
changeType="UPSERT",
aspect=aspect,
)


def create_and_run_test_pipeline(
events: List[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]],
transformers: List[Dict[str, Any]],
Expand Down Expand Up @@ -1929,6 +1949,41 @@ def run_dataset_transformer_pipeline(
return outputs


def run_container_transformer_pipeline(
transformer_type: Type[ContainerTransformer],
aspect: Optional[builder.Aspect],
config: dict,
pipeline_context: Optional[PipelineContext] = None,
use_mce: bool = False,
) -> List[RecordEnvelope]:
if pipeline_context is None:
pipeline_context = PipelineContext(run_id="transformer_pipe_line")
transformer: ContainerTransformer = cast(
ContainerTransformer, transformer_type.create(config, pipeline_context)
)

container: Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]
if use_mce:
container = MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn="urn:li:container:6338f55439c7ae58243a62c4d6fbffde",
aspects=[],
)
)
else:
assert aspect
container = make_generic_container_mcp(
aspect=aspect, aspect_name=transformer.aspect_name()
)

outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [container, EndOfStream()]]
)
)
return outputs


def test_simple_add_dataset_domain_aspect_name(mock_datahub_graph):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_simple_add_dataset_domain"
Expand Down Expand Up @@ -3235,7 +3290,7 @@ def test_replace_external_url_word_replace(
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
Expand All @@ -3262,7 +3317,7 @@ def test_replace_external_regex_replace_1(
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
Expand All @@ -3289,7 +3344,7 @@ def test_replace_external_regex_replace_2(
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_dataset_transformer_pipeline(
transformer_type=ReplaceExternalUrl,
transformer_type=ReplaceExternalUrlDataset,
aspect=models.DatasetPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
Expand Down Expand Up @@ -3867,3 +3922,87 @@ def fake_get_partial_match_tags(entity_urn: str) -> models.GlobalTagsClass:
assert isinstance(terms_aspect, models.GlossaryTermsClass)
assert len(terms_aspect.terms) == 1
assert terms_aspect.terms[0].urn == "urn:li:glossaryTerm:example1"


def test_replace_external_url_container_word_replace(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": "datahub", "replacement": "starhub"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/looker-demo/blob/master/foo.view.lkml"
)


def test_replace_external_regex_container_replace_1(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": r"datahub/.*/", "replacement": "starhub/test/"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://github.com/starhub/test/foo.view.lkml"
)


def test_replace_external_regex_container_replace_2(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_replace_external_url_container"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

output = run_container_transformer_pipeline(
transformer_type=ReplaceExternalUrlContainer,
aspect=models.ContainerPropertiesClass(
externalUrl="https://github.com/datahub/looker-demo/blob/master/foo.view.lkml",
customProperties=EXISTING_PROPERTIES.copy(),
name="sample_test",
),
config={"input_pattern": r"\b\w*hub\b", "replacement": "test"},
pipeline_context=pipeline_context,
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert (
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)
Loading