diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py new file mode 100644 index 0000000000..bb466c33e6 --- /dev/null +++ b/sdk/python/feast/diff/FcoDiff.py @@ -0,0 +1,76 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Any, Iterable, List, Set, Tuple, TypeVar + +from feast.base_feature_view import BaseFeatureView +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_table import FeatureTable +from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto + + +@dataclass +class PropertyDiff: + property_name: str + val_existing: str + val_declared: str + + +class TransitionType(Enum): + UNKNOWN = 0 + CREATE = 1 + DELETE = 2 + UPDATE = 3 + UNCHANGED = 4 + + +@dataclass +class FcoDiff: + current_fco: Any + new_fco: Any + fco_property_diffs: List[PropertyDiff] + transition_type: TransitionType + + +@dataclass +class RegistryDiff: + fco_diffs: List[FcoDiff] + + def __init__(self): + self.fco_diffs = [] + + def add_fco_diff(self, fco_diff: FcoDiff): + self.fco_diffs.append(fco_diff) + + +T = TypeVar("T", Entity, BaseFeatureView, FeatureService, FeatureTable) + + +def tag_objects_for_keep_delete_add( + existing_objs: Iterable[T], desired_objs: Iterable[T] +) -> Tuple[Set[T], Set[T], Set[T]]: + existing_obj_names = {e.name for e in existing_objs} + desired_obj_names = {e.name for e in desired_objs} + + objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names} + objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names} + objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names} + + return objs_to_keep, objs_to_delete, objs_to_add + + +U = TypeVar("U", EntityProto, FeatureViewProto) + + +def tag_proto_objects_for_keep_delete_add( + existing_objs: Iterable[U], desired_objs: Iterable[U] +) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]: + existing_obj_names = {e.spec.name for e in existing_objs} + desired_obj_names = {e.spec.name for e in desired_objs} + + objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names] + objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names] + objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names] + + return objs_to_keep, objs_to_delete, objs_to_add diff --git a/sdk/python/feast/diff/__init__.py b/sdk/python/feast/diff/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7a66bfb81d..7ab7817d61 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -38,6 +38,7 @@ from feast.feature_service import FeatureService from feast.feature_table import FeatureTable from feast.feature_view import ( + DUMMY_ENTITY, DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, DUMMY_ENTITY_VAL, @@ -61,7 +62,6 @@ from feast.request_feature_view import RequestFeatureView from feast.type_map import python_value_to_proto_value from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute -from feast.value_type import ValueType from feast.version import get_version warnings.simplefilter("once", DeprecationWarning) @@ -379,16 +379,18 @@ def apply( ] ], ], - objects_to_delete: List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, + objects_to_delete: Optional[ + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] ] - ] = [], + ] = None, partial: bool = True, ): """Register objects to metadata store and update related infrastructure. @@ -435,6 +437,9 @@ def apply( assert isinstance(objects, list) + if not objects_to_delete: + objects_to_delete = [] + # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] @@ -484,11 +489,6 @@ def apply( odfv.infer_features() # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. - DUMMY_ENTITY = Entity( - name=DUMMY_ENTITY_NAME, - join_key=DUMMY_ENTITY_ID, - value_type=ValueType.INT32, - ) entities_to_update.append(DUMMY_ENTITY) # Add all objects to the registry and update the provider's infrastructure. @@ -1560,7 +1560,9 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]): case_insensitive_fv_name = fv.name.lower() if case_insensitive_fv_name in fv_names: raise ValueError( - f"More than one feature view with name {case_insensitive_fv_name} found. Please ensure that all feature view names are case-insensitively unique. It may be necessary to ignore certain files in your feature repository by using a .feastignore file." + f"More than one feature view with name {case_insensitive_fv_name} found. " + f"Please ensure that all feature view names are case-insensitively unique. " + f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file." ) else: fv_names.add(case_insensitive_fv_name) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index ac8abefeb0..ee22ae1266 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -21,6 +21,7 @@ from feast import utils from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource +from feast.entity import Entity from feast.feature import Feature from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto @@ -42,6 +43,9 @@ DUMMY_ENTITY_ID = "__dummy_id" DUMMY_ENTITY_NAME = "__dummy" DUMMY_ENTITY_VAL = "" +DUMMY_ENTITY = Entity( + name=DUMMY_ENTITY_NAME, join_key=DUMMY_ENTITY_ID, value_type=ValueType.INT32, +) class FeatureView(BaseFeatureView): diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 7b523c9274..3a54568d45 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logging from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path @@ -24,6 +24,12 @@ from feast import importer from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import ( + FcoDiff, + RegistryDiff, + TransitionType, + tag_proto_objects_for_keep_delete_add, +) from feast.entity import Entity from feast.errors import ( ConflictingFeatureViewNames, @@ -57,6 +63,8 @@ "": "LocalRegistryStore", } +logger = logging.getLogger(__name__) + def get_registry_store_class_from_type(registry_store_type: str): if not registry_store_type.endswith("RegistryStore"): @@ -95,7 +103,9 @@ class Registry: cached_registry_proto_ttl: timedelta cache_being_updated: bool = False - def __init__(self, registry_config: RegistryConfig, repo_path: Path): + def __init__( + self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] + ): """ Create the Registry object. @@ -104,20 +114,50 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path): repo_path: Path to the base of the Feast repository or where it will be created if it does not exist yet. """ - registry_store_type = registry_config.registry_store_type - registry_path = registry_config.path - if registry_store_type is None: - cls = get_registry_store_class_from_scheme(registry_path) - else: - cls = get_registry_store_class_from_type(str(registry_store_type)) - self._registry_store = cls(registry_config, repo_path) - self.cached_registry_proto_ttl = timedelta( - seconds=registry_config.cache_ttl_seconds - if registry_config.cache_ttl_seconds is not None - else 0 + if registry_config: + registry_store_type = registry_config.registry_store_type + registry_path = registry_config.path + if registry_store_type is None: + cls = get_registry_store_class_from_scheme(registry_path) + else: + cls = get_registry_store_class_from_type(str(registry_store_type)) + + self._registry_store = cls(registry_config, repo_path) + self.cached_registry_proto_ttl = timedelta( + seconds=registry_config.cache_ttl_seconds + if registry_config.cache_ttl_seconds is not None + else 0 + ) + + # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. + @staticmethod + def diff_between( + current_registry: RegistryProto, new_registry: RegistryProto + ) -> RegistryDiff: + diff = RegistryDiff() + + # Handle Entities + ( + entities_to_keep, + entities_to_delete, + entities_to_add, + ) = tag_proto_objects_for_keep_delete_add( + current_registry.entities, new_registry.entities, ) + for e in entities_to_add: + diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE)) + for e in entities_to_delete: + diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE)) + + # Handle Feature Views + # Handle On Demand Feature Views + # Handle Request Feature Views + # Handle Feature Services + logger.info(f"Diff: {diff}") + return diff + def _initialize_registry(self): """Explicitly initializes the registry with an empty proto if it doesn't exist.""" try: @@ -752,6 +792,7 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) ) ) + if allow_cache and (not expired or self.cache_being_updated): assert isinstance(self.cached_registry_proto, RegistryProto) return self.cached_registry_proto diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 8d4ecba529..ef0953feb2 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,16 +6,18 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Tuple, Union, cast +from typing import List, NamedTuple, Set, Union, cast import click from click.exceptions import BadParameter -from feast import Entity, FeatureTable from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import tag_objects_for_keep_delete_add +from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore -from feast.feature_view import DUMMY_ENTITY_NAME, FeatureView +from feast.feature_table import FeatureTable +from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry @@ -122,6 +124,7 @@ def parse_repo(repo_root: Path) -> ParsedRepo: res.on_demand_feature_views.add(obj) elif isinstance(obj, RequestFeatureView): res.request_feature_views.add(obj) + res.entities.add(DUMMY_ENTITY) return res @@ -150,22 +153,77 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation data_source.validate(store.config) # For each object in the registry, determine whether it should be kept or deleted. - entities_to_keep, entities_to_delete = _tag_registry_entities_for_keep_delete( - project, registry, repo + ( + entities_to_keep, + entities_to_delete, + entities_to_add, + ) = tag_objects_for_keep_delete_add( + set(registry.list_entities(project=project)), repo.entities + ) + # TODO(achals): This code path should be refactored to handle added & kept entities separately. + entities_to_keep = set(entities_to_keep).union(entities_to_add) + + views = tag_objects_for_keep_delete_add( + set(registry.list_feature_views(project=project)), repo.feature_views + ) + views_to_keep, views_to_delete, views_to_add = ( + cast(Set[FeatureView], views[0]), + cast(Set[FeatureView], views[1]), + cast(Set[FeatureView], views[2]), + ) + + request_views = tag_objects_for_keep_delete_add( + set(registry.list_request_feature_views(project=project)), + repo.request_feature_views, + ) + request_views_to_keep: Set[RequestFeatureView] + request_views_to_delete: Set[RequestFeatureView] + request_views_to_add: Set[RequestFeatureView] + request_views_to_keep, request_views_to_delete, request_views_to_add = ( + cast(Set[RequestFeatureView], request_views[0]), + cast(Set[RequestFeatureView], request_views[1]), + cast(Set[RequestFeatureView], request_views[2]), + ) + + base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { + *views_to_keep, + *views_to_add, + *request_views_to_keep, + *request_views_to_add, + } + base_views_to_delete: Set[Union[RequestFeatureView, FeatureView]] = { + *views_to_delete, + *request_views_to_delete, + } + + odfvs = tag_objects_for_keep_delete_add( + set(registry.list_on_demand_feature_views(project=project)), + repo.on_demand_feature_views, ) - views_to_keep, views_to_delete = _tag_registry_views_for_keep_delete( - project, registry, repo + odfvs_to_keep, odfvs_to_delete, odfvs_to_add = ( + cast(Set[OnDemandFeatureView], odfvs[0]), + cast(Set[OnDemandFeatureView], odfvs[1]), + cast(Set[OnDemandFeatureView], odfvs[2]), ) + odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) + ( - odfvs_to_keep, - odfvs_to_delete, - ) = _tag_registry_on_demand_feature_views_for_keep_delete(project, registry, repo) - tables_to_keep, tables_to_delete = _tag_registry_tables_for_keep_delete( - project, registry, repo + tables_to_keep, + tables_to_delete, + tables_to_add, + ) = tag_objects_for_keep_delete_add( + set(registry.list_feature_tables(project=project)), repo.feature_tables ) - services_to_keep, services_to_delete = _tag_registry_services_for_keep_delete( - project, registry, repo + tables_to_keep = tables_to_keep.union(tables_to_add) + + ( + services_to_keep, + services_to_delete, + services_to_add, + ) = tag_objects_for_keep_delete_add( + set(registry.list_feature_services(project=project)), repo.feature_services ) + services_to_keep = services_to_keep.union(services_to_add) sys.dont_write_bytecode = False @@ -176,7 +234,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] ] = [] all_to_apply.extend(entities_to_keep) - all_to_apply.extend(views_to_keep) + all_to_apply.extend(base_views_to_keep) all_to_apply.extend(services_to_keep) all_to_apply.extend(odfvs_to_keep) all_to_apply.extend(tables_to_keep) @@ -186,7 +244,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] ] = [] all_to_delete.extend(entities_to_delete) - all_to_delete.extend(views_to_delete) + all_to_delete.extend(base_views_to_delete) all_to_delete.extend(services_to_delete) all_to_delete.extend(odfvs_to_delete) all_to_delete.extend(tables_to_delete) @@ -197,7 +255,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Deleted entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL} from registry" ) - for view in views_to_delete: + for view in base_views_to_delete: click.echo( f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL} from registry" ) @@ -216,10 +274,11 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) for entity in entities_to_keep: - click.echo( - f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" - ) - for view in views_to_keep: + if entity.name != DUMMY_ENTITY_NAME: + click.echo( + f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" + ) + for view in base_views_to_keep: click.echo( f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" ) @@ -258,74 +317,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation # TODO: consider echoing also entities being deployed/removed -def _tag_registry_entities_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[Entity], Set[Entity]]: - entities_to_keep: Set[Entity] = repo.entities - entities_to_delete: Set[Entity] = set() - repo_entities_names = set([e.name for e in repo.entities]) - for registry_entity in registry.list_entities(project=project): - # Do not delete dummy entity. - if ( - registry_entity.name not in repo_entities_names - and registry_entity.name != DUMMY_ENTITY_NAME - ): - entities_to_delete.add(registry_entity) - return entities_to_keep, entities_to_delete - - -def _tag_registry_views_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[BaseFeatureView], Set[BaseFeatureView]]: - views_to_keep: Set[BaseFeatureView] = cast(Set[BaseFeatureView], repo.feature_views) - for request_fv in repo.request_feature_views: - views_to_keep.add(request_fv) - views_to_delete: Set[BaseFeatureView] = set() - repo_feature_view_names = set(t.name for t in repo.feature_views) - for registry_view in registry.list_feature_views(project=project): - if registry_view.name not in repo_feature_view_names: - views_to_delete.add(registry_view) - return views_to_keep, views_to_delete - - -def _tag_registry_on_demand_feature_views_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[OnDemandFeatureView], Set[OnDemandFeatureView]]: - odfvs_to_keep: Set[OnDemandFeatureView] = repo.on_demand_feature_views - odfvs_to_delete: Set[OnDemandFeatureView] = set() - repo_on_demand_feature_view_names = set( - t.name for t in repo.on_demand_feature_views - ) - for registry_odfv in registry.list_on_demand_feature_views(project=project): - if registry_odfv.name not in repo_on_demand_feature_view_names: - odfvs_to_delete.add(registry_odfv) - return odfvs_to_keep, odfvs_to_delete - - -def _tag_registry_tables_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[FeatureTable], Set[FeatureTable]]: - tables_to_keep: Set[FeatureTable] = repo.feature_tables - tables_to_delete: Set[FeatureTable] = set() - repo_table_names = set(t.name for t in repo.feature_tables) - for registry_table in registry.list_feature_tables(project=project): - if registry_table.name not in repo_table_names: - tables_to_delete.add(registry_table) - return tables_to_keep, tables_to_delete - - -def _tag_registry_services_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[FeatureService], Set[FeatureService]]: - services_to_keep: Set[FeatureService] = repo.feature_services - services_to_delete: Set[FeatureService] = set() - repo_feature_service_names = set(t.name for t in repo.feature_services) - for registry_service in registry.list_feature_services(project=project): - if registry_service.name not in repo_feature_service_names: - services_to_delete.add(registry_service) - return services_to_keep, services_to_delete - - @log_exceptions_and_usage def teardown(repo_config: RepoConfig, repo_path: Path): # Cannot pass in both repo_path and repo_config to FeatureStore.