diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md index d1a1555a3ca02..1c84a2759d23e 100644 --- a/metadata-ingestion/docs/transformer/dataset_transformer.md +++ b/metadata-ingestion/docs/transformer/dataset_transformer.md @@ -55,12 +55,12 @@ transformers: ``` ## Simple Add Dataset ownership ### Config Details -| Field | Required | Type | Default | Description | -|-----------------------------|----------|--------------|---------------|------------------------------------------------------------------| -| `owner_urns` | ✅ | list[string] | | List of owner urns. | -| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. | -| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | -| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| Field | Required | Type | Default | Description | +|--------------------|----------|--------------|-------------|---------------------------------------------------------------------| +| `owner_urns` | ✅ | list[string] | | List of owner urns. | +| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | +| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | +| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics). @@ -95,7 +95,7 @@ transformers: - "urn:li:corpuser:username1" - "urn:li:corpuser:username2" - "urn:li:corpGroup:groupname" - ownership_type: "PRODUCER" + ownership_type: "urn:li:ownershipType:__system__producer" ``` - Add owners, however overwrite the owners available for the dataset on DataHub GMS ```yaml @@ -107,7 +107,7 @@ transformers: - "urn:li:corpuser:username1" - "urn:li:corpuser:username2" - "urn:li:corpGroup:groupname" - ownership_type: "PRODUCER" + ownership_type: "urn:li:ownershipType:__system__producer" ``` - Add owners, however keep the owners available for the dataset on DataHub GMS ```yaml @@ -124,12 +124,12 @@ transformers: ## Pattern Add Dataset ownership ### Config Details -| Field | Required | Type | Default | Description | -|-----------------------------|--------- |-----------------------|------------------|-----------------------------------------------------------------------------------------| -| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. | -| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. | -| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | -| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | +| Field | Required | Type | Default | Description | +|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------| +| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. | +| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) | +| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. | +| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. | let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners. @@ -158,7 +158,7 @@ The config, which we’d append to our ingestion recipe YAML, would look like th rules: ".*example1.*": ["urn:li:corpuser:username1"] ".*example2.*": ["urn:li:corpuser:username2"] - ownership_type: "PRODUCER" + ownership_type: "urn:li:ownershipType:__system__producer" ``` - Add owner, however overwrite the owners available for the dataset on DataHub GMS ```yaml @@ -170,7 +170,7 @@ The config, which we’d append to our ingestion recipe YAML, would look like th rules: ".*example1.*": ["urn:li:corpuser:username1"] ".*example2.*": ["urn:li:corpuser:username2"] - ownership_type: "PRODUCER" + ownership_type: "urn:li:ownershipType:__system__producer" ``` - Add owner, however keep the owners available for the dataset on DataHub GMS ```yaml diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 64c9ec1bb5704..3b2c87ea25a31 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -9,12 +9,13 @@ from typing import ( TYPE_CHECKING, Any, + Iterable, List, Optional, + Tuple, Type, TypeVar, Union, - cast, get_type_hints, ) @@ -342,26 +343,20 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str: ) -def is_valid_ownership_type(ownership_type: Optional[str]) -> bool: - return ownership_type is not None and ownership_type in [ - OwnershipTypeClass.TECHNICAL_OWNER, - OwnershipTypeClass.BUSINESS_OWNER, - OwnershipTypeClass.DATA_STEWARD, - OwnershipTypeClass.NONE, - OwnershipTypeClass.DEVELOPER, - OwnershipTypeClass.DATAOWNER, - OwnershipTypeClass.DELEGATE, - OwnershipTypeClass.PRODUCER, - OwnershipTypeClass.CONSUMER, - OwnershipTypeClass.STAKEHOLDER, +def get_class_fields(_class: Type[object]) -> Iterable[str]: + return [ + f + for f in dir(_class) + if not callable(getattr(_class, f)) and not f.startswith("_") ] -def validate_ownership_type(ownership_type: Optional[str]) -> str: - if is_valid_ownership_type(ownership_type): - return cast(str, ownership_type) - else: - raise ValueError(f"Unexpected ownership type: {ownership_type}") +def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]: + if ownership_type.startswith("urn:li:"): + return OwnershipTypeClass.CUSTOM, ownership_type + if ownership_type in get_class_fields(OwnershipTypeClass): + return ownership_type, None + raise ValueError(f"Unexpected ownership type: {ownership_type}") def make_lineage_mce( diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py index 71cf6cfa7e92b..73cb8e4d6739b 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_ownership.py @@ -14,11 +14,8 @@ from datahub.ingestion.transformer.dataset_transformer import ( DatasetOwnershipTransformer, ) -from datahub.metadata.schema_classes import ( - OwnerClass, - OwnershipClass, - OwnershipTypeClass, -) +from datahub.metadata._schema_classes import OwnershipTypeClass +from datahub.metadata.schema_classes import OwnerClass, OwnershipClass class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): @@ -102,7 +99,7 @@ def transform_aspect( class DatasetOwnershipBaseConfig(TransformerSemanticsConfigModel): - ownership_type: Optional[str] = OwnershipTypeClass.DATAOWNER + ownership_type: str = OwnershipTypeClass.DATAOWNER class SimpleDatasetOwnershipConfig(DatasetOwnershipBaseConfig): @@ -114,11 +111,14 @@ class SimpleAddDatasetOwnership(AddDatasetOwnership): """Transformer that adds a specified set of owners to each dataset.""" def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext): - ownership_type = builder.validate_ownership_type(config.ownership_type) + ownership_type, ownership_type_urn = builder.validate_ownership_type( + config.ownership_type + ) owners = [ OwnerClass( owner=owner, type=ownership_type, + typeUrn=ownership_type_urn, ) for owner in config.owner_urns ] @@ -147,29 +147,17 @@ class PatternDatasetOwnershipConfig(DatasetOwnershipBaseConfig): class PatternAddDatasetOwnership(AddDatasetOwnership): """Transformer that adds a specified set of owners to each dataset.""" - def getOwners( - self, - key: str, - owner_pattern: KeyValuePattern, - ownership_type: Optional[str] = None, - ) -> List[OwnerClass]: - owners = [ - OwnerClass( - owner=owner, - type=builder.validate_ownership_type(ownership_type), - ) - for owner in owner_pattern.value(key) - ] - return owners - def __init__(self, config: PatternDatasetOwnershipConfig, ctx: PipelineContext): - ownership_type = builder.validate_ownership_type(config.ownership_type) owner_pattern = config.owner_pattern + ownership_type, ownership_type_urn = builder.validate_ownership_type( + config.ownership_type + ) generic_config = AddDatasetOwnershipConfig( get_owners_to_add=lambda urn: [ OwnerClass( owner=owner, type=ownership_type, + typeUrn=ownership_type_urn, ) for owner in owner_pattern.value(urn) ], diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index 7ce78f0ab3e13..0f3c984196a78 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -214,7 +214,10 @@ def test_run_including_registered_transformation(self): "transformers": [ { "type": "simple_add_dataset_ownership", - "config": {"owner_urns": ["urn:li:corpuser:foo"]}, + "config": { + "owner_urns": ["urn:li:corpuser:foo"], + "ownership_type": "urn:li:ownershipType:__system__technical_owner", + }, } ], "sink": {"type": "tests.test_helpers.sink_helpers.RecordingSink"}, diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index bc95451620d22..8014df2f5c519 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -234,7 +234,7 @@ def test_simple_dataset_ownership_transformation(mock_time): assert last_event.entityUrn == outputs[0].record.proposedSnapshot.urn assert all( [ - owner.type == models.OwnershipTypeClass.DATAOWNER + owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None for owner in last_event.aspect.owners ] ) @@ -247,7 +247,7 @@ def test_simple_dataset_ownership_transformation(mock_time): assert len(second_ownership_aspect.owners) == 3 assert all( [ - owner.type == models.OwnershipTypeClass.DATAOWNER + owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None for owner in second_ownership_aspect.owners ] ) @@ -293,6 +293,44 @@ def test_simple_dataset_ownership_with_type_transformation(mock_time): assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER +def test_simple_dataset_ownership_with_type_urn_transformation(mock_time): + input = make_generic_dataset() + + transformer = SimpleAddDatasetOwnership.create( + { + "owner_urns": [ + builder.make_user_urn("person1"), + ], + "ownership_type": "urn:li:ownershipType:__system__technical_owner", + }, + PipelineContext(run_id="test"), + ) + + output = list( + transformer.transform( + [ + RecordEnvelope(input, metadata={}), + RecordEnvelope(EndOfStream(), metadata={}), + ] + ) + ) + + assert len(output) == 3 + + # original MCE is unchanged + assert input == output[0].record + + ownership_aspect = output[1].record.aspect + + assert isinstance(ownership_aspect, OwnershipClass) + assert len(ownership_aspect.owners) == 1 + assert ownership_aspect.owners[0].type == OwnershipTypeClass.CUSTOM + assert ( + ownership_aspect.owners[0].typeUrn + == "urn:li:ownershipType:__system__technical_owner" + ) + + def _test_extract_tags(in_urn: str, regex_str: str, out_tag: str) -> None: input = make_generic_dataset(entity_urn=in_urn) transformer = ExtractDatasetTags.create( @@ -883,6 +921,7 @@ def test_pattern_dataset_ownership_transformation(mock_time): ".*example2.*": [builder.make_user_urn("person2")], } }, + "ownership_type": "DATAOWNER", }, PipelineContext(run_id="test"), ) @@ -2233,6 +2272,7 @@ def fake_ownership_class(entity_urn: str) -> models.OwnershipClass: "replace_existing": False, "semantics": TransformerSemantics.PATCH, "owner_urns": [owner2], + "ownership_type": "DATAOWNER", }, pipeline_context=pipeline_context, )