Skip to content

Commit

Permalink
feat(ingestion): extend feast plugin to ingest tags and owners (datah…
Browse files Browse the repository at this point in the history
…ub-project#11784)

Co-authored-by: John Joyce <john@acryl.io>
  • Loading branch information
margaridafernandes-trip and jjoyce0510 authored Nov 27, 2024
1 parent d9d6255 commit f3eda31
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 19 deletions.
103 changes: 97 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/feast.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathsClass,
GlobalTagsClass,
MLFeaturePropertiesClass,
MLFeatureTablePropertiesClass,
MLPrimaryKeyPropertiesClass,
OwnerClass,
OwnershipClass,
StatusClass,
TagAssociationClass,
)

# FIXME: ValueType module cannot be used as a type
Expand Down Expand Up @@ -91,6 +95,24 @@ class FeastRepositorySourceConfig(ConfigModel):
environment: str = Field(
default=DEFAULT_ENV, description="Environment to use when constructing URNs"
)
# owner_mappings example:
# This must be added to the recipe in order to extract owners, otherwise NO owners will be extracted
# owner_mappings:
# - feast_owner_name: "<owner>"
# datahub_owner_urn: "urn:li:corpGroup:<owner>"
# datahub_ownership_type: "BUSINESS_OWNER"
owner_mappings: Optional[List[Dict[str, str]]] = Field(
default=None, description="Mapping of owner names to owner types"
)
enable_owner_extraction: bool = Field(
default=False,
description="If this is disabled, then we NEVER try to map owners. "
"If this is enabled, then owner_mappings is REQUIRED to extract ownership.",
)
enable_tag_extraction: bool = Field(
default=False,
description="If this is disabled, then we NEVER try to extract tags.",
)


@platform_name("Feast")
Expand Down Expand Up @@ -215,10 +237,15 @@ def _get_entity_workunit(
"""

feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
aspects = (
[StatusClass(removed=False)]
+ self._get_tags(entity)
+ self._get_owners(entity)
)

entity_snapshot = MLPrimaryKeySnapshot(
urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name),
aspects=[StatusClass(removed=False)],
aspects=aspects,
)

entity_snapshot.aspects.append(
Expand All @@ -243,10 +270,11 @@ def _get_feature_workunit(
Generate an MLFeature work unit for a Feast feature.
"""
feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
aspects = [StatusClass(removed=False)] + self._get_tags(field)

feature_snapshot = MLFeatureSnapshot(
urn=builder.make_ml_feature_urn(feature_view_name, field.name),
aspects=[StatusClass(removed=False)],
aspects=aspects,
)

feature_sources = []
Expand Down Expand Up @@ -295,13 +323,18 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU
"""

feature_view_name = f"{self.feature_store.project}.{feature_view.name}"
aspects = (
[
BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
StatusClass(removed=False),
]
+ self._get_tags(feature_view)
+ self._get_owners(feature_view)
)

feature_view_snapshot = MLFeatureTableSnapshot(
urn=builder.make_ml_feature_table_urn("feast", feature_view_name),
aspects=[
BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]),
StatusClass(removed=False),
],
aspects=aspects,
)

feature_view_snapshot.aspects.append(
Expand Down Expand Up @@ -360,6 +393,64 @@ def _get_on_demand_feature_view_workunit(

return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce)

# If a tag is specified in a Feast object, then the tag will be ingested into Datahub if enable_tag_extraction is
# True, otherwise NO tags will be ingested
def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list:
"""
Extracts tags from the given object and returns a list of aspects.
"""
aspects: List[Union[GlobalTagsClass]] = []

# Extract tags
if self.source_config.enable_tag_extraction:
if obj.tags.get("name"):
tag_name: str = obj.tags["name"]
tag_association = TagAssociationClass(
tag=builder.make_tag_urn(tag_name)
)
global_tags_aspect = GlobalTagsClass(tags=[tag_association])
aspects.append(global_tags_aspect)

return aspects

# If an owner is specified in a Feast object, it will only be ingested into Datahub if owner_mappings is specified
# and enable_owner_extraction is True in FeastRepositorySourceConfig, otherwise NO owners will be ingested
def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list:
"""
Extracts owners from the given object and returns a list of aspects.
"""
aspects: List[Union[OwnershipClass]] = []

# Extract owner
if self.source_config.enable_owner_extraction:
owner = getattr(obj, "owner", None)
if owner:
# Create owner association, skipping if None
owner_association = self._create_owner_association(owner)
if owner_association: # Only add valid owner associations
owners_aspect = OwnershipClass(owners=[owner_association])
aspects.append(owners_aspect)

return aspects

def _create_owner_association(self, owner: str) -> Optional[OwnerClass]:
"""
Create an OwnerClass instance for the given owner using the owner mappings.
"""
if self.source_config.owner_mappings is not None:
for mapping in self.source_config.owner_mappings:
if mapping["feast_owner_name"] == owner:
ownership_type_class: str = mapping.get(
"datahub_ownership_type", "TECHNICAL_OWNER"
)
datahub_owner_urn = mapping.get("datahub_owner_urn")
if datahub_owner_urn:
return OwnerClass(
owner=datahub_owner_urn,
type=ownership_type_class,
)
return None

@classmethod
def create(cls, config_dict, ctx):
config = FeastRepositorySourceConfig.parse_obj(config_dict)
Expand Down
Loading

0 comments on commit f3eda31

Please sign in to comment.