Skip to content

Commit

Permalink
feat(ingest): support domains in meta -> "datahub" section (#10967)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Jul 25, 2024
1 parent f4fb89e commit 1fa7998
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 18 deletions.
9 changes: 7 additions & 2 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@
"numpy<2",
}

dbt_common = {
*sqlglot_lib,
"more_itertools",
}

sql_common = (
{
# Required for all SQL sources.
Expand Down Expand Up @@ -352,8 +357,8 @@
"datahub-lineage-file": set(),
"datahub-business-glossary": set(),
"delta-lake": {*data_lake_profiling, *delta_lake},
"dbt": {"requests"} | sqlglot_lib | aws_common,
"dbt-cloud": {"requests"} | sqlglot_lib,
"dbt": {"requests"} | dbt_common | aws_common,
"dbt-cloud": {"requests"} | dbt_common,
"druid": sql_common | {"pydruid>=0.6.2"},
"dynamodb": aws_common | classification_lib,
# Starting with 7.14.0 python client is checking if it is connected to elasticsearch client. If its not it throws
Expand Down
13 changes: 10 additions & 3 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)

import typing_inspect
from avrogen.dict_wrapper import DictWrapper

from datahub.configuration.source_common import DEFAULT_ENV as DEFAULT_ENV_CONFIGURATION
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -412,9 +413,9 @@ def make_lineage_mce(
return mce


def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool:
SnapshotType = type(mce.proposedSnapshot)

def can_add_aspect_to_snapshot(
SnapshotType: Type[DictWrapper], AspectType: Type[Aspect]
) -> bool:
constructor_annotations = get_type_hints(SnapshotType.__init__)
aspect_list_union = typing_inspect.get_args(constructor_annotations["aspects"])[0]

Expand All @@ -423,6 +424,12 @@ def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> b
return issubclass(AspectType, supported_aspect_types)


def can_add_aspect(mce: MetadataChangeEventClass, AspectType: Type[Aspect]) -> bool:
SnapshotType = type(mce.proposedSnapshot)

return can_add_aspect_to_snapshot(SnapshotType, AspectType)


def assert_can_add_aspect(
mce: MetadataChangeEventClass, AspectType: Type[Aspect]
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from enum import auto
from typing import Any, Dict, Iterable, List, Optional, Tuple

import more_itertools
import pydantic
from pydantic import root_validator, validator
from pydantic.fields import Field
Expand Down Expand Up @@ -1309,8 +1310,23 @@ def create_dbt_platform_mces(
aspect=self._make_data_platform_instance_aspect(),
).as_workunit()

standalone_aspects, snapshot_aspects = more_itertools.partition(
(
lambda aspect: mce_builder.can_add_aspect_to_snapshot(
DatasetSnapshot, type(aspect)
)
),
aspects,
)
for aspect in standalone_aspects:
# The domains aspect, and some others, may not support being added to the snapshot.
yield MetadataChangeProposalWrapper(
entityUrn=node_datahub_urn,
aspect=aspect,
).as_workunit()

dataset_snapshot = DatasetSnapshot(
urn=node_datahub_urn, aspects=aspects
urn=node_datahub_urn, aspects=list(snapshot_aspects)
)
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
if self.config.write_semantics == "PATCH":
Expand Down Expand Up @@ -1588,6 +1604,10 @@ def _generate_base_dbt_aspects(
):
aspects.append(meta_aspects.get(Constants.ADD_TERM_OPERATION))

# add meta domains aspect
if meta_aspects.get(Constants.ADD_DOMAIN_OPERATION):
aspects.append(meta_aspects.get(Constants.ADD_DOMAIN_OPERATION))

# add meta links aspect
meta_links_aspect = meta_aspects.get(Constants.ADD_DOC_LINK_OPERATION)
if meta_links_aspect and self.config.enable_meta_mapping:
Expand Down
45 changes: 33 additions & 12 deletions metadata-ingestion/src/datahub/utilities/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from datahub.metadata.schema_classes import (
AuditStampClass,
DomainsClass,
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
OwnerClass,
Expand Down Expand Up @@ -70,6 +71,8 @@ class Constants:
ADD_TERM_OPERATION = "add_term"
ADD_TERMS_OPERATION = "add_terms"
ADD_OWNER_OPERATION = "add_owner"
ADD_DOMAIN_OPERATION = "add_domain"

OPERATION = "operation"
OPERATION_CONFIG = "config"
TAG = "tag"
Expand All @@ -94,9 +97,15 @@ class _MappingOwner(ConfigModel):


class _DatahubProps(ConfigModel):
owners: List[Union[str, _MappingOwner]]
tags: Optional[List[str]] = None
terms: Optional[List[str]] = None
owners: Optional[List[Union[str, _MappingOwner]]] = None
domain: Optional[str] = None

def make_owner_category_list(self) -> List[Dict]:
if self.owners is None:
return []

res = []
for owner in self.owners:
if isinstance(owner, str):
Expand Down Expand Up @@ -176,26 +185,29 @@ def process(self, raw_props: Mapping[str, Any]) -> Dict[str, Any]: # noqa: C901
# Process the special "datahub" property, which supports tags, terms, and owners.
operations_map: Dict[str, list] = {}
try:
datahub_prop = raw_props.get("datahub")
if datahub_prop and isinstance(datahub_prop, dict):
if datahub_prop.get("tags"):
raw_datahub_prop = raw_props.get("datahub")
if raw_datahub_prop:
datahub_prop = _DatahubProps.parse_obj_allow_extras(raw_datahub_prop)
if datahub_prop.tags:
# Note that tags get converted to urns later because we need to support the tag prefix.
tags = datahub_prop["tags"]
operations_map.setdefault(Constants.ADD_TAG_OPERATION, []).extend(
tags
datahub_prop.tags
)

if datahub_prop.get("terms"):
terms = datahub_prop["terms"]
if datahub_prop.terms:
operations_map.setdefault(Constants.ADD_TERM_OPERATION, []).extend(
mce_builder.make_term_urn(term) for term in terms
mce_builder.make_term_urn(term) for term in datahub_prop.terms
)

if datahub_prop.get("owners"):
owners = _DatahubProps.parse_obj_allow_extras(datahub_prop)
if datahub_prop.owners:
operations_map.setdefault(Constants.ADD_OWNER_OPERATION, []).extend(
owners.make_owner_category_list()
datahub_prop.make_owner_category_list()
)

if datahub_prop.domain:
operations_map.setdefault(
Constants.ADD_DOMAIN_OPERATION, []
).append(mce_builder.make_domain_urn(datahub_prop.domain))
except Exception as e:
logger.error(f"Error while processing datahub property: {e}")

Expand Down Expand Up @@ -299,6 +311,15 @@ def convert_to_aspects(self, operation_map: Dict[str, list]) -> Dict[str, Any]:
)
aspect_map[Constants.ADD_TERM_OPERATION] = term_aspect

if Constants.ADD_DOMAIN_OPERATION in operation_map:
domain_aspect = DomainsClass(
domains=[
mce_builder.make_domain_urn(domain)
for domain in operation_map[Constants.ADD_DOMAIN_OPERATION]
]
)
aspect_map[Constants.ADD_DOMAIN_OPERATION] = domain_aspect

if Constants.ADD_DOC_LINK_OPERATION in operation_map:
try:
if len(
Expand Down
5 changes: 5 additions & 0 deletions metadata-ingestion/tests/unit/test_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from datahub.metadata.com.linkedin.pegasus2avro.common import GlobalTags
from datahub.metadata.schema_classes import (
DomainsClass,
GlobalTagsClass,
GlossaryTermsClass,
InstitutionalMemoryClass,
Expand Down Expand Up @@ -366,6 +367,7 @@ def test_operation_processor_datahub_props():
"owner_type": "urn:li:ownershipType:steward",
},
],
"domain": "domain1",
}
}

Expand Down Expand Up @@ -396,3 +398,6 @@ def test_operation_processor_datahub_props():
assert [
term_association.urn for term_association in aspect_map["add_term"].terms
] == ["urn:li:glossaryTerm:term1", "urn:li:glossaryTerm:term2"]

assert isinstance(aspect_map["add_domain"], DomainsClass)
assert aspect_map["add_domain"].domains == ["urn:li:domain:domain1"]

0 comments on commit 1fa7998

Please sign in to comment.