Skip to content

Commit

Permalink
feat(ingestion/transformer): Add dataset dataproduct transformer (dat…
Browse files Browse the repository at this point in the history
…ahub-project#9491)

Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
  • Loading branch information
shubhamjagtap639 and hsheth2 committed Jan 5, 2024
1 parent f4b05a8 commit cb80024
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 10 deletions.
70 changes: 70 additions & 0 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)<br/> - [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)<br/> - [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)<br/> - [Pattern Add Dataset domains](#pattern-add-dataset-domains) |
| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)<br/> - [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)<br/> - [Add Dataset dataProduct](#add-dataset-dataproduct)

## Extract Ownership from Tags
### Config Details
Expand Down Expand Up @@ -961,6 +962,75 @@ in both of the cases domain should be provisioned on DataHub GMS
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"]
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
```
## Simple Add Dataset dataProduct
### Config Details
| Field | Required | Type | Default | Description |
|-------------------------------|----------|-----------------|---------------|----------------------------------------------------------------------------------------|
| `dataset_to_data_product_urns`| ✅ | Dict[str, str] | | Dataset Entity urn as key and dataproduct urn as value to create with dataset as asset.|

Let’s suppose we’d like to add a set of dataproduct with specific datasets as its assets. To do so, we can use the `simple_add_dataset_dataproduct` transformer that’s included in the ingestion framework.

The config, which we’d append to our ingestion recipe YAML, would look like this:

```yaml
transformers:
- type: "simple_add_dataset_dataproduct"
config:
dataset_to_data_product_urns:
"urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)": "urn:li:dataProduct:first"
"urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)": "urn:li:dataProduct:second"
```

## Pattern Add Dataset dataProduct
### Config Details
| Field | Required | Type | Default | Description |
|---------------------------------------|----------|----------------------|-------------|---------------------------------------------------------------------------------------------|
| `dataset_to_data_product_urns_pattern`| ✅ | map[regx, urn] | | Dataset Entity urn with regular expression and dataproduct urn apply to matching entity urn.|

Let’s suppose we’d like to append a series of dataproducts with specific datasets as its assets. To do so, we can use the `pattern_add_dataset_dataproduct` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and create the data product entity with given urn and matched datasets as its assets.

The config, which we’d append to our ingestion recipe YAML, would look like this:

```yaml
transformers:
- type: "pattern_add_dataset_dataproduct"
config:
dataset_to_data_product_urns_pattern:
rules:
".*example1.*": "urn:li:dataProduct:first"
".*example2.*": "urn:li:dataProduct:second"
```

## Add Dataset dataProduct
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|-----------------------------------|---------------|------------------------------------------------------------------------------------------|
| `get_data_product_to_add` | ✅ | callable[[str], Optional[str]] | | A function which takes dataset entity urn as input and return dataproduct urn to create. |

If you'd like to add more complex logic for creating dataproducts, you can use the more generic add_dataset_dataproduct transformer, which calls a user-provided function to determine the dataproduct to create with specified datasets as its asset.

```yaml
transformers:
- type: "add_dataset_dataproduct"
config:
get_data_product_to_add: "<your_module>.<your_function>"
```

Then define your function to return a dataproduct entity urn, for example:

```python
import datahub.emitter.mce_builder as builder
def custom_dataproducts(entity_urn: str) -> Optional[str]:
"""Compute the dataproduct urn to a given dataset urn."""
dataset_to_data_product_map = {
builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first"
}
return dataset_to_data_product_map.get(dataset_urn)
```
Finally, you can install and use your custom transformer as [shown here](#installing-the-package).

## Relationship Between replace_existing and semantics
The transformer behaviour mentioned here is in context of `simple_add_dataset_ownership`, however it is applicable for all dataset transformers which are supporting `replace_existing`
and `semantics` configuration attributes, for example `simple_add_dataset_tags` will add or remove tags as per behaviour mentioned in this section.
Expand Down
3 changes: 3 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,9 @@
"pattern_add_dataset_schema_terms = datahub.ingestion.transformer.add_dataset_schema_terms:PatternAddDatasetSchemaTerms",
"pattern_add_dataset_schema_tags = datahub.ingestion.transformer.add_dataset_schema_tags:PatternAddDatasetSchemaTags",
"extract_ownership_from_tags = datahub.ingestion.transformer.extract_ownership_from_tags:ExtractOwnersFromTagsTransformer",
"add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct",
"simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
"pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import logging
from typing import Callable, Dict, List, Optional, Union

import pydantic

from datahub.configuration.common import ConfigModel, KeyValuePattern
from datahub.configuration.import_resolver import pydantic_resolve_key
from datahub.emitter.mce_builder import Aspect
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.transformer.dataset_transformer import (
DatasetDataproductTransformer,
)
from datahub.metadata.schema_classes import MetadataChangeProposalClass
from datahub.specific.dataproduct import DataProductPatchBuilder

logger = logging.getLogger(__name__)


class AddDatasetDataProductConfig(ConfigModel):
# dataset_urn -> data product urn
get_data_product_to_add: Callable[[str], Optional[str]]

_resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add")


class AddDatasetDataProduct(DatasetDataproductTransformer):
"""Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function."""

ctx: PipelineContext
config: AddDatasetDataProductConfig

def __init__(self, config: AddDatasetDataProductConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDataProduct":
config = AddDatasetDataProductConfig.parse_obj(config_dict)
return cls(config, ctx)

def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
) -> Optional[Aspect]:
return None

def handle_end_of_stream(
self,
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
data_products: Dict[str, DataProductPatchBuilder] = {}

logger.debug("Generating dataproducts")
for entity_urn in self.entity_map.keys():
data_product_urn = self.config.get_data_product_to_add(entity_urn)
if data_product_urn:
if data_product_urn not in data_products:
data_products[data_product_urn] = DataProductPatchBuilder(
data_product_urn
).add_asset(entity_urn)
else:
data_products[data_product_urn] = data_products[
data_product_urn
].add_asset(entity_urn)

mcps: List[
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
] = []
for data_product in data_products.values():
mcps.extend(list(data_product.build()))
return mcps


class SimpleDatasetDataProductConfig(ConfigModel):
dataset_to_data_product_urns: Dict[str, str]


class SimpleAddDatasetDataProduct(AddDatasetDataProduct):
"""Transformer that adds a specified dataproduct entity for provided dataset as its asset."""

def __init__(self, config: SimpleDatasetDataProductConfig, ctx: PipelineContext):

generic_config = AddDatasetDataProductConfig(
get_data_product_to_add=lambda dataset_urn: config.dataset_to_data_product_urns.get(
dataset_urn
),
)
super().__init__(generic_config, ctx)

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "SimpleAddDatasetDataProduct":
config = SimpleDatasetDataProductConfig.parse_obj(config_dict)
return cls(config, ctx)


class PatternDatasetDataProductConfig(ConfigModel):
dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all()

@pydantic.root_validator(pre=True)
def validate_pattern_value(cls, values: Dict) -> Dict:
rules = values["dataset_to_data_product_urns_pattern"]["rules"]
for key, value in rules.items():
if isinstance(value, list) and len(value) > 1:
raise ValueError(
"Same dataset cannot be an asset of two different data product."
)
elif isinstance(value, str):
rules[key] = [rules[key]]
return values


class PatternAddDatasetDataProduct(AddDatasetDataProduct):
"""Transformer that adds a specified dataproduct entity for provided dataset as its asset."""

def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext):
dataset_to_data_product = config.dataset_to_data_product_urns_pattern
generic_config = AddDatasetDataProductConfig(
get_data_product_to_add=lambda dataset_urn: dataset_to_data_product.value(
dataset_urn
)[0]
if dataset_to_data_product.value(dataset_urn)
else None,
)
super().__init__(generic_config, ctx)

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "PatternAddDatasetDataProduct":
config = PatternDatasetDataProductConfig.parse_obj(config_dict)
return cls(config, ctx)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Callable, List, Optional, cast
from typing import Callable, List, Optional, Union, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
Expand All @@ -13,6 +13,7 @@
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
from datahub.metadata.schema_classes import (
GlobalTagsClass,
MetadataChangeProposalClass,
TagAssociationClass,
TagKeyClass,
)
Expand Down Expand Up @@ -65,9 +66,13 @@ def transform_aspect(
self.config, self.ctx.graph, entity_urn, out_global_tags_aspect
)

def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]:
def handle_end_of_stream(
self,
) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:

mcps: List[MetadataChangeProposalWrapper] = []
mcps: List[
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
] = []

logger.debug("Generating tags")

Expand Down Expand Up @@ -121,7 +126,6 @@ class PatternAddDatasetTags(AddDatasetTags):
"""Transformer that adds a specified set of tags to each dataset."""

def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext):
config.tag_pattern.all
tag_pattern = config.tag_pattern
generic_config = AddDatasetTagsConfig(
get_tags_to_add=lambda _: [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from abc import ABCMeta, abstractmethod
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union

import datahub.emitter.mce_builder as builder
from datahub.emitter.aspect import ASPECT_MAP
Expand Down Expand Up @@ -28,7 +28,9 @@ def _update_work_unit_id(


class HandleEndOfStreamTransformer:
def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]:
def handle_end_of_stream(
self,
) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
return []


Expand Down Expand Up @@ -206,15 +208,19 @@ def _handle_end_of_stream(
):
return

mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream()
mcps: Sequence[
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
] = self.handle_end_of_stream()

for mcp in mcps:
if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error
if (
mcp.aspect is None or mcp.aspectName is None or mcp.entityUrn is None
): # to silent the lint error
continue

record_metadata = _update_work_unit_id(
envelope=envelope,
aspect_name=mcp.aspect.get_aspect_name(), # type: ignore
aspect_name=mcp.aspectName,
urn=mcp.entityUrn,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,8 @@ def aspect_name(self) -> str:
class DatasetSchemaMetadataTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "schemaMetadata"


class DatasetDataproductTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "dataProductProperties"
Loading

0 comments on commit cb80024

Please sign in to comment.