diff --git a/metadata-ingestion/docs/transformer/dataset_transformer.md b/metadata-ingestion/docs/transformer/dataset_transformer.md
index 1c84a2759d23e..33ff722a0d0dd 100644
--- a/metadata-ingestion/docs/transformer/dataset_transformer.md
+++ b/metadata-ingestion/docs/transformer/dataset_transformer.md
@@ -14,6 +14,7 @@ The below table shows transformer which can transform aspects of entity [Dataset
| `schemaMetadata` | - [Pattern Add Dataset Schema Field glossaryTerms](#pattern-add-dataset-schema-field-glossaryterms)
- [Pattern Add Dataset Schema Field globalTags](#pattern-add-dataset-schema-field-globaltags) |
| `datasetProperties` | - [Simple Add Dataset datasetProperties](#simple-add-dataset-datasetproperties)
- [Add Dataset datasetProperties](#add-dataset-datasetproperties) |
| `domains` | - [Simple Add Dataset domains](#simple-add-dataset-domains)
- [Pattern Add Dataset domains](#pattern-add-dataset-domains) |
+| `dataProduct` | - [Simple Add Dataset dataProduct ](#simple-add-dataset-dataproduct)
- [Pattern Add Dataset dataProduct](#pattern-add-dataset-dataproduct)
- [Add Dataset dataProduct](#add-dataset-dataproduct)
## Extract Ownership from Tags
### Config Details
@@ -961,6 +962,75 @@ in both of the cases domain should be provisioned on DataHub GMS
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.n.*': ["hr"]
'urn:li:dataset:\(urn:li:dataPlatform:postgres,postgres\.public\.t.*': ["urn:li:domain:finance"]
```
+## Simple Add Dataset dataProduct
+### Config Details
+| Field | Required | Type | Default | Description |
+|-------------------------------|----------|-----------------|---------------|----------------------------------------------------------------------------------------|
+| `dataset_to_data_product_urns`| ✅ | Dict[str, str] | | Dataset Entity urn as key and dataproduct urn as value to create with dataset as asset.|
+
+Let’s suppose we’d like to add a set of dataproduct with specific datasets as its assets. To do so, we can use the `simple_add_dataset_dataproduct` transformer that’s included in the ingestion framework.
+
+The config, which we’d append to our ingestion recipe YAML, would look like this:
+
+ ```yaml
+ transformers:
+ - type: "simple_add_dataset_dataproduct"
+ config:
+ dataset_to_data_product_urns:
+ "urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)": "urn:li:dataProduct:first"
+ "urn:li:dataset:(urn:li:dataPlatform:bigquery,example2,PROD)": "urn:li:dataProduct:second"
+ ```
+
+## Pattern Add Dataset dataProduct
+### Config Details
+| Field | Required | Type | Default | Description |
+|---------------------------------------|----------|----------------------|-------------|---------------------------------------------------------------------------------------------|
+| `dataset_to_data_product_urns_pattern`| ✅ | map[regx, urn] | | Dataset Entity urn with regular expression and dataproduct urn apply to matching entity urn.|
+
+Let’s suppose we’d like to append a series of dataproducts with specific datasets as its assets. To do so, we can use the `pattern_add_dataset_dataproduct` module that’s included in the ingestion framework. This will match the regex pattern to `urn` of the dataset and create the data product entity with given urn and matched datasets as its assets.
+
+The config, which we’d append to our ingestion recipe YAML, would look like this:
+
+ ```yaml
+ transformers:
+ - type: "pattern_add_dataset_dataproduct"
+ config:
+ dataset_to_data_product_urns_pattern:
+ rules:
+ ".*example1.*": "urn:li:dataProduct:first"
+ ".*example2.*": "urn:li:dataProduct:second"
+ ```
+
+## Add Dataset dataProduct
+### Config Details
+| Field | Required | Type | Default | Description |
+|-----------------------------|----------|-----------------------------------|---------------|------------------------------------------------------------------------------------------|
+| `get_data_product_to_add` | ✅ | callable[[str], Optional[str]] | | A function which takes dataset entity urn as input and return dataproduct urn to create. |
+
+If you'd like to add more complex logic for creating dataproducts, you can use the more generic add_dataset_dataproduct transformer, which calls a user-provided function to determine the dataproduct to create with specified datasets as its asset.
+
+```yaml
+transformers:
+ - type: "add_dataset_dataproduct"
+ config:
+ get_data_product_to_add: "."
+```
+
+Then define your function to return a dataproduct entity urn, for example:
+
+```python
+import datahub.emitter.mce_builder as builder
+
+def custom_dataproducts(entity_urn: str) -> Optional[str]:
+ """Compute the dataproduct urn to a given dataset urn."""
+
+ dataset_to_data_product_map = {
+ builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first"
+ }
+ return dataset_to_data_product_map.get(dataset_urn)
+```
+Finally, you can install and use your custom transformer as [shown here](#installing-the-package).
+
## Relationship Between replace_existing and semantics
The transformer behaviour mentioned here is in context of `simple_add_dataset_ownership`, however it is applicable for all dataset transformers which are supporting `replace_existing`
and `semantics` configuration attributes, for example `simple_add_dataset_tags` will add or remove tags as per behaviour mentioned in this section.
diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py
index 10db019b51381..8bbabce4f749f 100644
--- a/metadata-ingestion/setup.py
+++ b/metadata-ingestion/setup.py
@@ -649,6 +649,9 @@
"pattern_add_dataset_schema_terms = datahub.ingestion.transformer.add_dataset_schema_terms:PatternAddDatasetSchemaTerms",
"pattern_add_dataset_schema_tags = datahub.ingestion.transformer.add_dataset_schema_tags:PatternAddDatasetSchemaTags",
"extract_ownership_from_tags = datahub.ingestion.transformer.extract_ownership_from_tags:ExtractOwnersFromTagsTransformer",
+ "add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:AddDatasetDataProduct",
+ "simple_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:SimpleAddDatasetDataProduct",
+ "pattern_add_dataset_dataproduct = datahub.ingestion.transformer.add_dataset_dataproduct:PatternAddDatasetDataProduct",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py
new file mode 100644
index 0000000000000..45e9262843025
--- /dev/null
+++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_dataproduct.py
@@ -0,0 +1,133 @@
+import logging
+from typing import Callable, Dict, List, Optional, Union
+
+import pydantic
+
+from datahub.configuration.common import ConfigModel, KeyValuePattern
+from datahub.configuration.import_resolver import pydantic_resolve_key
+from datahub.emitter.mce_builder import Aspect
+from datahub.emitter.mcp import MetadataChangeProposalWrapper
+from datahub.ingestion.api.common import PipelineContext
+from datahub.ingestion.transformer.dataset_transformer import (
+ DatasetDataproductTransformer,
+)
+from datahub.metadata.schema_classes import MetadataChangeProposalClass
+from datahub.specific.dataproduct import DataProductPatchBuilder
+
+logger = logging.getLogger(__name__)
+
+
+class AddDatasetDataProductConfig(ConfigModel):
+ # dataset_urn -> data product urn
+ get_data_product_to_add: Callable[[str], Optional[str]]
+
+ _resolve_data_product_fn = pydantic_resolve_key("get_data_product_to_add")
+
+
+class AddDatasetDataProduct(DatasetDataproductTransformer):
+ """Transformer that adds dataproduct entity for provided dataset as its asset according to a callback function."""
+
+ ctx: PipelineContext
+ config: AddDatasetDataProductConfig
+
+ def __init__(self, config: AddDatasetDataProductConfig, ctx: PipelineContext):
+ super().__init__()
+ self.ctx = ctx
+ self.config = config
+
+ @classmethod
+ def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetDataProduct":
+ config = AddDatasetDataProductConfig.parse_obj(config_dict)
+ return cls(config, ctx)
+
+ def transform_aspect(
+ self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
+ ) -> Optional[Aspect]:
+ return None
+
+ def handle_end_of_stream(
+ self,
+ ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
+ data_products: Dict[str, DataProductPatchBuilder] = {}
+
+ logger.debug("Generating dataproducts")
+ for entity_urn in self.entity_map.keys():
+ data_product_urn = self.config.get_data_product_to_add(entity_urn)
+ if data_product_urn:
+ if data_product_urn not in data_products:
+ data_products[data_product_urn] = DataProductPatchBuilder(
+ data_product_urn
+ ).add_asset(entity_urn)
+ else:
+ data_products[data_product_urn] = data_products[
+ data_product_urn
+ ].add_asset(entity_urn)
+
+ mcps: List[
+ Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
+ ] = []
+ for data_product in data_products.values():
+ mcps.extend(list(data_product.build()))
+ return mcps
+
+
+class SimpleDatasetDataProductConfig(ConfigModel):
+ dataset_to_data_product_urns: Dict[str, str]
+
+
+class SimpleAddDatasetDataProduct(AddDatasetDataProduct):
+ """Transformer that adds a specified dataproduct entity for provided dataset as its asset."""
+
+ def __init__(self, config: SimpleDatasetDataProductConfig, ctx: PipelineContext):
+
+ generic_config = AddDatasetDataProductConfig(
+ get_data_product_to_add=lambda dataset_urn: config.dataset_to_data_product_urns.get(
+ dataset_urn
+ ),
+ )
+ super().__init__(generic_config, ctx)
+
+ @classmethod
+ def create(
+ cls, config_dict: dict, ctx: PipelineContext
+ ) -> "SimpleAddDatasetDataProduct":
+ config = SimpleDatasetDataProductConfig.parse_obj(config_dict)
+ return cls(config, ctx)
+
+
+class PatternDatasetDataProductConfig(ConfigModel):
+ dataset_to_data_product_urns_pattern: KeyValuePattern = KeyValuePattern.all()
+
+ @pydantic.root_validator(pre=True)
+ def validate_pattern_value(cls, values: Dict) -> Dict:
+ rules = values["dataset_to_data_product_urns_pattern"]["rules"]
+ for key, value in rules.items():
+ if isinstance(value, list) and len(value) > 1:
+ raise ValueError(
+ "Same dataset cannot be an asset of two different data product."
+ )
+ elif isinstance(value, str):
+ rules[key] = [rules[key]]
+ return values
+
+
+class PatternAddDatasetDataProduct(AddDatasetDataProduct):
+ """Transformer that adds a specified dataproduct entity for provided dataset as its asset."""
+
+ def __init__(self, config: PatternDatasetDataProductConfig, ctx: PipelineContext):
+ dataset_to_data_product = config.dataset_to_data_product_urns_pattern
+ generic_config = AddDatasetDataProductConfig(
+ get_data_product_to_add=lambda dataset_urn: dataset_to_data_product.value(
+ dataset_urn
+ )[0]
+ if dataset_to_data_product.value(dataset_urn)
+ else None,
+ )
+ super().__init__(generic_config, ctx)
+
+ @classmethod
+ def create(
+ cls, config_dict: dict, ctx: PipelineContext
+ ) -> "PatternAddDatasetDataProduct":
+ config = PatternDatasetDataProductConfig.parse_obj(config_dict)
+ return cls(config, ctx)
diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py
index 72a8c226e491e..7508b33c6bfc6 100644
--- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py
+++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_tags.py
@@ -1,5 +1,5 @@
import logging
-from typing import Callable, List, Optional, cast
+from typing import Callable, List, Optional, Union, cast
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
@@ -13,6 +13,7 @@
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
from datahub.metadata.schema_classes import (
GlobalTagsClass,
+ MetadataChangeProposalClass,
TagAssociationClass,
TagKeyClass,
)
@@ -65,9 +66,13 @@ def transform_aspect(
self.config, self.ctx.graph, entity_urn, out_global_tags_aspect
)
- def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]:
+ def handle_end_of_stream(
+ self,
+ ) -> List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
- mcps: List[MetadataChangeProposalWrapper] = []
+ mcps: List[
+ Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
+ ] = []
logger.debug("Generating tags")
@@ -121,7 +126,6 @@ class PatternAddDatasetTags(AddDatasetTags):
"""Transformer that adds a specified set of tags to each dataset."""
def __init__(self, config: PatternDatasetTagsConfig, ctx: PipelineContext):
- config.tag_pattern.all
tag_pattern = config.tag_pattern
generic_config = AddDatasetTagsConfig(
get_tags_to_add=lambda _: [
diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py
index 8b6f42dcfba4b..254b3d084f2be 100644
--- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py
+++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py
@@ -1,6 +1,6 @@
import logging
from abc import ABCMeta, abstractmethod
-from typing import Any, Dict, Iterable, List, Optional, Union
+from typing import Any, Dict, Iterable, List, Optional, Sequence, Union
import datahub.emitter.mce_builder as builder
from datahub.emitter.aspect import ASPECT_MAP
@@ -28,7 +28,9 @@ def _update_work_unit_id(
class HandleEndOfStreamTransformer:
- def handle_end_of_stream(self) -> List[MetadataChangeProposalWrapper]:
+ def handle_end_of_stream(
+ self,
+ ) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
return []
@@ -206,15 +208,19 @@ def _handle_end_of_stream(
):
return
- mcps: List[MetadataChangeProposalWrapper] = self.handle_end_of_stream()
+ mcps: Sequence[
+ Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
+ ] = self.handle_end_of_stream()
for mcp in mcps:
- if mcp.aspect is None or mcp.entityUrn is None: # to silent the lint error
+ if (
+ mcp.aspect is None or mcp.aspectName is None or mcp.entityUrn is None
+ ): # to silent the lint error
continue
record_metadata = _update_work_unit_id(
envelope=envelope,
- aspect_name=mcp.aspect.get_aspect_name(), # type: ignore
+ aspect_name=mcp.aspectName,
urn=mcp.entityUrn,
)
diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py
index 0b2433c3a1fe2..79151f7b11bf0 100644
--- a/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py
+++ b/metadata-ingestion/src/datahub/ingestion/transformer/dataset_transformer.py
@@ -118,3 +118,8 @@ def aspect_name(self) -> str:
class DatasetSchemaMetadataTransformer(DatasetTransformer, metaclass=ABCMeta):
def aspect_name(self) -> str:
return "schemaMetadata"
+
+
+class DatasetDataproductTransformer(DatasetTransformer, metaclass=ABCMeta):
+ def aspect_name(self) -> str:
+ return "dataProductProperties"
diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py
index 546549dcf37a4..5152f406ed3ce 100644
--- a/metadata-ingestion/tests/unit/test_transform_dataset.py
+++ b/metadata-ingestion/tests/unit/test_transform_dataset.py
@@ -1,3 +1,4 @@
+import json
import re
from typing import (
Any,
@@ -27,6 +28,11 @@
from datahub.ingestion.transformer.add_dataset_browse_path import (
AddDatasetBrowsePathTransformer,
)
+from datahub.ingestion.transformer.add_dataset_dataproduct import (
+ AddDatasetDataProduct,
+ PatternAddDatasetDataProduct,
+ SimpleAddDatasetDataProduct,
+)
from datahub.ingestion.transformer.add_dataset_ownership import (
AddDatasetOwnership,
PatternAddDatasetOwnership,
@@ -873,7 +879,7 @@ def test_pattern_dataset_tags_transformation(mock_time):
assert builder.make_tag_urn("Needs Documentation") not in tags_aspect.tags
-def test_import_resolver():
+def test_add_dataset_tags_transformation():
transformer = AddDatasetTags.create(
{
"get_tags_to_add": "tests.unit.test_transform_dataset.dummy_tag_resolver_method"
@@ -2665,3 +2671,156 @@ def test_pattern_dataset_schema_tags_transformation_patch(
assert builder.make_tag_urn("pii") in global_tags_urn
assert builder.make_tag_urn("FirstName") in global_tags_urn
assert builder.make_tag_urn("Name") in global_tags_urn
+
+
+def test_simple_dataset_data_product_transformation(mock_time):
+ transformer = SimpleAddDatasetDataProduct.create(
+ {
+ "dataset_to_data_product_urns": {
+ builder.make_dataset_urn(
+ "bigquery", "example1"
+ ): "urn:li:dataProduct:first",
+ builder.make_dataset_urn(
+ "bigquery", "example2"
+ ): "urn:li:dataProduct:second",
+ builder.make_dataset_urn(
+ "bigquery", "example3"
+ ): "urn:li:dataProduct:first",
+ }
+ },
+ PipelineContext(run_id="test-dataproduct"),
+ )
+
+ outputs = list(
+ transformer.transform(
+ [
+ RecordEnvelope(input, metadata={})
+ for input in [
+ make_generic_dataset(
+ entity_urn=builder.make_dataset_urn("bigquery", "example1")
+ ),
+ make_generic_dataset(
+ entity_urn=builder.make_dataset_urn("bigquery", "example2")
+ ),
+ make_generic_dataset(
+ entity_urn=builder.make_dataset_urn("bigquery", "example3")
+ ),
+ EndOfStream(),
+ ]
+ ]
+ )
+ )
+
+ assert len(outputs) == 6
+
+ # Check new dataproduct entity should be there
+ assert outputs[3].record.entityUrn == "urn:li:dataProduct:first"
+ assert outputs[3].record.aspectName == "dataProductProperties"
+
+ first_data_product_aspect = json.loads(
+ outputs[3].record.aspect.value.decode("utf-8")
+ )
+ assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [
+ builder.make_dataset_urn("bigquery", "example1"),
+ builder.make_dataset_urn("bigquery", "example3"),
+ ]
+
+ second_data_product_aspect = json.loads(
+ outputs[4].record.aspect.value.decode("utf-8")
+ )
+ assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [
+ builder.make_dataset_urn("bigquery", "example2")
+ ]
+
+ assert isinstance(outputs[5].record, EndOfStream)
+
+
+def test_pattern_dataset_data_product_transformation(mock_time):
+ transformer = PatternAddDatasetDataProduct.create(
+ {
+ "dataset_to_data_product_urns_pattern": {
+ "rules": {
+ ".*example1.*": "urn:li:dataProduct:first",
+ ".*": "urn:li:dataProduct:second",
+ }
+ },
+ },
+ PipelineContext(run_id="test-dataproducts"),
+ )
+
+ outputs = list(
+ transformer.transform(
+ [
+ RecordEnvelope(input, metadata={})
+ for input in [
+ make_generic_dataset(
+ entity_urn=builder.make_dataset_urn("bigquery", "example1")
+ ),
+ make_generic_dataset(
+ entity_urn=builder.make_dataset_urn("bigquery", "example2")
+ ),
+ make_generic_dataset(
+ entity_urn=builder.make_dataset_urn("bigquery", "example3")
+ ),
+ EndOfStream(),
+ ]
+ ]
+ )
+ )
+
+ assert len(outputs) == 6
+
+ # Check new dataproduct entity should be there
+ assert outputs[3].record.entityUrn == "urn:li:dataProduct:first"
+ assert outputs[3].record.aspectName == "dataProductProperties"
+
+ first_data_product_aspect = json.loads(
+ outputs[3].record.aspect.value.decode("utf-8")
+ )
+ assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [
+ builder.make_dataset_urn("bigquery", "example1")
+ ]
+
+ second_data_product_aspect = json.loads(
+ outputs[4].record.aspect.value.decode("utf-8")
+ )
+ assert [item["value"]["destinationUrn"] for item in second_data_product_aspect] == [
+ builder.make_dataset_urn("bigquery", "example2"),
+ builder.make_dataset_urn("bigquery", "example3"),
+ ]
+
+ assert isinstance(outputs[5].record, EndOfStream)
+
+
+def dummy_data_product_resolver_method(dataset_urn):
+ dataset_to_data_product_map = {
+ builder.make_dataset_urn("bigquery", "example1"): "urn:li:dataProduct:first"
+ }
+ return dataset_to_data_product_map.get(dataset_urn)
+
+
+def test_add_dataset_data_product_transformation():
+ transformer = AddDatasetDataProduct.create(
+ {
+ "get_data_product_to_add": "tests.unit.test_transform_dataset.dummy_data_product_resolver_method"
+ },
+ PipelineContext(run_id="test-dataproduct"),
+ )
+ outputs = list(
+ transformer.transform(
+ [
+ RecordEnvelope(input, metadata={})
+ for input in [make_generic_dataset(), EndOfStream()]
+ ]
+ )
+ )
+ # Check new dataproduct entity should be there
+ assert outputs[1].record.entityUrn == "urn:li:dataProduct:first"
+ assert outputs[1].record.aspectName == "dataProductProperties"
+
+ first_data_product_aspect = json.loads(
+ outputs[1].record.aspect.value.decode("utf-8")
+ )
+ assert [item["value"]["destinationUrn"] for item in first_data_product_aspect] == [
+ builder.make_dataset_urn("bigquery", "example1")
+ ]