Skip to content

Commit

Permalink
fix(ingest/transformer): new transformer to clean user URN for datase…
Browse files Browse the repository at this point in the history
…tUsageStatistics aspect (datahub-project#10398)
  • Loading branch information
dushayntAW authored and sleeperdeep committed Jun 25, 2024
1 parent 0b54f0c commit e69bd4f
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 0 deletions.
18 changes: 18 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,24 @@ transformers:
replacement: "sub"
```

## Clean User URN in DatasetUsageStatistics Aspect
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|---------|---------------|---------------------------------------------|
| `pattern_for_cleanup` | ✅ | list[string] | | List of suffix/prefix to remove from the Owner URN(s) |


Matches against a User URN in DatasetUsageStatistics aspect and remove the matching part from it
```yaml
transformers:
- type: "pattern_cleanup_dataset_usage_user"
config:
pattern_for_cleanup:
- "ABCDEF"
- (?<=_)(\w+)
```


## Simple Add Dataset domains
### Config Details
| Field | Required | Type | Default | Description |
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
"replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl",
"pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,3 +123,8 @@ def aspect_name(self) -> str:
class DatasetDataproductTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "dataProductProperties"


class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "datasetUsageStatistics"
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import copy
import re
from typing import Any, Dict, List, Optional, cast

from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import Aspect
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetUsageStatisticsTransformer,
)
from datahub.metadata.schema_classes import DatasetUsageStatisticsClass

_USER_URN_PREFIX: str = "urn:li:corpuser:"


class PatternCleanupDatasetUsageUserConfig(ConfigModel):
pattern_for_cleanup: List[str]


class PatternCleanupDatasetUsageUser(DatasetUsageStatisticsTransformer):
"""Transformer that clean the user URN for DatasetUsageStatistics aspect."""

ctx: PipelineContext
config: PatternCleanupDatasetUsageUserConfig

def __init__(
self,
config: PatternCleanupDatasetUsageUserConfig,
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
super().__init__()
self.config = config
self.ctx = ctx
self.resolver_args = resolver_args

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "PatternCleanupDatasetUsageUser":
config = PatternCleanupDatasetUsageUserConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
in_dataset_properties_aspect: DatasetUsageStatisticsClass = cast(
DatasetUsageStatisticsClass, aspect
)

if in_dataset_properties_aspect.userCounts is not None:
out_dataset_properties_aspect: DatasetUsageStatisticsClass = copy.deepcopy(
in_dataset_properties_aspect
)

if out_dataset_properties_aspect.userCounts is not None:
for user in out_dataset_properties_aspect.userCounts:
user_id: str = user.user.split(_USER_URN_PREFIX)[1]
for value in self.config.pattern_for_cleanup:
cleaned_user_id = re.sub(value, "", user_id)
user.user = _USER_URN_PREFIX + cleaned_user_id

return cast(Aspect, out_dataset_properties_aspect)
else:
return cast(Aspect, out_dataset_properties_aspect)
else:
return cast(Aspect, in_dataset_properties_aspect)
167 changes: 167 additions & 0 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import re
from datetime import datetime, timezone
from typing import (
Any,
Callable,
Expand Down Expand Up @@ -72,6 +73,9 @@
ExtractOwnersFromTagsTransformer,
)
from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus
from datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user import (
PatternCleanupDatasetUsageUser,
)
from datahub.ingestion.transformer.pattern_cleanup_ownership import (
PatternCleanUpOwnership,
)
Expand All @@ -82,6 +86,7 @@
from datahub.metadata.schema_classes import (
BrowsePathsClass,
DatasetPropertiesClass,
DatasetUserUsageCountsClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnershipClass,
Expand Down Expand Up @@ -3291,3 +3296,165 @@ def test_replace_external_regex_replace_2(
output[0].record.aspect.externalUrl
== "https://test.com/test/looker-demo/blob/master/foo.view.lkml"
)


def test_pattern_cleanup_usage_statistics_user_1(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)

output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("IAM:user1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": ["IAM:"]},
pipeline_context=pipeline_context,
)

expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("user2"),
count=2,
userEmail="user2@exaple.com",
),
],
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts


def test_pattern_cleanup_usage_statistics_user_2(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)

output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_user_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_user_2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": ["_user"]},
pipeline_context=pipeline_context,
)

expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("test_2"),
count=2,
userEmail="user2@exaple.com",
),
],
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts


def test_pattern_cleanup_usage_statistics_user_3(
mock_datahub_graph,
):
pipeline_context: PipelineContext = PipelineContext(
run_id="test_pattern_cleanup_usage_statistics_user"
)
pipeline_context.graph = mock_datahub_graph(DatahubClientConfig)

TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc)

output = run_dataset_transformer_pipeline(
transformer_type=PatternCleanupDatasetUsageUser,
aspect=models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("abc_user_1"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("xyz_user_2"),
count=2,
userEmail="user2@exaple.com",
),
],
),
config={"pattern_for_cleanup": [r"_user_\d+"]},
pipeline_context=pipeline_context,
)

expectedUsageStatistics = models.DatasetUsageStatisticsClass(
timestampMillis=int(TS_1.timestamp() * 1000),
userCounts=[
DatasetUserUsageCountsClass(
user=builder.make_user_urn("abc"),
count=1,
userEmail="user1@exaple.com",
),
DatasetUserUsageCountsClass(
user=builder.make_user_urn("xyz"),
count=2,
userEmail="user2@exaple.com",
),
],
)

assert len(output) == 2
assert output[0].record
assert output[0].record.aspect
assert len(output[0].record.aspect.userCounts) == 2
assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts

0 comments on commit e69bd4f

Please sign in to comment.