From ef24dc4bfc302e201a820fb889cbe4feb39539c8 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Mon, 4 Nov 2024 18:11:36 +0000 Subject: [PATCH 01/23] feat(ingestion): extend feast plugin to ingest tags for features --- .../scripts/datahub_preflight.sh | 4 +-- .../src/datahub/ingestion/source/feast.py | 36 ++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/scripts/datahub_preflight.sh b/metadata-ingestion/scripts/datahub_preflight.sh index 9676964f4d49d1..9e259ca944cc8a 100755 --- a/metadata-ingestion/scripts/datahub_preflight.sh +++ b/metadata-ingestion/scripts/datahub_preflight.sh @@ -1,4 +1,4 @@ -#!/bin/bash -e + #!/bin/bash -e #From https://stackoverflow.com/questions/4023830/how-to-compare-two-strings-in-dot-separated-version-format-in-bash verlte() { @@ -45,7 +45,7 @@ arm64_darwin_preflight() { pip3 install --no-use-pep517 scipy fi - brew_install "openssl@1.1" + brew_install "openssl@3.0.14" brew install "postgresql@14" # postgresql installs libs in a strange way diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index e097fd1f221ea5..cccfd0f8ddb368 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -19,6 +19,8 @@ from feast.data_source import DataSource from pydantic import Field +from datahub.emitter.mcp import MetadataChangeProposalWrapper + import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import DEFAULT_ENV @@ -48,6 +50,9 @@ StatusClass, ) +from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass + + # FIXME: ValueType module cannot be used as a type _field_type_mapping: Dict[Union[ValueType, feast.types.FeastType], str] = { ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN, @@ -365,13 +370,41 @@ def create(cls, config_dict, ctx): config = FeastRepositorySourceConfig.parse_obj(config_dict) return cls(config, ctx) + def _add_tags_to_feature(self, feature_urn: str, tag_data: dict): + """ + Attach tags to a feature in DataHub using tag data from a Field. + + Args: + feature_urn (str): The URN of the feature to attach tags to. + tag_data (dict): Tag data with "name" for tag name. + """ + feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + + tag_name = tag_data.get("name") + if tag_name: + # Create tag association + tag_association = TagAssociationClass(tag=make_tag_urn(tag_name)) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + + # Create and emit the MetadataChangeProposalWrapper for tags + tag_event = MetadataChangeProposalWrapper( + entityUrn=feature_urn, + aspect=global_tags_aspect, + ) + + return MetadataWorkUnit(id=feature_view_name, mcp=tag_event) + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + + yield from self._add_tags_to_feature( + 'urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.visitor_actions_view)', + tag_data={"name": "Legacy"}) + for feature_view in self.feature_store.list_feature_views(): for entity_name in feature_view.entities: entity = self.feature_store.get_entity(entity_name) yield self._get_entity_workunit(feature_view, entity) - for field in feature_view.features: yield self._get_feature_workunit(feature_view, field) yield self._get_feature_view_workunit(feature_view) @@ -384,3 +417,4 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def get_report(self) -> SourceReport: return self.report + From 0494431a71802f569a62f07732d9bd3c99a73809 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Mon, 4 Nov 2024 19:01:33 +0000 Subject: [PATCH 02/23] revert(ingestion): change in get_workunits_internal --- metadata-ingestion/src/datahub/ingestion/source/feast.py | 1 + 1 file changed, 1 insertion(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index cccfd0f8ddb368..d38ae4949158e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -405,6 +405,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: entity = self.feature_store.get_entity(entity_name) yield self._get_entity_workunit(feature_view, entity) + for field in feature_view.features: yield self._get_feature_workunit(feature_view, field) yield self._get_feature_view_workunit(feature_view) From 429f66309e94af5d2bd16763fe18eaa859f807d3 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Thu, 7 Nov 2024 15:06:37 +0000 Subject: [PATCH 03/23] feat(ingestion): add tags, owners metadata ingestion --- .../src/datahub/ingestion/source/feast.py | 105 +++--- .../feast/feast_repository_mces_golden.json | 303 ------------------ .../feast/feature_store/features.py | 7 +- 3 files changed, 66 insertions(+), 349 deletions(-) delete mode 100644 metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index d38ae4949158e8..fb136495a11372 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -51,7 +51,7 @@ ) from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass - +from datahub.metadata._schema_classes import OwnerClass, OwnershipClass, OwnershipTypeClass # FIXME: ValueType module cannot be used as a type _field_type_mapping: Dict[Union[ValueType, feast.types.FeastType], str] = { @@ -131,7 +131,7 @@ def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext): ) def _get_field_type( - self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str + self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str ) -> str: """ Maps types encountered in Feast to corresponding schema types. @@ -213,7 +213,7 @@ def _get_data_sources(self, feature_view: FeatureView) -> List[str]: return sources def _get_entity_workunit( - self, feature_view: FeatureView, entity: Entity + self, feature_view: FeatureView, entity: Entity ) -> MetadataWorkUnit: """ Generate an MLPrimaryKey work unit for a Feast entity. @@ -221,9 +221,28 @@ def _get_entity_workunit( feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = [ + StatusClass(removed=False) + ] + + if entity.tags.get("name"): + tag = entity.tags.get("name") + tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag)) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + aspects.append(global_tags_aspect) + + if entity.owner: + owner = entity.owner + owner_association = OwnerClass( + owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER), + type=OwnershipTypeClass.TECHNICAL_OWNER + ) + owners_aspect = OwnershipClass(owners=[owner_association]) + aspects.append(owners_aspect) + entity_snapshot = MLPrimaryKeySnapshot( urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), - aspects=[StatusClass(removed=False)], + aspects=aspects, ) entity_snapshot.aspects.append( @@ -239,19 +258,29 @@ def _get_entity_workunit( return MetadataWorkUnit(id=entity.name, mce=mce) def _get_feature_workunit( - self, - # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type - feature_view: Union[FeatureView, OnDemandFeatureView], - field: FeastField, + self, + # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type + feature_view: Union[FeatureView, OnDemandFeatureView], + field: FeastField, ) -> MetadataWorkUnit: """ Generate an MLFeature work unit for a Feast feature. """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + global_tags_aspect = None + + if field.tags.get("name"): + tag_name = field.tags.get("name") + tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag_name)) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + + aspects = [StatusClass(removed=False)] + if global_tags_aspect is not None: + aspects.append(global_tags_aspect) feature_snapshot = MLFeatureSnapshot( urn=builder.make_ml_feature_urn(feature_view_name, field.name), - aspects=[StatusClass(removed=False)], + aspects=aspects, ) feature_sources = [] @@ -274,7 +303,7 @@ def _get_feature_workunit( if feature_view.source_feature_view_projections is not None: for ( - feature_view_projection + feature_view_projection ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name @@ -301,12 +330,29 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU feature_view_name = f"{self.feature_store.project}.{feature_view.name}" + aspects = [ + BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), + StatusClass(removed=False) + ] + + if feature_view.tags.get("name"): + tag = feature_view.tags.get("name") + tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag)) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + aspects.append(global_tags_aspect) + + if feature_view.owner: + owner = feature_view.owner + owner_association = OwnerClass( + owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER), + type=OwnershipTypeClass.TECHNICAL_OWNER + ) + owners_aspect = OwnershipClass(owners=[owner_association]) + aspects.append(owners_aspect) + feature_view_snapshot = MLFeatureTableSnapshot( urn=builder.make_ml_feature_table_urn("feast", feature_view_name), - aspects=[ - BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), - StatusClass(removed=False), - ], + aspects=aspects, ) feature_view_snapshot.aspects.append( @@ -330,7 +376,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU return MetadataWorkUnit(id=feature_view_name, mce=mce) def _get_on_demand_feature_view_workunit( - self, on_demand_feature_view: OnDemandFeatureView + self, on_demand_feature_view: OnDemandFeatureView ) -> MetadataWorkUnit: """ Generate an MLFeatureTable work unit for a Feast on-demand feature view. @@ -370,36 +416,8 @@ def create(cls, config_dict, ctx): config = FeastRepositorySourceConfig.parse_obj(config_dict) return cls(config, ctx) - def _add_tags_to_feature(self, feature_urn: str, tag_data: dict): - """ - Attach tags to a feature in DataHub using tag data from a Field. - - Args: - feature_urn (str): The URN of the feature to attach tags to. - tag_data (dict): Tag data with "name" for tag name. - """ - feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - - tag_name = tag_data.get("name") - if tag_name: - # Create tag association - tag_association = TagAssociationClass(tag=make_tag_urn(tag_name)) - global_tags_aspect = GlobalTagsClass(tags=[tag_association]) - - # Create and emit the MetadataChangeProposalWrapper for tags - tag_event = MetadataChangeProposalWrapper( - entityUrn=feature_urn, - aspect=global_tags_aspect, - ) - - return MetadataWorkUnit(id=feature_view_name, mcp=tag_event) - def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - yield from self._add_tags_to_feature( - 'urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.visitor_actions_view)', - tag_data={"name": "Legacy"}) - for feature_view in self.feature_store.list_feature_views(): for entity_name in feature_view.entities: entity = self.feature_store.get_entity(entity_name) @@ -418,4 +436,3 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def get_report(self) -> SourceReport: return self.report - diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json deleted file mode 100644 index 1b91925289845b..00000000000000 --- a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json +++ /dev/null @@ -1,303 +0,0 @@ -[ -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": { - "urn": "urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { - "description": "Driver ID", - "dataType": "ORDINAL", - "sources": [ - "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { - "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { - "description": "Conv rate", - "dataType": "CONTINUOUS", - "sources": [ - "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { - "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { - "description": "Acc rate", - "dataType": "CONTINUOUS", - "sources": [ - "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { - "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { - "description": "Avg daily trips", - "dataType": "ORDINAL", - "sources": [ - "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { - "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { - "description": "String feature", - "dataType": "TEXT", - "sources": [ - "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { - "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.driver_hourly_stats)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.BrowsePaths": { - "paths": [ - "/feast/feature_store" - ] - } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { - "customProperties": {}, - "mlFeatures": [ - "urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)", - "urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)", - "urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)", - "urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)" - ], - "mlPrimaryKeys": [ - "urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "entityType": "mlFeatureTable", - "entityUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.driver_hourly_stats)", - "changeType": "UPSERT", - "aspectName": "browsePathsV2", - "aspect": { - "json": { - "path": [ - { - "id": "feature_store" - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { - "urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { - "dataType": "CONTINUOUS", - "sources": [ - "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { - "urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { - "dataType": "CONTINUOUS", - "sources": [ - "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", - "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" - ] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { - "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.transformed_conv_rate)", - "aspects": [ - { - "com.linkedin.pegasus2avro.common.BrowsePaths": { - "paths": [ - "/feast/feature_store" - ] - } - }, - { - "com.linkedin.pegasus2avro.common.Status": { - "removed": false - } - }, - { - "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { - "customProperties": {}, - "mlFeatures": [ - "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)", - "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)" - ], - "mlPrimaryKeys": [] - } - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -}, -{ - "entityType": "mlFeatureTable", - "entityUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.transformed_conv_rate)", - "changeType": "UPSERT", - "aspectName": "browsePathsV2", - "aspect": { - "json": { - "path": [ - { - "id": "feature_store" - } - ] - } - }, - "systemMetadata": { - "lastObserved": 1586847600000, - "runId": "feast-repository-test" - } -} -] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py index a6e6cd3616e924..b928ab5aa9ce00 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/features.py +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -19,6 +19,8 @@ join_keys=["driver_id"], value_type=ValueType.INT64, description="Driver ID", + owner="Datahub", + tags={"name": "deprecated"}, ) driver_hourly_stats_view = FeatureView( @@ -29,7 +31,7 @@ Field( name="conv_rate", dtype=feast.types.Float64, - tags=dict(description="Conv rate"), + tags={"name": "needs_documentation", "description": "Conv rate"}, ), Field( name="acc_rate", @@ -49,7 +51,8 @@ ], online=True, source=driver_hourly_stats_source, - tags={}, + tags={"name": "deprecated"}, + owner="Datahub" ) input_request = RequestSource( From 94fc26abe156f9f2a85dc0708c4a82fa638eea8c Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Thu, 7 Nov 2024 15:10:02 +0000 Subject: [PATCH 04/23] feat(ingestion): add updated mces golden json --- .../feast/feast_repository_mces_golden.json | 410 ++++++++++++++++++ 1 file changed, 410 insertions(+) create mode 100644 metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json new file mode 100644 index 00000000000000..aa08e1bcbbe7d7 --- /dev/null +++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json @@ -0,0 +1,410 @@ +[ +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLPrimaryKeySnapshot": { + "urn": "urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:deprecated" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:Datahub", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { + "customProperties": {}, + "description": "Driver ID", + "dataType": "ORDINAL", + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841325, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:needs_documentation" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, + "description": "Conv rate", + "dataType": "CONTINUOUS", + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841325, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, + "description": "Acc rate", + "dataType": "CONTINUOUS", + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841326, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, + "description": "Avg daily trips", + "dataType": "ORDINAL", + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841326, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, + "description": "String feature", + "dataType": "TEXT", + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841326, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { + "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.driver_hourly_stats)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/feast/feature_store" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:deprecated" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpuser:Datahub", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": {}, + "mlFeatures": [ + "urn:li:mlFeature:(feature_store.driver_hourly_stats,conv_rate)", + "urn:li:mlFeature:(feature_store.driver_hourly_stats,acc_rate)", + "urn:li:mlFeature:(feature_store.driver_hourly_stats,avg_daily_trips)", + "urn:li:mlFeature:(feature_store.driver_hourly_stats,string_feature)" + ], + "mlPrimaryKeys": [ + "urn:li:mlPrimaryKey:(feature_store.driver_hourly_stats,driver_id)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841327, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlFeatureTable", + "entityUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.driver_hourly_stats)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "feature_store" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841328, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, + "dataType": "CONTINUOUS", + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841328, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureSnapshot": { + "urn": "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { + "customProperties": {}, + "dataType": "CONTINUOUS", + "sources": [ + "urn:li:dataset:(urn:li:dataPlatform:request,vals_to_add,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:file,data.driver_stats_with_string.parquet,PROD)" + ] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841328, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.MLFeatureTableSnapshot": { + "urn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.transformed_conv_rate)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/feast/feature_store" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { + "customProperties": {}, + "mlFeatures": [ + "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val1)", + "urn:li:mlFeature:(feature_store.transformed_conv_rate,conv_rate_plus_val2)" + ], + "mlPrimaryKeys": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841328, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlFeatureTable", + "entityUrn": "urn:li:mlFeatureTable:(urn:li:dataPlatform:feast,feature_store.transformed_conv_rate)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "feature_store" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1730988841328, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:deprecated", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "deprecated" + } + }, + "systemMetadata": { + "lastObserved": 1730988841329, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:needs_documentation", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "needs_documentation" + } + }, + "systemMetadata": { + "lastObserved": 1730988841329, + "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file From 33535bb2cbd05f2430701049049d4d2bee974aab Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Thu, 7 Nov 2024 17:24:29 +0000 Subject: [PATCH 05/23] fix(ingestion): static tests --- .../src/datahub/ingestion/source/feast.py | 40 +++++++++---------- .../feast/feature_store/features.py | 2 +- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index fb136495a11372..d87456fb74abf9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -19,8 +19,6 @@ from feast.data_source import DataSource from pydantic import Field -from datahub.emitter.mcp import MetadataChangeProposalWrapper - import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import DEFAULT_ENV @@ -35,6 +33,11 @@ ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata._schema_classes import ( + OwnerClass, + OwnershipClass, + OwnershipTypeClass, +) from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( MLFeatureSnapshot, @@ -44,15 +47,14 @@ from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathsClass, + GlobalTagsClass, MLFeaturePropertiesClass, MLFeatureTablePropertiesClass, MLPrimaryKeyPropertiesClass, StatusClass, + TagAssociationClass, ) -from datahub.metadata.schema_classes import GlobalTagsClass, TagAssociationClass -from datahub.metadata._schema_classes import OwnerClass, OwnershipClass, OwnershipTypeClass - # FIXME: ValueType module cannot be used as a type _field_type_mapping: Dict[Union[ValueType, feast.types.FeastType], str] = { ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN, @@ -131,7 +133,7 @@ def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext): ) def _get_field_type( - self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str + self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str ) -> str: """ Maps types encountered in Feast to corresponding schema types. @@ -213,7 +215,7 @@ def _get_data_sources(self, feature_view: FeatureView) -> List[str]: return sources def _get_entity_workunit( - self, feature_view: FeatureView, entity: Entity + self, feature_view: FeatureView, entity: Entity ) -> MetadataWorkUnit: """ Generate an MLPrimaryKey work unit for a Feast entity. @@ -221,12 +223,10 @@ def _get_entity_workunit( feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - aspects = [ - StatusClass(removed=False) - ] + aspects = [StatusClass(removed=False)] if entity.tags.get("name"): - tag = entity.tags.get("name") + tag: str = entity.tags.get("name") tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag)) global_tags_aspect = GlobalTagsClass(tags=[tag_association]) aspects.append(global_tags_aspect) @@ -235,7 +235,7 @@ def _get_entity_workunit( owner = entity.owner owner_association = OwnerClass( owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER), - type=OwnershipTypeClass.TECHNICAL_OWNER + type=OwnershipTypeClass.TECHNICAL_OWNER, ) owners_aspect = OwnershipClass(owners=[owner_association]) aspects.append(owners_aspect) @@ -258,10 +258,10 @@ def _get_entity_workunit( return MetadataWorkUnit(id=entity.name, mce=mce) def _get_feature_workunit( - self, - # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type - feature_view: Union[FeatureView, OnDemandFeatureView], - field: FeastField, + self, + # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type + feature_view: Union[FeatureView, OnDemandFeatureView], + field: FeastField, ) -> MetadataWorkUnit: """ Generate an MLFeature work unit for a Feast feature. @@ -303,7 +303,7 @@ def _get_feature_workunit( if feature_view.source_feature_view_projections is not None: for ( - feature_view_projection + feature_view_projection ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name @@ -332,7 +332,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU aspects = [ BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), - StatusClass(removed=False) + StatusClass(removed=False), ] if feature_view.tags.get("name"): @@ -345,7 +345,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU owner = feature_view.owner owner_association = OwnerClass( owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER), - type=OwnershipTypeClass.TECHNICAL_OWNER + type=OwnershipTypeClass.TECHNICAL_OWNER, ) owners_aspect = OwnershipClass(owners=[owner_association]) aspects.append(owners_aspect) @@ -376,7 +376,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU return MetadataWorkUnit(id=feature_view_name, mce=mce) def _get_on_demand_feature_view_workunit( - self, on_demand_feature_view: OnDemandFeatureView + self, on_demand_feature_view: OnDemandFeatureView ) -> MetadataWorkUnit: """ Generate an MLFeatureTable work unit for a Feast on-demand feature view. diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py index b928ab5aa9ce00..f6cb12414bd5d1 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/features.py +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -52,7 +52,7 @@ online=True, source=driver_hourly_stats_source, tags={"name": "deprecated"}, - owner="Datahub" + owner="Datahub", ) input_request = RequestSource( From 6a2cc71b8d1c8ace4d45582cb809143bcdb77773 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Thu, 7 Nov 2024 18:12:00 +0000 Subject: [PATCH 06/23] fix(ingestion): update golden file --- .../feast/feast_repository_mces_golden.json | 133 +++--------------- 1 file changed, 22 insertions(+), 111 deletions(-) diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json index aa08e1bcbbe7d7..149d94e46e93c8 100644 --- a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json +++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json @@ -9,30 +9,6 @@ "removed": false } }, - { - "com.linkedin.pegasus2avro.common.GlobalTags": { - "tags": [ - { - "tag": "urn:li:tag:deprecated" - } - ] - } - }, - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:Datahub", - "type": "TECHNICAL_OWNER" - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { "customProperties": {}, @@ -47,8 +23,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841325, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -62,15 +38,6 @@ "removed": false } }, - { - "com.linkedin.pegasus2avro.common.GlobalTags": { - "tags": [ - { - "tag": "urn:li:tag:needs_documentation" - } - ] - } - }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { "customProperties": {}, @@ -85,8 +52,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841325, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -114,8 +81,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841326, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -143,8 +110,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841326, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -172,8 +139,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841326, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -194,30 +161,6 @@ "removed": false } }, - { - "com.linkedin.pegasus2avro.common.GlobalTags": { - "tags": [ - { - "tag": "urn:li:tag:deprecated" - } - ] - } - }, - { - "com.linkedin.pegasus2avro.common.Ownership": { - "owners": [ - { - "owner": "urn:li:corpuser:Datahub", - "type": "TECHNICAL_OWNER" - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - } - } - }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { "customProperties": {}, @@ -236,8 +179,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841327, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -256,8 +199,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841328, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -285,8 +228,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841328, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -314,8 +257,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841328, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -350,8 +293,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841328, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } }, @@ -370,40 +313,8 @@ } }, "systemMetadata": { - "lastObserved": 1730988841328, - "runId": "feast-2024_11_07-14_14_00-j90obu", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:deprecated", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "deprecated" - } - }, - "systemMetadata": { - "lastObserved": 1730988841329, - "runId": "feast-2024_11_07-14_14_00-j90obu", - "lastRunId": "no-run-id-provided" - } -}, -{ - "entityType": "tag", - "entityUrn": "urn:li:tag:needs_documentation", - "changeType": "UPSERT", - "aspectName": "tagKey", - "aspect": { - "json": { - "name": "needs_documentation" - } - }, - "systemMetadata": { - "lastObserved": 1730988841329, - "runId": "feast-2024_11_07-14_14_00-j90obu", + "lastObserved": 1586847600000, + "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } } From 4e8916a750db90b61a49fb71dabddcb6f9bd6a42 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Thu, 7 Nov 2024 19:01:45 +0000 Subject: [PATCH 07/23] feat(ingestion): solve discussions --- metadata-ingestion/scripts/datahub_preflight.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/scripts/datahub_preflight.sh b/metadata-ingestion/scripts/datahub_preflight.sh index 9e259ca944cc8a..9676964f4d49d1 100755 --- a/metadata-ingestion/scripts/datahub_preflight.sh +++ b/metadata-ingestion/scripts/datahub_preflight.sh @@ -1,4 +1,4 @@ - #!/bin/bash -e +#!/bin/bash -e #From https://stackoverflow.com/questions/4023830/how-to-compare-two-strings-in-dot-separated-version-format-in-bash verlte() { @@ -45,7 +45,7 @@ arm64_darwin_preflight() { pip3 install --no-use-pep517 scipy fi - brew_install "openssl@3.0.14" + brew_install "openssl@1.1" brew install "postgresql@14" # postgresql installs libs in a strange way From 7620c985471475084cc4ef2c5267dcc57c0db19a Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Fri, 8 Nov 2024 14:42:15 +0000 Subject: [PATCH 08/23] feat(ingestion): add reusable method for tags and owners --- .../src/datahub/ingestion/source/feast.py | 113 ++++++++---------- 1 file changed, 49 insertions(+), 64 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index d87456fb74abf9..3635fe386653a0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Dict, Iterable, List, Optional, Tuple, Union +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import feast.types from feast import ( @@ -88,6 +88,14 @@ feast.types.Invalid: MLFeatureDataType.UNKNOWN, } +# FIXME: Update to have more owners +_owner_mapping: Dict[str, Dict[str, Any]] = { + "Datahub": { + "owner_type": builder.OwnerType.USER, + "owner_ship_type_class": OwnershipTypeClass.DATAOWNER, + } +} + class FeastRepositorySourceConfig(ConfigModel): path: str = Field(description="Path to Feast repository") @@ -220,31 +228,13 @@ def _get_entity_workunit( """ Generate an MLPrimaryKey work unit for a Feast entity. """ - feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - - aspects = [StatusClass(removed=False)] - - if entity.tags.get("name"): - tag: str = entity.tags.get("name") - tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag)) - global_tags_aspect = GlobalTagsClass(tags=[tag_association]) - aspects.append(global_tags_aspect) - - if entity.owner: - owner = entity.owner - owner_association = OwnerClass( - owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER), - type=OwnershipTypeClass.TECHNICAL_OWNER, - ) - owners_aspect = OwnershipClass(owners=[owner_association]) - aspects.append(owners_aspect) + aspects = [StatusClass(removed=False)] + self._get_tags_and_owners(entity) entity_snapshot = MLPrimaryKeySnapshot( urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), aspects=aspects, ) - entity_snapshot.aspects.append( MLPrimaryKeyPropertiesClass( description=entity.description, @@ -254,12 +244,10 @@ def _get_entity_workunit( ) mce = MetadataChangeEvent(proposedSnapshot=entity_snapshot) - return MetadataWorkUnit(id=entity.name, mce=mce) def _get_feature_workunit( self, - # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type feature_view: Union[FeatureView, OnDemandFeatureView], field: FeastField, ) -> MetadataWorkUnit: @@ -267,16 +255,7 @@ def _get_feature_workunit( Generate an MLFeature work unit for a Feast feature. """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - global_tags_aspect = None - - if field.tags.get("name"): - tag_name = field.tags.get("name") - tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag_name)) - global_tags_aspect = GlobalTagsClass(tags=[tag_association]) - - aspects = [StatusClass(removed=False)] - if global_tags_aspect is not None: - aspects.append(global_tags_aspect) + aspects = [StatusClass(removed=False)] + self._get_tags_and_owners(field) feature_snapshot = MLFeatureSnapshot( urn=builder.make_ml_feature_urn(feature_view_name, field.name), @@ -287,28 +266,23 @@ def _get_feature_workunit( if isinstance(feature_view, FeatureView): feature_sources = self._get_data_sources(feature_view) elif isinstance(feature_view, OnDemandFeatureView): - if feature_view.source_request_sources is not None: + if feature_view.source_request_sources: for request_source in feature_view.source_request_sources.values(): source_platform, source_name = self._get_data_source_details( request_source ) - feature_sources.append( builder.make_dataset_urn( - source_platform, - source_name, - self.source_config.environment, + source_platform, source_name, self.source_config.environment ) ) - - if feature_view.source_feature_view_projections is not None: + if feature_view.source_feature_view_projections: for ( feature_view_projection ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name ) - feature_sources.extend(self._get_data_sources(feature_view_source)) feature_snapshot.aspects.append( @@ -320,35 +294,17 @@ def _get_feature_workunit( ) mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) - return MetadataWorkUnit(id=field.name, mce=mce) def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit: """ Generate an MLFeatureTable work unit for a Feast feature view. """ - feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - aspects = [ BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), StatusClass(removed=False), - ] - - if feature_view.tags.get("name"): - tag = feature_view.tags.get("name") - tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag)) - global_tags_aspect = GlobalTagsClass(tags=[tag_association]) - aspects.append(global_tags_aspect) - - if feature_view.owner: - owner = feature_view.owner - owner_association = OwnerClass( - owner=builder.make_owner_urn(owner, owner_type=builder.OwnerType.USER), - type=OwnershipTypeClass.TECHNICAL_OWNER, - ) - owners_aspect = OwnershipClass(owners=[owner_association]) - aspects.append(owners_aspect) + ] + self._get_tags_and_owners(feature_view) feature_view_snapshot = MLFeatureTableSnapshot( urn=builder.make_ml_feature_table_urn("feast", feature_view_name), @@ -358,10 +314,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU feature_view_snapshot.aspects.append( MLFeatureTablePropertiesClass( mlFeatures=[ - builder.make_ml_feature_urn( - feature_view_name, - feature.name, - ) + builder.make_ml_feature_urn(feature_view_name, feature.name) for feature in feature_view.features ], mlPrimaryKeys=[ @@ -372,7 +325,6 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU ) mce = MetadataChangeEvent(proposedSnapshot=feature_view_snapshot) - return MetadataWorkUnit(id=feature_view_name, mce=mce) def _get_on_demand_feature_view_workunit( @@ -411,6 +363,39 @@ def _get_on_demand_feature_view_workunit( return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce) + def _get_tags_and_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: + """ + Extracts tags and owners from the given object and returns a list of aspects. + """ + aspects: List[Union[GlobalTagsClass, OwnershipClass]] = [] + + # Extract tags + tag_name = obj.tags.get("name") if obj.tags else None + if tag_name: + tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag_name)) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + aspects.append(global_tags_aspect) + + # Extract owner + owner = getattr(obj, "owner", None) + if owner: + owner_association = self._create_owner_association(owner) + owners_aspect = OwnershipClass(owners=[owner_association]) + aspects.append(owners_aspect) + + return aspects + + def _create_owner_association(self, owner: str) -> OwnerClass: + + owner_type: builder.OwnerType = _owner_mapping[owner]["owner_type"] + owner_ship_type_class: OwnershipTypeClass = _owner_mapping[owner][ + "owner_ship_type_class" + ] + return OwnerClass( + owner=builder.make_owner_urn(owner, owner_type=owner_type), + type=owner_ship_type_class, + ) + @classmethod def create(cls, config_dict, ctx): config = FeastRepositorySourceConfig.parse_obj(config_dict) From f21d1a0e645d8d3b11c018571a45a20b527066b2 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Fri, 8 Nov 2024 17:19:40 +0000 Subject: [PATCH 09/23] fix(ingestion): remove import --- .../src/datahub/ingestion/source/feast.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 3635fe386653a0..835c64770f2b60 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -33,11 +33,7 @@ ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit -from datahub.metadata._schema_classes import ( - OwnerClass, - OwnershipClass, - OwnershipTypeClass, -) + from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( MLFeatureSnapshot, @@ -53,6 +49,9 @@ MLPrimaryKeyPropertiesClass, StatusClass, TagAssociationClass, + OwnerClass, + OwnershipClass, + OwnershipTypeClass, ) # FIXME: ValueType module cannot be used as a type @@ -91,7 +90,7 @@ # FIXME: Update to have more owners _owner_mapping: Dict[str, Dict[str, Any]] = { "Datahub": { - "owner_type": builder.OwnerType.USER, + "owner_type": builder.OwnerType.GROUP, "owner_ship_type_class": OwnershipTypeClass.DATAOWNER, } } From c066e6acbcc44aa29fff795b069f0402a267e6c5 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Fri, 8 Nov 2024 17:22:24 +0000 Subject: [PATCH 10/23] feat(ingestion): fmt --- metadata-ingestion/src/datahub/ingestion/source/feast.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 835c64770f2b60..ff4e9a48fe7b22 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -33,7 +33,6 @@ ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit - from datahub.metadata.com.linkedin.pegasus2avro.common import MLFeatureDataType from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import ( MLFeatureSnapshot, @@ -47,11 +46,11 @@ MLFeaturePropertiesClass, MLFeatureTablePropertiesClass, MLPrimaryKeyPropertiesClass, - StatusClass, - TagAssociationClass, OwnerClass, OwnershipClass, OwnershipTypeClass, + StatusClass, + TagAssociationClass, ) # FIXME: ValueType module cannot be used as a type From 4598f407d40b3756dfc821aeb5324cfeed26ea1f Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Mon, 18 Nov 2024 17:57:49 +0000 Subject: [PATCH 11/23] feat(ingestion): add MLOPs and ML owners to owner_mapping --- .../src/datahub/ingestion/source/feast.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index ff4e9a48fe7b22..1ea962cd05c577 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -88,10 +88,14 @@ # FIXME: Update to have more owners _owner_mapping: Dict[str, Dict[str, Any]] = { - "Datahub": { + "MLOPs": { "owner_type": builder.OwnerType.GROUP, - "owner_ship_type_class": OwnershipTypeClass.DATAOWNER, - } + "owner_ship_type_class": OwnershipTypeClass.TECHNICAL_OWNER, + }, + "ML": { + "owner_type": builder.OwnerType.GROUP, + "owner_ship_type_class": OwnershipTypeClass.TECHNICAL_OWNER, + }, } From 1b85fb564100ad54bb795896d878a574e858cd16 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 19 Nov 2024 12:10:27 +0000 Subject: [PATCH 12/23] feat(ingestion): add owner mappings to FeastRepositorySourceConfig --- .../src/datahub/ingestion/source/feast.py | 30 +++++++------------ 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 1ea962cd05c577..74190eb3df60ef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -86,18 +86,6 @@ feast.types.Invalid: MLFeatureDataType.UNKNOWN, } -# FIXME: Update to have more owners -_owner_mapping: Dict[str, Dict[str, Any]] = { - "MLOPs": { - "owner_type": builder.OwnerType.GROUP, - "owner_ship_type_class": OwnershipTypeClass.TECHNICAL_OWNER, - }, - "ML": { - "owner_type": builder.OwnerType.GROUP, - "owner_ship_type_class": OwnershipTypeClass.TECHNICAL_OWNER, - }, -} - class FeastRepositorySourceConfig(ConfigModel): path: str = Field(description="Path to Feast repository") @@ -108,6 +96,9 @@ class FeastRepositorySourceConfig(ConfigModel): environment: str = Field( default=DEFAULT_ENV, description="Environment to use when constructing URNs" ) + owner_mappings: List[Dict[str, str]] = Field( + default={}, description="Mapping of owner names to owner types" + ) @platform_name("Feast") @@ -389,14 +380,13 @@ def _get_tags_and_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> l def _create_owner_association(self, owner: str) -> OwnerClass: - owner_type: builder.OwnerType = _owner_mapping[owner]["owner_type"] - owner_ship_type_class: OwnershipTypeClass = _owner_mapping[owner][ - "owner_ship_type_class" - ] - return OwnerClass( - owner=builder.make_owner_urn(owner, owner_type=owner_type), - type=owner_ship_type_class, - ) + for mapping in self.source_config.owner_mappings: + if mapping["feast_owner_name"] == owner: + ownership_type_class: OwnershipTypeClass = mapping["ownership_type"] + return OwnerClass( + owner=mapping["datahub_owner_urn"], + type=ownership_type_class, + ) @classmethod def create(cls, config_dict, ctx): From 0ee059298dbfc6c041caf3759b5b8ffd82dfea21 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 19 Nov 2024 16:37:38 +0000 Subject: [PATCH 13/23] revert(ingestion): remove added spaces --- .../src/datahub/ingestion/source/feast.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 74190eb3df60ef..3cd8ae02ecc5e9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -237,10 +237,12 @@ def _get_entity_workunit( ) mce = MetadataChangeEvent(proposedSnapshot=entity_snapshot) + return MetadataWorkUnit(id=entity.name, mce=mce) def _get_feature_workunit( self, + # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type feature_view: Union[FeatureView, OnDemandFeatureView], field: FeastField, ) -> MetadataWorkUnit: @@ -259,23 +261,28 @@ def _get_feature_workunit( if isinstance(feature_view, FeatureView): feature_sources = self._get_data_sources(feature_view) elif isinstance(feature_view, OnDemandFeatureView): - if feature_view.source_request_sources: + if feature_view.source_request_sources is not None: for request_source in feature_view.source_request_sources.values(): source_platform, source_name = self._get_data_source_details( request_source ) + feature_sources.append( builder.make_dataset_urn( - source_platform, source_name, self.source_config.environment + source_platform, + source_name, + self.source_config.environment ) ) - if feature_view.source_feature_view_projections: + + if feature_view.source_feature_view_projections is not None: for ( feature_view_projection ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name ) + feature_sources.extend(self._get_data_sources(feature_view_source)) feature_snapshot.aspects.append( @@ -287,12 +294,14 @@ def _get_feature_workunit( ) mce = MetadataChangeEvent(proposedSnapshot=feature_snapshot) + return MetadataWorkUnit(id=field.name, mce=mce) def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkUnit: """ Generate an MLFeatureTable work unit for a Feast feature view. """ + feature_view_name = f"{self.feature_store.project}.{feature_view.name}" aspects = [ BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), @@ -318,6 +327,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU ) mce = MetadataChangeEvent(proposedSnapshot=feature_view_snapshot) + return MetadataWorkUnit(id=feature_view_name, mce=mce) def _get_on_demand_feature_view_workunit( @@ -394,7 +404,6 @@ def create(cls, config_dict, ctx): return cls(config, ctx) def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - for feature_view in self.feature_store.list_feature_views(): for entity_name in feature_view.entities: entity = self.feature_store.get_entity(entity_name) From 701b602d1b9bf751ddc6fcfd8d4423de46113606 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 19 Nov 2024 16:39:00 +0000 Subject: [PATCH 14/23] revert(ingestion): remove added spaces --- metadata-ingestion/src/datahub/ingestion/source/feast.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 3cd8ae02ecc5e9..c2c5464f793306 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -221,6 +221,7 @@ def _get_entity_workunit( """ Generate an MLPrimaryKey work unit for a Feast entity. """ + feature_view_name = f"{self.feature_store.project}.{feature_view.name}" aspects = [StatusClass(removed=False)] + self._get_tags_and_owners(entity) @@ -228,6 +229,7 @@ def _get_entity_workunit( urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), aspects=aspects, ) + entity_snapshot.aspects.append( MLPrimaryKeyPropertiesClass( description=entity.description, @@ -271,7 +273,7 @@ def _get_feature_workunit( builder.make_dataset_urn( source_platform, source_name, - self.source_config.environment + self.source_config.environment, ) ) From 6dd35fc2605b559c33f7628d029d2353acf9022f Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 19 Nov 2024 16:42:30 +0000 Subject: [PATCH 15/23] feat(ingestion): fmt --- metadata-ingestion/src/datahub/ingestion/source/feast.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index c2c5464f793306..d633c233fcf862 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -318,7 +318,10 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU feature_view_snapshot.aspects.append( MLFeatureTablePropertiesClass( mlFeatures=[ - builder.make_ml_feature_urn(feature_view_name, feature.name) + builder.make_ml_feature_urn( + feature_view_name, + feature.name, + ) for feature in feature_view.features ], mlPrimaryKeys=[ From b2f4d3bc9352863c81c6eba2254618df6e2dba5d Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Mon, 25 Nov 2024 20:10:37 +0000 Subject: [PATCH 16/23] feat(ingestion): update integration tests --- .../src/datahub/ingestion/source/feast.py | 2 +- .../feast/feast_repository_mces_golden.json | 89 +++++++++++++++++++ .../feast/feature_store/features.py | 4 +- .../feast/test_feast_repository.py | 7 ++ 4 files changed, 99 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index d633c233fcf862..8baa53bb43d6c0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -96,7 +96,7 @@ class FeastRepositorySourceConfig(ConfigModel): environment: str = Field( default=DEFAULT_ENV, description="Environment to use when constructing URNs" ) - owner_mappings: List[Dict[str, str]] = Field( + owner_mappings: List[Dict[str, str]] = Field( default={}, description="Mapping of owner names to owner types" ) diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json index 149d94e46e93c8..a77b0d16a83ca4 100644 --- a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json +++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json @@ -9,6 +9,30 @@ "removed": false } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:deprecated" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpGroup:DataHub", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, { "com.linkedin.pegasus2avro.ml.metadata.MLPrimaryKeyProperties": { "customProperties": {}, @@ -38,6 +62,15 @@ "removed": false } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:needs_documentation" + } + ] + } + }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureProperties": { "customProperties": {}, @@ -161,6 +194,30 @@ "removed": false } }, + { + "com.linkedin.pegasus2avro.common.GlobalTags": { + "tags": [ + { + "tag": "urn:li:tag:deprecated" + } + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Ownership": { + "owners": [ + { + "owner": "urn:li:corpGroup:DataHub", + "type": "TECHNICAL_OWNER" + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + } + } + }, { "com.linkedin.pegasus2avro.ml.metadata.MLFeatureTableProperties": { "customProperties": {}, @@ -317,5 +374,37 @@ "runId": "feast-repository-test", "lastRunId": "no-run-id-provided" } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:deprecated", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "deprecated" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:needs_documentation", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "needs_documentation" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "feast-repository-test", + "lastRunId": "no-run-id-provided" + } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py index f6cb12414bd5d1..c23cebc672fad6 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/features.py +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -19,7 +19,7 @@ join_keys=["driver_id"], value_type=ValueType.INT64, description="Driver ID", - owner="Datahub", + owner="DataHub", tags={"name": "deprecated"}, ) @@ -52,7 +52,7 @@ online=True, source=driver_hourly_stats_source, tags={"name": "deprecated"}, - owner="Datahub", + owner="DataHub", ) input_request = RequestSource( diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py index a6bdce67222896..d04d710f0ab876 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -19,6 +19,13 @@ def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): "config": { "path": str(test_resources_dir / "feature_store"), "environment": "PROD", + "owner_mappings": [ + { + "feast_owner_name": "DataHub", + "datahub_owner_urn": "urn:li:corpGroup:DataHub", + "ownership_type": "TECHNICAL_OWNER" + } + ], }, }, "sink": { From d73a4d7220946e71d45945655048a8d373907237 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 26 Nov 2024 15:05:19 +0000 Subject: [PATCH 17/23] feat(ingestion): solve discussions --- .../src/datahub/ingestion/source/feast.py | 91 ++++++++++++------- .../feast/feast_repository_mces_golden.json | 8 +- .../feast/feature_store/features.py | 4 +- .../feast/test_feast_repository.py | 12 +-- 4 files changed, 71 insertions(+), 44 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 8baa53bb43d6c0..bd4b239a996924 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -96,8 +96,15 @@ class FeastRepositorySourceConfig(ConfigModel): environment: str = Field( default=DEFAULT_ENV, description="Environment to use when constructing URNs" ) - owner_mappings: List[Dict[str, str]] = Field( - default={}, description="Mapping of owner names to owner types" + # owner_mappings example: + # This must be added to the recipe in order to extract owners, otherwise NO owners will be extracted + # owner_mappings: + # - feast_owner_name: "" + # datahub_owner_urn: "urn:li:corpGroup:" + # datahub_ownership_type: "BUSINESS_OWNER" + owner_mappings: Optional[List[Dict[str, str]]] = Field( + default=None, + description="Mapping of owner names to owner types" ) @@ -134,7 +141,7 @@ def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext): ) def _get_field_type( - self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str + self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str ) -> str: """ Maps types encountered in Feast to corresponding schema types. @@ -216,14 +223,14 @@ def _get_data_sources(self, feature_view: FeatureView) -> List[str]: return sources def _get_entity_workunit( - self, feature_view: FeatureView, entity: Entity + self, feature_view: FeatureView, entity: Entity ) -> MetadataWorkUnit: """ Generate an MLPrimaryKey work unit for a Feast entity. """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - aspects = [StatusClass(removed=False)] + self._get_tags_and_owners(entity) + aspects = [StatusClass(removed=False)] + self._get_tags(entity) + self._get_owners(entity) entity_snapshot = MLPrimaryKeySnapshot( urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), @@ -243,16 +250,16 @@ def _get_entity_workunit( return MetadataWorkUnit(id=entity.name, mce=mce) def _get_feature_workunit( - self, - # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type - feature_view: Union[FeatureView, OnDemandFeatureView], - field: FeastField, + self, + # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type + feature_view: Union[FeatureView, OnDemandFeatureView], + field: FeastField, ) -> MetadataWorkUnit: """ Generate an MLFeature work unit for a Feast feature. """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - aspects = [StatusClass(removed=False)] + self._get_tags_and_owners(field) + aspects = [StatusClass(removed=False)] + self._get_tags(field) feature_snapshot = MLFeatureSnapshot( urn=builder.make_ml_feature_urn(feature_view_name, field.name), @@ -279,7 +286,7 @@ def _get_feature_workunit( if feature_view.source_feature_view_projections is not None: for ( - feature_view_projection + feature_view_projection ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name @@ -306,9 +313,9 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU feature_view_name = f"{self.feature_store.project}.{feature_view.name}" aspects = [ - BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), - StatusClass(removed=False), - ] + self._get_tags_and_owners(feature_view) + BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), + StatusClass(removed=False), + ] + self._get_tags(feature_view) + self._get_owners(feature_view) feature_view_snapshot = MLFeatureTableSnapshot( urn=builder.make_ml_feature_table_urn("feast", feature_view_name), @@ -336,7 +343,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU return MetadataWorkUnit(id=feature_view_name, mce=mce) def _get_on_demand_feature_view_workunit( - self, on_demand_feature_view: OnDemandFeatureView + self, on_demand_feature_view: OnDemandFeatureView ) -> MetadataWorkUnit: """ Generate an MLFeatureTable work unit for a Feast on-demand feature view. @@ -371,37 +378,59 @@ def _get_on_demand_feature_view_workunit( return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce) - def _get_tags_and_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: + # If a tag is specified in a Feast object, then the tag will be ingested into Datahub + def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list: """ - Extracts tags and owners from the given object and returns a list of aspects. + Extracts tags from the given object and returns a list of aspects. """ - aspects: List[Union[GlobalTagsClass, OwnershipClass]] = [] + aspects: List[Union[GlobalTagsClass]] = [] # Extract tags - tag_name = obj.tags.get("name") if obj.tags else None - if tag_name: + if obj.tags.get("name"): + tag_name = obj.tags.get("name") tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag_name)) global_tags_aspect = GlobalTagsClass(tags=[tag_association]) aspects.append(global_tags_aspect) + return aspects + + # If a owner is specified in a Feast object, it will only be ingested into Datahub if owner_mapping is specified, + # otherwise NO owners will be ingested + def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: + """ + Extracts owners from the given object and returns a list of aspects. + """ + aspects: List[Union[OwnershipClass]] = [] + # Extract owner owner = getattr(obj, "owner", None) if owner: + # Create owner association, skipping if None owner_association = self._create_owner_association(owner) - owners_aspect = OwnershipClass(owners=[owner_association]) - aspects.append(owners_aspect) + if owner_association: # Only add valid owner associations + owners_aspect = OwnershipClass(owners=[owner_association]) + aspects.append(owners_aspect) return aspects - def _create_owner_association(self, owner: str) -> OwnerClass: - - for mapping in self.source_config.owner_mappings: - if mapping["feast_owner_name"] == owner: - ownership_type_class: OwnershipTypeClass = mapping["ownership_type"] - return OwnerClass( - owner=mapping["datahub_owner_urn"], - type=ownership_type_class, - ) + def _create_owner_association(self, owner: str) -> Optional[OwnerClass]: + """ + Create an OwnerClass instance for the given owner using the owner mappings. + """ + if self.source_config.owner_mappings is not None: + for mapping in self.source_config.owner_mappings: + # Match the provided Feast owner name + if mapping["feast_owner_name"] == owner: + ownership_type_class: Optional[OwnershipTypeClass] = mapping.get( + "datahub_ownership_type", "TECHNICAL_OWNER" + ) + datahub_owner_urn = mapping.get("datahub_owner_urn") + return OwnerClass( + owner=datahub_owner_urn, + type=ownership_type_class, + ) + # Return None if no matching mapping is found + return None @classmethod def create(cls, config_dict, ctx): diff --git a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json index a77b0d16a83ca4..a4fd9843c5cf49 100644 --- a/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json +++ b/metadata-ingestion/tests/integration/feast/feast_repository_mces_golden.json @@ -22,8 +22,8 @@ "com.linkedin.pegasus2avro.common.Ownership": { "owners": [ { - "owner": "urn:li:corpGroup:DataHub", - "type": "TECHNICAL_OWNER" + "owner": "urn:li:corpGroup:MOCK_OWNER", + "type": "BUSINESS_OWNER" } ], "ownerTypes": {}, @@ -207,8 +207,8 @@ "com.linkedin.pegasus2avro.common.Ownership": { "owners": [ { - "owner": "urn:li:corpGroup:DataHub", - "type": "TECHNICAL_OWNER" + "owner": "urn:li:corpGroup:MOCK_OWNER", + "type": "BUSINESS_OWNER" } ], "ownerTypes": {}, diff --git a/metadata-ingestion/tests/integration/feast/feature_store/features.py b/metadata-ingestion/tests/integration/feast/feature_store/features.py index c23cebc672fad6..dcfd417637958c 100644 --- a/metadata-ingestion/tests/integration/feast/feature_store/features.py +++ b/metadata-ingestion/tests/integration/feast/feature_store/features.py @@ -19,7 +19,7 @@ join_keys=["driver_id"], value_type=ValueType.INT64, description="Driver ID", - owner="DataHub", + owner="MOCK_OWNER", tags={"name": "deprecated"}, ) @@ -52,7 +52,7 @@ online=True, source=driver_hourly_stats_source, tags={"name": "deprecated"}, - owner="DataHub", + owner="MOCK_OWNER", ) input_request = RequestSource( diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py index d04d710f0ab876..ecda3271b308da 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -19,13 +19,11 @@ def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): "config": { "path": str(test_resources_dir / "feature_store"), "environment": "PROD", - "owner_mappings": [ - { - "feast_owner_name": "DataHub", - "datahub_owner_urn": "urn:li:corpGroup:DataHub", - "ownership_type": "TECHNICAL_OWNER" - } - ], + "owner_mappings": [{ + "feast_owner_name": "MOCK_OWNER", + "datahub_owner_urn": "urn:li:corpGroup:MOCK_OWNER", + "datahub_ownership_type": "BUSINESS_OWNER", + }] }, }, "sink": { From 22afe012a8ebac55054659601d7577794513c4d3 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 26 Nov 2024 15:35:56 +0000 Subject: [PATCH 18/23] feat(ingestion): solve discussions --- metadata-ingestion/src/datahub/ingestion/source/feast.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index bd4b239a996924..a81a40c00fd42a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -394,8 +394,8 @@ def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list: return aspects - # If a owner is specified in a Feast object, it will only be ingested into Datahub if owner_mapping is specified, - # otherwise NO owners will be ingested + # If an owner is specified in a Feast object, it will only be ingested into Datahub if owner_mappings is specified + # in FeastRepositorySourceConfig, otherwise NO owners will be ingested def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: """ Extracts owners from the given object and returns a list of aspects. From 069300055a0cdb6e9f59aed94d8b05938b8b1160 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 26 Nov 2024 16:20:18 +0000 Subject: [PATCH 19/23] feat(ingestion): fmt --- .../src/datahub/ingestion/source/feast.py | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index a81a40c00fd42a..6167a4ec8dd655 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union +from typing import Dict, Iterable, List, Optional, Tuple, Union import feast.types from feast import ( @@ -103,8 +103,7 @@ class FeastRepositorySourceConfig(ConfigModel): # datahub_owner_urn: "urn:li:corpGroup:" # datahub_ownership_type: "BUSINESS_OWNER" owner_mappings: Optional[List[Dict[str, str]]] = Field( - default=None, - description="Mapping of owner names to owner types" + default=None, description="Mapping of owner names to owner types" ) @@ -141,7 +140,7 @@ def __init__(self, config: FeastRepositorySourceConfig, ctx: PipelineContext): ) def _get_field_type( - self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str + self, field_type: Union[ValueType, feast.types.FeastType], parent_name: str ) -> str: """ Maps types encountered in Feast to corresponding schema types. @@ -223,14 +222,18 @@ def _get_data_sources(self, feature_view: FeatureView) -> List[str]: return sources def _get_entity_workunit( - self, feature_view: FeatureView, entity: Entity + self, feature_view: FeatureView, entity: Entity ) -> MetadataWorkUnit: """ Generate an MLPrimaryKey work unit for a Feast entity. """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - aspects = [StatusClass(removed=False)] + self._get_tags(entity) + self._get_owners(entity) + aspects = ( + [StatusClass(removed=False)] + + self._get_tags(entity) + + self._get_owners(entity) + ) entity_snapshot = MLPrimaryKeySnapshot( urn=builder.make_ml_primary_key_urn(feature_view_name, entity.name), @@ -250,10 +253,10 @@ def _get_entity_workunit( return MetadataWorkUnit(id=entity.name, mce=mce) def _get_feature_workunit( - self, - # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type - feature_view: Union[FeatureView, OnDemandFeatureView], - field: FeastField, + self, + # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type + feature_view: Union[FeatureView, OnDemandFeatureView], + field: FeastField, ) -> MetadataWorkUnit: """ Generate an MLFeature work unit for a Feast feature. @@ -286,7 +289,7 @@ def _get_feature_workunit( if feature_view.source_feature_view_projections is not None: for ( - feature_view_projection + feature_view_projection ) in feature_view.source_feature_view_projections.values(): feature_view_source = self.feature_store.get_feature_view( feature_view_projection.name @@ -312,10 +315,14 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU """ feature_view_name = f"{self.feature_store.project}.{feature_view.name}" - aspects = [ - BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), - StatusClass(removed=False), - ] + self._get_tags(feature_view) + self._get_owners(feature_view) + aspects = ( + [ + BrowsePathsClass(paths=[f"/feast/{self.feature_store.project}"]), + StatusClass(removed=False), + ] + + self._get_tags(feature_view) + + self._get_owners(feature_view) + ) feature_view_snapshot = MLFeatureTableSnapshot( urn=builder.make_ml_feature_table_urn("feast", feature_view_name), @@ -343,7 +350,7 @@ def _get_feature_view_workunit(self, feature_view: FeatureView) -> MetadataWorkU return MetadataWorkUnit(id=feature_view_name, mce=mce) def _get_on_demand_feature_view_workunit( - self, on_demand_feature_view: OnDemandFeatureView + self, on_demand_feature_view: OnDemandFeatureView ) -> MetadataWorkUnit: """ Generate an MLFeatureTable work unit for a Feast on-demand feature view. From a851ad8f0e04c3eb1df01b851347f25e17960b5c Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Tue, 26 Nov 2024 16:20:47 +0000 Subject: [PATCH 20/23] feat(ingestion): fmt --- .../tests/integration/feast/test_feast_repository.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py index ecda3271b308da..52d779a6c79f7e 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -19,11 +19,13 @@ def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): "config": { "path": str(test_resources_dir / "feature_store"), "environment": "PROD", - "owner_mappings": [{ - "feast_owner_name": "MOCK_OWNER", - "datahub_owner_urn": "urn:li:corpGroup:MOCK_OWNER", - "datahub_ownership_type": "BUSINESS_OWNER", - }] + "owner_mappings": [ + { + "feast_owner_name": "MOCK_OWNER", + "datahub_owner_urn": "urn:li:corpGroup:MOCK_OWNER", + "datahub_ownership_type": "BUSINESS_OWNER", + } + ], }, }, "sink": { From fcfb79deb38b9f19e17f4619a4d1572ef78dda74 Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Wed, 27 Nov 2024 17:58:52 +0000 Subject: [PATCH 21/23] feat(ingestion): add flags enable_owner_extraction and enable_tag_extraction --- .../src/datahub/ingestion/source/feast.py | 42 ++++++++++++------- .../feast/test_feast_repository.py | 2 + 2 files changed, 30 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index 6167a4ec8dd655..e91945f87dab19 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -105,6 +105,15 @@ class FeastRepositorySourceConfig(ConfigModel): owner_mappings: Optional[List[Dict[str, str]]] = Field( default=None, description="Mapping of owner names to owner types" ) + enable_owner_extraction: bool = Field( + default=False, + description="If this is disabled, then we NEVER try to map owners. " + "If this is enabled, then owner_mappings is REQUIRED to extract ownership.", + ) + enable_tag_extraction: bool = Field( + default=False, + description="If this is disabled, then we NEVER try to extract tags.", + ) @platform_name("Feast") @@ -385,7 +394,8 @@ def _get_on_demand_feature_view_workunit( return MetadataWorkUnit(id=on_demand_feature_view_name, mce=mce) - # If a tag is specified in a Feast object, then the tag will be ingested into Datahub + # If a tag is specified in a Feast object, then the tag will be ingested into Datahub if enable_tag_extraction is + # True, otherwise NO tags will be ingested def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list: """ Extracts tags from the given object and returns a list of aspects. @@ -393,16 +403,19 @@ def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list: aspects: List[Union[GlobalTagsClass]] = [] # Extract tags - if obj.tags.get("name"): - tag_name = obj.tags.get("name") - tag_association = TagAssociationClass(tag=builder.make_tag_urn(tag_name)) - global_tags_aspect = GlobalTagsClass(tags=[tag_association]) - aspects.append(global_tags_aspect) + if self.source_config.enable_tag_extraction: + if obj.tags.get("name"): + tag_name = obj.tags.get("name") + tag_association = TagAssociationClass( + tag=builder.make_tag_urn(tag_name) + ) + global_tags_aspect = GlobalTagsClass(tags=[tag_association]) + aspects.append(global_tags_aspect) return aspects # If an owner is specified in a Feast object, it will only be ingested into Datahub if owner_mappings is specified - # in FeastRepositorySourceConfig, otherwise NO owners will be ingested + # and enable_owner_extraction is True in FeastRepositorySourceConfig, otherwise NO owners will be ingested def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: """ Extracts owners from the given object and returns a list of aspects. @@ -410,13 +423,14 @@ def _get_owners(self, obj: Union[Entity, FeatureView, FeastField]) -> list: aspects: List[Union[OwnershipClass]] = [] # Extract owner - owner = getattr(obj, "owner", None) - if owner: - # Create owner association, skipping if None - owner_association = self._create_owner_association(owner) - if owner_association: # Only add valid owner associations - owners_aspect = OwnershipClass(owners=[owner_association]) - aspects.append(owners_aspect) + if self.source_config.enable_owner_extraction: + owner = getattr(obj, "owner", None) + if owner: + # Create owner association, skipping if None + owner_association = self._create_owner_association(owner) + if owner_association: # Only add valid owner associations + owners_aspect = OwnershipClass(owners=[owner_association]) + aspects.append(owners_aspect) return aspects diff --git a/metadata-ingestion/tests/integration/feast/test_feast_repository.py b/metadata-ingestion/tests/integration/feast/test_feast_repository.py index 52d779a6c79f7e..7f04337145dc36 100644 --- a/metadata-ingestion/tests/integration/feast/test_feast_repository.py +++ b/metadata-ingestion/tests/integration/feast/test_feast_repository.py @@ -19,6 +19,8 @@ def test_feast_repository_ingest(pytestconfig, tmp_path, mock_time): "config": { "path": str(test_resources_dir / "feature_store"), "environment": "PROD", + "enable_tag_extraction": True, + "enable_owner_extraction": True, "owner_mappings": [ { "feast_owner_name": "MOCK_OWNER", From 2da5e2a6ec17a61fb2a1cb2986c8dd685e9b04da Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Wed, 27 Nov 2024 22:05:19 +0000 Subject: [PATCH 22/23] fix(ingestion): static tests --- .../src/datahub/ingestion/source/feast.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index e91945f87dab19..6330fe0291660d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -48,7 +48,6 @@ MLPrimaryKeyPropertiesClass, OwnerClass, OwnershipClass, - OwnershipTypeClass, StatusClass, TagAssociationClass, ) @@ -405,7 +404,7 @@ def _get_tags(self, obj: Union[Entity, FeatureView, FeastField]) -> list: # Extract tags if self.source_config.enable_tag_extraction: if obj.tags.get("name"): - tag_name = obj.tags.get("name") + tag_name: str = obj.tags["name"] tag_association = TagAssociationClass( tag=builder.make_tag_urn(tag_name) ) @@ -440,17 +439,16 @@ def _create_owner_association(self, owner: str) -> Optional[OwnerClass]: """ if self.source_config.owner_mappings is not None: for mapping in self.source_config.owner_mappings: - # Match the provided Feast owner name if mapping["feast_owner_name"] == owner: - ownership_type_class: Optional[OwnershipTypeClass] = mapping.get( + ownership_type_class: str = mapping.get( "datahub_ownership_type", "TECHNICAL_OWNER" ) datahub_owner_urn = mapping.get("datahub_owner_urn") - return OwnerClass( - owner=datahub_owner_urn, - type=ownership_type_class, - ) - # Return None if no matching mapping is found + if datahub_owner_urn: + return OwnerClass( + owner=datahub_owner_urn, + type=ownership_type_class, + ) return None @classmethod From 63d67d94c8c38c7c4a71e913c3ef7f82bfbd4def Mon Sep 17 00:00:00 2001 From: Margarida Fernandes Date: Wed, 27 Nov 2024 22:45:34 +0000 Subject: [PATCH 23/23] feat(ingestion): add registry.db --- .../feast/feature_store/data/registry.db | Bin 2715 -> 2991 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db b/metadata-ingestion/tests/integration/feast/feature_store/data/registry.db index a511ff56c97705ff3e98d4b90b756e67523d6af5..5dca29d92afe53ffd713ad01267c58f844037f48 100644 GIT binary patch delta 813 zcmbO&x?X&OuI&EC>ATnjE*zTnib;rv1Bu5~#1%BrOS#@jh>ImJF*j9+Dq_ikCzPKd6C^d-7*WcMY-ap*WHAqN?i-+URF_57%n!X-k1R9FOlVUVf zQb{s2wM;WlOw~0?Of%6nF*P#PwKPvO)-_EsvM@DDHZ`+MO3~usII({o&=Bs9AAK_! zjlME-U16Raz$`ZLt(3M47iV&QURit*&@BS2QqoWd3g@M!rWD7g>bV3!f0u@f)$B-|xnkyZQNgRh=>ogVMHbl`2${GmWq_ikCzPKd6C{;*? zi-)6kHAvHs!;_CO12rM>q!MRZ^3|FjTy2JrAOoDATO9RVPfyp~q>}9!}n*?L^GxBp&^;46Jlk|&=lJ)Z^ zzvXnA9K)u<<8AHC!NASIB*G}dH2ET%qK~kjU&)l9DYZb@!yg}?n3tCi471GqyyEzH zpqNGme~(~6VqQvOv0gGT?)1`%5_40hWbjU&(!*Ai3XIgeDH)2T#cflPGQ>CAvHLK| zK@z})1nnhw#l6i`uB>?}5(l7u3