From a5b62b3b3e89699735d428119492b2828a26b641 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Tue, 18 Jan 2022 17:45:36 -0800 Subject: [PATCH 01/17] Compare Python objects instead of proto objects Signed-off-by: Felix Wang --- sdk/python/feast/diff/FcoDiff.py | 57 +++--- sdk/python/feast/feature_store.py | 68 +------ sdk/python/feast/registry.py | 188 ++++++++++++++++---- sdk/python/feast/repo_operations.py | 107 ++++------- sdk/python/tests/unit/diff/test_fco_diff.py | 4 +- 5 files changed, 225 insertions(+), 199 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index b85897019f..da70ce71d4 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -17,22 +17,15 @@ RequestFeatureView as RequestFeatureViewProto, ) -FcoProto = TypeVar( - "FcoProto", - EntityProto, - FeatureViewProto, - FeatureServiceProto, - OnDemandFeatureViewProto, - RequestFeatureViewProto, -) +Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) @dataclass -class FcoDiff(Generic[FcoProto]): +class FcoDiff(Generic[Fco]): name: str fco_type: str - current_fco: FcoProto - new_fco: FcoProto + current_fco: Fco + new_fco: Fco fco_property_diffs: List[PropertyDiff] transition_type: TransitionType @@ -48,20 +41,28 @@ def add_fco_diff(self, fco_diff: FcoDiff): self.fco_diffs.append(fco_diff) -Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) - - -def tag_objects_for_keep_delete_add( +def tag_objects_for_keep_delete_update_add( existing_objs: Iterable[Fco], desired_objs: Iterable[Fco] -) -> Tuple[Set[Fco], Set[Fco], Set[Fco]]: +) -> Tuple[Set[Fco], Set[Fco], Set[Fco], Set[Fco]]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names} - objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names} + objs_to_update = {e for e in desired_objs if e.name in existing_obj_names} + objs_to_keep = {e for e in existing_objs if e.name in desired_obj_names} objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names} - return objs_to_keep, objs_to_delete, objs_to_add + return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add + + +FcoProto = TypeVar( + "FcoProto", + EntityProto, + FeatureViewProto, + FeatureServiceProto, + OnDemandFeatureViewProto, + RequestFeatureViewProto, +) def tag_proto_objects_for_keep_delete_add( @@ -80,23 +81,27 @@ def tag_proto_objects_for_keep_delete_add( FIELDS_TO_IGNORE = {"project"} -def diff_between(current: FcoProto, new: FcoProto, object_type: str) -> FcoDiff: - assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name +def diff_between(current: Fco, new: Fco, object_type: str) -> FcoDiff: + current_proto = current.to_proto() + new_proto = new.to_proto() + assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name property_diffs = [] transition: TransitionType = TransitionType.UNCHANGED - if current.spec != new.spec: - for _field in current.spec.DESCRIPTOR.fields: + if current_proto.spec != new_proto.spec: + for _field in current_proto.spec.DESCRIPTOR.fields: if _field.name in FIELDS_TO_IGNORE: continue - if getattr(current.spec, _field.name) != getattr(new.spec, _field.name): + if getattr(current_proto.spec, _field.name) != getattr( + new_proto.spec, _field.name + ): transition = TransitionType.UPDATE property_diffs.append( PropertyDiff( _field.name, - getattr(current.spec, _field.name), - getattr(new.spec, _field.name), + getattr(current_proto.spec, _field.name), + getattr(new_proto.spec, _field.name), ) ) return FcoDiff( - new.spec.name, object_type, current, new, property_diffs, transition, + new_proto.spec.name, object_type, current, new, property_diffs, transition, ) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 39273b56c2..996f96c78c 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -24,7 +24,6 @@ Iterable, List, Mapping, - NamedTuple, Optional, Sequence, Set, @@ -68,14 +67,13 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto -from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, GetOnlineFeaturesResponse, ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value -from feast.registry import Registry +from feast.registry import Registry, RepoContents from feast.repo_config import RepoConfig, load_repo_config from feast.request_feature_view import RequestFeatureView from feast.type_map import python_values_to_proto_values @@ -86,31 +84,6 @@ warnings.simplefilter("once", DeprecationWarning) -class RepoContents(NamedTuple): - feature_views: Set[FeatureView] - on_demand_feature_views: Set[OnDemandFeatureView] - request_feature_views: Set[RequestFeatureView] - entities: Set[Entity] - feature_services: Set[FeatureService] - - def to_registry_proto(self) -> RegistryProto: - registry_proto = RegistryProto() - registry_proto.entities.extend([e.to_proto() for e in self.entities]) - registry_proto.feature_views.extend( - [fv.to_proto() for fv in self.feature_views] - ) - registry_proto.on_demand_feature_views.extend( - [fv.to_proto() for fv in self.on_demand_feature_views] - ) - registry_proto.request_feature_views.extend( - [fv.to_proto() for fv in self.request_feature_views] - ) - registry_proto.feature_services.extend( - [fs.to_proto() for fs in self.feature_services] - ) - return registry_proto - - class FeatureStore: """ A FeatureStore object is used to define, create, and retrieve features. @@ -415,7 +388,7 @@ def _get_features( @log_exceptions_and_usage def plan( - self, desired_repo_objects: RepoContents + self, desired_repo_contents: RepoContents ) -> Tuple[RegistryDiff, InfraDiff]: """Dry-run registering objects to metadata store. @@ -453,16 +426,9 @@ def plan( ... ) >>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view """ - - current_registry_proto = ( - self._registry.cached_registry_proto.__deepcopy__() - if self._registry.cached_registry_proto - else RegistryProto() - ) - - desired_registry_proto = desired_repo_objects.to_registry_proto() + current_repo_contents = self._registry.to_repo_contents(project=self.project) registry_diff = Registry.diff_between( - current_registry_proto, desired_registry_proto + current_repo_contents, desired_repo_contents ) current_infra_proto = ( @@ -470,6 +436,7 @@ def plan( if self._registry.cached_registry_proto else InfraProto() ) + desired_registry_proto = desired_repo_contents.to_registry_proto() new_infra_proto = self._provider.plan_infra( self.config, desired_registry_proto ).to_proto() @@ -554,11 +521,7 @@ def apply( if not objects_to_delete: objects_to_delete = [] - current_registry_proto = ( - self._registry.cached_registry_proto.__deepcopy__() - if self._registry.cached_registry_proto - else RegistryProto() - ) + current_repo_contents = self._registry.to_repo_contents(project=self.project) # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] @@ -657,22 +620,6 @@ def apply( service.name, project=self.project, commit=False ) - new_registry_proto = ( - self._registry.cached_registry_proto - if self._registry.cached_registry_proto - else RegistryProto() - ) - - diffs = Registry.diff_between(current_registry_proto, new_registry_proto) - - entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] - views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] - - entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)] - views_to_delete = [ - ob for ob in objects_to_delete if isinstance(ob, FeatureView) - ] - self._get_provider().update_infra( project=self.project, tables_to_delete=views_to_delete if not partial else [], @@ -684,7 +631,8 @@ def apply( self._registry.commit() - return diffs + new_repo_contents = self._registry.to_repo_contents(project=self.project) + return Registry.diff_between(current_repo_contents, new_repo_contents) @log_exceptions_and_usage def teardown(self): diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 0c058a0d46..60e9edb026 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -16,7 +16,7 @@ from datetime import datetime, timedelta from pathlib import Path from threading import Lock -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, NamedTuple, Optional, Set from urllib.parse import urlparse from google.protobuf.internal.containers import RepeatedCompositeFieldContainer @@ -29,7 +29,7 @@ RegistryDiff, TransitionType, diff_between, - tag_proto_objects_for_keep_delete_add, + tag_objects_for_keep_delete_update_add, ) from feast.entity import Entity from feast.errors import ( @@ -65,9 +65,109 @@ "": "LocalRegistryStore", } +REGISTRY_OBJECT_TYPE_TO_STR = { + "entities": "entity", + "feature_views": "feature view", + "on_demand_feature_views": "on demand feature view", + "request_feature_views": "request feature view", + "feature_services": "feature service", +} + +REGISTRY_OBJECT_TYPES = REGISTRY_OBJECT_TYPE_TO_STR.keys() + logger = logging.getLogger(__name__) +class RepoContents(NamedTuple): + feature_views: Set[FeatureView] + on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] + entities: Set[Entity] + feature_services: Set[FeatureService] + + def to_registry_proto(self) -> RegistryProto: + registry_proto = RegistryProto() + registry_proto.entities.extend([e.to_proto() for e in self.entities]) + registry_proto.feature_views.extend( + [fv.to_proto() for fv in self.feature_views] + ) + registry_proto.on_demand_feature_views.extend( + [fv.to_proto() for fv in self.on_demand_feature_views] + ) + registry_proto.request_feature_views.extend( + [fv.to_proto() for fv in self.request_feature_views] + ) + registry_proto.feature_services.extend( + [fs.to_proto() for fs in self.feature_services] + ) + return registry_proto + + @classmethod + def from_registry_proto(cls, project: str, registry_proto: RegistryProto): + repo_contents = cls( + entities=set(), + feature_views=set(), + feature_services=set(), + on_demand_feature_views=set(), + request_feature_views=set(), + ) + + for feature_view_proto in registry_proto.feature_views: + if feature_view_proto.spec.project == project: + repo_contents.feature_views.add( + FeatureView.from_proto(feature_view_proto) + ) + for on_demand_feature_view_proto in registry_proto.on_demand_feature_views: + if on_demand_feature_view_proto.spec.project == project: + repo_contents.on_demand_feature_views.add( + OnDemandFeatureView.from_proto(on_demand_feature_view_proto) + ) + for request_feature_view_proto in registry_proto.request_feature_views: + if request_feature_view_proto.spec.project == project: + repo_contents.request_feature_views.add( + RequestFeatureView.from_proto(request_feature_view_proto) + ) + for entity_proto in registry_proto.entities: + if entity_proto.spec.project == project: + repo_contents.entities.add(Entity.from_proto(entity_proto)) + for feature_service_proto in registry_proto.feature_services: + if feature_service_proto.spec.project == project: + repo_contents.feature_services.add( + FeatureService.from_proto(feature_service_proto) + ) + + return repo_contents + + +def extract_objects_for_keep_delete_update_add( + current_repo_contents: RepoContents, new_repo_contents: RepoContents +): + """ + Extracts the objects to be kept, deleted, updated, and added to achieve the desired end state. + + Args: + current_repo_contents: The current repo state. + new_repo_contents: The desired repo state. + """ + objs_to_keep = {} + objs_to_delete = {} + objs_to_update = {} + objs_to_add = {} + + for object_type in REGISTRY_OBJECT_TYPES: + to_keep, to_delete, to_update, to_add = tag_objects_for_keep_delete_update_add( + getattr(current_repo_contents, object_type), + getattr(new_repo_contents, object_type), + ) + + objs_to_keep[object_type] = to_keep + objs_to_delete[object_type] = to_delete + objs_to_update[object_type] = to_update + objs_to_add[object_type] = to_add + + return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add + + def get_registry_store_class_from_type(registry_store_type: str): if not registry_store_type.endswith("RegistryStore"): raise Exception('Registry store class name should end with "RegistryStore"') @@ -143,44 +243,60 @@ def clone(self) -> "Registry": new_registry._registry_store = NoopRegistryStore() return new_registry + def to_repo_contents(self, project: str) -> RepoContents: + """ + Convert the contents of the registry for the given project into a RepoContents object. + + Args: + project: The Feast project to be converted. + """ + return RepoContents( + entities=set(self.list_entities(project=project)), + feature_views=set(self.list_feature_views(project=project)), + request_feature_views=set(self.list_request_feature_views(project=project)), + on_demand_feature_views=set( + self.list_on_demand_feature_views(project=project) + ), + feature_services=set(self.list_feature_services(project=project)), + ) + # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. @staticmethod def diff_between( - current_registry: RegistryProto, new_registry: RegistryProto + current_registry_contents: RepoContents, new_registry_contents: RepoContents ) -> RegistryDiff: + """ + Computes the difference between the two repos. + + Args: + current_registry_contents: The current repo. + new_registry_contents: The new repo. + + Returns: + A RegistryDiff object containing the difference between the two repos. + """ diff = RegistryDiff() - attribute_to_object_type_str = { - "entities": "entity", - "feature_views": "feature view", - "feature_tables": "feature table", - "on_demand_feature_views": "on demand feature view", - "request_feature_views": "request feature view", - "feature_services": "feature service", - } + ( + objs_to_keep, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add( + current_registry_contents, new_registry_contents + ) - for object_type in [ - "entities", - "feature_views", - "feature_tables", - "on_demand_feature_views", - "request_feature_views", - "feature_services", - ]: - ( - objects_to_keep, - objects_to_delete, - objects_to_add, - ) = tag_proto_objects_for_keep_delete_add( - getattr(current_registry, object_type), - getattr(new_registry, object_type), - ) + for object_type in REGISTRY_OBJECT_TYPES: + objects_to_keep = objs_to_keep[object_type] + objects_to_delete = objs_to_delete[object_type] + objects_to_update = objs_to_update[object_type] + objects_to_add = objs_to_add[object_type] for e in objects_to_add: diff.add_fco_diff( FcoDiff( - e.spec.name, - attribute_to_object_type_str[object_type], + e.name, + REGISTRY_OBJECT_TYPE_TO_STR[object_type], None, e, [], @@ -190,23 +306,19 @@ def diff_between( for e in objects_to_delete: diff.add_fco_diff( FcoDiff( - e.spec.name, - attribute_to_object_type_str[object_type], + e.name, + REGISTRY_OBJECT_TYPE_TO_STR[object_type], e, None, [], TransitionType.DELETE, ) ) - for e in objects_to_keep: - current_obj_proto = [ - _e - for _e in getattr(current_registry, object_type) - if _e.spec.name == e.spec.name - ][0] + for e in objects_to_update: + current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0] diff.add_fco_diff( diff_between( - current_obj_proto, e, attribute_to_object_type_str[object_type] + current_obj, e, REGISTRY_OBJECT_TYPE_TO_STR[object_type] ) ) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 0638ca589a..a3be0110a7 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,20 +6,24 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, Set, Union, cast +from typing import List, Set, Union import click from click.exceptions import BadParameter -from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add +from feast.diff.FcoDiff import TransitionType from feast.entity import Entity from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore, RepoContents +from feast.feature_store import FeatureStore from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import Registry +from feast.registry import ( + REGISTRY_OBJECT_TYPES, + Registry, + RepoContents, + extract_objects_for_keep_delete_update_add, +) from feast.repo_config import RepoConfig from feast.request_feature_view import RequestFeatureView from feast.usage import log_exceptions_and_usage @@ -160,81 +164,38 @@ def _prepare_registry_and_repo(repo_config, repo_path): def extract_objects_for_apply_delete(project, registry, repo): - ( - entities_to_keep, - entities_to_delete, - entities_to_add, - ) = tag_objects_for_keep_delete_add( - set(registry.list_entities(project=project)), repo.entities - ) # TODO(achals): This code path should be refactored to handle added & kept entities separately. - entities_to_keep = set(entities_to_keep).union(entities_to_add) - views = tag_objects_for_keep_delete_add( - set(registry.list_feature_views(project=project)), repo.feature_views - ) - views_to_keep, views_to_delete, views_to_add = ( - cast(Set[FeatureView], views[0]), - cast(Set[FeatureView], views[1]), - cast(Set[FeatureView], views[2]), - ) - request_views = tag_objects_for_keep_delete_add( - set(registry.list_request_feature_views(project=project)), - repo.request_feature_views, - ) - request_views_to_keep: Set[RequestFeatureView] - request_views_to_delete: Set[RequestFeatureView] - request_views_to_add: Set[RequestFeatureView] - request_views_to_keep, request_views_to_delete, request_views_to_add = ( - cast(Set[RequestFeatureView], request_views[0]), - cast(Set[RequestFeatureView], request_views[1]), - cast(Set[RequestFeatureView], request_views[2]), - ) - base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { - *views_to_keep, - *views_to_add, - *request_views_to_keep, - *request_views_to_add, - } - base_views_to_delete: Set[Union[RequestFeatureView, FeatureView]] = { - *views_to_delete, - *request_views_to_delete, - } - odfvs = tag_objects_for_keep_delete_add( - set(registry.list_on_demand_feature_views(project=project)), - repo.on_demand_feature_views, - ) - odfvs_to_keep, odfvs_to_delete, odfvs_to_add = ( - cast(Set[OnDemandFeatureView], odfvs[0]), - cast(Set[OnDemandFeatureView], odfvs[1]), - cast(Set[OnDemandFeatureView], odfvs[2]), - ) - odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) + current_repo_contents = registry.to_repo_contents(project=project) ( - services_to_keep, - services_to_delete, - services_to_add, - ) = tag_objects_for_keep_delete_add( - set(registry.list_feature_services(project=project)), repo.feature_services - ) - services_to_keep = services_to_keep.union(services_to_add) - sys.dont_write_bytecode = False - # Apply all changes to the registry and infrastructure. + _, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add(current_repo_contents, repo) + all_to_apply: List[ - Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] + Union[ + Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + ] ] = [] - all_to_apply.extend(entities_to_keep) - all_to_apply.extend(base_views_to_keep) - all_to_apply.extend(services_to_keep) - all_to_apply.extend(odfvs_to_keep) + for object_type in REGISTRY_OBJECT_TYPES: + to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) + all_to_apply.extend(to_apply) + all_to_delete: List[ - Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] + Union[ + Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + ] ] = [] - all_to_delete.extend(entities_to_delete) - all_to_delete.extend(base_views_to_delete) - all_to_delete.extend(services_to_delete) - all_to_delete.extend(odfvs_to_delete) + for object_type in REGISTRY_OBJECT_TYPES: + all_to_delete.extend(objs_to_delete[object_type]) - return all_to_apply, all_to_delete, views_to_delete, views_to_keep + return ( + all_to_apply, + all_to_delete, + set(objs_to_add["feature_views"].union(objs_to_update["feature_views"])), + objs_to_delete["feature_views"], + ) def apply_total_with_repo_instance( diff --git a/sdk/python/tests/unit/diff/test_fco_diff.py b/sdk/python/tests/unit/diff/test_fco_diff.py index 802a6438c3..1b4a3f9b72 100644 --- a/sdk/python/tests/unit/diff/test_fco_diff.py +++ b/sdk/python/tests/unit/diff/test_fco_diff.py @@ -55,14 +55,14 @@ def test_diff_between_feature_views(simple_dataset_1): batch_source=file_source, ttl=None, tags={"when": "before"}, - ).to_proto() + ) post_changed = FeatureView( name="fv2", entities=["id"], batch_source=file_source, ttl=None, tags={"when": "after"}, - ).to_proto() + ) fco_diffs = diff_between(pre_changed, pre_changed, "feature view") assert len(fco_diffs.fco_property_diffs) == 0 From 26ee03dfd6b993b0355df65af8609461e216e72d Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 19 Jan 2022 16:51:42 -0800 Subject: [PATCH 02/17] Remove unnecessary helper method Signed-off-by: Felix Wang --- sdk/python/feast/registry.py | 36 ------------------------------------ 1 file changed, 36 deletions(-) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 60e9edb026..7938410ce6 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -102,42 +102,6 @@ def to_registry_proto(self) -> RegistryProto: ) return registry_proto - @classmethod - def from_registry_proto(cls, project: str, registry_proto: RegistryProto): - repo_contents = cls( - entities=set(), - feature_views=set(), - feature_services=set(), - on_demand_feature_views=set(), - request_feature_views=set(), - ) - - for feature_view_proto in registry_proto.feature_views: - if feature_view_proto.spec.project == project: - repo_contents.feature_views.add( - FeatureView.from_proto(feature_view_proto) - ) - for on_demand_feature_view_proto in registry_proto.on_demand_feature_views: - if on_demand_feature_view_proto.spec.project == project: - repo_contents.on_demand_feature_views.add( - OnDemandFeatureView.from_proto(on_demand_feature_view_proto) - ) - for request_feature_view_proto in registry_proto.request_feature_views: - if request_feature_view_proto.spec.project == project: - repo_contents.request_feature_views.add( - RequestFeatureView.from_proto(request_feature_view_proto) - ) - for entity_proto in registry_proto.entities: - if entity_proto.spec.project == project: - repo_contents.entities.add(Entity.from_proto(entity_proto)) - for feature_service_proto in registry_proto.feature_services: - if feature_service_proto.spec.project == project: - repo_contents.feature_services.add( - FeatureService.from_proto(feature_service_proto) - ) - - return repo_contents - def extract_objects_for_keep_delete_update_add( current_repo_contents: RepoContents, new_repo_contents: RepoContents From b27df77fbb521d31626091c0f08d9bbf7a22a35d Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 19 Jan 2022 17:13:38 -0800 Subject: [PATCH 03/17] Fix docstring test Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 996f96c78c..646734b05e 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -122,6 +122,7 @@ def __init__( registry_config = self.config.get_registry_config() self._registry = Registry(registry_config, repo_path=self.repo_path) + self._registry._initialize_registry() self._provider = get_provider(self.config, self.repo_path) @log_exceptions From c2e3e822c7a4372b9014406632181a62b83ed6a2 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 19 Jan 2022 17:15:51 -0800 Subject: [PATCH 04/17] Add docstring to RepoContents Signed-off-by: Felix Wang --- sdk/python/feast/registry.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 7938410ce6..33b5291192 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -79,6 +79,11 @@ class RepoContents(NamedTuple): + """ + Represents the objects in a Feast feature repo. + + Equivalently, represents the contents of a registry corresponding to a specific Feas project. + """ feature_views: Set[FeatureView] on_demand_feature_views: Set[OnDemandFeatureView] request_feature_views: Set[RequestFeatureView] From f5fa131c770f97a395a684d3cc0e40f809c145e3 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 19 Jan 2022 17:16:08 -0800 Subject: [PATCH 05/17] Lint Signed-off-by: Felix Wang --- sdk/python/feast/registry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 33b5291192..b83f095a35 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -84,6 +84,7 @@ class RepoContents(NamedTuple): Equivalently, represents the contents of a registry corresponding to a specific Feas project. """ + feature_views: Set[FeatureView] on_demand_feature_views: Set[OnDemandFeatureView] request_feature_views: Set[RequestFeatureView] From 2104830a5780cfb19f53970951f7402deefcecd8 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 19 Jan 2022 23:58:30 -0800 Subject: [PATCH 06/17] Update usage test Signed-off-by: Felix Wang --- sdk/python/tests/integration/e2e/test_usage_e2e.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py index f55fbce55c..0bae973063 100644 --- a/sdk/python/tests/integration/e2e/test_usage_e2e.py +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -66,10 +66,16 @@ def test_usage_on(dummy_exporter, enabling_toggle): test_feature_store.apply([entity]) - assert len(dummy_exporter) == 1 + assert len(dummy_exporter) == 3 assert { - "entrypoint": "feast.feature_store.FeatureStore.apply" + "entrypoint": "feast.infra.local.LocalRegistryStore.get_registry_proto" }.items() <= dummy_exporter[0].items() + assert { + "entrypoint": "feast.infra.local.LocalRegistryStore.update_registry_proto" + }.items() <= dummy_exporter[1].items() + assert { + "entrypoint": "feast.feature_store.FeatureStore.apply" + }.items() <= dummy_exporter[2].items() @pytest.mark.integration From e661e0101518393085a5452f483c134fba668ade Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 21 Jan 2022 11:54:01 -0800 Subject: [PATCH 07/17] Set cache ttl to 1 second in tests for local feature server tests Signed-off-by: Felix Wang --- .../tests/integration/feature_repos/repo_configuration.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 63ee4fe7bc..0003a7a8a1 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -12,10 +12,11 @@ import pandas as pd import yaml -from feast import FeatureStore, FeatureView, RepoConfig, driver_test_data +from feast import FeatureStore, FeatureView, driver_test_data from feast.constants import FULL_REPO_CONFIGS_MODULE_ENV_NAME from feast.data_source import DataSource from feast.errors import FeastModuleImportError +from feast.repo_config import RepoConfig, RegistryConfig from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) @@ -286,7 +287,10 @@ def construct_test_environment( else: # Note: even if it's a local feature server, the repo config does not have this configured feature_server = None - registry = str(Path(repo_dir_name) / "registry.db") + registry = RegistryConfig( + path=str(Path(repo_dir_name) / "registry.db"), + cache_ttl_seconds=1, + ) config = RepoConfig( registry=registry, From b29c4cbbb977ba211282b3336b85e0d600e863a8 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 21 Jan 2022 11:54:42 -0800 Subject: [PATCH 08/17] Add FCO test Signed-off-by: Felix Wang --- sdk/python/tests/unit/diff/test_fco_diff.py | 52 ++++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/diff/test_fco_diff.py b/sdk/python/tests/unit/diff/test_fco_diff.py index 1b4a3f9b72..04e2d877da 100644 --- a/sdk/python/tests/unit/diff/test_fco_diff.py +++ b/sdk/python/tests/unit/diff/test_fco_diff.py @@ -1,4 +1,8 @@ -from feast.diff.FcoDiff import diff_between, tag_proto_objects_for_keep_delete_add +from feast.diff.FcoDiff import ( + diff_between, + tag_objects_for_keep_delete_update_add, + tag_proto_objects_for_keep_delete_add, +) from feast.feature_view import FeatureView from tests.utils.data_source_utils import prep_file_source @@ -45,6 +49,52 @@ def test_tag_proto_objects_for_keep_delete_add(simple_dataset_1): assert to_add in add +def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): + with prep_file_source( + df=simple_dataset_1, event_timestamp_column="ts_1" + ) as file_source: + to_delete = FeatureView( + name="to_delete", entities=["id"], batch_source=file_source, ttl=None, + ) + unchanged_fv = FeatureView( + name="fv1", entities=["id"], batch_source=file_source, ttl=None, + ) + pre_changed = FeatureView( + name="fv2", + entities=["id"], + batch_source=file_source, + ttl=None, + tags={"when": "before"}, + ) + post_changed = FeatureView( + name="fv2", + entities=["id"], + batch_source=file_source, + ttl=None, + tags={"when": "after"}, + ) + to_add = FeatureView( + name="to_add", entities=["id"], batch_source=file_source, ttl=None, + ) + + keep, delete, update, add = tag_objects_for_keep_delete_update_add( + [unchanged_fv, pre_changed, to_delete], [unchanged_fv, post_changed, to_add] + ) + + assert len(list(keep)) == 2 + assert unchanged_fv in keep + assert pre_changed in keep + assert post_changed not in keep + assert len(list(delete)) == 1 + assert to_delete in delete + assert len(list(update)) == 2 + assert unchanged_fv in update + assert post_changed in update + assert pre_changed not in update + assert len(list(add)) == 1 + assert to_add in add + + def test_diff_between_feature_views(simple_dataset_1): with prep_file_source( df=simple_dataset_1, event_timestamp_column="ts_1" From 812689dbbd061ad7d678c2ad304e4a91123718ff Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 21 Jan 2022 12:03:11 -0800 Subject: [PATCH 09/17] Add properties to feature service Signed-off-by: Felix Wang --- sdk/python/feast/feature_service.py | 76 ++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 9bb4fb5e5d..16815531a3 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -30,12 +30,12 @@ class FeatureService: Services. """ - name: str - feature_view_projections: List[FeatureViewProjection] - tags: Dict[str, str] - description: Optional[str] = None - created_timestamp: Optional[datetime] = None - last_updated_timestamp: Optional[datetime] = None + _name: str + _feature_view_projections: List[FeatureViewProjection] + _tags: Dict[str, str] + _description: Optional[str] = None + _created_timestamp: Optional[datetime] = None + _last_updated_timestamp: Optional[datetime] = None @log_exceptions def __init__( @@ -51,22 +51,22 @@ def __init__( Raises: ValueError: If one of the specified features is not a valid type. """ - self.name = name - self.feature_view_projections = [] + self._name = name + self._feature_view_projections = [] for feature_grouping in features: if isinstance(feature_grouping, BaseFeatureView): - self.feature_view_projections.append(feature_grouping.projection) + self._feature_view_projections.append(feature_grouping.projection) else: raise ValueError( "The FeatureService {fs_name} has been provided with an invalid type" f'{type(feature_grouping)} as part of the "features" argument.)' ) - self.tags = tags or {} - self.description = description - self.created_timestamp = None - self.last_updated_timestamp = None + self._tags = tags or {} + self._description = description + self._created_timestamp = None + self._last_updated_timestamp = None def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) @@ -93,6 +93,56 @@ def __eq__(self, other): return True + @property + def name(self) -> str: + return self._name + + @name.setter + def name(self, name: str): + self._name = name + + @property + def feature_view_projections(self) -> List[FeatureViewProjection]: + return self._feature_view_projections + + @feature_view_projections.setter + def feature_view_projections( + self, feature_view_projections: List[FeatureViewProjection] + ): + self._feature_view_projections = feature_view_projections + + @property + def tags(self) -> Dict[str, str]: + return self._tags + + @tags.setter + def tags(self, tags: Dict[str, str]): + self._tags = tags + + @property + def description(self) -> Optional[str]: + return self._description + + @description.setter + def description(self, description: str): + self._description = description + + @property + def created_timestamp(self) -> Optional[datetime]: + return self._created_timestamp + + @created_timestamp.setter + def created_timestamp(self, created_timestamp: datetime): + self._created_timestamp = created_timestamp + + @property + def last_updated_timestamp(self) -> Optional[datetime]: + return self._last_updated_timestamp + + @last_updated_timestamp.setter + def last_updated_timestamp(self, last_updated_timestamp: datetime): + self._last_updated_timestamp = last_updated_timestamp + @staticmethod def from_proto(feature_service_proto: FeatureServiceProto): """ From e66d6561b5930a4c48b02888010535fb5809f87e Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 21 Jan 2022 12:03:33 -0800 Subject: [PATCH 10/17] Lint Signed-off-by: Felix Wang --- .../tests/integration/feature_repos/repo_configuration.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 0003a7a8a1..f6f2b5d2bc 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -16,7 +16,7 @@ from feast.constants import FULL_REPO_CONFIGS_MODULE_ENV_NAME from feast.data_source import DataSource from feast.errors import FeastModuleImportError -from feast.repo_config import RepoConfig, RegistryConfig +from feast.repo_config import RegistryConfig, RepoConfig from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) @@ -288,8 +288,7 @@ def construct_test_environment( # Note: even if it's a local feature server, the repo config does not have this configured feature_server = None registry = RegistryConfig( - path=str(Path(repo_dir_name) / "registry.db"), - cache_ttl_seconds=1, + path=str(Path(repo_dir_name) / "registry.db"), cache_ttl_seconds=1, ) config = RepoConfig( From e076a6bb14a10329ec2ddc8be088ceb3b6924792 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 21 Jan 2022 18:56:31 -0800 Subject: [PATCH 11/17] Remove logic that converts Registry to RepoContents Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 14 +-- sdk/python/feast/registry.py | 133 ++++++++++------------------ sdk/python/feast/repo_contents.py | 50 +++++++++++ sdk/python/feast/repo_operations.py | 17 ++-- 4 files changed, 109 insertions(+), 105 deletions(-) create mode 100644 sdk/python/feast/repo_contents.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 646734b05e..dbd0bc2e67 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -427,10 +427,7 @@ def plan( ... ) >>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view """ - current_repo_contents = self._registry.to_repo_contents(project=self.project) - registry_diff = Registry.diff_between( - current_repo_contents, desired_repo_contents - ) + registry_diff = self._registry.diff_between(self.project, desired_repo_contents) current_infra_proto = ( self._registry.cached_registry_proto.infra.__deepcopy__() @@ -476,7 +473,7 @@ def apply( ] ] = None, partial: bool = True, - ) -> RegistryDiff: + ): """Register objects to metadata store and update related infrastructure. The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these @@ -512,7 +509,7 @@ def apply( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ # TODO: Add locking if not isinstance(objects, Iterable): @@ -522,8 +519,6 @@ def apply( if not objects_to_delete: objects_to_delete = [] - current_repo_contents = self._registry.to_repo_contents(project=self.project) - # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] @@ -632,9 +627,6 @@ def apply( self._registry.commit() - new_repo_contents = self._registry.to_repo_contents(project=self.project) - return Registry.diff_between(current_repo_contents, new_repo_contents) - @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index b83f095a35..3d73b39402 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -16,7 +16,7 @@ from datetime import datetime, timedelta from pathlib import Path from threading import Lock -from typing import Any, Dict, List, NamedTuple, Optional, Set +from typing import Any, Dict, List, Optional from urllib.parse import urlparse from google.protobuf.internal.containers import RepeatedCompositeFieldContainer @@ -47,6 +47,7 @@ from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.registry_store import NoopRegistryStore from feast.repo_config import RegistryConfig +from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView REGISTRY_SCHEMA_VERSION = "1" @@ -78,66 +79,6 @@ logger = logging.getLogger(__name__) -class RepoContents(NamedTuple): - """ - Represents the objects in a Feast feature repo. - - Equivalently, represents the contents of a registry corresponding to a specific Feas project. - """ - - feature_views: Set[FeatureView] - on_demand_feature_views: Set[OnDemandFeatureView] - request_feature_views: Set[RequestFeatureView] - entities: Set[Entity] - feature_services: Set[FeatureService] - - def to_registry_proto(self) -> RegistryProto: - registry_proto = RegistryProto() - registry_proto.entities.extend([e.to_proto() for e in self.entities]) - registry_proto.feature_views.extend( - [fv.to_proto() for fv in self.feature_views] - ) - registry_proto.on_demand_feature_views.extend( - [fv.to_proto() for fv in self.on_demand_feature_views] - ) - registry_proto.request_feature_views.extend( - [fv.to_proto() for fv in self.request_feature_views] - ) - registry_proto.feature_services.extend( - [fs.to_proto() for fs in self.feature_services] - ) - return registry_proto - - -def extract_objects_for_keep_delete_update_add( - current_repo_contents: RepoContents, new_repo_contents: RepoContents -): - """ - Extracts the objects to be kept, deleted, updated, and added to achieve the desired end state. - - Args: - current_repo_contents: The current repo state. - new_repo_contents: The desired repo state. - """ - objs_to_keep = {} - objs_to_delete = {} - objs_to_update = {} - objs_to_add = {} - - for object_type in REGISTRY_OBJECT_TYPES: - to_keep, to_delete, to_update, to_add = tag_objects_for_keep_delete_update_add( - getattr(current_repo_contents, object_type), - getattr(new_repo_contents, object_type), - ) - - objs_to_keep[object_type] = to_keep - objs_to_delete[object_type] = to_delete - objs_to_update[object_type] = to_update - objs_to_add[object_type] = to_add - - return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add - - def get_registry_store_class_from_type(registry_store_type: str): if not registry_store_type.endswith("RegistryStore"): raise Exception('Registry store class name should end with "RegistryStore"') @@ -213,37 +154,61 @@ def clone(self) -> "Registry": new_registry._registry_store = NoopRegistryStore() return new_registry - def to_repo_contents(self, project: str) -> RepoContents: + def extract_objects_for_keep_delete_update_add( + self, current_project: str, desired_repo_contents: RepoContents, + ): """ - Convert the contents of the registry for the given project into a RepoContents object. + Returns the objects that must be modified to achieve the desired repo state. Args: - project: The Feast project to be converted. - """ - return RepoContents( - entities=set(self.list_entities(project=project)), - feature_views=set(self.list_feature_views(project=project)), - request_feature_views=set(self.list_request_feature_views(project=project)), - on_demand_feature_views=set( - self.list_on_demand_feature_views(project=project) + current_project: The Feast project whose objects should be compared. + desired_repo_contents: The desired repo state. + """ + objs_to_keep = {} + objs_to_delete = {} + objs_to_update = {} + objs_to_add = {} + + registry_object_type_to_objects: Dict[str, List[Any]] + registry_object_type_to_objects = { + "entities": self.list_entities(project=current_project), + "feature_views": self.list_feature_views(project=current_project), + "on_demand_feature_views": self.list_on_demand_feature_views( + project=current_project ), - feature_services=set(self.list_feature_services(project=project)), - ) + "request_feature_views": self.list_request_feature_views( + project=current_project + ), + "feature_services": self.list_feature_services(project=current_project), + } + + for object_type in REGISTRY_OBJECT_TYPES: + ( + to_keep, + to_delete, + to_update, + to_add, + ) = tag_objects_for_keep_delete_update_add( + registry_object_type_to_objects[object_type], + getattr(desired_repo_contents, object_type), + ) + + objs_to_keep[object_type] = to_keep + objs_to_delete[object_type] = to_delete + objs_to_update[object_type] = to_update + objs_to_add[object_type] = to_add + + return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add - # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. - @staticmethod def diff_between( - current_registry_contents: RepoContents, new_registry_contents: RepoContents + self, current_project: str, desired_repo_contents: RepoContents, ) -> RegistryDiff: """ - Computes the difference between the two repos. + Returns the difference between the current and desired repo states. Args: - current_registry_contents: The current repo. - new_registry_contents: The new repo. - - Returns: - A RegistryDiff object containing the difference between the two repos. + current_project: The Feast project for which the diff is being computed. + desired_repo_contents: The desired repo state. """ diff = RegistryDiff() @@ -252,8 +217,8 @@ def diff_between( objs_to_delete, objs_to_update, objs_to_add, - ) = extract_objects_for_keep_delete_update_add( - current_registry_contents, new_registry_contents + ) = self.extract_objects_for_keep_delete_update_add( + current_project, desired_repo_contents ) for object_type in REGISTRY_OBJECT_TYPES: diff --git a/sdk/python/feast/repo_contents.py b/sdk/python/feast/repo_contents.py new file mode 100644 index 0000000000..9190af11ee --- /dev/null +++ b/sdk/python/feast/repo_contents.py @@ -0,0 +1,50 @@ +# Copyright 2022 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import NamedTuple, Set + +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.request_feature_view import RequestFeatureView + + +class RepoContents(NamedTuple): + """ + Represents the objects in a Feast feature repo. + """ + + feature_views: Set[FeatureView] + on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] + entities: Set[Entity] + feature_services: Set[FeatureService] + + def to_registry_proto(self) -> RegistryProto: + registry_proto = RegistryProto() + registry_proto.entities.extend([e.to_proto() for e in self.entities]) + registry_proto.feature_views.extend( + [fv.to_proto() for fv in self.feature_views] + ) + registry_proto.on_demand_feature_views.extend( + [fv.to_proto() for fv in self.on_demand_feature_views] + ) + registry_proto.request_feature_views.extend( + [fv.to_proto() for fv in self.request_feature_views] + ) + registry_proto.feature_services.extend( + [fs.to_proto() for fs in self.feature_services] + ) + return registry_proto diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index a3be0110a7..c8260fe2a5 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -18,13 +18,9 @@ from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import ( - REGISTRY_OBJECT_TYPES, - Registry, - RepoContents, - extract_objects_for_keep_delete_update_add, -) +from feast.registry import REGISTRY_OBJECT_TYPES, Registry from feast.repo_config import RepoConfig +from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView from feast.usage import log_exceptions_and_usage @@ -165,13 +161,12 @@ def _prepare_registry_and_repo(repo_config, repo_path): def extract_objects_for_apply_delete(project, registry, repo): # TODO(achals): This code path should be refactored to handle added & kept entities separately. - current_repo_contents = registry.to_repo_contents(project=project) ( _, objs_to_delete, objs_to_update, objs_to_add, - ) = extract_objects_for_keep_delete_update_add(current_repo_contents, repo) + ) = registry.extract_objects_for_keep_delete_update_add(project, repo) all_to_apply: List[ Union[ @@ -211,6 +206,8 @@ def apply_total_with_repo_instance( for data_source in data_sources: data_source.validate(store.config) + registry_diff, _ = store.plan(repo) + # For each object in the registry, determine whether it should be kept or deleted. ( all_to_apply, @@ -219,9 +216,9 @@ def apply_total_with_repo_instance( views_to_keep, ) = extract_objects_for_apply_delete(project, registry, repo) - diff = store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) - log_cli_output(diff, views_to_delete, views_to_keep) + log_cli_output(registry_diff, views_to_delete, views_to_keep) @log_exceptions_and_usage From c19fc42737aa29214a7235cc7921769545274027 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 24 Jan 2022 10:09:52 -0800 Subject: [PATCH 12/17] Always initialize registry Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 1 - sdk/python/feast/registry.py | 2 ++ sdk/python/feast/repo_operations.py | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index dbd0bc2e67..d6180c2028 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -122,7 +122,6 @@ def __init__( registry_config = self.config.get_registry_config() self._registry = Registry(registry_config, repo_path=self.repo_path) - self._registry._initialize_registry() self._provider = get_provider(self.config, self.repo_path) @log_exceptions diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 3d73b39402..c42c4cc7df 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -142,6 +142,8 @@ def __init__( else 0 ) + self._initialize_registry() + def clone(self) -> "Registry": new_registry = Registry(None, None) new_registry.cached_registry_proto_ttl = timedelta(seconds=0) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index c8260fe2a5..c02988d561 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -153,7 +153,6 @@ def _prepare_registry_and_repo(repo_config, repo_path): ) sys.exit(1) registry = store.registry - registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) return project, registry, repo, store From ab2c9cb03c0e093e8e12c0523786ad41f9fadf6e Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 24 Jan 2022 10:36:34 -0800 Subject: [PATCH 13/17] Move diffing methods from Registry into FcoDiff.py Signed-off-by: Felix Wang --- sdk/python/feast/diff/FcoDiff.py | 117 +++++++++++++++++++++++++++- sdk/python/feast/feature_store.py | 9 ++- sdk/python/feast/registry.py | 113 --------------------------- sdk/python/feast/repo_operations.py | 7 +- 4 files changed, 126 insertions(+), 120 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index da70ce71d4..f1cbf55c41 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Generic, Iterable, List, Set, Tuple, TypeVar +from typing import Any, Dict, Generic, Iterable, List, Set, Tuple, TypeVar from feast.base_feature_view import BaseFeatureView from feast.diff.property_diff import PropertyDiff, TransitionType @@ -16,6 +16,8 @@ from feast.protos.feast.core.RequestFeatureView_pb2 import ( RequestFeatureView as RequestFeatureViewProto, ) +from feast.registry import REGISTRY_OBJECT_TYPE_TO_STR, REGISTRY_OBJECT_TYPES, Registry +from feast.repo_contents import RepoContents Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) @@ -81,7 +83,7 @@ def tag_proto_objects_for_keep_delete_add( FIELDS_TO_IGNORE = {"project"} -def diff_between(current: Fco, new: Fco, object_type: str) -> FcoDiff: +def diff_registry_objects(current: Fco, new: Fco, object_type: str) -> FcoDiff: current_proto = current.to_proto() new_proto = new.to_proto() assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name @@ -105,3 +107,114 @@ def diff_between(current: Fco, new: Fco, object_type: str) -> FcoDiff: return FcoDiff( new_proto.spec.name, object_type, current, new, property_diffs, transition, ) + + +def extract_objects_for_keep_delete_update_add( + registry: Registry, current_project: str, desired_repo_contents: RepoContents, +) -> Tuple[ + Dict[str, Set[Fco]], Dict[str, Set[Fco]], Dict[str, Set[Fco]], Dict[str, Set[Fco]] +]: + """ + Returns the objects in the registry that must be modified to achieve the desired repo state. + + Args: + registry: The registry storing the current repo state. + current_project: The Feast project whose objects should be compared. + desired_repo_contents: The desired repo state. + """ + objs_to_keep = {} + objs_to_delete = {} + objs_to_update = {} + objs_to_add = {} + + registry_object_type_to_objects: Dict[str, List[Any]] + registry_object_type_to_objects = { + "entities": registry.list_entities(project=current_project), + "feature_views": registry.list_feature_views(project=current_project), + "on_demand_feature_views": registry.list_on_demand_feature_views( + project=current_project + ), + "request_feature_views": registry.list_request_feature_views( + project=current_project + ), + "feature_services": registry.list_feature_services(project=current_project), + } + + for object_type in REGISTRY_OBJECT_TYPES: + ( + to_keep, + to_delete, + to_update, + to_add, + ) = tag_objects_for_keep_delete_update_add( + registry_object_type_to_objects[object_type], + getattr(desired_repo_contents, object_type), + ) + + objs_to_keep[object_type] = to_keep + objs_to_delete[object_type] = to_delete + objs_to_update[object_type] = to_update + objs_to_add[object_type] = to_add + + return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add + + +def diff_between( + registry: Registry, current_project: str, desired_repo_contents: RepoContents, +) -> RegistryDiff: + """ + Returns the difference between the current and desired repo states. + + Args: + registry: The registry storing the current repo state. + current_project: The Feast project for which the diff is being computed. + desired_repo_contents: The desired repo state. + """ + diff = RegistryDiff() + + ( + objs_to_keep, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add( + registry, current_project, desired_repo_contents + ) + + for object_type in REGISTRY_OBJECT_TYPES: + objects_to_keep = objs_to_keep[object_type] + objects_to_delete = objs_to_delete[object_type] + objects_to_update = objs_to_update[object_type] + objects_to_add = objs_to_add[object_type] + + for e in objects_to_add: + diff.add_fco_diff( + FcoDiff( + e.name, + REGISTRY_OBJECT_TYPE_TO_STR[object_type], + None, + e, + [], + TransitionType.CREATE, + ) + ) + for e in objects_to_delete: + diff.add_fco_diff( + FcoDiff( + e.name, + REGISTRY_OBJECT_TYPE_TO_STR[object_type], + e, + None, + [], + TransitionType.DELETE, + ) + ) + for e in objects_to_update: + current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0] + diff.add_fco_diff( + diff_registry_objects( + current_obj, e, REGISTRY_OBJECT_TYPE_TO_STR[object_type] + ) + ) + + return diff diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d6180c2028..c9dae9063a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -39,7 +39,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import RegistryDiff +from feast.diff.FcoDiff import RegistryDiff, diff_between from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.entity import Entity from feast.errors import ( @@ -73,8 +73,9 @@ ) from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value -from feast.registry import Registry, RepoContents +from feast.registry import Registry from feast.repo_config import RepoConfig, load_repo_config +from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView from feast.type_map import python_values_to_proto_values from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute @@ -426,7 +427,9 @@ def plan( ... ) >>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view """ - registry_diff = self._registry.diff_between(self.project, desired_repo_contents) + registry_diff = diff_between( + self._registry, self.project, desired_repo_contents + ) current_infra_proto = ( self._registry.cached_registry_proto.infra.__deepcopy__() diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index c42c4cc7df..836c9ac3b3 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -24,13 +24,6 @@ from proto import Message from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import ( - FcoDiff, - RegistryDiff, - TransitionType, - diff_between, - tag_objects_for_keep_delete_update_add, -) from feast.entity import Entity from feast.errors import ( ConflictingFeatureViewNames, @@ -47,7 +40,6 @@ from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.registry_store import NoopRegistryStore from feast.repo_config import RegistryConfig -from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView REGISTRY_SCHEMA_VERSION = "1" @@ -156,111 +148,6 @@ def clone(self) -> "Registry": new_registry._registry_store = NoopRegistryStore() return new_registry - def extract_objects_for_keep_delete_update_add( - self, current_project: str, desired_repo_contents: RepoContents, - ): - """ - Returns the objects that must be modified to achieve the desired repo state. - - Args: - current_project: The Feast project whose objects should be compared. - desired_repo_contents: The desired repo state. - """ - objs_to_keep = {} - objs_to_delete = {} - objs_to_update = {} - objs_to_add = {} - - registry_object_type_to_objects: Dict[str, List[Any]] - registry_object_type_to_objects = { - "entities": self.list_entities(project=current_project), - "feature_views": self.list_feature_views(project=current_project), - "on_demand_feature_views": self.list_on_demand_feature_views( - project=current_project - ), - "request_feature_views": self.list_request_feature_views( - project=current_project - ), - "feature_services": self.list_feature_services(project=current_project), - } - - for object_type in REGISTRY_OBJECT_TYPES: - ( - to_keep, - to_delete, - to_update, - to_add, - ) = tag_objects_for_keep_delete_update_add( - registry_object_type_to_objects[object_type], - getattr(desired_repo_contents, object_type), - ) - - objs_to_keep[object_type] = to_keep - objs_to_delete[object_type] = to_delete - objs_to_update[object_type] = to_update - objs_to_add[object_type] = to_add - - return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add - - def diff_between( - self, current_project: str, desired_repo_contents: RepoContents, - ) -> RegistryDiff: - """ - Returns the difference between the current and desired repo states. - - Args: - current_project: The Feast project for which the diff is being computed. - desired_repo_contents: The desired repo state. - """ - diff = RegistryDiff() - - ( - objs_to_keep, - objs_to_delete, - objs_to_update, - objs_to_add, - ) = self.extract_objects_for_keep_delete_update_add( - current_project, desired_repo_contents - ) - - for object_type in REGISTRY_OBJECT_TYPES: - objects_to_keep = objs_to_keep[object_type] - objects_to_delete = objs_to_delete[object_type] - objects_to_update = objs_to_update[object_type] - objects_to_add = objs_to_add[object_type] - - for e in objects_to_add: - diff.add_fco_diff( - FcoDiff( - e.name, - REGISTRY_OBJECT_TYPE_TO_STR[object_type], - None, - e, - [], - TransitionType.CREATE, - ) - ) - for e in objects_to_delete: - diff.add_fco_diff( - FcoDiff( - e.name, - REGISTRY_OBJECT_TYPE_TO_STR[object_type], - e, - None, - [], - TransitionType.DELETE, - ) - ) - for e in objects_to_update: - current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0] - diff.add_fco_diff( - diff_between( - current_obj, e, REGISTRY_OBJECT_TYPE_TO_STR[object_type] - ) - ) - - return diff - def _initialize_registry(self): """Explicitly initializes the registry with an empty proto if it doesn't exist.""" try: diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index c02988d561..54662a5fde 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -11,7 +11,10 @@ import click from click.exceptions import BadParameter -from feast.diff.FcoDiff import TransitionType +from feast.diff.FcoDiff import ( + TransitionType, + extract_objects_for_keep_delete_update_add, +) from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore @@ -165,7 +168,7 @@ def extract_objects_for_apply_delete(project, registry, repo): objs_to_delete, objs_to_update, objs_to_add, - ) = registry.extract_objects_for_keep_delete_update_add(project, repo) + ) = extract_objects_for_keep_delete_update_add(registry, project, repo) all_to_apply: List[ Union[ From d94e9188b6af8f7bb87bc1e1f7604b3fbcdc3b6a Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 24 Jan 2022 10:45:24 -0800 Subject: [PATCH 14/17] Fix unit test Signed-off-by: Felix Wang --- sdk/python/tests/unit/diff/test_fco_diff.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/python/tests/unit/diff/test_fco_diff.py b/sdk/python/tests/unit/diff/test_fco_diff.py index 04e2d877da..fa3c84d035 100644 --- a/sdk/python/tests/unit/diff/test_fco_diff.py +++ b/sdk/python/tests/unit/diff/test_fco_diff.py @@ -1,5 +1,5 @@ from feast.diff.FcoDiff import ( - diff_between, + diff_registry_objects, tag_objects_for_keep_delete_update_add, tag_proto_objects_for_keep_delete_add, ) @@ -95,7 +95,7 @@ def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): assert to_add in add -def test_diff_between_feature_views(simple_dataset_1): +def test_diff_registry_objects_feature_views(simple_dataset_1): with prep_file_source( df=simple_dataset_1, event_timestamp_column="ts_1" ) as file_source: @@ -114,10 +114,10 @@ def test_diff_between_feature_views(simple_dataset_1): tags={"when": "after"}, ) - fco_diffs = diff_between(pre_changed, pre_changed, "feature view") + fco_diffs = diff_registry_objects(pre_changed, pre_changed, "feature view") assert len(fco_diffs.fco_property_diffs) == 0 - fco_diffs = diff_between(pre_changed, post_changed, "feature view") + fco_diffs = diff_registry_objects(pre_changed, post_changed, "feature view") assert len(fco_diffs.fco_property_diffs) == 1 assert fco_diffs.fco_property_diffs[0].property_name == "tags" From 8e0b5dd85a178be1936cdaeb340c19403e3118d8 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 24 Jan 2022 11:08:21 -0800 Subject: [PATCH 15/17] Put registry initialization back in repo_operations.py Signed-off-by: Felix Wang --- sdk/python/feast/registry.py | 2 -- sdk/python/feast/repo_operations.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 836c9ac3b3..3254990adc 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -134,8 +134,6 @@ def __init__( else 0 ) - self._initialize_registry() - def clone(self) -> "Registry": new_registry = Registry(None, None) new_registry.cached_registry_proto_ttl = timedelta(seconds=0) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 54662a5fde..99004eac36 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -156,6 +156,7 @@ def _prepare_registry_and_repo(repo_config, repo_path): ) sys.exit(1) registry = store.registry + registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) return project, registry, repo, store From 2e8961fecb7478c4f7ab2bc7b5ce6417693c1ccf Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 24 Jan 2022 11:22:43 -0800 Subject: [PATCH 16/17] Fix usage test Signed-off-by: Felix Wang --- sdk/python/tests/integration/e2e/test_usage_e2e.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/sdk/python/tests/integration/e2e/test_usage_e2e.py b/sdk/python/tests/integration/e2e/test_usage_e2e.py index 0bae973063..f55fbce55c 100644 --- a/sdk/python/tests/integration/e2e/test_usage_e2e.py +++ b/sdk/python/tests/integration/e2e/test_usage_e2e.py @@ -66,16 +66,10 @@ def test_usage_on(dummy_exporter, enabling_toggle): test_feature_store.apply([entity]) - assert len(dummy_exporter) == 3 - assert { - "entrypoint": "feast.infra.local.LocalRegistryStore.get_registry_proto" - }.items() <= dummy_exporter[0].items() - assert { - "entrypoint": "feast.infra.local.LocalRegistryStore.update_registry_proto" - }.items() <= dummy_exporter[1].items() + assert len(dummy_exporter) == 1 assert { "entrypoint": "feast.feature_store.FeatureStore.apply" - }.items() <= dummy_exporter[2].items() + }.items() <= dummy_exporter[0].items() @pytest.mark.integration From 4da806e6594f55ed373a740668e6dbdbed4191b1 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Tue, 25 Jan 2022 13:56:33 -0800 Subject: [PATCH 17/17] Switch from hardcoded names to enum Signed-off-by: Felix Wang --- sdk/python/feast/diff/FcoDiff.py | 80 ++++++++++++++++++++--------- sdk/python/feast/registry.py | 16 +++--- sdk/python/feast/repo_operations.py | 17 +++--- 3 files changed, 74 insertions(+), 39 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index f1cbf55c41..e2aac16bf5 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -16,9 +16,19 @@ from feast.protos.feast.core.RequestFeatureView_pb2 import ( RequestFeatureView as RequestFeatureViewProto, ) -from feast.registry import REGISTRY_OBJECT_TYPE_TO_STR, REGISTRY_OBJECT_TYPES, Registry +from feast.registry import FeastObjectType, Registry from feast.repo_contents import RepoContents +FEAST_OBJECT_TYPE_TO_STR = { + FeastObjectType.ENTITY: "entity", + FeastObjectType.FEATURE_VIEW: "feature view", + FeastObjectType.ON_DEMAND_FEATURE_VIEW: "on demand feature view", + FeastObjectType.REQUEST_FEATURE_VIEW: "request feature view", + FeastObjectType.FEATURE_SERVICE: "feature service", +} + +FEAST_OBJECT_TYPES = FEAST_OBJECT_TYPE_TO_STR.keys() + Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) @@ -105,14 +115,22 @@ def diff_registry_objects(current: Fco, new: Fco, object_type: str) -> FcoDiff: ) ) return FcoDiff( - new_proto.spec.name, object_type, current, new, property_diffs, transition, + name=new_proto.spec.name, + fco_type=object_type, + current_fco=current, + new_fco=new, + fco_property_diffs=property_diffs, + transition_type=transition, ) def extract_objects_for_keep_delete_update_add( registry: Registry, current_project: str, desired_repo_contents: RepoContents, ) -> Tuple[ - Dict[str, Set[Fco]], Dict[str, Set[Fco]], Dict[str, Set[Fco]], Dict[str, Set[Fco]] + Dict[FeastObjectType, Set[Fco]], + Dict[FeastObjectType, Set[Fco]], + Dict[FeastObjectType, Set[Fco]], + Dict[FeastObjectType, Set[Fco]], ]: """ Returns the objects in the registry that must be modified to achieve the desired repo state. @@ -127,20 +145,32 @@ def extract_objects_for_keep_delete_update_add( objs_to_update = {} objs_to_add = {} - registry_object_type_to_objects: Dict[str, List[Any]] + registry_object_type_to_objects: Dict[FeastObjectType, List[Any]] registry_object_type_to_objects = { - "entities": registry.list_entities(project=current_project), - "feature_views": registry.list_feature_views(project=current_project), - "on_demand_feature_views": registry.list_on_demand_feature_views( + FeastObjectType.ENTITY: registry.list_entities(project=current_project), + FeastObjectType.FEATURE_VIEW: registry.list_feature_views( + project=current_project + ), + FeastObjectType.ON_DEMAND_FEATURE_VIEW: registry.list_on_demand_feature_views( project=current_project ), - "request_feature_views": registry.list_request_feature_views( + FeastObjectType.REQUEST_FEATURE_VIEW: registry.list_request_feature_views( project=current_project ), - "feature_services": registry.list_feature_services(project=current_project), + FeastObjectType.FEATURE_SERVICE: registry.list_feature_services( + project=current_project + ), + } + registry_object_type_to_repo_contents: Dict[FeastObjectType, Set[Any]] + registry_object_type_to_repo_contents = { + FeastObjectType.ENTITY: desired_repo_contents.entities, + FeastObjectType.FEATURE_VIEW: desired_repo_contents.feature_views, + FeastObjectType.ON_DEMAND_FEATURE_VIEW: desired_repo_contents.on_demand_feature_views, + FeastObjectType.REQUEST_FEATURE_VIEW: desired_repo_contents.request_feature_views, + FeastObjectType.FEATURE_SERVICE: desired_repo_contents.feature_services, } - for object_type in REGISTRY_OBJECT_TYPES: + for object_type in FEAST_OBJECT_TYPES: ( to_keep, to_delete, @@ -148,7 +178,7 @@ def extract_objects_for_keep_delete_update_add( to_add, ) = tag_objects_for_keep_delete_update_add( registry_object_type_to_objects[object_type], - getattr(desired_repo_contents, object_type), + registry_object_type_to_repo_contents[object_type], ) objs_to_keep[object_type] = to_keep @@ -181,7 +211,7 @@ def diff_between( registry, current_project, desired_repo_contents ) - for object_type in REGISTRY_OBJECT_TYPES: + for object_type in FEAST_OBJECT_TYPES: objects_to_keep = objs_to_keep[object_type] objects_to_delete = objs_to_delete[object_type] objects_to_update = objs_to_update[object_type] @@ -190,30 +220,30 @@ def diff_between( for e in objects_to_add: diff.add_fco_diff( FcoDiff( - e.name, - REGISTRY_OBJECT_TYPE_TO_STR[object_type], - None, - e, - [], - TransitionType.CREATE, + name=e.name, + fco_type=FEAST_OBJECT_TYPE_TO_STR[object_type], + current_fco=None, + new_fco=e, + fco_property_diffs=[], + transition_type=TransitionType.CREATE, ) ) for e in objects_to_delete: diff.add_fco_diff( FcoDiff( - e.name, - REGISTRY_OBJECT_TYPE_TO_STR[object_type], - e, - None, - [], - TransitionType.DELETE, + name=e.name, + fco_type=FEAST_OBJECT_TYPE_TO_STR[object_type], + current_fco=e, + new_fco=None, + fco_property_diffs=[], + transition_type=TransitionType.DELETE, ) ) for e in objects_to_update: current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0] diff.add_fco_diff( diff_registry_objects( - current_obj, e, REGISTRY_OBJECT_TYPE_TO_STR[object_type] + current_obj, e, FEAST_OBJECT_TYPE_TO_STR[object_type] ) ) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 3254990adc..f05abc6d9a 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -14,6 +14,7 @@ import logging from collections import defaultdict from datetime import datetime, timedelta +from enum import Enum from pathlib import Path from threading import Lock from typing import Any, Dict, List, Optional @@ -58,15 +59,14 @@ "": "LocalRegistryStore", } -REGISTRY_OBJECT_TYPE_TO_STR = { - "entities": "entity", - "feature_views": "feature view", - "on_demand_feature_views": "on demand feature view", - "request_feature_views": "request feature view", - "feature_services": "feature service", -} -REGISTRY_OBJECT_TYPES = REGISTRY_OBJECT_TYPE_TO_STR.keys() +class FeastObjectType(Enum): + ENTITY = 0 + FEATURE_VIEW = 1 + ON_DEMAND_FEATURE_VIEW = 2 + REQUEST_FEATURE_VIEW = 3 + FEATURE_SERVICE = 4 + logger = logging.getLogger(__name__) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 99004eac36..17d0530d4e 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -12,16 +12,17 @@ from click.exceptions import BadParameter from feast.diff.FcoDiff import ( - TransitionType, + FEAST_OBJECT_TYPES, extract_objects_for_keep_delete_update_add, ) +from feast.diff.property_diff import TransitionType from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import REGISTRY_OBJECT_TYPES, Registry +from feast.registry import FeastObjectType, Registry from feast.repo_config import RepoConfig from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView @@ -176,7 +177,7 @@ def extract_objects_for_apply_delete(project, registry, repo): Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService ] ] = [] - for object_type in REGISTRY_OBJECT_TYPES: + for object_type in FEAST_OBJECT_TYPES: to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) all_to_apply.extend(to_apply) @@ -185,14 +186,18 @@ def extract_objects_for_apply_delete(project, registry, repo): Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService ] ] = [] - for object_type in REGISTRY_OBJECT_TYPES: + for object_type in FEAST_OBJECT_TYPES: all_to_delete.extend(objs_to_delete[object_type]) return ( all_to_apply, all_to_delete, - set(objs_to_add["feature_views"].union(objs_to_update["feature_views"])), - objs_to_delete["feature_views"], + set( + objs_to_add[FeastObjectType.FEATURE_VIEW].union( + objs_to_update[FeastObjectType.FEATURE_VIEW] + ) + ), + objs_to_delete[FeastObjectType.FEATURE_VIEW], )