diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index c0a8d31bca4c0..217d65ed05d6a 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -925,6 +925,24 @@ transformers: replacement: "sub" ``` +## Clean User URN in DatasetUsageStatistics Aspect +### Config Details +| Field | Required | Type | Default | Description | +|-----------------------------|----------|---------|---------------|---------------------------------------------| +| `pattern_for_cleanup` | ✅ | list[string] | | List of suffix/prefix to remove from the Owner URN(s) | + + +Matches against a User URN in DatasetUsageStatistics aspect and remove the matching part from it +```yaml +transformers: + - type: "pattern_cleanup_dataset_usage_user" + config: + pattern_for_cleanup: + - "ABCDEF" + - (?<=_)(\w+) +``` + + ## Simple Add Dataset domains ### Config Details | Field | Required | Type | Default | Description | diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 307e519cc9cc6..c9cbe66cdbca7 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -706,6 +706,7 @@ "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct", "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct", "replace_external_url = datahub.ingestion.transformer.replace_external_url:ReplaceExternalUrl", + "pattern_cleanup_dataset_usage_user = datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user:PatternCleanupDatasetUsageUser", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py index 79151f7b11bf0..a78a79141e8e4 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py @@ -123,3 +123,8 @@ def aspect_name(self) -> str: class DatasetDataproductTransformer(DatasetTransformer, metaclass=ABCMeta): def aspect_name(self) -> str: return "dataProductProperties" + + +class DatasetUsageStatisticsTransformer(DatasetTransformer, metaclass=ABCMeta): + def aspect_name(self) -> str: + return "datasetUsageStatistics" diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_dataset_usage_user.py b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_dataset_usage_user.py new file mode 100644 index 0000000000000..a3d41c8e91ec5 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/transformer/pattern_cleanup_dataset_usage_user.py @@ -0,0 +1,67 @@ +import copy +import re +from typing import Any, Dict, List, Optional, cast + +from datahub.configuration.common import ConfigModel +from datahub.emitter.mce_builder import Aspect +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.transformer.dataset_transformer import ( + DatasetUsageStatisticsTransformer, +) +from datahub.metadata.schema_classes import DatasetUsageStatisticsClass + +_USER_URN_PREFIX: str = "urn:li:corpuser:" + + +class PatternCleanupDatasetUsageUserConfig(ConfigModel): + pattern_for_cleanup: List[str] + + +class PatternCleanupDatasetUsageUser(DatasetUsageStatisticsTransformer): + """Transformer that clean the user URN for DatasetUsageStatistics aspect.""" + + ctx: PipelineContext + config: PatternCleanupDatasetUsageUserConfig + + def __init__( + self, + config: PatternCleanupDatasetUsageUserConfig, + ctx: PipelineContext, + **resolver_args: Dict[str, Any], + ): + super().__init__() + self.config = config + self.ctx = ctx + self.resolver_args = resolver_args + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "PatternCleanupDatasetUsageUser": + config = PatternCleanupDatasetUsageUserConfig.parse_obj(config_dict) + return cls(config, ctx) + + def transform_aspect( + self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect] + ) -> Optional[Aspect]: + in_dataset_properties_aspect: DatasetUsageStatisticsClass = cast( + DatasetUsageStatisticsClass, aspect + ) + + if in_dataset_properties_aspect.userCounts is not None: + out_dataset_properties_aspect: DatasetUsageStatisticsClass = copy.deepcopy( + in_dataset_properties_aspect + ) + + if out_dataset_properties_aspect.userCounts is not None: + for user in out_dataset_properties_aspect.userCounts: + user_id: str = user.user.split(_USER_URN_PREFIX)[1] + for value in self.config.pattern_for_cleanup: + cleaned_user_id = re.sub(value, "", user_id) + user.user = _USER_URN_PREFIX + cleaned_user_id + + return cast(Aspect, out_dataset_properties_aspect) + else: + return cast(Aspect, out_dataset_properties_aspect) + else: + return cast(Aspect, in_dataset_properties_aspect) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 89d4fcca8801c..7e01dd8909568 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -1,5 +1,6 @@ import json import re +from datetime import datetime, timezone from typing import ( Any, Callable, @@ -72,6 +73,9 @@ ExtractOwnersFromTagsTransformer, ) from datahub.ingestion.transformer.mark_dataset_status import MarkDatasetStatus +from datahub.ingestion.transformer.pattern_cleanup_dataset_usage_user import ( + PatternCleanupDatasetUsageUser, +) from datahub.ingestion.transformer.pattern_cleanup_ownership import ( PatternCleanUpOwnership, ) @@ -82,6 +86,7 @@ from datahub.metadata.schema_classes import ( BrowsePathsClass, DatasetPropertiesClass, + DatasetUserUsageCountsClass, GlobalTagsClass, MetadataChangeEventClass, OwnershipClass, @@ -3291,3 +3296,165 @@ def test_replace_external_regex_replace_2( output[0].record.aspect.externalUrl == "https://test.com/test/looker-demo/blob/master/foo.view.lkml" ) + + +def test_pattern_cleanup_usage_statistics_user_1( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_pattern_cleanup_usage_statistics_user" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc) + + output = run_dataset_transformer_pipeline( + transformer_type=PatternCleanupDatasetUsageUser, + aspect=models.DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + userCounts=[ + DatasetUserUsageCountsClass( + user=builder.make_user_urn("IAM:user1"), + count=1, + userEmail="user1@exaple.com", + ), + DatasetUserUsageCountsClass( + user=builder.make_user_urn("user2"), + count=2, + userEmail="user2@exaple.com", + ), + ], + ), + config={"pattern_for_cleanup": ["IAM:"]}, + pipeline_context=pipeline_context, + ) + + expectedUsageStatistics = models.DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + userCounts=[ + DatasetUserUsageCountsClass( + user=builder.make_user_urn("user1"), + count=1, + userEmail="user1@exaple.com", + ), + DatasetUserUsageCountsClass( + user=builder.make_user_urn("user2"), + count=2, + userEmail="user2@exaple.com", + ), + ], + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert len(output[0].record.aspect.userCounts) == 2 + assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts + + +def test_pattern_cleanup_usage_statistics_user_2( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_pattern_cleanup_usage_statistics_user" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc) + + output = run_dataset_transformer_pipeline( + transformer_type=PatternCleanupDatasetUsageUser, + aspect=models.DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + userCounts=[ + DatasetUserUsageCountsClass( + user=builder.make_user_urn("test_user_1"), + count=1, + userEmail="user1@exaple.com", + ), + DatasetUserUsageCountsClass( + user=builder.make_user_urn("test_user_2"), + count=2, + userEmail="user2@exaple.com", + ), + ], + ), + config={"pattern_for_cleanup": ["_user"]}, + pipeline_context=pipeline_context, + ) + + expectedUsageStatistics = models.DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + userCounts=[ + DatasetUserUsageCountsClass( + user=builder.make_user_urn("test_1"), + count=1, + userEmail="user1@exaple.com", + ), + DatasetUserUsageCountsClass( + user=builder.make_user_urn("test_2"), + count=2, + userEmail="user2@exaple.com", + ), + ], + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert len(output[0].record.aspect.userCounts) == 2 + assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts + + +def test_pattern_cleanup_usage_statistics_user_3( + mock_datahub_graph, +): + pipeline_context: PipelineContext = PipelineContext( + run_id="test_pattern_cleanup_usage_statistics_user" + ) + pipeline_context.graph = mock_datahub_graph(DatahubClientConfig) + + TS_1 = datetime(year=2023, month=1, day=1, tzinfo=timezone.utc) + + output = run_dataset_transformer_pipeline( + transformer_type=PatternCleanupDatasetUsageUser, + aspect=models.DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + userCounts=[ + DatasetUserUsageCountsClass( + user=builder.make_user_urn("abc_user_1"), + count=1, + userEmail="user1@exaple.com", + ), + DatasetUserUsageCountsClass( + user=builder.make_user_urn("xyz_user_2"), + count=2, + userEmail="user2@exaple.com", + ), + ], + ), + config={"pattern_for_cleanup": [r"_user_\d+"]}, + pipeline_context=pipeline_context, + ) + + expectedUsageStatistics = models.DatasetUsageStatisticsClass( + timestampMillis=int(TS_1.timestamp() * 1000), + userCounts=[ + DatasetUserUsageCountsClass( + user=builder.make_user_urn("abc"), + count=1, + userEmail="user1@exaple.com", + ), + DatasetUserUsageCountsClass( + user=builder.make_user_urn("xyz"), + count=2, + userEmail="user2@exaple.com", + ), + ], + ) + + assert len(output) == 2 + assert output[0].record + assert output[0].record.aspect + assert len(output[0].record.aspect.userCounts) == 2 + assert output[0].record.aspect.userCounts == expectedUsageStatistics.userCounts