diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index b85897019f..e2aac16bf5 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import Generic, Iterable, List, Set, Tuple, TypeVar +from typing import Any, Dict, Generic, Iterable, List, Set, Tuple, TypeVar from feast.base_feature_view import BaseFeatureView from feast.diff.property_diff import PropertyDiff, TransitionType @@ -16,23 +16,28 @@ from feast.protos.feast.core.RequestFeatureView_pb2 import ( RequestFeatureView as RequestFeatureViewProto, ) +from feast.registry import FeastObjectType, Registry +from feast.repo_contents import RepoContents -FcoProto = TypeVar( - "FcoProto", - EntityProto, - FeatureViewProto, - FeatureServiceProto, - OnDemandFeatureViewProto, - RequestFeatureViewProto, -) +FEAST_OBJECT_TYPE_TO_STR = { + FeastObjectType.ENTITY: "entity", + FeastObjectType.FEATURE_VIEW: "feature view", + FeastObjectType.ON_DEMAND_FEATURE_VIEW: "on demand feature view", + FeastObjectType.REQUEST_FEATURE_VIEW: "request feature view", + FeastObjectType.FEATURE_SERVICE: "feature service", +} + +FEAST_OBJECT_TYPES = FEAST_OBJECT_TYPE_TO_STR.keys() + +Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) @dataclass -class FcoDiff(Generic[FcoProto]): +class FcoDiff(Generic[Fco]): name: str fco_type: str - current_fco: FcoProto - new_fco: FcoProto + current_fco: Fco + new_fco: Fco fco_property_diffs: List[PropertyDiff] transition_type: TransitionType @@ -48,20 +53,28 @@ def add_fco_diff(self, fco_diff: FcoDiff): self.fco_diffs.append(fco_diff) -Fco = TypeVar("Fco", Entity, BaseFeatureView, FeatureService) - - -def tag_objects_for_keep_delete_add( +def tag_objects_for_keep_delete_update_add( existing_objs: Iterable[Fco], desired_objs: Iterable[Fco] -) -> Tuple[Set[Fco], Set[Fco], Set[Fco]]: +) -> Tuple[Set[Fco], Set[Fco], Set[Fco], Set[Fco]]: existing_obj_names = {e.name for e in existing_objs} desired_obj_names = {e.name for e in desired_objs} objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names} - objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names} + objs_to_update = {e for e in desired_objs if e.name in existing_obj_names} + objs_to_keep = {e for e in existing_objs if e.name in desired_obj_names} objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names} - return objs_to_keep, objs_to_delete, objs_to_add + return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add + + +FcoProto = TypeVar( + "FcoProto", + EntityProto, + FeatureViewProto, + FeatureServiceProto, + OnDemandFeatureViewProto, + RequestFeatureViewProto, +) def tag_proto_objects_for_keep_delete_add( @@ -80,23 +93,158 @@ def tag_proto_objects_for_keep_delete_add( FIELDS_TO_IGNORE = {"project"} -def diff_between(current: FcoProto, new: FcoProto, object_type: str) -> FcoDiff: - assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name +def diff_registry_objects(current: Fco, new: Fco, object_type: str) -> FcoDiff: + current_proto = current.to_proto() + new_proto = new.to_proto() + assert current_proto.DESCRIPTOR.full_name == new_proto.DESCRIPTOR.full_name property_diffs = [] transition: TransitionType = TransitionType.UNCHANGED - if current.spec != new.spec: - for _field in current.spec.DESCRIPTOR.fields: + if current_proto.spec != new_proto.spec: + for _field in current_proto.spec.DESCRIPTOR.fields: if _field.name in FIELDS_TO_IGNORE: continue - if getattr(current.spec, _field.name) != getattr(new.spec, _field.name): + if getattr(current_proto.spec, _field.name) != getattr( + new_proto.spec, _field.name + ): transition = TransitionType.UPDATE property_diffs.append( PropertyDiff( _field.name, - getattr(current.spec, _field.name), - getattr(new.spec, _field.name), + getattr(current_proto.spec, _field.name), + getattr(new_proto.spec, _field.name), ) ) return FcoDiff( - new.spec.name, object_type, current, new, property_diffs, transition, + name=new_proto.spec.name, + fco_type=object_type, + current_fco=current, + new_fco=new, + fco_property_diffs=property_diffs, + transition_type=transition, ) + + +def extract_objects_for_keep_delete_update_add( + registry: Registry, current_project: str, desired_repo_contents: RepoContents, +) -> Tuple[ + Dict[FeastObjectType, Set[Fco]], + Dict[FeastObjectType, Set[Fco]], + Dict[FeastObjectType, Set[Fco]], + Dict[FeastObjectType, Set[Fco]], +]: + """ + Returns the objects in the registry that must be modified to achieve the desired repo state. + + Args: + registry: The registry storing the current repo state. + current_project: The Feast project whose objects should be compared. + desired_repo_contents: The desired repo state. + """ + objs_to_keep = {} + objs_to_delete = {} + objs_to_update = {} + objs_to_add = {} + + registry_object_type_to_objects: Dict[FeastObjectType, List[Any]] + registry_object_type_to_objects = { + FeastObjectType.ENTITY: registry.list_entities(project=current_project), + FeastObjectType.FEATURE_VIEW: registry.list_feature_views( + project=current_project + ), + FeastObjectType.ON_DEMAND_FEATURE_VIEW: registry.list_on_demand_feature_views( + project=current_project + ), + FeastObjectType.REQUEST_FEATURE_VIEW: registry.list_request_feature_views( + project=current_project + ), + FeastObjectType.FEATURE_SERVICE: registry.list_feature_services( + project=current_project + ), + } + registry_object_type_to_repo_contents: Dict[FeastObjectType, Set[Any]] + registry_object_type_to_repo_contents = { + FeastObjectType.ENTITY: desired_repo_contents.entities, + FeastObjectType.FEATURE_VIEW: desired_repo_contents.feature_views, + FeastObjectType.ON_DEMAND_FEATURE_VIEW: desired_repo_contents.on_demand_feature_views, + FeastObjectType.REQUEST_FEATURE_VIEW: desired_repo_contents.request_feature_views, + FeastObjectType.FEATURE_SERVICE: desired_repo_contents.feature_services, + } + + for object_type in FEAST_OBJECT_TYPES: + ( + to_keep, + to_delete, + to_update, + to_add, + ) = tag_objects_for_keep_delete_update_add( + registry_object_type_to_objects[object_type], + registry_object_type_to_repo_contents[object_type], + ) + + objs_to_keep[object_type] = to_keep + objs_to_delete[object_type] = to_delete + objs_to_update[object_type] = to_update + objs_to_add[object_type] = to_add + + return objs_to_keep, objs_to_delete, objs_to_update, objs_to_add + + +def diff_between( + registry: Registry, current_project: str, desired_repo_contents: RepoContents, +) -> RegistryDiff: + """ + Returns the difference between the current and desired repo states. + + Args: + registry: The registry storing the current repo state. + current_project: The Feast project for which the diff is being computed. + desired_repo_contents: The desired repo state. + """ + diff = RegistryDiff() + + ( + objs_to_keep, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add( + registry, current_project, desired_repo_contents + ) + + for object_type in FEAST_OBJECT_TYPES: + objects_to_keep = objs_to_keep[object_type] + objects_to_delete = objs_to_delete[object_type] + objects_to_update = objs_to_update[object_type] + objects_to_add = objs_to_add[object_type] + + for e in objects_to_add: + diff.add_fco_diff( + FcoDiff( + name=e.name, + fco_type=FEAST_OBJECT_TYPE_TO_STR[object_type], + current_fco=None, + new_fco=e, + fco_property_diffs=[], + transition_type=TransitionType.CREATE, + ) + ) + for e in objects_to_delete: + diff.add_fco_diff( + FcoDiff( + name=e.name, + fco_type=FEAST_OBJECT_TYPE_TO_STR[object_type], + current_fco=e, + new_fco=None, + fco_property_diffs=[], + transition_type=TransitionType.DELETE, + ) + ) + for e in objects_to_update: + current_obj = [_e for _e in objects_to_keep if _e.name == e.name][0] + diff.add_fco_diff( + diff_registry_objects( + current_obj, e, FEAST_OBJECT_TYPE_TO_STR[object_type] + ) + ) + + return diff diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 9bb4fb5e5d..16815531a3 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -30,12 +30,12 @@ class FeatureService: Services. """ - name: str - feature_view_projections: List[FeatureViewProjection] - tags: Dict[str, str] - description: Optional[str] = None - created_timestamp: Optional[datetime] = None - last_updated_timestamp: Optional[datetime] = None + _name: str + _feature_view_projections: List[FeatureViewProjection] + _tags: Dict[str, str] + _description: Optional[str] = None + _created_timestamp: Optional[datetime] = None + _last_updated_timestamp: Optional[datetime] = None @log_exceptions def __init__( @@ -51,22 +51,22 @@ def __init__( Raises: ValueError: If one of the specified features is not a valid type. """ - self.name = name - self.feature_view_projections = [] + self._name = name + self._feature_view_projections = [] for feature_grouping in features: if isinstance(feature_grouping, BaseFeatureView): - self.feature_view_projections.append(feature_grouping.projection) + self._feature_view_projections.append(feature_grouping.projection) else: raise ValueError( "The FeatureService {fs_name} has been provided with an invalid type" f'{type(feature_grouping)} as part of the "features" argument.)' ) - self.tags = tags or {} - self.description = description - self.created_timestamp = None - self.last_updated_timestamp = None + self._tags = tags or {} + self._description = description + self._created_timestamp = None + self._last_updated_timestamp = None def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) @@ -93,6 +93,56 @@ def __eq__(self, other): return True + @property + def name(self) -> str: + return self._name + + @name.setter + def name(self, name: str): + self._name = name + + @property + def feature_view_projections(self) -> List[FeatureViewProjection]: + return self._feature_view_projections + + @feature_view_projections.setter + def feature_view_projections( + self, feature_view_projections: List[FeatureViewProjection] + ): + self._feature_view_projections = feature_view_projections + + @property + def tags(self) -> Dict[str, str]: + return self._tags + + @tags.setter + def tags(self, tags: Dict[str, str]): + self._tags = tags + + @property + def description(self) -> Optional[str]: + return self._description + + @description.setter + def description(self, description: str): + self._description = description + + @property + def created_timestamp(self) -> Optional[datetime]: + return self._created_timestamp + + @created_timestamp.setter + def created_timestamp(self, created_timestamp: datetime): + self._created_timestamp = created_timestamp + + @property + def last_updated_timestamp(self) -> Optional[datetime]: + return self._last_updated_timestamp + + @last_updated_timestamp.setter + def last_updated_timestamp(self, last_updated_timestamp: datetime): + self._last_updated_timestamp = last_updated_timestamp + @staticmethod def from_proto(feature_service_proto: FeatureServiceProto): """ diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 39273b56c2..c9dae9063a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -24,7 +24,6 @@ Iterable, List, Mapping, - NamedTuple, Optional, Sequence, Set, @@ -40,7 +39,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import RegistryDiff +from feast.diff.FcoDiff import RegistryDiff, diff_between from feast.diff.infra_diff import InfraDiff, diff_infra_protos from feast.entity import Entity from feast.errors import ( @@ -68,7 +67,6 @@ from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto -from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.serving.ServingService_pb2 import ( FieldStatus, GetOnlineFeaturesResponse, @@ -77,6 +75,7 @@ from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value from feast.registry import Registry from feast.repo_config import RepoConfig, load_repo_config +from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView from feast.type_map import python_values_to_proto_values from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute @@ -86,31 +85,6 @@ warnings.simplefilter("once", DeprecationWarning) -class RepoContents(NamedTuple): - feature_views: Set[FeatureView] - on_demand_feature_views: Set[OnDemandFeatureView] - request_feature_views: Set[RequestFeatureView] - entities: Set[Entity] - feature_services: Set[FeatureService] - - def to_registry_proto(self) -> RegistryProto: - registry_proto = RegistryProto() - registry_proto.entities.extend([e.to_proto() for e in self.entities]) - registry_proto.feature_views.extend( - [fv.to_proto() for fv in self.feature_views] - ) - registry_proto.on_demand_feature_views.extend( - [fv.to_proto() for fv in self.on_demand_feature_views] - ) - registry_proto.request_feature_views.extend( - [fv.to_proto() for fv in self.request_feature_views] - ) - registry_proto.feature_services.extend( - [fs.to_proto() for fs in self.feature_services] - ) - return registry_proto - - class FeatureStore: """ A FeatureStore object is used to define, create, and retrieve features. @@ -415,7 +389,7 @@ def _get_features( @log_exceptions_and_usage def plan( - self, desired_repo_objects: RepoContents + self, desired_repo_contents: RepoContents ) -> Tuple[RegistryDiff, InfraDiff]: """Dry-run registering objects to metadata store. @@ -453,16 +427,8 @@ def plan( ... ) >>> registry_diff, infra_diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view """ - - current_registry_proto = ( - self._registry.cached_registry_proto.__deepcopy__() - if self._registry.cached_registry_proto - else RegistryProto() - ) - - desired_registry_proto = desired_repo_objects.to_registry_proto() - registry_diff = Registry.diff_between( - current_registry_proto, desired_registry_proto + registry_diff = diff_between( + self._registry, self.project, desired_repo_contents ) current_infra_proto = ( @@ -470,6 +436,7 @@ def plan( if self._registry.cached_registry_proto else InfraProto() ) + desired_registry_proto = desired_repo_contents.to_registry_proto() new_infra_proto = self._provider.plan_infra( self.config, desired_registry_proto ).to_proto() @@ -508,7 +475,7 @@ def apply( ] ] = None, partial: bool = True, - ) -> RegistryDiff: + ): """Register objects to metadata store and update related infrastructure. The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these @@ -544,7 +511,7 @@ def apply( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ # TODO: Add locking if not isinstance(objects, Iterable): @@ -554,12 +521,6 @@ def apply( if not objects_to_delete: objects_to_delete = [] - current_registry_proto = ( - self._registry.cached_registry_proto.__deepcopy__() - if self._registry.cached_registry_proto - else RegistryProto() - ) - # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] @@ -657,22 +618,6 @@ def apply( service.name, project=self.project, commit=False ) - new_registry_proto = ( - self._registry.cached_registry_proto - if self._registry.cached_registry_proto - else RegistryProto() - ) - - diffs = Registry.diff_between(current_registry_proto, new_registry_proto) - - entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] - views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] - - entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)] - views_to_delete = [ - ob for ob in objects_to_delete if isinstance(ob, FeatureView) - ] - self._get_provider().update_infra( project=self.project, tables_to_delete=views_to_delete if not partial else [], @@ -684,8 +629,6 @@ def apply( self._registry.commit() - return diffs - @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 0c058a0d46..f05abc6d9a 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -14,6 +14,7 @@ import logging from collections import defaultdict from datetime import datetime, timedelta +from enum import Enum from pathlib import Path from threading import Lock from typing import Any, Dict, List, Optional @@ -24,13 +25,6 @@ from proto import Message from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import ( - FcoDiff, - RegistryDiff, - TransitionType, - diff_between, - tag_proto_objects_for_keep_delete_add, -) from feast.entity import Entity from feast.errors import ( ConflictingFeatureViewNames, @@ -65,6 +59,15 @@ "": "LocalRegistryStore", } + +class FeastObjectType(Enum): + ENTITY = 0 + FEATURE_VIEW = 1 + ON_DEMAND_FEATURE_VIEW = 2 + REQUEST_FEATURE_VIEW = 3 + FEATURE_SERVICE = 4 + + logger = logging.getLogger(__name__) @@ -143,75 +146,6 @@ def clone(self) -> "Registry": new_registry._registry_store = NoopRegistryStore() return new_registry - # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. - @staticmethod - def diff_between( - current_registry: RegistryProto, new_registry: RegistryProto - ) -> RegistryDiff: - diff = RegistryDiff() - - attribute_to_object_type_str = { - "entities": "entity", - "feature_views": "feature view", - "feature_tables": "feature table", - "on_demand_feature_views": "on demand feature view", - "request_feature_views": "request feature view", - "feature_services": "feature service", - } - - for object_type in [ - "entities", - "feature_views", - "feature_tables", - "on_demand_feature_views", - "request_feature_views", - "feature_services", - ]: - ( - objects_to_keep, - objects_to_delete, - objects_to_add, - ) = tag_proto_objects_for_keep_delete_add( - getattr(current_registry, object_type), - getattr(new_registry, object_type), - ) - - for e in objects_to_add: - diff.add_fco_diff( - FcoDiff( - e.spec.name, - attribute_to_object_type_str[object_type], - None, - e, - [], - TransitionType.CREATE, - ) - ) - for e in objects_to_delete: - diff.add_fco_diff( - FcoDiff( - e.spec.name, - attribute_to_object_type_str[object_type], - e, - None, - [], - TransitionType.DELETE, - ) - ) - for e in objects_to_keep: - current_obj_proto = [ - _e - for _e in getattr(current_registry, object_type) - if _e.spec.name == e.spec.name - ][0] - diff.add_fco_diff( - diff_between( - current_obj_proto, e, attribute_to_object_type_str[object_type] - ) - ) - - return diff - def _initialize_registry(self): """Explicitly initializes the registry with an empty proto if it doesn't exist.""" try: diff --git a/sdk/python/feast/repo_contents.py b/sdk/python/feast/repo_contents.py new file mode 100644 index 0000000000..9190af11ee --- /dev/null +++ b/sdk/python/feast/repo_contents.py @@ -0,0 +1,50 @@ +# Copyright 2022 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import NamedTuple, Set + +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.request_feature_view import RequestFeatureView + + +class RepoContents(NamedTuple): + """ + Represents the objects in a Feast feature repo. + """ + + feature_views: Set[FeatureView] + on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] + entities: Set[Entity] + feature_services: Set[FeatureService] + + def to_registry_proto(self) -> RegistryProto: + registry_proto = RegistryProto() + registry_proto.entities.extend([e.to_proto() for e in self.entities]) + registry_proto.feature_views.extend( + [fv.to_proto() for fv in self.feature_views] + ) + registry_proto.on_demand_feature_views.extend( + [fv.to_proto() for fv in self.on_demand_feature_views] + ) + registry_proto.request_feature_views.extend( + [fv.to_proto() for fv in self.request_feature_views] + ) + registry_proto.feature_services.extend( + [fs.to_proto() for fs in self.feature_services] + ) + return registry_proto diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 0638ca589a..17d0530d4e 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,21 +6,25 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, Set, Union, cast +from typing import List, Set, Union import click from click.exceptions import BadParameter -from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add +from feast.diff.FcoDiff import ( + FEAST_OBJECT_TYPES, + extract_objects_for_keep_delete_update_add, +) +from feast.diff.property_diff import TransitionType from feast.entity import Entity from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore, RepoContents +from feast.feature_store import FeatureStore from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView -from feast.registry import Registry +from feast.registry import FeastObjectType, Registry from feast.repo_config import RepoConfig +from feast.repo_contents import RepoContents from feast.request_feature_view import RequestFeatureView from feast.usage import log_exceptions_and_usage @@ -160,81 +164,41 @@ def _prepare_registry_and_repo(repo_config, repo_path): def extract_objects_for_apply_delete(project, registry, repo): - ( - entities_to_keep, - entities_to_delete, - entities_to_add, - ) = tag_objects_for_keep_delete_add( - set(registry.list_entities(project=project)), repo.entities - ) # TODO(achals): This code path should be refactored to handle added & kept entities separately. - entities_to_keep = set(entities_to_keep).union(entities_to_add) - views = tag_objects_for_keep_delete_add( - set(registry.list_feature_views(project=project)), repo.feature_views - ) - views_to_keep, views_to_delete, views_to_add = ( - cast(Set[FeatureView], views[0]), - cast(Set[FeatureView], views[1]), - cast(Set[FeatureView], views[2]), - ) - request_views = tag_objects_for_keep_delete_add( - set(registry.list_request_feature_views(project=project)), - repo.request_feature_views, - ) - request_views_to_keep: Set[RequestFeatureView] - request_views_to_delete: Set[RequestFeatureView] - request_views_to_add: Set[RequestFeatureView] - request_views_to_keep, request_views_to_delete, request_views_to_add = ( - cast(Set[RequestFeatureView], request_views[0]), - cast(Set[RequestFeatureView], request_views[1]), - cast(Set[RequestFeatureView], request_views[2]), - ) - base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { - *views_to_keep, - *views_to_add, - *request_views_to_keep, - *request_views_to_add, - } - base_views_to_delete: Set[Union[RequestFeatureView, FeatureView]] = { - *views_to_delete, - *request_views_to_delete, - } - odfvs = tag_objects_for_keep_delete_add( - set(registry.list_on_demand_feature_views(project=project)), - repo.on_demand_feature_views, - ) - odfvs_to_keep, odfvs_to_delete, odfvs_to_add = ( - cast(Set[OnDemandFeatureView], odfvs[0]), - cast(Set[OnDemandFeatureView], odfvs[1]), - cast(Set[OnDemandFeatureView], odfvs[2]), - ) - odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) ( - services_to_keep, - services_to_delete, - services_to_add, - ) = tag_objects_for_keep_delete_add( - set(registry.list_feature_services(project=project)), repo.feature_services - ) - services_to_keep = services_to_keep.union(services_to_add) - sys.dont_write_bytecode = False - # Apply all changes to the registry and infrastructure. + _, + objs_to_delete, + objs_to_update, + objs_to_add, + ) = extract_objects_for_keep_delete_update_add(registry, project, repo) + all_to_apply: List[ - Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] + Union[ + Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + ] ] = [] - all_to_apply.extend(entities_to_keep) - all_to_apply.extend(base_views_to_keep) - all_to_apply.extend(services_to_keep) - all_to_apply.extend(odfvs_to_keep) + for object_type in FEAST_OBJECT_TYPES: + to_apply = set(objs_to_add[object_type]).union(objs_to_update[object_type]) + all_to_apply.extend(to_apply) + all_to_delete: List[ - Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] + Union[ + Entity, FeatureView, RequestFeatureView, OnDemandFeatureView, FeatureService + ] ] = [] - all_to_delete.extend(entities_to_delete) - all_to_delete.extend(base_views_to_delete) - all_to_delete.extend(services_to_delete) - all_to_delete.extend(odfvs_to_delete) + for object_type in FEAST_OBJECT_TYPES: + all_to_delete.extend(objs_to_delete[object_type]) - return all_to_apply, all_to_delete, views_to_delete, views_to_keep + return ( + all_to_apply, + all_to_delete, + set( + objs_to_add[FeastObjectType.FEATURE_VIEW].union( + objs_to_update[FeastObjectType.FEATURE_VIEW] + ) + ), + objs_to_delete[FeastObjectType.FEATURE_VIEW], + ) def apply_total_with_repo_instance( @@ -250,6 +214,8 @@ def apply_total_with_repo_instance( for data_source in data_sources: data_source.validate(store.config) + registry_diff, _ = store.plan(repo) + # For each object in the registry, determine whether it should be kept or deleted. ( all_to_apply, @@ -258,9 +224,9 @@ def apply_total_with_repo_instance( views_to_keep, ) = extract_objects_for_apply_delete(project, registry, repo) - diff = store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) - log_cli_output(diff, views_to_delete, views_to_keep) + log_cli_output(registry_diff, views_to_delete, views_to_keep) @log_exceptions_and_usage diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 63ee4fe7bc..f6f2b5d2bc 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -12,10 +12,11 @@ import pandas as pd import yaml -from feast import FeatureStore, FeatureView, RepoConfig, driver_test_data +from feast import FeatureStore, FeatureView, driver_test_data from feast.constants import FULL_REPO_CONFIGS_MODULE_ENV_NAME from feast.data_source import DataSource from feast.errors import FeastModuleImportError +from feast.repo_config import RegistryConfig, RepoConfig from tests.integration.feature_repos.integration_test_repo_config import ( IntegrationTestRepoConfig, ) @@ -286,7 +287,9 @@ def construct_test_environment( else: # Note: even if it's a local feature server, the repo config does not have this configured feature_server = None - registry = str(Path(repo_dir_name) / "registry.db") + registry = RegistryConfig( + path=str(Path(repo_dir_name) / "registry.db"), cache_ttl_seconds=1, + ) config = RepoConfig( registry=registry, diff --git a/sdk/python/tests/unit/diff/test_fco_diff.py b/sdk/python/tests/unit/diff/test_fco_diff.py index 802a6438c3..fa3c84d035 100644 --- a/sdk/python/tests/unit/diff/test_fco_diff.py +++ b/sdk/python/tests/unit/diff/test_fco_diff.py @@ -1,4 +1,8 @@ -from feast.diff.FcoDiff import diff_between, tag_proto_objects_for_keep_delete_add +from feast.diff.FcoDiff import ( + diff_registry_objects, + tag_objects_for_keep_delete_update_add, + tag_proto_objects_for_keep_delete_add, +) from feast.feature_view import FeatureView from tests.utils.data_source_utils import prep_file_source @@ -45,29 +49,75 @@ def test_tag_proto_objects_for_keep_delete_add(simple_dataset_1): assert to_add in add -def test_diff_between_feature_views(simple_dataset_1): +def test_tag_objects_for_keep_delete_update_add(simple_dataset_1): with prep_file_source( df=simple_dataset_1, event_timestamp_column="ts_1" ) as file_source: + to_delete = FeatureView( + name="to_delete", entities=["id"], batch_source=file_source, ttl=None, + ) + unchanged_fv = FeatureView( + name="fv1", entities=["id"], batch_source=file_source, ttl=None, + ) pre_changed = FeatureView( name="fv2", entities=["id"], batch_source=file_source, ttl=None, tags={"when": "before"}, - ).to_proto() + ) post_changed = FeatureView( name="fv2", entities=["id"], batch_source=file_source, ttl=None, tags={"when": "after"}, - ).to_proto() + ) + to_add = FeatureView( + name="to_add", entities=["id"], batch_source=file_source, ttl=None, + ) + + keep, delete, update, add = tag_objects_for_keep_delete_update_add( + [unchanged_fv, pre_changed, to_delete], [unchanged_fv, post_changed, to_add] + ) + + assert len(list(keep)) == 2 + assert unchanged_fv in keep + assert pre_changed in keep + assert post_changed not in keep + assert len(list(delete)) == 1 + assert to_delete in delete + assert len(list(update)) == 2 + assert unchanged_fv in update + assert post_changed in update + assert pre_changed not in update + assert len(list(add)) == 1 + assert to_add in add + + +def test_diff_registry_objects_feature_views(simple_dataset_1): + with prep_file_source( + df=simple_dataset_1, event_timestamp_column="ts_1" + ) as file_source: + pre_changed = FeatureView( + name="fv2", + entities=["id"], + batch_source=file_source, + ttl=None, + tags={"when": "before"}, + ) + post_changed = FeatureView( + name="fv2", + entities=["id"], + batch_source=file_source, + ttl=None, + tags={"when": "after"}, + ) - fco_diffs = diff_between(pre_changed, pre_changed, "feature view") + fco_diffs = diff_registry_objects(pre_changed, pre_changed, "feature view") assert len(fco_diffs.fco_property_diffs) == 0 - fco_diffs = diff_between(pre_changed, post_changed, "feature view") + fco_diffs = diff_registry_objects(pre_changed, post_changed, "feature view") assert len(fco_diffs.fco_property_diffs) == 1 assert fco_diffs.fco_property_diffs[0].property_name == "tags"