Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/transformer): new transformer to clean user URN for DatasetUsageStatistics aspect #10398

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,24 @@ transformers:
replacement: "sub"
```

## Clean User URN in DatasetUsageStatistics Aspect
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `pattern_for_cleanup` | ✅ | list[string] | | List of suffix/prefix to remove from the Owner URN(s) |


Matches against a User URN in DatasetUsageStatistics aspect and remove the matching part from it
```yaml
transformers:
- type: "pattern_cleanup_dataset_usage_user"
config:
pattern_for_cleanup:
- "ABCDEF"
- (?<=_)(\w+)
```


## Simple Add Dataset domains
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,8 @@ def aspect_name(self) -> str:
class DatasetDataproductTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "dataProductProperties"


class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "datasetUsageStatistics"
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import copy
import re
from typing import Any, Dict, List, Optional, cast

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetUsageStatisticsTransformer,
)
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass

_USER_URN_PREFIX: str = "urn:li:corpuser:"


class PatternCleanupDatasetUsageUserConfig(ConfigModel):
pattern_for_cleanup: List[str]


class PatternCleanupDatasetUsageUser(DatasetUsageStatisticsTransformer):
"""Transformer that clean the user URN for DatasetUsageStatistics aspect."""

ctx: PipelineContext
config: PatternCleanupDatasetUsageUserConfig

def __init__(
self,
config: PatternCleanupDatasetUsageUserConfig,
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
super().__init__()
self.config = config
self.ctx = ctx
self.resolver_args = resolver_args

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "PatternCleanupDatasetUsageUser":
config = PatternCleanupDatasetUsageUserConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
in_dataset_properties_aspect: DatasetUsageStatisticsClass = cast(
DatasetUsageStatisticsClass, aspect
)

if in_dataset_properties_aspect.userCounts is not None:
out_dataset_properties_aspect: DatasetUsageStatisticsClass = copy.deepcopy(
in_dataset_properties_aspect
)

if out_dataset_properties_aspect.userCounts is not None:
for user in out_dataset_properties_aspect.userCounts:
user_id: str = user.user.split(_USER_URN_PREFIX)[1]
for value in self.config.pattern_for_cleanup:
cleaned_user_id = re.sub(value, "", user_id)
user.user = _USER_URN_PREFIX + cleaned_user_id

return cast(Aspect, out_dataset_properties_aspect)
else:
return cast(Aspect, out_dataset_properties_aspect)
else:
return cast(Aspect, in_dataset_properties_aspect)
167 changes: 167 additions & 0 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import re
from datetime import datetime, timezone
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -72,6 +73,9 @@
ExtractOwnersFromTagsTransformer,
)
from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus
from datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user import (
PatternCleanupDatasetUsageUser,
)
from datahub.ingestion.transformer.pattern_cleanup_ownership import (
PatternCleanUpOwnership,
)
Expand All @@ -82,6 +86,7 @@
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetPropertiesClass,
DatasetUserUsageCountsClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnershipClass,
Expand Down Expand Up @@ -3291,3 +3296,165 @@ def test_replace_external_regex_replace_2(
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)


def test_pattern_cleanup_usage_statistics_user_1(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)

output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("IAM:user1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": ["IAM:"]},
pipeline_context=pipeline_context,
)

expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user2"),
count=2,
userEmail="user2@exaple.com",
),
],
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts


def test_pattern_cleanup_usage_statistics_user_2(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)

output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_user_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_user_2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": ["_user"]},
pipeline_context=pipeline_context,
)

expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_2"),
count=2,
userEmail="user2@exaple.com",
),
],
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts


def test_pattern_cleanup_usage_statistics_user_3(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)

output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("abc_user_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("xyz_user_2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": [r"_user_\d+"]},
pipeline_context=pipeline_context,
)

expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("abc"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("xyz"),
count=2,
userEmail="user2@exaple.com",
),
],
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts
Loading