From be4b466c9758ac602a4d0cd5ab17605c26cb79e4 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 14 Dec 2021 14:21:21 -0800 Subject: [PATCH 01/12] Bump log4j-api from 2.15.0 to 2.16.0 in /java (#2145) Bumps log4j-api from 2.15.0 to 2.16.0. --- updated-dependencies: - dependency-name: org.apache.logging.log4j:log4j-api dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 4b687f4f39..4b63f5389b 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -59,7 +59,7 @@ 0.26.0 - 2.15.0 + 2.16.0 2.9.9 2.0.2 2.5.0.RELEASE From 32a4cdb002ffaff427e1881847d363ef4d82b046 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 14 Dec 2021 16:30:21 -0800 Subject: [PATCH 02/12] Refactor tag methods to infer created, deleted, and kept repo objects (#2142) * Refactor tag methods to infer objects that are created, deleted, and kept Signed-off-by: Achal Shah * Fixes Signed-off-by: Achal Shah * Fixes Signed-off-by: Achal Shah * True Fixes Signed-off-by: Achal Shah * Use the same tag method Signed-off-by: Achal Shah * CR updates Signed-off-by: Achal Shah * CR updates Signed-off-by: Achal Shah --- sdk/python/feast/diff/FcoDiff.py | 76 +++++++++++++ sdk/python/feast/diff/__init__.py | 0 sdk/python/feast/feature_store.py | 34 +++--- sdk/python/feast/feature_view.py | 4 + sdk/python/feast/registry.py | 67 ++++++++--- sdk/python/feast/repo_operations.py | 169 +++++++++++++--------------- 6 files changed, 232 insertions(+), 118 deletions(-) create mode 100644 sdk/python/feast/diff/FcoDiff.py create mode 100644 sdk/python/feast/diff/__init__.py diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py new file mode 100644 index 0000000000..bb466c33e6 --- /dev/null +++ b/sdk/python/feast/diff/FcoDiff.py @@ -0,0 +1,76 @@ +from dataclasses import dataclass +from enum import Enum +from typing import Any, Iterable, List, Set, Tuple, TypeVar + +from feast.base_feature_view import BaseFeatureView +from feast.entity import Entity +from feast.feature_service import FeatureService +from feast.feature_table import FeatureTable +from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto + + +@dataclass +class PropertyDiff: + property_name: str + val_existing: str + val_declared: str + + +class TransitionType(Enum): + UNKNOWN = 0 + CREATE = 1 + DELETE = 2 + UPDATE = 3 + UNCHANGED = 4 + + +@dataclass +class FcoDiff: + current_fco: Any + new_fco: Any + fco_property_diffs: List[PropertyDiff] + transition_type: TransitionType + + +@dataclass +class RegistryDiff: + fco_diffs: List[FcoDiff] + + def __init__(self): + self.fco_diffs = [] + + def add_fco_diff(self, fco_diff: FcoDiff): + self.fco_diffs.append(fco_diff) + + +T = TypeVar("T", Entity, BaseFeatureView, FeatureService, FeatureTable) + + +def tag_objects_for_keep_delete_add( + existing_objs: Iterable[T], desired_objs: Iterable[T] +) -> Tuple[Set[T], Set[T], Set[T]]: + existing_obj_names = {e.name for e in existing_objs} + desired_obj_names = {e.name for e in desired_objs} + + objs_to_add = {e for e in desired_objs if e.name not in existing_obj_names} + objs_to_keep = {e for e in desired_objs if e.name in existing_obj_names} + objs_to_delete = {e for e in existing_objs if e.name not in desired_obj_names} + + return objs_to_keep, objs_to_delete, objs_to_add + + +U = TypeVar("U", EntityProto, FeatureViewProto) + + +def tag_proto_objects_for_keep_delete_add( + existing_objs: Iterable[U], desired_objs: Iterable[U] +) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]: + existing_obj_names = {e.spec.name for e in existing_objs} + desired_obj_names = {e.spec.name for e in desired_objs} + + objs_to_add = [e for e in desired_objs if e.spec.name not in existing_obj_names] + objs_to_keep = [e for e in desired_objs if e.spec.name in existing_obj_names] + objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names] + + return objs_to_keep, objs_to_delete, objs_to_add diff --git a/sdk/python/feast/diff/__init__.py b/sdk/python/feast/diff/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7a66bfb81d..7ab7817d61 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -38,6 +38,7 @@ from feast.feature_service import FeatureService from feast.feature_table import FeatureTable from feast.feature_view import ( + DUMMY_ENTITY, DUMMY_ENTITY_ID, DUMMY_ENTITY_NAME, DUMMY_ENTITY_VAL, @@ -61,7 +62,6 @@ from feast.request_feature_view import RequestFeatureView from feast.type_map import python_value_to_proto_value from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute -from feast.value_type import ValueType from feast.version import get_version warnings.simplefilter("once", DeprecationWarning) @@ -379,16 +379,18 @@ def apply( ] ], ], - objects_to_delete: List[ - Union[ - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - Entity, - FeatureService, - FeatureTable, + objects_to_delete: Optional[ + List[ + Union[ + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + Entity, + FeatureService, + FeatureTable, + ] ] - ] = [], + ] = None, partial: bool = True, ): """Register objects to metadata store and update related infrastructure. @@ -435,6 +437,9 @@ def apply( assert isinstance(objects, list) + if not objects_to_delete: + objects_to_delete = [] + # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] @@ -484,11 +489,6 @@ def apply( odfv.infer_features() # Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity. - DUMMY_ENTITY = Entity( - name=DUMMY_ENTITY_NAME, - join_key=DUMMY_ENTITY_ID, - value_type=ValueType.INT32, - ) entities_to_update.append(DUMMY_ENTITY) # Add all objects to the registry and update the provider's infrastructure. @@ -1560,7 +1560,9 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]): case_insensitive_fv_name = fv.name.lower() if case_insensitive_fv_name in fv_names: raise ValueError( - f"More than one feature view with name {case_insensitive_fv_name} found. Please ensure that all feature view names are case-insensitively unique. It may be necessary to ignore certain files in your feature repository by using a .feastignore file." + f"More than one feature view with name {case_insensitive_fv_name} found. " + f"Please ensure that all feature view names are case-insensitively unique. " + f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file." ) else: fv_names.add(case_insensitive_fv_name) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index ac8abefeb0..ee22ae1266 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -21,6 +21,7 @@ from feast import utils from feast.base_feature_view import BaseFeatureView from feast.data_source import DataSource +from feast.entity import Entity from feast.feature import Feature from feast.feature_view_projection import FeatureViewProjection from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto @@ -42,6 +43,9 @@ DUMMY_ENTITY_ID = "__dummy_id" DUMMY_ENTITY_NAME = "__dummy" DUMMY_ENTITY_VAL = "" +DUMMY_ENTITY = Entity( + name=DUMMY_ENTITY_NAME, join_key=DUMMY_ENTITY_ID, value_type=ValueType.INT32, +) class FeatureView(BaseFeatureView): diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 7b523c9274..3a54568d45 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logging from collections import defaultdict from datetime import datetime, timedelta from pathlib import Path @@ -24,6 +24,12 @@ from feast import importer from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import ( + FcoDiff, + RegistryDiff, + TransitionType, + tag_proto_objects_for_keep_delete_add, +) from feast.entity import Entity from feast.errors import ( ConflictingFeatureViewNames, @@ -57,6 +63,8 @@ "": "LocalRegistryStore", } +logger = logging.getLogger(__name__) + def get_registry_store_class_from_type(registry_store_type: str): if not registry_store_type.endswith("RegistryStore"): @@ -95,7 +103,9 @@ class Registry: cached_registry_proto_ttl: timedelta cache_being_updated: bool = False - def __init__(self, registry_config: RegistryConfig, repo_path: Path): + def __init__( + self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] + ): """ Create the Registry object. @@ -104,20 +114,50 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path): repo_path: Path to the base of the Feast repository or where it will be created if it does not exist yet. """ - registry_store_type = registry_config.registry_store_type - registry_path = registry_config.path - if registry_store_type is None: - cls = get_registry_store_class_from_scheme(registry_path) - else: - cls = get_registry_store_class_from_type(str(registry_store_type)) - self._registry_store = cls(registry_config, repo_path) - self.cached_registry_proto_ttl = timedelta( - seconds=registry_config.cache_ttl_seconds - if registry_config.cache_ttl_seconds is not None - else 0 + if registry_config: + registry_store_type = registry_config.registry_store_type + registry_path = registry_config.path + if registry_store_type is None: + cls = get_registry_store_class_from_scheme(registry_path) + else: + cls = get_registry_store_class_from_type(str(registry_store_type)) + + self._registry_store = cls(registry_config, repo_path) + self.cached_registry_proto_ttl = timedelta( + seconds=registry_config.cache_ttl_seconds + if registry_config.cache_ttl_seconds is not None + else 0 + ) + + # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. + @staticmethod + def diff_between( + current_registry: RegistryProto, new_registry: RegistryProto + ) -> RegistryDiff: + diff = RegistryDiff() + + # Handle Entities + ( + entities_to_keep, + entities_to_delete, + entities_to_add, + ) = tag_proto_objects_for_keep_delete_add( + current_registry.entities, new_registry.entities, ) + for e in entities_to_add: + diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE)) + for e in entities_to_delete: + diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE)) + + # Handle Feature Views + # Handle On Demand Feature Views + # Handle Request Feature Views + # Handle Feature Services + logger.info(f"Diff: {diff}") + return diff + def _initialize_registry(self): """Explicitly initializes the registry with an empty proto if it doesn't exist.""" try: @@ -752,6 +792,7 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) ) ) + if allow_cache and (not expired or self.cache_being_updated): assert isinstance(self.cached_registry_proto, RegistryProto) return self.cached_registry_proto diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 8d4ecba529..ef0953feb2 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,16 +6,18 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Tuple, Union, cast +from typing import List, NamedTuple, Set, Union, cast import click from click.exceptions import BadParameter -from feast import Entity, FeatureTable from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import tag_objects_for_keep_delete_add +from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore -from feast.feature_view import DUMMY_ENTITY_NAME, FeatureView +from feast.feature_table import FeatureTable +from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView from feast.registry import Registry @@ -122,6 +124,7 @@ def parse_repo(repo_root: Path) -> ParsedRepo: res.on_demand_feature_views.add(obj) elif isinstance(obj, RequestFeatureView): res.request_feature_views.add(obj) + res.entities.add(DUMMY_ENTITY) return res @@ -150,22 +153,77 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation data_source.validate(store.config) # For each object in the registry, determine whether it should be kept or deleted. - entities_to_keep, entities_to_delete = _tag_registry_entities_for_keep_delete( - project, registry, repo + ( + entities_to_keep, + entities_to_delete, + entities_to_add, + ) = tag_objects_for_keep_delete_add( + set(registry.list_entities(project=project)), repo.entities + ) + # TODO(achals): This code path should be refactored to handle added & kept entities separately. + entities_to_keep = set(entities_to_keep).union(entities_to_add) + + views = tag_objects_for_keep_delete_add( + set(registry.list_feature_views(project=project)), repo.feature_views + ) + views_to_keep, views_to_delete, views_to_add = ( + cast(Set[FeatureView], views[0]), + cast(Set[FeatureView], views[1]), + cast(Set[FeatureView], views[2]), + ) + + request_views = tag_objects_for_keep_delete_add( + set(registry.list_request_feature_views(project=project)), + repo.request_feature_views, + ) + request_views_to_keep: Set[RequestFeatureView] + request_views_to_delete: Set[RequestFeatureView] + request_views_to_add: Set[RequestFeatureView] + request_views_to_keep, request_views_to_delete, request_views_to_add = ( + cast(Set[RequestFeatureView], request_views[0]), + cast(Set[RequestFeatureView], request_views[1]), + cast(Set[RequestFeatureView], request_views[2]), + ) + + base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { + *views_to_keep, + *views_to_add, + *request_views_to_keep, + *request_views_to_add, + } + base_views_to_delete: Set[Union[RequestFeatureView, FeatureView]] = { + *views_to_delete, + *request_views_to_delete, + } + + odfvs = tag_objects_for_keep_delete_add( + set(registry.list_on_demand_feature_views(project=project)), + repo.on_demand_feature_views, ) - views_to_keep, views_to_delete = _tag_registry_views_for_keep_delete( - project, registry, repo + odfvs_to_keep, odfvs_to_delete, odfvs_to_add = ( + cast(Set[OnDemandFeatureView], odfvs[0]), + cast(Set[OnDemandFeatureView], odfvs[1]), + cast(Set[OnDemandFeatureView], odfvs[2]), ) + odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) + ( - odfvs_to_keep, - odfvs_to_delete, - ) = _tag_registry_on_demand_feature_views_for_keep_delete(project, registry, repo) - tables_to_keep, tables_to_delete = _tag_registry_tables_for_keep_delete( - project, registry, repo + tables_to_keep, + tables_to_delete, + tables_to_add, + ) = tag_objects_for_keep_delete_add( + set(registry.list_feature_tables(project=project)), repo.feature_tables ) - services_to_keep, services_to_delete = _tag_registry_services_for_keep_delete( - project, registry, repo + tables_to_keep = tables_to_keep.union(tables_to_add) + + ( + services_to_keep, + services_to_delete, + services_to_add, + ) = tag_objects_for_keep_delete_add( + set(registry.list_feature_services(project=project)), repo.feature_services ) + services_to_keep = services_to_keep.union(services_to_add) sys.dont_write_bytecode = False @@ -176,7 +234,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] ] = [] all_to_apply.extend(entities_to_keep) - all_to_apply.extend(views_to_keep) + all_to_apply.extend(base_views_to_keep) all_to_apply.extend(services_to_keep) all_to_apply.extend(odfvs_to_keep) all_to_apply.extend(tables_to_keep) @@ -186,7 +244,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] ] = [] all_to_delete.extend(entities_to_delete) - all_to_delete.extend(views_to_delete) + all_to_delete.extend(base_views_to_delete) all_to_delete.extend(services_to_delete) all_to_delete.extend(odfvs_to_delete) all_to_delete.extend(tables_to_delete) @@ -197,7 +255,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Deleted entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL} from registry" ) - for view in views_to_delete: + for view in base_views_to_delete: click.echo( f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL} from registry" ) @@ -216,10 +274,11 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) for entity in entities_to_keep: - click.echo( - f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" - ) - for view in views_to_keep: + if entity.name != DUMMY_ENTITY_NAME: + click.echo( + f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" + ) + for view in base_views_to_keep: click.echo( f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" ) @@ -258,74 +317,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation # TODO: consider echoing also entities being deployed/removed -def _tag_registry_entities_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[Entity], Set[Entity]]: - entities_to_keep: Set[Entity] = repo.entities - entities_to_delete: Set[Entity] = set() - repo_entities_names = set([e.name for e in repo.entities]) - for registry_entity in registry.list_entities(project=project): - # Do not delete dummy entity. - if ( - registry_entity.name not in repo_entities_names - and registry_entity.name != DUMMY_ENTITY_NAME - ): - entities_to_delete.add(registry_entity) - return entities_to_keep, entities_to_delete - - -def _tag_registry_views_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[BaseFeatureView], Set[BaseFeatureView]]: - views_to_keep: Set[BaseFeatureView] = cast(Set[BaseFeatureView], repo.feature_views) - for request_fv in repo.request_feature_views: - views_to_keep.add(request_fv) - views_to_delete: Set[BaseFeatureView] = set() - repo_feature_view_names = set(t.name for t in repo.feature_views) - for registry_view in registry.list_feature_views(project=project): - if registry_view.name not in repo_feature_view_names: - views_to_delete.add(registry_view) - return views_to_keep, views_to_delete - - -def _tag_registry_on_demand_feature_views_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[OnDemandFeatureView], Set[OnDemandFeatureView]]: - odfvs_to_keep: Set[OnDemandFeatureView] = repo.on_demand_feature_views - odfvs_to_delete: Set[OnDemandFeatureView] = set() - repo_on_demand_feature_view_names = set( - t.name for t in repo.on_demand_feature_views - ) - for registry_odfv in registry.list_on_demand_feature_views(project=project): - if registry_odfv.name not in repo_on_demand_feature_view_names: - odfvs_to_delete.add(registry_odfv) - return odfvs_to_keep, odfvs_to_delete - - -def _tag_registry_tables_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[FeatureTable], Set[FeatureTable]]: - tables_to_keep: Set[FeatureTable] = repo.feature_tables - tables_to_delete: Set[FeatureTable] = set() - repo_table_names = set(t.name for t in repo.feature_tables) - for registry_table in registry.list_feature_tables(project=project): - if registry_table.name not in repo_table_names: - tables_to_delete.add(registry_table) - return tables_to_keep, tables_to_delete - - -def _tag_registry_services_for_keep_delete( - project: str, registry: Registry, repo: ParsedRepo -) -> Tuple[Set[FeatureService], Set[FeatureService]]: - services_to_keep: Set[FeatureService] = repo.feature_services - services_to_delete: Set[FeatureService] = set() - repo_feature_service_names = set(t.name for t in repo.feature_services) - for registry_service in registry.list_feature_services(project=project): - if registry_service.name not in repo_feature_service_names: - services_to_delete.add(registry_service) - return services_to_keep, services_to_delete - - @log_exceptions_and_usage def teardown(repo_config: RepoConfig, repo_path: Path): # Cannot pass in both repo_path and repo_config to FeatureStore. From 15fcb400ab970243d2bd59528abc50fe4d59e8e2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 14 Dec 2021 16:31:27 -0800 Subject: [PATCH 03/12] Bump log4j-core from 2.15.0 to 2.16.0 in /java (#2146) Bumps log4j-core from 2.15.0 to 2.16.0. --- updated-dependencies: - dependency-name: org.apache.logging.log4j:log4j-core dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> From ec4165396f70ab20b42246b093f777dfcc9f5277 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 15 Dec 2021 17:03:25 +0000 Subject: [PATCH 04/12] Remove untested and undocumented interfaces (#2084) * Remove `FeatureTable` class from Python SDK Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Remove `FeatureRef` class from Python SDK Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> * Remove out of date concepts documentation Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- docs/concepts/architecture.md | 33 -- docs/concepts/entities.md | 2 - docs/concepts/feature-tables.md | 122 ----- docs/concepts/feature-views.md | 123 ------ docs/concepts/glossary.md | 36 -- docs/concepts/sources.md | 2 - docs/concepts/stores.md | 26 -- sdk/python/feast/__init__.py | 2 - sdk/python/feast/data_source.py | 2 +- sdk/python/feast/diff/FcoDiff.py | 3 +- sdk/python/feast/errors.py | 10 - sdk/python/feast/feature.py | 84 +--- sdk/python/feast/feature_service.py | 9 +- sdk/python/feast/feature_store.py | 34 +- sdk/python/feast/feature_table.py | 415 ------------------ sdk/python/feast/infra/aws.py | 12 +- sdk/python/feast/infra/local.py | 4 +- .../feast/infra/online_stores/datastore.py | 16 +- .../feast/infra/online_stores/dynamodb.py | 14 +- .../feast/infra/online_stores/online_store.py | 18 +- sdk/python/feast/infra/online_stores/redis.py | 16 +- .../feast/infra/online_stores/sqlite.py | 16 +- .../feast/infra/passthrough_provider.py | 14 +- sdk/python/feast/infra/provider.py | 16 +- sdk/python/feast/registry.py | 104 ----- sdk/python/feast/repo_operations.py | 41 +- sdk/python/tests/foo_provider.py | 15 +- 27 files changed, 76 insertions(+), 1113 deletions(-) delete mode 100644 docs/concepts/architecture.md delete mode 100644 docs/concepts/entities.md delete mode 100644 docs/concepts/feature-tables.md delete mode 100644 docs/concepts/feature-views.md delete mode 100644 docs/concepts/glossary.md delete mode 100644 docs/concepts/sources.md delete mode 100644 docs/concepts/stores.md delete mode 100644 sdk/python/feast/feature_table.py diff --git a/docs/concepts/architecture.md b/docs/concepts/architecture.md deleted file mode 100644 index 568ac1aa7d..0000000000 --- a/docs/concepts/architecture.md +++ /dev/null @@ -1,33 +0,0 @@ -# Architecture - -![Feast 0.10 Architecture Diagram](../.gitbook/assets/image%20%284%29.png) - -### Functionality - -* **Create Batch Features:** ELT/ETL systems like Spark and SQL are used to transform data in the batch store. -* **Feast Apply:** The user \(or CI\) publishes versioned controlled feature definitions using `feast apply`. This CLI command updates infrastructure and persists definitions in the object store registry. -* **Feast Materialize:** The user \(or scheduler\) executes `feast materialize` which loads features from the offline store into the online store. -* **Model Training:** A model training pipeline is launched. It uses the Feast Python SDK to retrieve a training dataset and trains a model. -* **Get Historical Features:** Feast exports a point-in-time correct training dataset based on the list of features and entity dataframe provided by the model training pipeline. -* **Deploy Model:** The trained model binary \(and list of features\) are deployed into a model serving system. This step is not executed by Feast. -* **Prediction:** A backend system makes a request for a prediction from the model serving service. -* **Get Online Features:** The model serving service makes a request to the Feast Online Serving service for online features using a Feast SDK. - -### Components - -A complete Feast deployment contains the following components: - -* **Feast Online Serving:** Provides low-latency access to feature values stores in the online store. This component is optional. Teams can also read feature values directly from the online store if necessary. -* **Feast Registry**: An object store \(GCS, S3\) based registry used to persist feature definitions that are registered with the feature store. Systems can discover feature data by interacting with the registry through the Feast SDK. -* **Feast Python SDK/CLI:** The primary user facing SDK. Used to: - * Manage version controlled feature definitions. - * Materialize \(load\) feature values into the online store. - * Build and retrieve training datasets from the offline store. - * Retrieve online features. -* **Online Store:** The online store is a database that stores only the latest feature values for each entity. The online store is populated by materialization jobs. -* **Offline Store:** The offline store persists batch data that has been ingested into Feast. This data is used for producing training datasets. Feast does not manage the offline store directly, but runs queries against it. - -{% hint style="info" %} -Java and Go Clients are also available for online feature retrieval. See [API Reference](../feast-on-kubernetes/reference-1/api/). -{% endhint %} - diff --git a/docs/concepts/entities.md b/docs/concepts/entities.md deleted file mode 100644 index dadeac1cac..0000000000 --- a/docs/concepts/entities.md +++ /dev/null @@ -1,2 +0,0 @@ -# Entities - diff --git a/docs/concepts/feature-tables.md b/docs/concepts/feature-tables.md deleted file mode 100644 index a27e0c000b..0000000000 --- a/docs/concepts/feature-tables.md +++ /dev/null @@ -1,122 +0,0 @@ -# Feature Tables - -## Overview - -Feature tables are both a schema and a logical means of grouping features, data [sources](sources.md), and other related metadata. - -Feature tables serve the following purposes: - -* Feature tables are a means for defining the location and properties of data [sources](sources.md). -* Feature tables are used to create within Feast a database-level structure for the storage of feature values. -* The data sources described within feature tables allow Feast to find and ingest feature data into stores within Feast. -* Feature tables ensure data is efficiently stored during [ingestion](../user-guide/define-and-ingest-features.md) by providing a grouping mechanism of features values that occur on the same event timestamp. - -{% hint style="info" %} -Feast does not yet apply feature transformations. Transformations are currently expected to happen before data is ingested into Feast. The data sources described within feature tables should reference feature values in their already transformed form. -{% endhint %} - -### Features - -A feature is an individual measurable property observed on an entity. For example the amount of transactions \(feature\) a customer \(entity\) has completed. Features are used for both model training and scoring \(batch, online\). - -Features are defined as part of feature tables. Since Feast does not apply transformations, a feature is basically a schema that only contains a name and a type: - -```python -avg_daily_ride = Feature("average_daily_rides", ValueType.FLOAT) -``` - -Visit [FeatureSpec](https://api.docs.feast.dev/grpc/feast.core.pb.html#FeatureSpecV2) for the complete feature specification API. - -## Structure of a Feature Table - -Feature tables contain the following fields: - -* **Name:** Name of feature table. This name must be unique within a project. -* **Entities:** List of [entities](entities.md) to associate with the features defined in this feature table. Entities are used as lookup keys when retrieving features from a feature table. -* **Features:** List of features within a feature table. -* **Labels:** Labels are arbitrary key-value properties that can be defined by users. -* **Max age:** Max age affect the retrieval of features from a feature table. Age is measured as the duration of time between the event timestamp of a feature and the lookup time on an [entity key](glossary.md#entity-key) used to retrieve the feature. Feature values outside max age will be returned as unset values. Max age allows for eviction of keys from online stores and limits the amount of historical scanning required for historical feature values during retrieval. -* **Batch Source:** The batch data source from which Feast will ingest feature values into stores. This can either be used to back-fill stores before switching over to a streaming source, or it can be used as the primary source of data for a feature table. Visit [Sources](sources.md) to learn more about batch sources. -* **Stream Source:** The streaming data source from which you can ingest streaming feature values into Feast. Streaming sources must be paired with a batch source containing the same feature values. A streaming source is only used to populate online stores. The batch equivalent source that is paired with a streaming source is used during the generation of historical feature datasets. Visit [Sources](sources.md) to learn more about stream sources. - -Here is a ride-hailing example of a valid feature table specification: - -{% tabs %} -{% tab title="driver\_trips\_feature\_table.py" %} -```python -from feast import BigQuerySource, FeatureTable, Feature, ValueType -from google.protobuf.duration_pb2 import Duration - -driver_ft = FeatureTable( - name="driver_trips", - entities=["driver_id"], - features=[ - Feature("average_daily_rides", ValueType.FLOAT), - Feature("rating", ValueType.FLOAT) - ], - max_age=Duration(seconds=3600), - labels={ - "team": "driver_matching" - }, - batch_source=BigQuerySource( - table_ref="gcp_project:bq_dataset.bq_table", - event_timestamp_column="datetime", - created_timestamp_column="timestamp", - field_mapping={ - "rating": "driver_rating" - } - ) -) -``` -{% endtab %} -{% endtabs %} - -By default, Feast assumes that features specified in the feature-table specification corresponds one-to-one to the fields found in the sources. All features defined in a feature table should be available in the defined sources. - -Field mappings can be used to map features defined in Feast to fields as they occur in data sources. - -In the example feature-specification table above, we use field mappings to ensure the feature named `rating` in the batch source is mapped to the field named `driver_rating`. - -## Working with a Feature Table - -#### Creating a Feature Table - -```python -driver_ft = FeatureTable(...) -client.apply(driver_ft) -``` - -#### Updating a Feature Table - -```python -driver_ft = FeatureTable() - -client.apply(driver_ft) - -driver_ft.labels = {"team": "marketplace"} - -client.apply(driver_ft) -``` - -#### Feast currently supports the following changes to feature tables: - -* Adding new features. -* Removing features. -* Updating source, max age, and labels. - -{% hint style="warning" %} -Deleted features are archived, rather than removed completely. Importantly, new features cannot use the names of these deleted features. -{% endhint %} - -#### Feast currently does not support the following changes to feature tables: - -* Changes to the project or name of a feature table. -* Changes to entities related to a feature table. -* Changes to names and types of existing features. - -#### Deleting a Feature Table - -{% hint style="danger" %} -Feast currently does not support the deletion of feature tables. -{% endhint %} - diff --git a/docs/concepts/feature-views.md b/docs/concepts/feature-views.md deleted file mode 100644 index 96a930b290..0000000000 --- a/docs/concepts/feature-views.md +++ /dev/null @@ -1,123 +0,0 @@ -# Feature Views - -### Overview - -Feature views are objects used to define and productionize logical groups of features for training and serving. - -Feature views serve the following purposes: - -* Feature views are a means for defining the location and properties of data sources that contain features. -* The data sources described within feature views allow Feast to find and materialize feature data into stores. -* Feature views ensure data is efficiently stored during materialization by providing a grouping mechanism of feature values that occur on the same event timestamp. -* Features are referenced relative to their feature view during the lookup of features, e.g., `driver_feature_view:driver_rating`. - -{% hint style="info" %} -Feast does not yet apply feature transformations. Feast acts as the productionization layer for pre-existing features. The data sources described within feature views should reference feature values in their already transformed form. -{% endhint %} - -Entities, features, and sources must be defined in order to define a feature view. - -### Entity - -Define an entity for the driver. Entities can be thought of as primary keys used to retrieve features. Entities are also used to join multiple tables/views during the construction of feature vectors. - -```python -driver = Entity( - # Name of the entity. Must be unique within a project - name="driver", - - # The join key of an entity describes the storage level field/column on which - # features can be looked up. The join key is also used to join feature - # tables/views when building feature vectors - join_key="driver_id", - - # The storage level type for an entity - value_type=ValueType.INT64 -) -``` - -### Feature - -A feature is an individual measurable property observed on an entity. For example, the amount of transactions \(feature\) a customer \(entity\) has completed. - -Features are defined as part of feature views. Since Feast does not transform data, a feature is essentially a schema that only contains a name and a type: - -```python -conversion_rate = Feature( - # Name of the feature. Used during lookup of feautres from the feature store - # The name must be unique - name="conv_rate", - - # The type used for storage of features (both at source and when materialized - # into a store) - dtype=ValueType.FLOAT -) -``` - -### Source - -Indicates a data source from which feature values can be retrieved. Sources are queried when building training datasets or materializing features into an online store. - -```python - -driver_stats_source = BigQuerySource( - # The BigQuery table where features can be found - table_ref="feast-oss.demo_data.driver_stats", - - # The event timestamp is used for point-in-time joins and for ensuring only - # features within the TTL are returned - event_timestamp_column="datetime", - - # The (optional) created timestamp is used to ensure there are no duplicate - # feature rows in the offline store or when building training datasets - created_timestamp_column="created", -) -``` - -### Feature View - -A Feature View is a - -{% tabs %} -{% tab title="driver\_trips\_feature\_table.py" %} -```python -driver_stats_fv = FeatureView( - # The unique name of this feature view. Two feature views in a single - # project cannot have the same name - name="driver_stats", - - # The list of entities specifies the keys required for joining or looking - # up features from this feature view. The reference provided in this field - # correspond to the name of a defined entity (or entities) - entities=["driver"], - - # The timedelta is the maximum age that each feature value may have - # relative to its lookup time. For historical features (used in training), - # TTL is relative to each timestamp provided in the entity dataframe. - # TTL also allows for eviction of keys from online stores and limits the - # amount of historical scanning required for historical feature values - # during retrieval - ttl=timedelta(weeks=1), - - # The list of features defined below act as a schema to both define features - # for both materialization of features into a store, and are used as references - # during retrieval for building a training dataset or serving features - features=[ - Feature(name="conv_rate", dtype=ValueType.FLOAT), - Feature(name="acc_rate", dtype=ValueType.FLOAT), - Feature(name="avg_daily_trips", dtype=ValueType.INT64), - ], - - # Batch sources are used to find feature values. In the case of this feature - # view we will query a source table on BigQuery for driver statistics - # features - batch_source=driver_stats_source, - - # Tags are user defined key/value pairs that are attached to each - # feature view - tags={"team": "driver_performance"}, -) -``` -{% endtab %} -{% endtabs %} - diff --git a/docs/concepts/glossary.md b/docs/concepts/glossary.md deleted file mode 100644 index 0c458c7c4f..0000000000 --- a/docs/concepts/glossary.md +++ /dev/null @@ -1,36 +0,0 @@ -# Glossary - -#### **Entity key** - -The combination of entities that uniquely identify a row. For example, a feature table with the composite entity of \(customer, country\) might have an entity key of \(1001, 5\). The key is used during lookups of feature values and for deduplicating historical rows. - -#### Entity timestamp - -The timestamp on which an event occurred. The entity timestamp could describe the event time at which features were calculated, or it could describe the event timestamps at which outcomes were observed. - -Entity timestamps are commonly found on the entity dataframe and associated with the target variable \(outcome\) that needs to be predicted. These timestamps are the target on which point-in-time joins should be made. - -#### Entity rows - -A combination of a single [entity key ](glossary.md#entity-key)and a single [entity timestamp](glossary.md#entity-timestamp). - -#### Entity dataframe - -A collection of [entity rows](glossary.md#entity-rows). This dataframe is enriched with feature values before being used for model training. - -#### Feature References - -Feature references uniquely identify feature values throughout Feast. Feature references can either be defined as objects or as strings. - -The structure of a feature reference in string form is as follows: - -`feature_table:feature` - -Example: - -`drivers_stream:unique_drivers` - -Feature references are unique within a project. It is not possible to reference \(or retrieve\) features from multiple projects at the same time. - -\*\*\*\* - diff --git a/docs/concepts/sources.md b/docs/concepts/sources.md deleted file mode 100644 index a76d395d09..0000000000 --- a/docs/concepts/sources.md +++ /dev/null @@ -1,2 +0,0 @@ -# Sources - diff --git a/docs/concepts/stores.md b/docs/concepts/stores.md deleted file mode 100644 index 3695f6c37d..0000000000 --- a/docs/concepts/stores.md +++ /dev/null @@ -1,26 +0,0 @@ -# Stores - -In Feast, a store is a database that is populated with feature data that will ultimately be served to models. - -### Offline \(Historical\) Store - -The offline store maintains historical copies of feature values. These features are grouped and stored in feature tables. During retrieval of historical data, features are queries from these feature tables in order to produce training datasets. - -{% hint style="warning" %} -Feast 0.8 does not support offline storage. Support will be added in Feast 0.9. -{% endhint %} - -### Online Store - -The online store maintains only the latest values for a specific feature. - -* Feature values are stored based on their [entity keys](glossary.md#entity-key) -* Feast currently supports Redis as an online store. -* Online stores are meant for very high throughput writes from ingestion jobs and very low latency access to features during online serving. - -{% hint style="info" %} -Feast only supports a single online store in production -{% endhint %} - - - diff --git a/sdk/python/feast/__init__.py b/sdk/python/feast/__init__.py index cd4730efa3..eada13f995 100644 --- a/sdk/python/feast/__init__.py +++ b/sdk/python/feast/__init__.py @@ -11,7 +11,6 @@ from .feature import Feature from .feature_service import FeatureService from .feature_store import FeatureStore -from .feature_table import FeatureTable from .feature_view import FeatureView from .on_demand_feature_view import OnDemandFeatureView from .repo_config import RepoConfig @@ -36,7 +35,6 @@ "Feature", "FeatureService", "FeatureStore", - "FeatureTable", "FeatureView", "OnDemandFeatureView", "RepoConfig", diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index a8410fbf99..b30340f0d2 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -331,7 +331,7 @@ def date_partition_column(self, date_partition_column): @abstractmethod def from_proto(data_source: DataSourceProto) -> Any: """ - Converts data source config in FeatureTable spec to a DataSource class object. + Converts data source config in protobuf spec to a DataSource class object. Args: data_source: A protobuf representation of a DataSource. diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index bb466c33e6..b19eb713c2 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -5,7 +5,6 @@ from feast.base_feature_view import BaseFeatureView from feast.entity import Entity from feast.feature_service import FeatureService -from feast.feature_table import FeatureTable from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto @@ -44,7 +43,7 @@ def add_fco_diff(self, fco_diff: FcoDiff): self.fco_diffs.append(fco_diff) -T = TypeVar("T", Entity, BaseFeatureView, FeatureService, FeatureTable) +T = TypeVar("T", Entity, BaseFeatureView, FeatureService) def tag_objects_for_keep_delete_add( diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 03dd2409a7..f6a66bea5a 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -64,16 +64,6 @@ def __init__(self, feature_names): ) -class FeatureTableNotFoundException(FeastObjectNotFoundException): - def __init__(self, name, project=None): - if project: - super().__init__( - f"Feature table {name} does not exist in project {project}" - ) - else: - super().__init__(f"Feature table {name} does not exist") - - class S3RegistryBucketNotExist(FeastObjectNotFoundException): def __init__(self, bucket): super().__init__(f"S3 bucket {bucket} for the Feast registry does not exist") diff --git a/sdk/python/feast/feature.py b/sdk/python/feast/feature.py index d022571fc2..b37e0f562b 100644 --- a/sdk/python/feast/feature.py +++ b/sdk/python/feast/feature.py @@ -12,12 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Optional +from typing import Dict, Optional from feast.protos.feast.core.Feature_pb2 import FeatureSpecV2 as FeatureSpecProto -from feast.protos.feast.serving.ServingService_pb2 import ( - FeatureReferenceV2 as FeatureRefProto, -) from feast.protos.feast.types import Value_pb2 as ValueTypeProto from feast.value_type import ValueType @@ -113,82 +110,3 @@ def from_proto(cls, feature_proto: FeatureSpecProto): ) return feature - - -class FeatureRef: - """ Feature Reference represents a reference to a specific feature. """ - - def __init__(self, name: str, feature_table: str): - self.proto = FeatureRefProto(name=name, feature_table=feature_table) - - @classmethod - def from_proto(cls, proto: FeatureRefProto): - """ - Construct a feature reference from the given FeatureReference proto - - Args: - proto: Protobuf FeatureReference to construct from - Returns: - FeatureRef that refers to the given feature - """ - return cls(name=proto.name, feature_table=proto.feature_table) - - @classmethod - def from_str(cls, feature_ref_str: str): - """ - Parse the given string feature reference into FeatureRef model - String feature reference should be in the format feature_table:feature. - Where "feature_table" and "name" are the feature_table name and feature name - respectively. - - Args: - feature_ref_str: String representation of the feature reference - Returns: - FeatureRef that refers to the given feature - """ - proto = FeatureRefProto() - - # parse feature table name if specified - if ":" in feature_ref_str: - proto.feature_table, proto.name = feature_ref_str.split(":") - else: - raise ValueError( - f"Unsupported feature reference: {feature_ref_str} - Feature reference string should be in the form [featuretable_name:featurename]" - ) - - return cls.from_proto(proto) - - def to_proto(self) -> FeatureRefProto: - """ - Convert and return this feature table reference to protobuf. - - Returns: - Protobuf respresentation of this feature table reference. - """ - - return self.proto - - def __repr__(self): - # return string representation of the reference - ref_str = self.proto.feature_table + ":" + self.proto.name - return ref_str - - def __str__(self): - # readable string of the reference - return f"FeatureRef<{self.__repr__()}>" - - -def _build_feature_references(feature_ref_strs: List[str]) -> List[FeatureRefProto]: - """ - Builds a list of FeatureReference protos from a list of FeatureReference strings - - Args: - feature_ref_strs: List of string feature references - Returns: - A list of FeatureReference protos parsed from args. - """ - - feature_refs = [FeatureRef.from_str(ref_str) for ref_str in feature_ref_strs] - feature_ref_protos = [ref.to_proto() for ref in feature_refs] - - return feature_ref_protos diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 46afdfff1e..9bb4fb5e5d 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -4,7 +4,6 @@ from google.protobuf.json_format import MessageToJson from feast.base_feature_view import BaseFeatureView -from feast.feature_table import FeatureTable from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection from feast.on_demand_feature_view import OnDemandFeatureView @@ -42,7 +41,7 @@ class FeatureService: def __init__( self, name: str, - features: List[Union[FeatureTable, FeatureView, OnDemandFeatureView]], + features: List[Union[FeatureView, OnDemandFeatureView]], tags: Optional[Dict[str, str]] = None, description: Optional[str] = None, ): @@ -56,11 +55,7 @@ def __init__( self.feature_view_projections = [] for feature_grouping in features: - if isinstance(feature_grouping, FeatureTable): - self.feature_view_projections.append( - FeatureViewProjection.from_definition(feature_grouping) - ) - elif isinstance(feature_grouping, BaseFeatureView): + if isinstance(feature_grouping, BaseFeatureView): self.feature_view_projections.append(feature_grouping.projection) else: raise ValueError( diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 7ab7817d61..98af761931 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -36,7 +36,6 @@ RequestDataNotFoundInEntityRowsException, ) from feast.feature_service import FeatureService -from feast.feature_table import FeatureTable from feast.feature_view import ( DUMMY_ENTITY, DUMMY_ENTITY_ID, @@ -367,7 +366,6 @@ def apply( OnDemandFeatureView, RequestFeatureView, FeatureService, - FeatureTable, List[ Union[ FeatureView, @@ -375,7 +373,6 @@ def apply( RequestFeatureView, Entity, FeatureService, - FeatureTable, ] ], ], @@ -387,7 +384,6 @@ def apply( RequestFeatureView, Entity, FeatureService, - FeatureTable, ] ] ] = None, @@ -448,15 +444,10 @@ def apply( ] odfvs_to_update = [ob for ob in objects if isinstance(ob, OnDemandFeatureView)] services_to_update = [ob for ob in objects if isinstance(ob, FeatureService)] - tables_to_update = [ob for ob in objects if isinstance(ob, FeatureTable)] if len(entities_to_update) + len(views_to_update) + len( request_views_to_update - ) + len(odfvs_to_update) + len(services_to_update) + len( - tables_to_update - ) != len( - objects - ): + ) + len(odfvs_to_update) + len(services_to_update) != len(objects): raise ValueError("Unknown object type provided as part of apply() call") # Validate all types of feature views. @@ -502,10 +493,6 @@ def apply( self._registry.apply_feature_service( feature_service, project=self.project, commit=False ) - for table in tables_to_update: - self._registry.apply_feature_table( - table, project=self.project, commit=False - ) if not partial: # Delete all registry objects that should not exist. @@ -524,9 +511,6 @@ def apply( services_to_delete = [ ob for ob in objects_to_delete if isinstance(ob, FeatureService) ] - tables_to_delete = [ - ob for ob in objects_to_delete if isinstance(ob, FeatureTable) - ] for entity in entities_to_delete: self._registry.delete_entity( @@ -548,15 +532,11 @@ def apply( self._registry.delete_feature_service( service.name, project=self.project, commit=False ) - for table in tables_to_delete: - self._registry.delete_feature_table( - table.name, project=self.project, commit=False - ) self._get_provider().update_infra( project=self.project, - tables_to_delete=views_to_delete + tables_to_delete if not partial else [], - tables_to_keep=views_to_update + tables_to_update, + tables_to_delete=views_to_delete if not partial else [], + tables_to_keep=views_to_update, entities_to_delete=entities_to_delete if not partial else [], entities_to_keep=entities_to_update, partial=partial, @@ -567,12 +547,10 @@ def apply( @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" - tables: List[Union[FeatureView, FeatureTable]] = [] + tables: List[FeatureView] = [] feature_views = self.list_feature_views() - feature_tables = self._registry.list_feature_tables(self.project) tables.extend(feature_views) - tables.extend(feature_tables) entities = self.list_entities() @@ -926,8 +904,8 @@ def get_online_features( Args: features: List of feature references that will be returned for each entity. Each feature reference should have the following format: - "feature_table:feature" where "feature_table" & "feature" refer to - the feature and feature table names respectively. + "feature_view:feature" where "feature_view" & "feature" refer to + the Feature and FeatureView names respectively. Only the feature name is required. entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair. diff --git a/sdk/python/feast/feature_table.py b/sdk/python/feast/feature_table.py deleted file mode 100644 index 2c1022de22..0000000000 --- a/sdk/python/feast/feature_table.py +++ /dev/null @@ -1,415 +0,0 @@ -# Copyright 2020 The Feast Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Dict, List, MutableMapping, Optional, Union - -import yaml -from google.protobuf import json_format -from google.protobuf.duration_pb2 import Duration -from google.protobuf.json_format import MessageToDict, MessageToJson -from google.protobuf.timestamp_pb2 import Timestamp - -from feast.data_source import DataSource, KafkaSource, KinesisSource -from feast.feature import Feature -from feast.loaders import yaml as feast_yaml -from feast.protos.feast.core.FeatureTable_pb2 import FeatureTable as FeatureTableProto -from feast.protos.feast.core.FeatureTable_pb2 import ( - FeatureTableMeta as FeatureTableMetaProto, -) -from feast.protos.feast.core.FeatureTable_pb2 import ( - FeatureTableSpec as FeatureTableSpecProto, -) -from feast.usage import log_exceptions -from feast.value_type import ValueType - - -class FeatureTable: - """ - Represents a collection of features and associated metadata. - """ - - @log_exceptions - def __init__( - self, - name: str, - entities: List[str], - features: List[Feature], - batch_source: DataSource = None, - stream_source: Optional[Union[KafkaSource, KinesisSource]] = None, - max_age: Optional[Duration] = None, - labels: Optional[MutableMapping[str, str]] = None, - ): - self._name = name - self._entities = entities - self._features = features - self._batch_source = batch_source - self._stream_source = stream_source - - self._labels: MutableMapping[str, str] - if labels is None: - self._labels = dict() - else: - self._labels = labels - - self._max_age = max_age - self._created_timestamp: Optional[Timestamp] = None - self._last_updated_timestamp: Optional[Timestamp] = None - - def __str__(self): - return str(MessageToJson(self.to_proto())) - - def __hash__(self) -> int: - return hash((id(self), self.name)) - - def __eq__(self, other): - if not isinstance(other, FeatureTable): - raise TypeError( - "Comparisons should only involve FeatureTable class objects." - ) - - if ( - self.labels != other.labels - or self.name != other.name - or self.max_age != other.max_age - ): - return False - - if sorted(self.entities) != sorted(other.entities): - return False - if sorted(self.features) != sorted(other.features): - return False - if self.batch_source != other.batch_source: - return False - if self.stream_source != other.stream_source: - return False - - return True - - @property - def name(self): - """ - Returns the name of this feature table - """ - return self._name - - @name.setter - def name(self, name: str): - """ - Sets the name of this feature table - """ - self._name = name - - @property - def entities(self) -> List[str]: - """ - Returns the entities of this feature table - """ - return self._entities - - @entities.setter - def entities(self, entities: List[str]): - """ - Sets the entities of this feature table - """ - self._entities = entities - - @property - def features(self): - """ - Returns the features of this feature table - """ - return self._features - - @features.setter - def features(self, features: List[Feature]): - """ - Sets the features of this feature table - """ - self._features = features - - @property - def batch_source(self): - """ - Returns the batch source of this feature table - """ - return self._batch_source - - @batch_source.setter - def batch_source(self, batch_source: DataSource): - """ - Sets the batch source of this feature table - """ - self._batch_source = batch_source - - @property - def stream_source(self): - """ - Returns the stream source of this feature table - """ - return self._stream_source - - @stream_source.setter - def stream_source(self, stream_source: Union[KafkaSource, KinesisSource]): - """ - Sets the stream source of this feature table - """ - self._stream_source = stream_source - - @property - def max_age(self): - """ - Returns the maximum age of this feature table. This is the total maximum - amount of staleness that will be allowed during feature retrieval for - each specific feature that is looked up. - """ - return self._max_age - - @max_age.setter - def max_age(self, max_age: Duration): - """ - Set the maximum age for this feature table - """ - self._max_age = max_age - - @property - def labels(self): - """ - Returns the labels of this feature table. This is the user defined metadata - defined as a dictionary. - """ - return self._labels - - @labels.setter - def labels(self, labels: MutableMapping[str, str]): - """ - Set the labels for this feature table - """ - self._labels = labels - - @property - def created_timestamp(self): - """ - Returns the created_timestamp of this feature table - """ - return self._created_timestamp - - @property - def last_updated_timestamp(self): - """ - Returns the last_updated_timestamp of this feature table - """ - return self._last_updated_timestamp - - def add_feature(self, feature: Feature): - """ - Adds a new feature to the feature table. - """ - self.features.append(feature) - - def is_valid(self): - """ - Validates the state of a feature table locally. Raises an exception - if feature table is invalid. - """ - - if not self.name: - raise ValueError("No name found in feature table.") - - if not self.entities: - raise ValueError("No entities found in feature table {self.name}.") - - @classmethod - def from_yaml(cls, yml: str): - """ - Creates a feature table from a YAML string body or a file path - - Args: - yml: Either a file path containing a yaml file or a YAML string - - Returns: - Returns a FeatureTable object based on the YAML file - """ - - return cls.from_dict(feast_yaml.yaml_loader(yml, load_single=True)) - - @classmethod - def from_dict(cls, ft_dict): - """ - Creates a feature table from a dict - - Args: - ft_dict: A dict representation of a feature table - - Returns: - Returns a FeatureTable object based on the feature table dict - """ - - feature_table_proto = json_format.ParseDict( - ft_dict, FeatureTableProto(), ignore_unknown_fields=True - ) - - return cls.from_proto(feature_table_proto) - - @classmethod - def from_proto(cls, feature_table_proto: FeatureTableProto): - """ - Creates a feature table from a protobuf representation of a feature table - - Args: - feature_table_proto: A protobuf representation of a feature table - - Returns: - Returns a FeatureTableProto object based on the feature table protobuf - """ - - feature_table = cls( - name=feature_table_proto.spec.name, - entities=[entity for entity in feature_table_proto.spec.entities], - features=[ - Feature( - name=feature.name, - dtype=ValueType(feature.value_type), - labels=dict(feature.labels), - ) - for feature in feature_table_proto.spec.features - ], - labels=feature_table_proto.spec.labels, - max_age=( - None - if feature_table_proto.spec.max_age.seconds == 0 - and feature_table_proto.spec.max_age.nanos == 0 - else feature_table_proto.spec.max_age - ), - batch_source=DataSource.from_proto(feature_table_proto.spec.batch_source), - stream_source=( - None - if not feature_table_proto.spec.stream_source.ByteSize() - else DataSource.from_proto(feature_table_proto.spec.stream_source) - ), - ) - - feature_table._created_timestamp = feature_table_proto.meta.created_timestamp - - return feature_table - - def to_proto(self) -> FeatureTableProto: - """ - Converts an feature table object to its protobuf representation - - Returns: - FeatureTableProto protobuf - """ - - meta = FeatureTableMetaProto( - created_timestamp=self.created_timestamp, - last_updated_timestamp=self.last_updated_timestamp, - ) - - batch_source_proto = self.batch_source.to_proto() - batch_source_proto.data_source_class_type = f"{self.batch_source.__class__.__module__}.{self.batch_source.__class__.__name__}" - - stream_source_proto = None - if self.stream_source: - stream_source_proto = self.stream_source.to_proto() - stream_source_proto.data_source_class_type = f"{self.stream_source.__class__.__module__}.{self.stream_source.__class__.__name__}" - - spec = FeatureTableSpecProto( - name=self.name, - entities=self.entities, - features=[ - feature.to_proto() if type(feature) == Feature else feature - for feature in self.features - ], - labels=self.labels, - max_age=self.max_age, - batch_source=batch_source_proto, - stream_source=stream_source_proto, - ) - - return FeatureTableProto(spec=spec, meta=meta) - - def to_spec_proto(self) -> FeatureTableSpecProto: - """ - Converts an FeatureTableProto object to its protobuf representation. - Used when passing FeatureTableSpecProto object to Feast request. - - Returns: - FeatureTableSpecProto protobuf - """ - - spec = FeatureTableSpecProto( - name=self.name, - entities=self.entities, - features=[ - feature.to_proto() if type(feature) == Feature else feature - for feature in self.features - ], - labels=self.labels, - max_age=self.max_age, - batch_source=( - self.batch_source.to_proto() - if issubclass(type(self.batch_source), DataSource) - else self.batch_source - ), - stream_source=( - self.stream_source.to_proto() - if issubclass(type(self.stream_source), DataSource) - else self.stream_source - ), - ) - - return spec - - def to_dict(self) -> Dict: - """ - Converts feature table to dict - - :return: Dictionary object representation of feature table - """ - feature_table_dict = MessageToDict(self.to_proto()) - - # Remove meta when empty for more readable exports - if feature_table_dict["meta"] == {}: - del feature_table_dict["meta"] - - return feature_table_dict - - def to_yaml(self): - """ - Converts a feature table to a YAML string. - - :return: Feature table string returned in YAML format - """ - feature_table_dict = self.to_dict() - return yaml.dump(feature_table_dict, allow_unicode=True, sort_keys=False) - - def _update_from_feature_table(self, feature_table): - """ - Deep replaces one feature table with another - - Args: - feature_table: Feature table to use as a source of configuration - """ - - self.name = feature_table.name - self.entities = feature_table.entities - self.features = feature_table.features - self.labels = feature_table.labels - self.max_age = feature_table.max_age - self.batch_source = feature_table.batch_source - self.stream_source = feature_table.stream_source - self._created_timestamp = feature_table.created_timestamp - self._last_updated_timestamp = feature_table.last_updated_timestamp - - def __repr__(self): - return f"FeatureTable <{self.name}>" diff --git a/sdk/python/feast/infra/aws.py b/sdk/python/feast/infra/aws.py index 04b4abb48b..735b2f62e7 100644 --- a/sdk/python/feast/infra/aws.py +++ b/sdk/python/feast/infra/aws.py @@ -6,7 +6,7 @@ from datetime import datetime from pathlib import Path from tempfile import TemporaryFile -from typing import Optional, Sequence, Union +from typing import Optional, Sequence from urllib.parse import urlparse from colorama import Fore, Style @@ -28,7 +28,6 @@ S3RegistryBucketForbiddenAccess, S3RegistryBucketNotExist, ) -from feast.feature_table import FeatureTable from feast.feature_view import FeatureView from feast.flags import FLAG_AWS_LAMBDA_FEATURE_SERVER_NAME from feast.flags_helper import enable_aws_lambda_feature_server @@ -57,8 +56,8 @@ class AwsProvider(PassthroughProvider): def update_infra( self, project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -193,10 +192,7 @@ def _deploy_feature_server(self, project: str, image_uri: str): @log_exceptions_and_usage(provider="AwsProvider") def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], + self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ) -> None: self.online_store.teardown(self.repo_config, tables, entities) diff --git a/sdk/python/feast/infra/local.py b/sdk/python/feast/infra/local.py index d1dbd259ef..c5ee11e144 100644 --- a/sdk/python/feast/infra/local.py +++ b/sdk/python/feast/infra/local.py @@ -1,11 +1,9 @@ import uuid from datetime import datetime from pathlib import Path -from typing import Union import pytz -from feast import FeatureTable from feast.feature_view import FeatureView from feast.infra.passthrough_provider import PassthroughProvider from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -22,7 +20,7 @@ class LocalProvider(PassthroughProvider): pass -def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: +def _table_id(project: str, table: FeatureView) -> str: return f"{project}_{table.name}" diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index e9e5973edd..a9bd534a50 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -14,12 +14,12 @@ import itertools from datetime import datetime from multiprocessing.pool import ThreadPool -from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple from pydantic import PositiveInt, StrictStr from pydantic.typing import Literal -from feast import Entity, FeatureTable, utils +from feast import Entity, utils from feast.feature_view import FeatureView from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore @@ -74,8 +74,8 @@ class DatastoreOnlineStore(OnlineStore): def update( self, config: RepoConfig, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -107,7 +107,7 @@ def update( def teardown( self, config: RepoConfig, - tables: Sequence[Union[FeatureTable, FeatureView]], + tables: Sequence[FeatureView], entities: Sequence[Entity], ): """ @@ -146,7 +146,7 @@ def _get_client(self, online_config: DatastoreOnlineStoreConfig): def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -186,7 +186,7 @@ def _to_minibatches(data: ProtoBatch, batch_size) -> Iterator[ProtoBatch]: def _write_minibatch( client, project: str, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: Sequence[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -227,7 +227,7 @@ def _write_minibatch( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index f469f09e49..d2082972a3 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -13,12 +13,12 @@ # limitations under the License. import logging from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple from pydantic import StrictStr from pydantic.typing import Literal -from feast import Entity, FeatureTable, FeatureView, utils +from feast import Entity, FeatureView, utils from feast.infra.infra_object import InfraObject from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore @@ -65,8 +65,8 @@ class DynamoDBOnlineStore(OnlineStore): def update( self, config: RepoConfig, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -104,7 +104,7 @@ def update( def teardown( self, config: RepoConfig, - tables: Sequence[Union[FeatureTable, FeatureView]], + tables: Sequence[FeatureView], entities: Sequence[Entity], ): online_config = config.online_store @@ -118,7 +118,7 @@ def teardown( def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -149,7 +149,7 @@ def online_write_batch( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: diff --git a/sdk/python/feast/infra/online_stores/online_store.py b/sdk/python/feast/infra/online_stores/online_store.py index 8050d07f00..b2aa1e46d0 100644 --- a/sdk/python/feast/infra/online_stores/online_store.py +++ b/sdk/python/feast/infra/online_stores/online_store.py @@ -14,9 +14,9 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple -from feast import Entity, FeatureTable +from feast import Entity from feast.feature_view import FeatureView from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto @@ -33,7 +33,7 @@ class OnlineStore(ABC): def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -47,7 +47,7 @@ def online_write_batch( Args: config: The RepoConfig for the current FeatureStore. - table: Feast FeatureTable or FeatureView + table: Feast FeatureView data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, a dict containing feature values, an event timestamp for the row, and the created timestamp for the row if it exists. @@ -60,7 +60,7 @@ def online_write_batch( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: @@ -70,7 +70,7 @@ def online_read( Args: config: The RepoConfig for the current FeatureStore. - table: Feast FeatureTable or FeatureView + table: Feast FeatureView entity_keys: a list of entity keys that should be read from the FeatureStore. requested_features: (Optional) A subset of the features that should be read from the FeatureStore. Returns: @@ -84,8 +84,8 @@ def online_read( def update( self, config: RepoConfig, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -96,7 +96,7 @@ def update( def teardown( self, config: RepoConfig, - tables: Sequence[Union[FeatureTable, FeatureView]], + tables: Sequence[FeatureView], entities: Sequence[Entity], ): ... diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index c6df0c69fa..9f20339343 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -31,7 +31,7 @@ from pydantic import StrictStr from pydantic.typing import Literal -from feast import Entity, FeatureTable, FeatureView, RepoConfig, utils +from feast import Entity, FeatureView, RepoConfig, utils from feast.infra.online_stores.helpers import _mmh3, _redis_key, _redis_key_prefix from feast.infra.online_stores.online_store import OnlineStore from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -72,9 +72,7 @@ class RedisOnlineStoreConfig(FeastConfigBaseModel): class RedisOnlineStore(OnlineStore): _client: Optional[Union[Redis, RedisCluster]] = None - def delete_table_values( - self, config: RepoConfig, table: Union[FeatureTable, FeatureView] - ): + def delete_table_values(self, config: RepoConfig, table: FeatureView): client = self._get_client(config.online_store) deleted_count = 0 pipeline = client.pipeline() @@ -93,8 +91,8 @@ def delete_table_values( def update( self, config: RepoConfig, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -108,7 +106,7 @@ def update( def teardown( self, config: RepoConfig, - tables: Sequence[Union[FeatureTable, FeatureView]], + tables: Sequence[FeatureView], entities: Sequence[Entity], ): """ @@ -166,7 +164,7 @@ def _get_client(self, online_store_config: RedisOnlineStoreConfig): def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -230,7 +228,7 @@ def online_write_batch( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 62642153f9..3e69f6213b 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -16,13 +16,13 @@ import sqlite3 from datetime import datetime from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple import pytz from pydantic import StrictStr from pydantic.schema import Literal -from feast import Entity, FeatureTable +from feast import Entity from feast.feature_view import FeatureView from feast.infra.key_encoding_utils import serialize_entity_key from feast.infra.online_stores.online_store import OnlineStore @@ -78,7 +78,7 @@ def _get_conn(self, config: RepoConfig): def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -133,7 +133,7 @@ def online_write_batch( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: Optional[List[str]] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: @@ -176,8 +176,8 @@ def online_read( def update( self, config: RepoConfig, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -199,7 +199,7 @@ def update( def teardown( self, config: RepoConfig, - tables: Sequence[Union[FeatureTable, FeatureView]], + tables: Sequence[FeatureView], entities: Sequence[Entity], ): try: @@ -208,7 +208,7 @@ def teardown( pass -def _table_id(project: str, table: Union[FeatureTable, FeatureView]) -> str: +def _table_id(project: str, table: FeatureView) -> str: return f"{project}_{table.name}" diff --git a/sdk/python/feast/infra/passthrough_provider.py b/sdk/python/feast/infra/passthrough_provider.py index c6dda62817..b42f5b0daf 100644 --- a/sdk/python/feast/infra/passthrough_provider.py +++ b/sdk/python/feast/infra/passthrough_provider.py @@ -6,7 +6,6 @@ from tqdm import tqdm from feast.entity import Entity -from feast.feature_table import FeatureTable from feast.feature_view import FeatureView from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.offline_stores.offline_utils import get_offline_store_from_config @@ -41,8 +40,8 @@ def __init__(self, config: RepoConfig): def update_infra( self, project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -58,10 +57,7 @@ def update_infra( ) def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], + self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ) -> None: set_usage_attribute("provider", self.__class__.__name__) self.online_store.teardown(self.repo_config, tables, entities) @@ -69,7 +65,7 @@ def teardown_infra( def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -82,7 +78,7 @@ def online_write_batch( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: diff --git a/sdk/python/feast/infra/provider.py b/sdk/python/feast/infra/provider.py index 540fa8bf47..8f72a48bd3 100644 --- a/sdk/python/feast/infra/provider.py +++ b/sdk/python/feast/infra/provider.py @@ -10,7 +10,6 @@ from feast import errors, importer from feast.entity import Entity -from feast.feature_table import FeatureTable from feast.feature_view import DUMMY_ENTITY_ID, FeatureView from feast.infra.offline_stores.offline_store import RetrievalJob from feast.on_demand_feature_view import OnDemandFeatureView @@ -36,8 +35,8 @@ def __init__(self, config: RepoConfig): def update_infra( self, project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -62,10 +61,7 @@ def update_infra( @abc.abstractmethod def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], + self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ): """ Tear down all cloud resources for a repo. @@ -81,7 +77,7 @@ def teardown_infra( def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -95,7 +91,7 @@ def online_write_batch( Args: config: The RepoConfig for the current FeatureStore. - table: Feast FeatureTable + table: Feast FeatureView data: a list of quadruplets containing Feature data. Each quadruplet contains an Entity Key, a dict containing feature values, an event timestamp for the row, and the created timestamp for the row if it exists. @@ -142,7 +138,7 @@ def get_historical_features( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 3a54568d45..68d5b0bc11 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -35,12 +35,10 @@ ConflictingFeatureViewNames, EntityNotFoundException, FeatureServiceNotFoundException, - FeatureTableNotFoundException, FeatureViewNotFoundException, OnDemandFeatureViewNotFoundException, ) from feast.feature_service import FeatureService -from feast.feature_table import FeatureTable from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -305,37 +303,6 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti return Entity.from_proto(entity_proto) raise EntityNotFoundException(name, project=project) - def apply_feature_table( - self, feature_table: FeatureTable, project: str, commit: bool = True - ): - """ - Registers a single feature table with Feast - - Args: - feature_table: Feature table that will be registered - project: Feast project that this feature table belongs to - commit: Whether the change should be persisted immediately - """ - feature_table.is_valid() - feature_table_proto = feature_table.to_proto() - feature_table_proto.spec.project = project - self._prepare_registry_for_changes() - assert self.cached_registry_proto - - for idx, existing_feature_table_proto in enumerate( - self.cached_registry_proto.feature_tables - ): - if ( - existing_feature_table_proto.spec.name == feature_table_proto.spec.name - and existing_feature_table_proto.spec.project == project - ): - del self.cached_registry_proto.feature_tables[idx] - break - - self.cached_registry_proto.feature_tables.append(feature_table_proto) - if commit: - self.commit() - def apply_feature_view( self, feature_view: BaseFeatureView, project: str, commit: bool = True ): @@ -481,23 +448,6 @@ def apply_materialization( raise FeatureViewNotFoundException(feature_view.name, project) - def list_feature_tables(self, project: str) -> List[FeatureTable]: - """ - Retrieve a list of feature tables from the registry - - Args: - project: Filter feature tables based on project name - - Returns: - List of feature tables - """ - registry_proto = self._get_registry_proto() - feature_tables = [] - for feature_table_proto in registry_proto.feature_tables: - if feature_table_proto.spec.project == project: - feature_tables.append(FeatureTable.from_proto(feature_table_proto)) - return feature_tables - def list_feature_views( self, project: str, allow_cache: bool = False ) -> List[FeatureView]: @@ -540,27 +490,6 @@ def list_request_feature_views( ) return feature_views - def get_feature_table(self, name: str, project: str) -> FeatureTable: - """ - Retrieves a feature table. - - Args: - name: Name of feature table - project: Feast project that this feature table belongs to - - Returns: - Returns either the specified feature table, or raises an exception if - none is found - """ - registry_proto = self._get_registry_proto() - for feature_table_proto in registry_proto.feature_tables: - if ( - feature_table_proto.spec.name == name - and feature_table_proto.spec.project == project - ): - return FeatureTable.from_proto(feature_table_proto) - raise FeatureTableNotFoundException(name, project) - def get_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: @@ -610,32 +539,6 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): return raise FeatureServiceNotFoundException(name, project) - def delete_feature_table(self, name: str, project: str, commit: bool = True): - """ - Deletes a feature table or raises an exception if not found. - - Args: - name: Name of feature table - project: Feast project that this feature table belongs to - commit: Whether the change should be persisted immediately - """ - self._prepare_registry_for_changes() - assert self.cached_registry_proto - - for idx, existing_feature_table_proto in enumerate( - self.cached_registry_proto.feature_tables - ): - if ( - existing_feature_table_proto.spec.name == name - and existing_feature_table_proto.spec.project == project - ): - del self.cached_registry_proto.feature_tables[idx] - if commit: - self.commit() - return - - raise FeatureTableNotFoundException(name, project) - def delete_feature_view(self, name: str, project: str, commit: bool = True): """ Deletes a feature view or raises an exception if not found. @@ -733,13 +636,6 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]: key=lambda feature_view: feature_view.name, ): registry_dict["featureViews"].append(MessageToDict(feature_view.to_proto())) - for feature_table in sorted( - self.list_feature_tables(project=project), - key=lambda feature_table: feature_table.name, - ): - registry_dict["featureTables"].append( - MessageToDict(feature_table.to_proto()) - ) for feature_service in sorted( self.list_feature_services(project=project), key=lambda feature_service: feature_service.name, diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index ef0953feb2..c7620c07a0 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -16,7 +16,6 @@ from feast.entity import Entity from feast.feature_service import FeatureService from feast.feature_store import FeatureStore -from feast.feature_table import FeatureTable from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView @@ -35,7 +34,6 @@ def py_path_to_module(path: Path, repo_root: Path) -> str: class ParsedRepo(NamedTuple): - feature_tables: Set[FeatureTable] feature_views: Set[FeatureView] on_demand_feature_views: Set[OnDemandFeatureView] request_feature_views: Set[RequestFeatureView] @@ -99,7 +97,6 @@ def get_repo_files(repo_root: Path) -> List[Path]: def parse_repo(repo_root: Path) -> ParsedRepo: """ Collect feature table definitions from feature repo """ res = ParsedRepo( - feature_tables=set(), entities=set(), feature_views=set(), feature_services=set(), @@ -112,8 +109,6 @@ def parse_repo(repo_root: Path) -> ParsedRepo: module = importlib.import_module(module_path) for attr_name in dir(module): obj = getattr(module, attr_name) - if isinstance(obj, FeatureTable): - res.feature_tables.add(obj) if isinstance(obj, FeatureView): res.feature_views.add(obj) elif isinstance(obj, Entity): @@ -207,15 +202,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) - ( - tables_to_keep, - tables_to_delete, - tables_to_add, - ) = tag_objects_for_keep_delete_add( - set(registry.list_feature_tables(project=project)), repo.feature_tables - ) - tables_to_keep = tables_to_keep.union(tables_to_add) - ( services_to_keep, services_to_delete, @@ -229,25 +215,19 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation # Apply all changes to the registry and infrastructure. all_to_apply: List[ - Union[ - Entity, BaseFeatureView, FeatureService, OnDemandFeatureView, FeatureTable - ] + Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] ] = [] all_to_apply.extend(entities_to_keep) all_to_apply.extend(base_views_to_keep) all_to_apply.extend(services_to_keep) all_to_apply.extend(odfvs_to_keep) - all_to_apply.extend(tables_to_keep) all_to_delete: List[ - Union[ - Entity, BaseFeatureView, FeatureService, OnDemandFeatureView, FeatureTable - ] + Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] ] = [] all_to_delete.extend(entities_to_delete) all_to_delete.extend(base_views_to_delete) all_to_delete.extend(services_to_delete) all_to_delete.extend(odfvs_to_delete) - all_to_delete.extend(tables_to_delete) store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) @@ -263,10 +243,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Deleted on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL} from registry" ) - for table in tables_to_delete: - click.echo( - f"Deleted feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL} from registry" - ) for feature_service in services_to_delete: click.echo( f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL} " @@ -290,27 +266,18 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation click.echo( f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}" ) - # Create tables that should exist - for table in tables_to_keep: - click.echo( - f"Registered feature table {Style.BRIGHT + Fore.GREEN}{table.name}{Style.RESET_ALL}" - ) views_to_keep_in_infra = [ view for view in views_to_keep if isinstance(view, FeatureView) ] - for name in [view.name for view in repo.feature_tables] + [ - table.name for table in views_to_keep_in_infra - ]: + for name in [view.name for view in views_to_keep_in_infra]: click.echo( f"Deploying infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" ) views_to_delete_from_infra = [ view for view in views_to_delete if isinstance(view, FeatureView) ] - for name in [view.name for view in views_to_delete_from_infra] + [ - table.name for table in tables_to_delete - ]: + for name in [view.name for view in views_to_delete_from_infra]: click.echo( f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" ) diff --git a/sdk/python/tests/foo_provider.py b/sdk/python/tests/foo_provider.py index 1b108b7eec..8e9254cd3d 100644 --- a/sdk/python/tests/foo_provider.py +++ b/sdk/python/tests/foo_provider.py @@ -4,7 +4,7 @@ import pandas from tqdm import tqdm -from feast import Entity, FeatureTable, FeatureView, RepoConfig +from feast import Entity, FeatureView, RepoConfig from feast.infra.offline_stores.offline_store import RetrievalJob from feast.infra.provider import Provider from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto @@ -19,8 +19,8 @@ def __init__(self, config: RepoConfig): def update_infra( self, project: str, - tables_to_delete: Sequence[Union[FeatureTable, FeatureView]], - tables_to_keep: Sequence[Union[FeatureTable, FeatureView]], + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], entities_to_delete: Sequence[Entity], entities_to_keep: Sequence[Entity], partial: bool, @@ -28,17 +28,14 @@ def update_infra( pass def teardown_infra( - self, - project: str, - tables: Sequence[Union[FeatureTable, FeatureView]], - entities: Sequence[Entity], + self, project: str, tables: Sequence[FeatureView], entities: Sequence[Entity], ): pass def online_write_batch( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, data: List[ Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] ], @@ -73,7 +70,7 @@ def get_historical_features( def online_read( self, config: RepoConfig, - table: Union[FeatureTable, FeatureView], + table: FeatureView, entity_keys: List[EntityKeyProto], requested_features: List[str] = None, ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: From 90b842627d852ff8b33a2c7dcd49dd6550e06c45 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 15 Dec 2021 17:04:28 -0800 Subject: [PATCH 05/12] Ensure that universal CLI test tears down infrastructure (#2151) * Ensure that universal CLI test tears down infrastructure Signed-off-by: Felix Wang * Lint Signed-off-by: Felix Wang --- .../integration/registration/test_cli.py | 124 +++++++++--------- 1 file changed, 64 insertions(+), 60 deletions(-) diff --git a/sdk/python/tests/integration/registration/test_cli.py b/sdk/python/tests/integration/registration/test_cli.py index de0e5d5629..b92dc52642 100644 --- a/sdk/python/tests/integration/registration/test_cli.py +++ b/sdk/python/tests/integration/registration/test_cli.py @@ -25,68 +25,72 @@ def test_universal_cli(test_repo_config) -> None: runner = CliRunner() with tempfile.TemporaryDirectory() as repo_dir_name: - feature_store_yaml = make_feature_store_yaml( - project, test_repo_config, repo_dir_name - ) - repo_path = Path(repo_dir_name) - - repo_config = repo_path / "feature_store.yaml" - - repo_config.write_text(dedent(feature_store_yaml)) - - repo_example = repo_path / "example.py" - repo_example.write_text(get_example_repo("example_feature_repo_1.py")) - result = runner.run(["apply"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(0) - - # Store registry contents, to be compared later. - fs = FeatureStore(repo_path=str(repo_path)) - registry_dict = fs.registry.to_dict(project=project) - - # entity & feature view list commands should succeed - result = runner.run(["entities", "list"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(0) - result = runner.run(["feature-views", "list"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(0) - result = runner.run(["feature-services", "list"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(0) - - # entity & feature view describe commands should succeed when objects exist - result = runner.run(["entities", "describe", "driver"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(0) - result = runner.run( - ["feature-views", "describe", "driver_locations"], cwd=repo_path - ) - assertpy.assert_that(result.returncode).is_equal_to(0) - result = runner.run( - ["feature-services", "describe", "driver_locations_service"], cwd=repo_path - ) - assertpy.assert_that(result.returncode).is_equal_to(0) - assertpy.assert_that(fs.list_feature_views()).is_length(3) - - # entity & feature view describe commands should fail when objects don't exist - result = runner.run(["entities", "describe", "foo"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(1) - result = runner.run(["feature-views", "describe", "foo"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(1) - result = runner.run(["feature-services", "describe", "foo"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(1) - - # Doing another apply should be a no op, and should not cause errors - result = runner.run(["apply"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(0) - basic_rw_test( - FeatureStore(repo_path=str(repo_path), config=None), - view_name="driver_locations", - ) + try: + feature_store_yaml = make_feature_store_yaml( + project, test_repo_config, repo_dir_name + ) + repo_path = Path(repo_dir_name) + + repo_config = repo_path / "feature_store.yaml" + + repo_config.write_text(dedent(feature_store_yaml)) + + repo_example = repo_path / "example.py" + repo_example.write_text(get_example_repo("example_feature_repo_1.py")) + result = runner.run(["apply"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + + # Store registry contents, to be compared later. + fs = FeatureStore(repo_path=str(repo_path)) + registry_dict = fs.registry.to_dict(project=project) + + # entity & feature view list commands should succeed + result = runner.run(["entities", "list"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + result = runner.run(["feature-views", "list"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + result = runner.run(["feature-services", "list"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + + # entity & feature view describe commands should succeed when objects exist + result = runner.run(["entities", "describe", "driver"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + result = runner.run( + ["feature-views", "describe", "driver_locations"], cwd=repo_path + ) + assertpy.assert_that(result.returncode).is_equal_to(0) + result = runner.run( + ["feature-services", "describe", "driver_locations_service"], + cwd=repo_path, + ) + assertpy.assert_that(result.returncode).is_equal_to(0) + assertpy.assert_that(fs.list_feature_views()).is_length(3) + + # entity & feature view describe commands should fail when objects don't exist + result = runner.run(["entities", "describe", "foo"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(1) + result = runner.run(["feature-views", "describe", "foo"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(1) + result = runner.run(["feature-services", "describe", "foo"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(1) + + # Doing another apply should be a no op, and should not cause errors + result = runner.run(["apply"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + basic_rw_test( + FeatureStore(repo_path=str(repo_path), config=None), + view_name="driver_locations", + ) - # Confirm that registry contents have not changed. - assertpy.assert_that(registry_dict).is_equal_to( - fs.registry.to_dict(project=project) - ) + # Confirm that registry contents have not changed. + assertpy.assert_that(registry_dict).is_equal_to( + fs.registry.to_dict(project=project) + ) - result = runner.run(["teardown"], cwd=repo_path) - assertpy.assert_that(result.returncode).is_equal_to(0) + result = runner.run(["teardown"], cwd=repo_path) + assertpy.assert_that(result.returncode).is_equal_to(0) + finally: + runner.run(["teardown"], cwd=repo_path) def make_feature_store_yaml(project, test_repo_config, repo_dir_name: PosixPath): From 7eba23c2412c7ea77ac9c26464294a375819398f Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Thu, 16 Dec 2021 18:41:28 +0200 Subject: [PATCH 06/12] Use correct name when deleting dynamo db (#2154) Signed-off-by: pyalex --- .../feast/infra/online_stores/dynamodb.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index d2082972a3..377e10c308 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -79,7 +79,7 @@ def update( for table_instance in tables_to_keep: try: dynamodb_resource.create_table( - TableName=f"{config.project}.{table_instance.name}", + TableName=_get_table_name(config, table_instance), KeySchema=[{"AttributeName": "entity_id", "KeyType": "HASH"}], AttributeDefinitions=[ {"AttributeName": "entity_id", "AttributeType": "S"} @@ -95,11 +95,13 @@ def update( for table_instance in tables_to_keep: dynamodb_client.get_waiter("table_exists").wait( - TableName=f"{config.project}.{table_instance.name}" + TableName=_get_table_name(config, table_instance) ) for table_to_delete in tables_to_delete: - _delete_table_idempotent(dynamodb_resource, table_to_delete.name) + _delete_table_idempotent( + dynamodb_resource, _get_table_name(config, table_to_delete) + ) def teardown( self, @@ -112,7 +114,7 @@ def teardown( dynamodb_resource = self._get_dynamodb_resource(online_config.region) for table in tables: - _delete_table_idempotent(dynamodb_resource, table.name) + _delete_table_idempotent(dynamodb_resource, _get_table_name(config, table)) @log_exceptions_and_usage(online_store="dynamodb") def online_write_batch( @@ -128,7 +130,7 @@ def online_write_batch( assert isinstance(online_config, DynamoDBOnlineStoreConfig) dynamodb_resource = self._get_dynamodb_resource(online_config.region) - table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}") + table_instance = dynamodb_resource.Table(_get_table_name(config, table)) with table_instance.batch_writer() as batch: for entity_key, features, timestamp, created_ts in data: entity_id = compute_entity_id(entity_key) @@ -159,7 +161,7 @@ def online_read( result: List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]] = [] for entity_key in entity_keys: - table_instance = dynamodb_resource.Table(f"{config.project}.{table.name}") + table_instance = dynamodb_resource.Table(_get_table_name(config, table)) entity_id = compute_entity_id(entity_key) with tracing_span(name="remote_call"): response = table_instance.get_item(Key={"entity_id": entity_id}) @@ -195,6 +197,10 @@ def _initialize_dynamodb_resource(region: str): return boto3.resource("dynamodb", region_name=region) +def _get_table_name(config: RepoConfig, table: FeatureView) -> str: + return f"{config.project}.{table.name}" + + def _delete_table_idempotent( dynamodb_resource, table_name: str, ): From ce243a48935c583f8f27fdd5a3dafe6242e1dde2 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 16 Dec 2021 10:06:28 -0800 Subject: [PATCH 07/12] Add a feast plan command, and have CLI output differentiates between created, deleted and unchanged objects (#2147) * Print changes in the repo objects in the new style during feast apply Signed-off-by: Achal Shah * change color for deleted infra Signed-off-by: Achal Shah * Add a feast plan command Signed-off-by: Achal Shah * Add a feast plan command Signed-off-by: Achal Shah * return from apply() Signed-off-by: Achal Shah * Fix errors in doctests Signed-off-by: Achal Shah * Fix deepcopy and use a clone method instead Signed-off-by: Achal Shah * Fix registry clone Signed-off-by: Achal Shah * CR updates Signed-off-by: Achal Shah --- sdk/python/feast/cli.py | 21 +++ sdk/python/feast/diff/FcoDiff.py | 2 + sdk/python/feast/feature_store.py | 119 ++++++++++++++- sdk/python/feast/registry.py | 88 ++++++++--- sdk/python/feast/registry_store.py | 11 ++ sdk/python/feast/repo_operations.py | 137 ++++++++++-------- sdk/python/tests/doctest/test_all.py | 7 +- ..._cli_apply_duplicated_featureview_names.py | 2 +- 8 files changed, 298 insertions(+), 89 deletions(-) diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 6acd7ec55b..1997b12e3b 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -34,6 +34,7 @@ cli_check_repo, generate_project_name, init_repo, + plan, registry_dump, teardown, ) @@ -351,6 +352,26 @@ def on_demand_feature_view_list(ctx: click.Context): print(tabulate(table, headers=["NAME"], tablefmt="plain")) +@cli.command("plan", cls=NoOptionDefaultFormat) +@click.option( + "--skip-source-validation", + is_flag=True, + help="Don't validate the data sources by checking for that the tables exist.", +) +@click.pass_context +def plan_command(ctx: click.Context, skip_source_validation: bool): + """ + Create or update a feature store deployment + """ + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + repo_config = load_repo_config(repo) + try: + plan(repo_config, repo, skip_source_validation) + except FeastProviderLoginError as e: + print(str(e)) + + @cli.command("apply", cls=NoOptionDefaultFormat) @click.option( "--skip-source-validation", diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index b19eb713c2..4e2047d38f 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -26,6 +26,8 @@ class TransitionType(Enum): @dataclass class FcoDiff: + name: str + fco_type: str current_fco: Any new_fco: Any fco_property_diffs: List[PropertyDiff] diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 98af761931..b0b99061e9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -18,7 +18,18 @@ from collections import Counter, OrderedDict, defaultdict from datetime import datetime from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast +from typing import ( + Any, + Dict, + Iterable, + List, + NamedTuple, + Optional, + Set, + Tuple, + Union, + cast, +) import pandas as pd from colorama import Fore, Style @@ -26,6 +37,7 @@ from feast import feature_server, flags, flags_helper, utils from feast.base_feature_view import BaseFeatureView +from feast.diff.FcoDiff import RegistryDiff from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -51,6 +63,7 @@ from feast.infra.provider import Provider, RetrievalJob, get_provider from feast.on_demand_feature_view import OnDemandFeatureView from feast.online_response import OnlineResponse, _infer_online_entity_rows +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.protos.feast.serving.ServingService_pb2 import ( GetOnlineFeaturesRequestV2, GetOnlineFeaturesResponse, @@ -66,6 +79,31 @@ warnings.simplefilter("once", DeprecationWarning) +class RepoContents(NamedTuple): + feature_views: Set[FeatureView] + on_demand_feature_views: Set[OnDemandFeatureView] + request_feature_views: Set[RequestFeatureView] + entities: Set[Entity] + feature_services: Set[FeatureService] + + def to_registry_proto(self) -> RegistryProto: + registry_proto = RegistryProto() + registry_proto.entities.extend([e.to_proto() for e in self.entities]) + registry_proto.feature_views.extend( + [fv.to_proto() for fv in self.feature_views] + ) + registry_proto.on_demand_feature_views.extend( + [fv.to_proto() for fv in self.on_demand_feature_views] + ) + registry_proto.request_feature_views.extend( + [fv.to_proto() for fv in self.request_feature_views] + ) + registry_proto.feature_services.extend( + [fs.to_proto() for fs in self.feature_services] + ) + return registry_proto + + class FeatureStore: """ A FeatureStore object is used to define, create, and retrieve features. @@ -357,6 +395,55 @@ def _get_features(self, features: Union[List[str], FeatureService],) -> List[str _feature_refs = _features return _feature_refs + @log_exceptions_and_usage + def plan(self, desired_repo_objects: RepoContents) -> RegistryDiff: + """Dry-run registering objects to metadata store. + + The plan method dry-runs registering one or more definitions (e.g., Entity, FeatureView), and produces + a list of all the changes the that would be introduced in the feature repo. The changes computed by the plan + command are for informational purpose, and are not actually applied to the registry. + + Args: + objects: A single object, or a list of objects that are intended to be registered with the Feature Store. + objects_to_delete: A list of objects to be deleted from the registry. + partial: If True, apply will only handle the specified objects; if False, apply will also delete + all the objects in objects_to_delete. + + Raises: + ValueError: The 'objects' parameter could not be parsed properly. + + Examples: + Generate a plan adding an Entity and a FeatureView. + + >>> from feast import FeatureStore, Entity, FeatureView, Feature, ValueType, FileSource, RepoConfig + >>> from feast.feature_store import RepoContents + >>> from datetime import timedelta + >>> fs = FeatureStore(repo_path="feature_repo") + >>> driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + >>> driver_hourly_stats = FileSource( + ... path="feature_repo/data/driver_stats.parquet", + ... event_timestamp_column="event_timestamp", + ... created_timestamp_column="created", + ... ) + >>> driver_hourly_stats_view = FeatureView( + ... name="driver_hourly_stats", + ... entities=["driver_id"], + ... ttl=timedelta(seconds=86400 * 1), + ... batch_source=driver_hourly_stats, + ... ) + >>> diff = fs.plan(RepoContents({driver_hourly_stats_view}, set(), set(), {driver}, set())) # register entity and feature view + """ + + current_registry_proto = ( + self._registry.cached_registry_proto.__deepcopy__() + if self._registry.cached_registry_proto + else RegistryProto() + ) + + desired_registry_proto = desired_repo_objects.to_registry_proto() + diffs = Registry.diff_between(current_registry_proto, desired_registry_proto) + return diffs + @log_exceptions_and_usage def apply( self, @@ -388,7 +475,7 @@ def apply( ] ] = None, partial: bool = True, - ): + ) -> RegistryDiff: """Register objects to metadata store and update related infrastructure. The apply method registers one or more definitions (e.g., Entity, FeatureView) and registers or updates these @@ -424,18 +511,22 @@ def apply( ... ttl=timedelta(seconds=86400 * 1), ... batch_source=driver_hourly_stats, ... ) - >>> fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view + >>> diff = fs.apply([driver_hourly_stats_view, driver]) # register entity and feature view """ # TODO: Add locking - if not isinstance(objects, Iterable): objects = [objects] - assert isinstance(objects, list) if not objects_to_delete: objects_to_delete = [] + current_registry_proto = ( + self._registry.cached_registry_proto.__deepcopy__() + if self._registry.cached_registry_proto + else RegistryProto() + ) + # Separate all objects into entities, feature services, and different feature view types. entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] @@ -533,6 +624,22 @@ def apply( service.name, project=self.project, commit=False ) + new_registry_proto = ( + self._registry.cached_registry_proto + if self._registry.cached_registry_proto + else RegistryProto() + ) + + diffs = Registry.diff_between(current_registry_proto, new_registry_proto) + + entities_to_update = [ob for ob in objects if isinstance(ob, Entity)] + views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)] + + entities_to_delete = [ob for ob in objects_to_delete if isinstance(ob, Entity)] + views_to_delete = [ + ob for ob in objects_to_delete if isinstance(ob, FeatureView) + ] + self._get_provider().update_infra( project=self.project, tables_to_delete=views_to_delete if not partial else [], @@ -544,6 +651,8 @@ def apply( self._registry.commit() + return diffs + @log_exceptions_and_usage def teardown(self): """Tears down all local and cloud resources for the feature store.""" diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 68d5b0bc11..bfc1e9a336 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -42,6 +42,7 @@ from feast.feature_view import FeatureView from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.registry_store import NoopRegistryStore from feast.repo_config import RegistryConfig from feast.request_feature_view import RequestFeatureView @@ -128,6 +129,18 @@ def __init__( else 0 ) + def clone(self) -> "Registry": + new_registry = Registry(None, None) + new_registry.cached_registry_proto_ttl = timedelta(seconds=0) + new_registry.cached_registry_proto = ( + self.cached_registry_proto.__deepcopy__() + if self.cached_registry_proto + else RegistryProto() + ) + new_registry.cached_registry_proto_created = datetime.utcnow() + new_registry._registry_store = NoopRegistryStore() + return new_registry + # TODO(achals): This method needs to be filled out and used in the feast plan/apply methods. @staticmethod def diff_between( @@ -135,25 +148,66 @@ def diff_between( ) -> RegistryDiff: diff = RegistryDiff() - # Handle Entities - ( - entities_to_keep, - entities_to_delete, - entities_to_add, - ) = tag_proto_objects_for_keep_delete_add( - current_registry.entities, new_registry.entities, - ) + attribute_to_object_type_str = { + "entities": "entity", + "feature_views": "feature view", + "feature_tables": "feature table", + "on_demand_feature_views": "on demand feature view", + "request_feature_views": "request feature view", + "feature_services": "feature service", + } - for e in entities_to_add: - diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE)) - for e in entities_to_delete: - diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE)) + for object_type in [ + "entities", + "feature_views", + "feature_tables", + "on_demand_feature_views", + "request_feature_views", + "feature_services", + ]: + ( + objects_to_keep, + objects_to_delete, + objects_to_add, + ) = tag_proto_objects_for_keep_delete_add( + getattr(current_registry, object_type), + getattr(new_registry, object_type), + ) + + for e in objects_to_add: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + None, + e, + [], + TransitionType.CREATE, + ) + ) + for e in objects_to_delete: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + e, + None, + [], + TransitionType.DELETE, + ) + ) + for e in objects_to_keep: + diff.add_fco_diff( + FcoDiff( + e.spec.name, + attribute_to_object_type_str[object_type], + e, + e, + [], + TransitionType.UNCHANGED, + ) + ) - # Handle Feature Views - # Handle On Demand Feature Views - # Handle Request Feature Views - # Handle Feature Services - logger.info(f"Diff: {diff}") return diff def _initialize_registry(self): diff --git a/sdk/python/feast/registry_store.py b/sdk/python/feast/registry_store.py index 22c8b0f5ed..c42a55cd9d 100644 --- a/sdk/python/feast/registry_store.py +++ b/sdk/python/feast/registry_store.py @@ -36,3 +36,14 @@ def teardown(self): Tear down the registry. """ pass + + +class NoopRegistryStore(RegistryStore): + def get_registry_proto(self) -> RegistryProto: + pass + + def update_registry_proto(self, registry_proto: RegistryProto): + pass + + def teardown(self): + pass diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index c7620c07a0..86de6d1958 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -6,16 +6,16 @@ import sys from importlib.abc import Loader from pathlib import Path -from typing import List, NamedTuple, Set, Union, cast +from typing import List, Set, Union, cast import click from click.exceptions import BadParameter from feast.base_feature_view import BaseFeatureView -from feast.diff.FcoDiff import tag_objects_for_keep_delete_add +from feast.diff.FcoDiff import TransitionType, tag_objects_for_keep_delete_add from feast.entity import Entity from feast.feature_service import FeatureService -from feast.feature_store import FeatureStore +from feast.feature_store import FeatureStore, RepoContents from feast.feature_view import DUMMY_ENTITY, DUMMY_ENTITY_NAME, FeatureView from feast.names import adjectives, animals from feast.on_demand_feature_view import OnDemandFeatureView @@ -33,14 +33,6 @@ def py_path_to_module(path: Path, repo_root: Path) -> str: ) -class ParsedRepo(NamedTuple): - feature_views: Set[FeatureView] - on_demand_feature_views: Set[OnDemandFeatureView] - request_feature_views: Set[RequestFeatureView] - entities: Set[Entity] - feature_services: Set[FeatureService] - - def read_feastignore(repo_root: Path) -> List[str]: """Read .feastignore in the repo root directory (if exists) and return the list of user-defined ignore paths""" feast_ignore = repo_root / ".feastignore" @@ -94,9 +86,9 @@ def get_repo_files(repo_root: Path) -> List[Path]: return sorted(repo_files) -def parse_repo(repo_root: Path) -> ParsedRepo: +def parse_repo(repo_root: Path) -> RepoContents: """ Collect feature table definitions from feature repo """ - res = ParsedRepo( + res = RepoContents( entities=set(), feature_views=set(), feature_services=set(), @@ -124,10 +116,34 @@ def parse_repo(repo_root: Path) -> ParsedRepo: @log_exceptions_and_usage -def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): - from colorama import Fore, Style +def plan(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): os.chdir(repo_path) + project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) + + if not skip_source_validation: + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + data_source.validate(store.config) + + diff = store.plan(repo) + views_to_delete = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" and v.transition_type == TransitionType.DELETE + ] + views_to_keep = [ + v + for v in diff.fco_diffs + if v.fco_type == "feature view" + and v.transition_type in {TransitionType.CREATE, TransitionType.UNCHANGED} + ] + + log_cli_output(diff, views_to_delete, views_to_keep) + + +def _prepare_registry_and_repo(repo_config, repo_path): store = FeatureStore(config=repo_config) project = store.project if not is_valid_name(project): @@ -140,14 +156,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation registry._initialize_registry() sys.dont_write_bytecode = True repo = parse_repo(repo_path) + return project, registry, repo, store - if not skip_source_validation: - data_sources = [t.batch_source for t in repo.feature_views] - # Make sure the data source used by this feature view is supported by Feast - for data_source in data_sources: - data_source.validate(store.config) - # For each object in the registry, determine whether it should be kept or deleted. +def extract_objects_for_apply_delete(project, registry, repo): ( entities_to_keep, entities_to_delete, @@ -157,7 +169,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ) # TODO(achals): This code path should be refactored to handle added & kept entities separately. entities_to_keep = set(entities_to_keep).union(entities_to_add) - views = tag_objects_for_keep_delete_add( set(registry.list_feature_views(project=project)), repo.feature_views ) @@ -166,7 +177,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[FeatureView], views[1]), cast(Set[FeatureView], views[2]), ) - request_views = tag_objects_for_keep_delete_add( set(registry.list_request_feature_views(project=project)), repo.request_feature_views, @@ -179,7 +189,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[RequestFeatureView], request_views[1]), cast(Set[RequestFeatureView], request_views[2]), ) - base_views_to_keep: Set[Union[RequestFeatureView, FeatureView]] = { *views_to_keep, *views_to_add, @@ -190,7 +199,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation *views_to_delete, *request_views_to_delete, } - odfvs = tag_objects_for_keep_delete_add( set(registry.list_on_demand_feature_views(project=project)), repo.on_demand_feature_views, @@ -201,7 +209,6 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation cast(Set[OnDemandFeatureView], odfvs[2]), ) odfvs_to_keep = odfvs_to_keep.union(odfvs_to_add) - ( services_to_keep, services_to_delete, @@ -210,9 +217,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation set(registry.list_feature_services(project=project)), repo.feature_services ) services_to_keep = services_to_keep.union(services_to_add) - sys.dont_write_bytecode = False - # Apply all changes to the registry and infrastructure. all_to_apply: List[ Union[Entity, BaseFeatureView, FeatureService, OnDemandFeatureView] @@ -229,44 +234,49 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation all_to_delete.extend(services_to_delete) all_to_delete.extend(odfvs_to_delete) - store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + return all_to_apply, all_to_delete, views_to_delete, views_to_keep - for entity in entities_to_delete: - click.echo( - f"Deleted entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL} from registry" - ) - for view in base_views_to_delete: - click.echo( - f"Deleted feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL} from registry" - ) - for odfv in odfvs_to_delete: - click.echo( - f"Deleted on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL} from registry" - ) - for feature_service in services_to_delete: - click.echo( - f"Deleted feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL} " - f"from registry" - ) - for entity in entities_to_keep: - if entity.name != DUMMY_ENTITY_NAME: - click.echo( - f"Registered entity {Style.BRIGHT + Fore.GREEN}{entity.name}{Style.RESET_ALL}" - ) - for view in base_views_to_keep: - click.echo( - f"Registered feature view {Style.BRIGHT + Fore.GREEN}{view.name}{Style.RESET_ALL}" - ) - for odfv in odfvs_to_keep: - click.echo( - f"Registered on demand feature view {Style.BRIGHT + Fore.GREEN}{odfv.name}{Style.RESET_ALL}" - ) - for feature_service in services_to_keep: +@log_exceptions_and_usage +def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation: bool): + + os.chdir(repo_path) + project, registry, repo, store = _prepare_registry_and_repo(repo_config, repo_path) + + if not skip_source_validation: + data_sources = [t.batch_source for t in repo.feature_views] + # Make sure the data source used by this feature view is supported by Feast + for data_source in data_sources: + data_source.validate(store.config) + + # For each object in the registry, determine whether it should be kept or deleted. + ( + all_to_apply, + all_to_delete, + views_to_delete, + views_to_keep, + ) = extract_objects_for_apply_delete(project, registry, repo) + + diff = store.apply(all_to_apply, objects_to_delete=all_to_delete, partial=False) + + log_cli_output(diff, views_to_delete, views_to_keep) + + +def log_cli_output(diff, views_to_delete, views_to_keep): + from colorama import Fore, Style + + message_action_map = { + TransitionType.CREATE: ("Created", Fore.GREEN), + TransitionType.DELETE: ("Deleted", Fore.RED), + TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX), + } + for fco_diff in diff.fco_diffs: + if fco_diff.name == DUMMY_ENTITY_NAME: + continue + action, color = message_action_map[fco_diff.transition_type] click.echo( - f"Registered feature service {Style.BRIGHT + Fore.GREEN}{feature_service.name}{Style.RESET_ALL}" + f"{action} {fco_diff.fco_type} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}" ) - views_to_keep_in_infra = [ view for view in views_to_keep if isinstance(view, FeatureView) ] @@ -279,9 +289,8 @@ def apply_total(repo_config: RepoConfig, repo_path: Path, skip_source_validation ] for name in [view.name for view in views_to_delete_from_infra]: click.echo( - f"Removing infrastructure for {Style.BRIGHT + Fore.GREEN}{name}{Style.RESET_ALL}" + f"Removing infrastructure for {Style.BRIGHT + Fore.RED}{name}{Style.RESET_ALL}" ) - # TODO: consider echoing also entities being deployed/removed @log_exceptions_and_usage diff --git a/sdk/python/tests/doctest/test_all.py b/sdk/python/tests/doctest/test_all.py index c406303f00..bf3a09db1e 100644 --- a/sdk/python/tests/doctest/test_all.py +++ b/sdk/python/tests/doctest/test_all.py @@ -59,6 +59,7 @@ def test_docstrings(): """ successful = True current_packages = [feast] + failed_cases = [] while current_packages: next_packages = [] @@ -94,8 +95,10 @@ def test_docstrings(): result = unittest.TextTestRunner(sys.stdout).run(test_suite) if not result.wasSuccessful(): successful = False - except Exception: + failed_cases.append(result.failures) + except Exception as e: successful = False + failed_cases.append((full_name, e)) finally: if teardown_function: teardown_function() @@ -103,4 +106,4 @@ def test_docstrings(): current_packages = next_packages if not successful: - raise Exception("Docstring tests failed.") + raise Exception(f"Docstring tests failed. Failed results: {failed_cases}") diff --git a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py b/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py index 2b5ba2dcb0..6987066e8d 100644 --- a/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py +++ b/sdk/python/tests/integration/registration/test_cli_apply_duplicated_featureview_names.py @@ -85,7 +85,7 @@ def test_cli_apply_imported_featureview() -> None: rc, output = runner.run_with_output(["apply"], cwd=repo_path) assert rc == 0 - assert b"Registered feature service driver_locations_service" in output + assert b"Created feature service driver_locations_service" in output def test_cli_apply_imported_featureview_with_duplication() -> None: From 107eddc836f81560cd9babe3e3f168572d233ba1 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 16 Dec 2021 12:46:19 -0800 Subject: [PATCH 08/12] Do not run benchmarks on pull requests (#2155) Signed-off-by: Felix Wang --- .github/workflows/pr_integration_tests.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 445e1a3576..8a910f943c 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -161,11 +161,3 @@ jobs: env_vars: OS,PYTHON fail_ci_if_error: true verbose: true - - name: Benchmark python - env: - FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-docker-image.outputs.DOCKER_IMAGE_TAG }} - FEAST_USAGE: "False" - IS_TEST: "True" - run: pytest --verbose --color=yes sdk/python/tests --integration --benchmark --benchmark-autosave --benchmark-save-data --durations=5 - - name: Upload Benchmark Artifact to S3 - run: aws s3 cp --recursive .benchmarks s3://feast-ci-pytest-benchmarks From 3cd967807dc29d513b5637cfb9baa52cb2eeba93 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 16 Dec 2021 13:04:18 -0800 Subject: [PATCH 09/12] Add DatastoreTable infra object (#2140) * Add DatastoreTable infra object Signed-off-by: Felix Wang * Switch to StringValue Signed-off-by: Felix Wang * Initialize Datastore client in __init__ Signed-off-by: Felix Wang --- protos/feast/core/DatastoreTable.proto | 39 ++++++ protos/feast/core/InfraObject.proto | 2 + .../feast/infra/online_stores/datastore.py | 119 +++++++++++++++--- 3 files changed, 143 insertions(+), 17 deletions(-) create mode 100644 protos/feast/core/DatastoreTable.proto diff --git a/protos/feast/core/DatastoreTable.proto b/protos/feast/core/DatastoreTable.proto new file mode 100644 index 0000000000..15720ad809 --- /dev/null +++ b/protos/feast/core/DatastoreTable.proto @@ -0,0 +1,39 @@ +// +// * Copyright 2021 The Feast Authors +// * +// * Licensed under the Apache License, Version 2.0 (the "License"); +// * you may not use this file except in compliance with the License. +// * You may obtain a copy of the License at +// * +// * https://www.apache.org/licenses/LICENSE-2.0 +// * +// * Unless required by applicable law or agreed to in writing, software +// * distributed under the License is distributed on an "AS IS" BASIS, +// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// * See the License for the specific language governing permissions and +// * limitations under the License. +// + +syntax = "proto3"; + +package feast.core; +option java_package = "feast.proto.core"; +option java_outer_classname = "DatastoreTableProto"; +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; + +import "google/protobuf/wrappers.proto"; + +// Represents a Datastore table +message DatastoreTable { + // Feast project of the table + string project = 1; + + // Name of the table + string name = 2; + + // GCP project id + google.protobuf.StringValue project_id = 3; + + // Datastore namespace + google.protobuf.StringValue namespace = 4; +} \ No newline at end of file diff --git a/protos/feast/core/InfraObject.proto b/protos/feast/core/InfraObject.proto index ded4c3ed68..a0f3541dec 100644 --- a/protos/feast/core/InfraObject.proto +++ b/protos/feast/core/InfraObject.proto @@ -22,6 +22,7 @@ option java_outer_classname = "InfraObjectProto"; option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; import "feast/core/DynamoDBTable.proto"; +import "feast/core/DatastoreTable.proto"; // Represents a set of infrastructure objects managed by Feast message Infra { @@ -37,6 +38,7 @@ message InfraObject { // The infrastructure object oneof infra_object { DynamoDBTable dynamodb_table = 2; + DatastoreTable datastore_table = 3; CustomInfra custom_infra = 100; } diff --git a/sdk/python/feast/infra/online_stores/datastore.py b/sdk/python/feast/infra/online_stores/datastore.py index a9bd534a50..0442eda122 100644 --- a/sdk/python/feast/infra/online_stores/datastore.py +++ b/sdk/python/feast/infra/online_stores/datastore.py @@ -21,8 +21,13 @@ from feast import Entity, utils from feast.feature_view import FeatureView +from feast.infra.infra_object import InfraObject from feast.infra.online_stores.helpers import compute_entity_id from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.core.DatastoreTable_pb2 import ( + DatastoreTable as DatastoreTableProto, +) +from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from feast.repo_config import FeastConfigBaseModel, RepoConfig @@ -80,8 +85,6 @@ def update( entities_to_keep: Sequence[Entity], partial: bool, ): - """ - """ online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) client = self._get_client(online_config) @@ -110,9 +113,6 @@ def teardown( tables: Sequence[FeatureView], entities: Sequence[Entity], ): - """ - There's currently no teardown done for Datastore. - """ online_config = config.online_store assert isinstance(online_config, DatastoreOnlineStoreConfig) client = self._get_client(online_config) @@ -128,18 +128,10 @@ def teardown( client.delete(key) def _get_client(self, online_config: DatastoreOnlineStoreConfig): - if not self._client: - try: - self._client = datastore.Client( - project=online_config.project_id, namespace=online_config.namespace, - ) - except DefaultCredentialsError as e: - raise FeastProviderLoginError( - str(e) - + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' - "local Google Cloud account " - ) + self._client = _initialize_client( + online_config.project_id, online_config.namespace + ) return self._client @log_exceptions_and_usage(online_store="datastore") @@ -267,7 +259,7 @@ def online_read( return result -def _delete_all_values(client, key) -> None: +def _delete_all_values(client, key): """ Delete all data under the key path in datastore. """ @@ -279,3 +271,96 @@ def _delete_all_values(client, key) -> None: for entity in entities: client.delete(entity.key) + + +def _initialize_client( + project_id: Optional[str], namespace: Optional[str] +) -> datastore.Client: + try: + client = datastore.Client(project=project_id, namespace=namespace,) + return client + except DefaultCredentialsError as e: + raise FeastProviderLoginError( + str(e) + + '\nIt may be necessary to run "gcloud auth application-default login" if you would like to use your ' + "local Google Cloud account " + ) + + +class DatastoreTable(InfraObject): + """ + A Datastore table managed by Feast. + + Attributes: + project: The Feast project of the table. + name: The name of the table. + project_id (optional): The GCP project id. + namespace (optional): Datastore namespace. + client: Datastore client. + """ + + project: str + name: str + project_id: Optional[str] + namespace: Optional[str] + client: datastore.Client + + def __init__( + self, + project: str, + name: str, + project_id: Optional[str] = None, + namespace: Optional[str] = None, + ): + self.project = project + self.name = name + self.project_id = project_id + self.namespace = namespace + self.client = _initialize_client(self.project_id, self.namespace) + + def to_proto(self) -> InfraObjectProto: + datastore_table_proto = DatastoreTableProto() + datastore_table_proto.project = self.project + datastore_table_proto.name = self.name + if self.project_id: + datastore_table_proto.project_id.FromString(bytes(self.project_id, "utf-8")) + if self.namespace: + datastore_table_proto.namespace.FromString(bytes(self.namespace, "utf-8")) + + return InfraObjectProto( + infra_object_class_type="feast.infra.online_stores.datastore.DatastoreTable", + datastore_table=datastore_table_proto, + ) + + @staticmethod + def from_proto(infra_object_proto: InfraObjectProto) -> Any: + datastore_table = DatastoreTable( + project=infra_object_proto.datastore_table.project, + name=infra_object_proto.datastore_table.name, + ) + + if infra_object_proto.datastore_table.HasField("project_id"): + datastore_table.project_id = ( + infra_object_proto.datastore_table.project_id.SerializeToString() + ).decode("utf-8") + if infra_object_proto.datastore_table.HasField("namespace"): + datastore_table.namespace = ( + infra_object_proto.datastore_table.namespace.SerializeToString() + ).decode("utf-8") + + return datastore_table + + def update(self): + key = self.client.key("Project", self.project, "Table", self.name) + entity = datastore.Entity( + key=key, exclude_from_indexes=("created_ts", "event_ts", "values") + ) + entity.update({"created_ts": datetime.utcnow()}) + self.client.put(entity) + + def teardown(self): + key = self.client.key("Project", self.project, "Table", self.name) + _delete_all_values(self.client, key) + + # Delete the table metadata datastore entity + self.client.delete(key) From 1cfc25cf658712b87a6633b8b13ca918af593c9b Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 16 Dec 2021 17:38:18 -0800 Subject: [PATCH 10/12] Compute property-level diffs for repo objects (#2156) * Compute property-level diffs for repo objects Signed-off-by: Achal Shah * Fix inference Signed-off-by: Achal Shah --- sdk/python/feast/diff/FcoDiff.py | 25 +++++++++++++++++++++++++ sdk/python/feast/registry.py | 15 ++++++++------- sdk/python/feast/repo_operations.py | 7 +++++++ 3 files changed, 40 insertions(+), 7 deletions(-) diff --git a/sdk/python/feast/diff/FcoDiff.py b/sdk/python/feast/diff/FcoDiff.py index 4e2047d38f..09f76d42f1 100644 --- a/sdk/python/feast/diff/FcoDiff.py +++ b/sdk/python/feast/diff/FcoDiff.py @@ -75,3 +75,28 @@ def tag_proto_objects_for_keep_delete_add( objs_to_delete = [e for e in existing_objs if e.spec.name not in desired_obj_names] return objs_to_keep, objs_to_delete, objs_to_add + + +FIELDS_TO_IGNORE = {"project"} + + +def diff_between(current: U, new: U, object_type: str) -> FcoDiff: + assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name + property_diffs = [] + transition: TransitionType = TransitionType.UNCHANGED + if current.spec != new.spec: + for _field in current.spec.DESCRIPTOR.fields: + if _field.name in FIELDS_TO_IGNORE: + continue + if getattr(current.spec, _field.name) != getattr(new.spec, _field.name): + transition = TransitionType.UPDATE + property_diffs.append( + PropertyDiff( + _field.name, + getattr(current.spec, _field.name), + getattr(new.spec, _field.name), + ) + ) + return FcoDiff( + new.spec.name, object_type, current, new, property_diffs, transition, + ) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index bfc1e9a336..cd937b2978 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -28,6 +28,7 @@ FcoDiff, RegistryDiff, TransitionType, + diff_between, tag_proto_objects_for_keep_delete_add, ) from feast.entity import Entity @@ -197,14 +198,14 @@ def diff_between( ) ) for e in objects_to_keep: + current_obj_proto = [ + _e + for _e in getattr(current_registry, object_type) + if _e.spec.name == e.spec.name + ][0] diff.add_fco_diff( - FcoDiff( - e.spec.name, - attribute_to_object_type_str[object_type], - e, - e, - [], - TransitionType.UNCHANGED, + diff_between( + current_obj_proto, e, attribute_to_object_type_str[object_type] ) ) diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 86de6d1958..9299a36123 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -269,6 +269,7 @@ def log_cli_output(diff, views_to_delete, views_to_keep): TransitionType.CREATE: ("Created", Fore.GREEN), TransitionType.DELETE: ("Deleted", Fore.RED), TransitionType.UNCHANGED: ("Unchanged", Fore.LIGHTBLUE_EX), + TransitionType.UPDATE: ("Updated", Fore.YELLOW), } for fco_diff in diff.fco_diffs: if fco_diff.name == DUMMY_ENTITY_NAME: @@ -277,6 +278,12 @@ def log_cli_output(diff, views_to_delete, views_to_keep): click.echo( f"{action} {fco_diff.fco_type} {Style.BRIGHT + color}{fco_diff.name}{Style.RESET_ALL}" ) + if fco_diff.transition_type == TransitionType.UPDATE: + for _p in fco_diff.fco_property_diffs: + click.echo( + f"\t{_p.property_name}: {Style.BRIGHT + color}{_p.val_existing}{Style.RESET_ALL} -> {Style.BRIGHT + Fore.LIGHTGREEN_EX}{_p.val_declared}{Style.RESET_ALL}" + ) + views_to_keep_in_infra = [ view for view in views_to_keep if isinstance(view, FeatureView) ] From a753e73312c9f01f5e8af88b2ed1b45e7717aa98 Mon Sep 17 00:00:00 2001 From: ChaitanyaKN Date: Fri, 17 Dec 2021 09:56:19 +0530 Subject: [PATCH 11/12] Fix typo in custom provider docs (#2070) Signed-off-by: Felix Wang Co-authored-by: Felix Wang --- docs/how-to-guides/creating-a-custom-provider.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/how-to-guides/creating-a-custom-provider.md b/docs/how-to-guides/creating-a-custom-provider.md index ca71acb68e..40ec20ee6a 100644 --- a/docs/how-to-guides/creating-a-custom-provider.md +++ b/docs/how-to-guides/creating-a-custom-provider.md @@ -123,7 +123,7 @@ It may also be necessary to add the module root path to your `PYTHONPATH` as fol PYTHONPATH=$PYTHONPATH:/home/my_user/my_custom_provider feast apply ``` -That's it. You should not have a fully functional custom provider! +That's it. You should now have a fully functional custom provider! ### Next steps From 5ce0fdb48b473fd66e06834d6273d1b5d8eaa467 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Fri, 17 Dec 2021 16:19:18 +0000 Subject: [PATCH 12/12] Fix `BYTES` and `BYTES_LIST` type conversion (#2158) Signed-off-by: Judah Rand <17158624+judahrand@users.noreply.github.com> --- sdk/python/feast/type_map.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 9992517ba6..10c9b5b331 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import base64 import re from datetime import datetime from typing import Any, Dict, List, Optional, Set, Tuple, Type @@ -62,7 +63,9 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: if k == "int64Val": return int(val) if k == "bytesVal": - return bytes(val) + # MessageToDict converts the bytes object to base64 encoded string: + # https://developers.google.com/protocol-buffers/docs/proto3#json + return base64.b64decode(val) if (k == "int64ListVal") or (k == "int32ListVal"): return [int(item) for item in val] if (k == "floatListVal") or (k == "doubleListVal"): @@ -70,7 +73,9 @@ def feast_value_type_to_python_type(field_value_proto: ProtoValue) -> Any: if k == "stringListVal": return [str(item) for item in val] if k == "bytesListVal": - return [bytes(item) for item in val] + # MessageToDict converts the bytes object to base64 encoded string: + # https://developers.google.com/protocol-buffers/docs/proto3#json + return [base64.b64decode(val) for item in val] if k == "boolListVal": return [bool(item) for item in val]