Skip to content

Commit

Permalink
feat(ingestion): Add typeUrn handling to ownership transformers (data…
Browse files Browse the repository at this point in the history
  • Loading branch information
skrydal authored and Salman-Apptware committed Dec 15, 2023
1 parent c8ad63a commit c13897b
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 60 deletions.
32 changes: 16 additions & 16 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ transformers:
```
## Simple Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|--------------|---------------|------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|---------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |

For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics).

Expand Down Expand Up @@ -95,7 +95,7 @@ transformers:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owners, however overwrite the owners available for the dataset on DataHub GMS
```yaml
Expand All @@ -107,7 +107,7 @@ transformers:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owners, however keep the owners available for the dataset on DataHub GMS
```yaml
Expand All @@ -124,12 +124,12 @@ transformers:

## Pattern Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|--------- |-----------------------|------------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |

let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.

Expand Down Expand Up @@ -158,7 +158,7 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owner, however overwrite the owners available for the dataset on DataHub GMS
```yaml
Expand All @@ -170,7 +170,7 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owner, however keep the owners available for the dataset on DataHub GMS
```yaml
Expand Down
31 changes: 13 additions & 18 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from typing import (
TYPE_CHECKING,
Any,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
cast,
get_type_hints,
)

Expand Down Expand Up @@ -342,26 +343,20 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str:
)


def is_valid_ownership_type(ownership_type: Optional[str]) -> bool:
return ownership_type is not None and ownership_type in [
OwnershipTypeClass.TECHNICAL_OWNER,
OwnershipTypeClass.BUSINESS_OWNER,
OwnershipTypeClass.DATA_STEWARD,
OwnershipTypeClass.NONE,
OwnershipTypeClass.DEVELOPER,
OwnershipTypeClass.DATAOWNER,
OwnershipTypeClass.DELEGATE,
OwnershipTypeClass.PRODUCER,
OwnershipTypeClass.CONSUMER,
OwnershipTypeClass.STAKEHOLDER,
def get_class_fields(_class: Type[object]) -> Iterable[str]:
return [
f
for f in dir(_class)
if not callable(getattr(_class, f)) and not f.startswith("_")
]


def validate_ownership_type(ownership_type: Optional[str]) -> str:
if is_valid_ownership_type(ownership_type):
return cast(str, ownership_type)
else:
raise ValueError(f"Unexpected ownership type: {ownership_type}")
def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]:
if ownership_type.startswith("urn:li:"):
return OwnershipTypeClass.CUSTOM, ownership_type
if ownership_type in get_class_fields(OwnershipTypeClass):
return ownership_type, None
raise ValueError(f"Unexpected ownership type: {ownership_type}")


def make_lineage_mce(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
from datahub.metadata._schema_classes import OwnershipTypeClass
from datahub.metadata.schema_classes import OwnerClass, OwnershipClass


class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel):
Expand Down Expand Up @@ -102,7 +99,7 @@ def transform_aspect(


class DatasetOwnershipBaseConfig(TransformerSemanticsConfigModel):
ownership_type: Optional[str] = OwnershipTypeClass.DATAOWNER
ownership_type: str = OwnershipTypeClass.DATAOWNER


class SimpleDatasetOwnershipConfig(DatasetOwnershipBaseConfig):
Expand All @@ -114,11 +111,14 @@ class SimpleAddDatasetOwnership(AddDatasetOwnership):
"""Transformer that adds a specified set of owners to each dataset."""

def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext):
ownership_type = builder.validate_ownership_type(config.ownership_type)
ownership_type, ownership_type_urn = builder.validate_ownership_type(
config.ownership_type
)
owners = [
OwnerClass(
owner=owner,
type=ownership_type,
typeUrn=ownership_type_urn,
)
for owner in config.owner_urns
]
Expand Down Expand Up @@ -147,29 +147,17 @@ class PatternDatasetOwnershipConfig(DatasetOwnershipBaseConfig):
class PatternAddDatasetOwnership(AddDatasetOwnership):
"""Transformer that adds a specified set of owners to each dataset."""

def getOwners(
self,
key: str,
owner_pattern: KeyValuePattern,
ownership_type: Optional[str] = None,
) -> List[OwnerClass]:
owners = [
OwnerClass(
owner=owner,
type=builder.validate_ownership_type(ownership_type),
)
for owner in owner_pattern.value(key)
]
return owners

def __init__(self, config: PatternDatasetOwnershipConfig, ctx: PipelineContext):
ownership_type = builder.validate_ownership_type(config.ownership_type)
owner_pattern = config.owner_pattern
ownership_type, ownership_type_urn = builder.validate_ownership_type(
config.ownership_type
)
generic_config = AddDatasetOwnershipConfig(
get_owners_to_add=lambda urn: [
OwnerClass(
owner=owner,
type=ownership_type,
typeUrn=ownership_type_urn,
)
for owner in owner_pattern.value(urn)
],
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/tests/unit/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ def test_run_including_registered_transformation(self):
"transformers": [
{
"type": "simple_add_dataset_ownership",
"config": {"owner_urns": ["urn:li:corpuser:foo"]},
"config": {
"owner_urns": ["urn:li:corpuser:foo"],
"ownership_type": "urn:li:ownershipType:__system__technical_owner",
},
}
],
"sink": {"type": "tests.test_helpers.sink_helpers.RecordingSink"},
Expand Down
44 changes: 42 additions & 2 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
assert last_event.entityUrn == outputs[0].record.proposedSnapshot.urn
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in last_event.aspect.owners
]
)
Expand All @@ -247,7 +247,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
assert len(second_ownership_aspect.owners) == 3
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)
Expand Down Expand Up @@ -293,6 +293,44 @@ def test_simple_dataset_ownership_with_type_transformation(mock_time):
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER


def test_simple_dataset_ownership_with_type_urn_transformation(mock_time):
input = make_generic_dataset()

transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "urn:li:ownershipType:__system__technical_owner",
},
PipelineContext(run_id="test"),
)

output = list(
transformer.transform(
[
RecordEnvelope(input, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)

assert len(output) == 3

# original MCE is unchanged
assert input == output[0].record

ownership_aspect = output[1].record.aspect

assert isinstance(ownership_aspect, OwnershipClass)
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == OwnershipTypeClass.CUSTOM
assert (
ownership_aspect.owners[0].typeUrn
== "urn:li:ownershipType:__system__technical_owner"
)


def _test_extract_tags(in_urn: str, regex_str: str, out_tag: str) -> None:
input = make_generic_dataset(entity_urn=in_urn)
transformer = ExtractDatasetTags.create(
Expand Down Expand Up @@ -883,6 +921,7 @@ def test_pattern_dataset_ownership_transformation(mock_time):
".*example2.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
},
PipelineContext(run_id="test"),
)
Expand Down Expand Up @@ -2233,6 +2272,7 @@ def fake_ownership_class(entity_urn: str) -> models.OwnershipClass:
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"owner_urns": [owner2],
"ownership_type": "DATAOWNER",
},
pipeline_context=pipeline_context,
)
Expand Down

0 comments on commit c13897b

Please sign in to comment.