From 1e09ad3b141c8dbc785a4090bc296f83eb2f4a6d Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Mon, 15 Aug 2022 17:26:45 -0700 Subject: [PATCH 01/11] Add __init__.py files to allow test files to share names Signed-off-by: Felix Wang --- sdk/python/tests/doctest/__init__.py | 0 sdk/python/tests/integration/e2e/__init__.py | 0 sdk/python/tests/integration/materialization/__init__.py | 0 sdk/python/tests/integration/offline_store/__init__.py | 0 sdk/python/tests/integration/registration/__init__.py | 0 sdk/python/tests/integration/scaffolding/__init__.py | 0 sdk/python/tests/unit/cli/__init__.py | 0 sdk/python/tests/unit/diff/__init__.py | 0 sdk/python/tests/unit/infra/__init__.py | 0 sdk/python/tests/unit/infra/online_store/__init__.py | 0 sdk/python/tests/unit/infra/scaffolding/__init__.py | 0 sdk/python/tests/unit/local_feast_tests/__init__.py | 0 sdk/python/tests/unit/online_store/__init__.py | 0 13 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 sdk/python/tests/doctest/__init__.py create mode 100644 sdk/python/tests/integration/e2e/__init__.py create mode 100644 sdk/python/tests/integration/materialization/__init__.py create mode 100644 sdk/python/tests/integration/offline_store/__init__.py create mode 100644 sdk/python/tests/integration/registration/__init__.py create mode 100644 sdk/python/tests/integration/scaffolding/__init__.py create mode 100644 sdk/python/tests/unit/cli/__init__.py create mode 100644 sdk/python/tests/unit/diff/__init__.py create mode 100644 sdk/python/tests/unit/infra/__init__.py create mode 100644 sdk/python/tests/unit/infra/online_store/__init__.py create mode 100644 sdk/python/tests/unit/infra/scaffolding/__init__.py create mode 100644 sdk/python/tests/unit/local_feast_tests/__init__.py create mode 100644 sdk/python/tests/unit/online_store/__init__.py diff --git a/sdk/python/tests/doctest/__init__.py b/sdk/python/tests/doctest/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/integration/e2e/__init__.py b/sdk/python/tests/integration/e2e/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/integration/materialization/__init__.py b/sdk/python/tests/integration/materialization/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/integration/offline_store/__init__.py b/sdk/python/tests/integration/offline_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/integration/registration/__init__.py b/sdk/python/tests/integration/registration/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/integration/scaffolding/__init__.py b/sdk/python/tests/integration/scaffolding/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/unit/cli/__init__.py b/sdk/python/tests/unit/cli/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/unit/diff/__init__.py b/sdk/python/tests/unit/diff/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/unit/infra/__init__.py b/sdk/python/tests/unit/infra/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/unit/infra/online_store/__init__.py b/sdk/python/tests/unit/infra/online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/unit/infra/scaffolding/__init__.py b/sdk/python/tests/unit/infra/scaffolding/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/unit/local_feast_tests/__init__.py b/sdk/python/tests/unit/local_feast_tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/python/tests/unit/online_store/__init__.py b/sdk/python/tests/unit/online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2 From 5a5c6f606bbb84345f8bb8ff3839bdec9fa7b662 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Fri, 12 Aug 2022 13:08:15 -0700 Subject: [PATCH 02/11] Add feature service tests Signed-off-by: Felix Wang --- ...ple_feature_repo_with_feature_service_2.py | 63 ++++++++++++ ...ple_feature_repo_with_feature_service_3.py | 52 ++++++++++ .../unit/infra/test_inference_unit_tests.py | 60 ++++++++++++ .../local_feast_tests/test_feature_service.py | 96 +++++++++++++++++++ .../test_feature_service_apply.py | 25 ----- .../test_feature_service_read.py | 17 ---- 6 files changed, 271 insertions(+), 42 deletions(-) create mode 100644 sdk/python/tests/example_repos/example_feature_repo_with_feature_service_2.py create mode 100644 sdk/python/tests/example_repos/example_feature_repo_with_feature_service_3.py create mode 100644 sdk/python/tests/unit/local_feast_tests/test_feature_service.py delete mode 100644 sdk/python/tests/unit/local_feast_tests/test_feature_service_apply.py delete mode 100644 sdk/python/tests/unit/local_feast_tests/test_feature_service_read.py diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_2.py b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_2.py new file mode 100644 index 0000000000..3547c3de86 --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_2.py @@ -0,0 +1,63 @@ +from datetime import timedelta + +from feast import Entity, FeatureService, FeatureView, Field, FileSource +from feast.types import Float32, Int32, Int64 + +driver_hourly_stats = FileSource( + path="data/driver_stats.parquet", # Fake path + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +driver = Entity( + name="driver_id", +) + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + Field(name="driver_id", dtype=Int32), + ], + online=True, + source=driver_hourly_stats, + tags={}, +) + +global_daily_stats = FileSource( + path="data/global_stats.parquet", # Fake path + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +global_stats_feature_view = FeatureView( + name="global_daily_stats", + entities=[], + ttl=timedelta(days=1), + schema=[ + Field(name="num_rides", dtype=Int32), + Field(name="avg_ride_length", dtype=Float32), + ], + online=True, + source=global_daily_stats, + tags={}, +) + +all_stats_service = FeatureService( + name="all_stats", + features=[driver_hourly_stats_view, global_stats_feature_view], + tags={"release": "production"}, +) + +some_stats_service = FeatureService( + name="some_stats", + features=[ + driver_hourly_stats_view[["conv_rate"]], + global_stats_feature_view[["num_rides"]], + ], + tags={"release": "production"}, +) diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_3.py b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_3.py new file mode 100644 index 0000000000..c16a5d4abc --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_feature_service_3.py @@ -0,0 +1,52 @@ +from datetime import timedelta + +from feast import Entity, FeatureService, FeatureView, FileSource + +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", # placeholder to be replaced by the test + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +driver = Entity( + name="driver_id", +) + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + online=True, + source=driver_hourly_stats, + tags={}, +) + +global_daily_stats = FileSource( + path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +global_stats_feature_view = FeatureView( + name="global_daily_stats", + entities=[], + ttl=timedelta(days=1), + online=True, + source=global_daily_stats, + tags={}, +) + +all_stats_service = FeatureService( + name="all_stats", + features=[driver_hourly_stats_view, global_stats_feature_view], + tags={"release": "production"}, +) + +some_stats_service = FeatureService( + name="some_stats", + features=[ + driver_hourly_stats_view[["conv_rate"]], + global_stats_feature_view[["num_rides"]], + ], + tags={"release": "production"}, +) diff --git a/sdk/python/tests/unit/infra/test_inference_unit_tests.py b/sdk/python/tests/unit/infra/test_inference_unit_tests.py index 68b6ee70f6..c5ed83c12f 100644 --- a/sdk/python/tests/unit/infra/test_inference_unit_tests.py +++ b/sdk/python/tests/unit/infra/test_inference_unit_tests.py @@ -294,6 +294,10 @@ def test_feature_view_inference_on_feature_columns(simple_dataset_1): def test_update_feature_services_with_inferred_features(simple_dataset_1): + """ + Tests that a feature service that references feature views without specified features will + be updated with the correct projections after feature inference. + """ with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: entity1 = Entity(name="test1", join_keys=["id_join_key"]) feature_view_1 = FeatureView( @@ -338,4 +342,60 @@ def test_update_feature_services_with_inferred_features(simple_dataset_1): assert len(feature_service.feature_view_projections[1].features) == 3 +def test_update_feature_services_with_specified_features(simple_dataset_1): + """ + Tests that a feature service that references feature views with specified features will + have the correct projections both before and after feature inference. + """ + with prep_file_source(df=simple_dataset_1, timestamp_field="ts_1") as file_source: + entity1 = Entity(name="test1", join_keys=["id_join_key"]) + feature_view_1 = FeatureView( + name="test1", + entities=[entity1], + schema=[ + Field(name="float_col", dtype=Float32), + Field(name="id_join_key", dtype=Int64), + ], + source=file_source, + ) + feature_view_2 = FeatureView( + name="test2", + entities=[entity1], + schema=[ + Field(name="int64_col", dtype=Int64), + Field(name="id_join_key", dtype=Int64), + ], + source=file_source, + ) + + feature_service = FeatureService( + name="fs_1", features=[feature_view_1[["float_col"]], feature_view_2] + ) + assert len(feature_service.feature_view_projections) == 2 + assert len(feature_service.feature_view_projections[0].features) == 1 + assert len(feature_service.feature_view_projections[0].desired_features) == 0 + assert len(feature_service.feature_view_projections[1].features) == 1 + assert len(feature_service.feature_view_projections[1].desired_features) == 0 + + update_feature_views_with_inferred_features_and_entities( + [feature_view_1, feature_view_2], + [entity1], + RepoConfig( + provider="local", project="test", entity_key_serialization_version=2 + ), + ) + assert len(feature_view_1.features) == 1 + assert len(feature_view_2.features) == 1 + + feature_service.infer_features( + fvs_to_update={ + feature_view_1.name: feature_view_1, + feature_view_2.name: feature_view_2, + } + ) + + assert len(feature_service.feature_view_projections[0].features) == 1 + assert len(feature_service.feature_view_projections[1].features) == 1 + + # TODO(felixwang9817): Add tests that interact with field mapping. diff --git a/sdk/python/tests/unit/local_feast_tests/test_feature_service.py b/sdk/python/tests/unit/local_feast_tests/test_feature_service.py new file mode 100644 index 0000000000..82c1dd2a1d --- /dev/null +++ b/sdk/python/tests/unit/local_feast_tests/test_feature_service.py @@ -0,0 +1,96 @@ +import os +import tempfile +from datetime import datetime, timedelta + +from feast.driver_test_data import ( + create_driver_hourly_stats_df, + create_global_daily_stats_df, +) +from tests.utils.basic_read_write_test import basic_rw_test +from tests.utils.cli_repo_creator import CliRunner, get_example_repo + + +def test_apply_without_fv_inference() -> None: + """ + Tests that feature services based on feature views that do not require inference can be applied correctly. + """ + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_with_feature_service_2.py"), "file" + ) as store: + assert len(store.list_feature_services()) == 2 + + fs = store.get_feature_service("all_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 3 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[1].features) == 2 + assert len(fs.feature_view_projections[1].desired_features) == 0 + assert len(fs.tags) == 1 + assert fs.tags["release"] == "production" + + fs = store.get_feature_service("some_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + + +def test_apply_with_fv_inference() -> None: + """ + Tests that feature services based on feature views that require inference can be applied correctly. + """ + runner = CliRunner() + with tempfile.TemporaryDirectory() as data_dir: + # Generate test data. + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + + driver_entities = [1001, 1002, 1003, 1004, 1005] + driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date) + driver_stats_path = os.path.join(data_dir, "driver_stats.parquet") + driver_df.to_parquet(path=driver_stats_path, allow_truncated_timestamps=True) + + global_df = create_global_daily_stats_df(start_date, end_date) + global_stats_path = os.path.join(data_dir, "global_stats.parquet") + global_df.to_parquet(path=global_stats_path, allow_truncated_timestamps=True) + + with runner.local_repo( + get_example_repo("example_feature_repo_with_feature_service_3.py") + .replace("%PARQUET_PATH%", driver_stats_path) + .replace("%PARQUET_PATH_GLOBAL%", global_stats_path), + "file", + ) as store: + assert len(store.list_feature_services()) == 2 + + fs = store.get_feature_service("all_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 3 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[1].features) == 2 + assert len(fs.feature_view_projections[1].desired_features) == 0 + assert len(fs.tags) == 1 + assert fs.tags["release"] == "production" + + fs = store.get_feature_service("some_stats") + assert len(fs.feature_view_projections) == 2 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + assert len(fs.feature_view_projections[0].features) == 1 + assert len(fs.feature_view_projections[0].desired_features) == 0 + + +def test_read() -> None: + """ + Test that feature values are correctly read through a feature service. + """ + runner = CliRunner() + with runner.local_repo( + get_example_repo("example_feature_repo_with_feature_service.py"), "file" + ) as store: + basic_rw_test( + store, + view_name="driver_locations", + feature_service_name="driver_locations_service", + ) diff --git a/sdk/python/tests/unit/local_feast_tests/test_feature_service_apply.py b/sdk/python/tests/unit/local_feast_tests/test_feature_service_apply.py deleted file mode 100644 index dc642a6e3c..0000000000 --- a/sdk/python/tests/unit/local_feast_tests/test_feature_service_apply.py +++ /dev/null @@ -1,25 +0,0 @@ -from feast.feature_service import FeatureService -from tests.utils.cli_repo_creator import CliRunner, get_example_repo - - -def test_read_pre_applied() -> None: - """ - Read feature values from the FeatureStore using a FeatureService. - """ - runner = CliRunner() - with runner.local_repo( - get_example_repo("example_feature_repo_with_feature_service.py"), "file" - ) as store: - assert len(store.list_feature_services()) == 1 - fs = store.get_feature_service("driver_locations_service") - assert len(fs.tags) == 1 - assert fs.tags["release"] == "production" - - fv = store.get_feature_view("driver_locations") - - fs = FeatureService(name="new_feature_service", features=[fv[["lon"]]]) - - store.apply([fs]) - - assert len(store.list_feature_services()) == 2 - store.get_feature_service("new_feature_service") diff --git a/sdk/python/tests/unit/local_feast_tests/test_feature_service_read.py b/sdk/python/tests/unit/local_feast_tests/test_feature_service_read.py deleted file mode 100644 index 2b5b311dc9..0000000000 --- a/sdk/python/tests/unit/local_feast_tests/test_feature_service_read.py +++ /dev/null @@ -1,17 +0,0 @@ -from tests.utils.basic_read_write_test import basic_rw_test -from tests.utils.cli_repo_creator import CliRunner, get_example_repo - - -def test_feature_service_read() -> None: - """ - Read feature values from the FeatureStore using a FeatureService. - """ - runner = CliRunner() - with runner.local_repo( - get_example_repo("example_feature_repo_with_feature_service.py"), "file" - ) as store: - basic_rw_test( - store, - view_name="driver_locations", - feature_service_name="driver_locations_service", - ) From 07eaba657c958078b2b2b46d6ddfcc2c097234ce Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Tue, 16 Aug 2022 00:26:11 -0700 Subject: [PATCH 03/11] Fix feature service inference logic Signed-off-by: Felix Wang --- sdk/python/feast/feature_service.py | 27 ++++++++++++++------- sdk/python/feast/feature_view_projection.py | 4 +++ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 7c946b1d0b..134634bbc6 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -86,14 +86,21 @@ def __init__( self.feature_view_projections.append(feature_grouping.projection) def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None): + """ + Infers the features for the projections of this feature service, and updates this feature + service in place. + + This method is necessary since feature services may rely on feature views which require + feature inference. + """ for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): - # For feature services that depend on an unspecified feature view, apply inferred schema + projection = feature_grouping.projection + if fvs_to_update and feature_grouping.name in fvs_to_update: - if feature_grouping.projection.desired_features: - desired_features = set( - feature_grouping.projection.desired_features - ) + if projection.desired_features: + # Select the specific desired features. + desired_features = set(projection.desired_features) actual_features = set( [ f.name @@ -101,14 +108,16 @@ def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None) ] ) assert desired_features.issubset(actual_features) + # We need to set the features for the projection at this point so we ensure we're starting with # an empty list. - feature_grouping.projection.features = [] + projection.features = [] for f in fvs_to_update[feature_grouping.name].features: if f.name in desired_features: - feature_grouping.projection.features.append(f) - else: - feature_grouping.projection.features = fvs_to_update[ + projection.features.append(f) + elif not projection.features: + # No features have been specifically selected, so all features will be added to the projection. + projection.features = fvs_to_update[ feature_grouping.name ].features else: diff --git a/sdk/python/feast/feature_view_projection.py b/sdk/python/feast/feature_view_projection.py index a862e5f08d..2960996a10 100644 --- a/sdk/python/feast/feature_view_projection.py +++ b/sdk/python/feast/feature_view_projection.py @@ -21,6 +21,10 @@ class FeatureViewProjection: name: The unique name of the feature view from which this projection is created. name_alias: An optional alias for the name. features: The list of features represented by the feature view projection. + desired_features: The list of features that this feature view projection intends to select. + If empty, the projection intends to select all features. This attribute is only used + for feature service inference. It should only be set if the underlying feature view + is not ready to be projected, i.e. still needs to go through feature inference. join_key_map: A map to modify join key columns during retrieval of this feature view projection. """ From 51174b31a944ed689bcb5090b55519299d7d1ca4 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Tue, 16 Aug 2022 09:50:02 -0700 Subject: [PATCH 04/11] Remove stray `__init__.py` file Signed-off-by: Felix Wang --- sdk/python/tests/unit/online_store/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 sdk/python/tests/unit/online_store/__init__.py diff --git a/sdk/python/tests/unit/online_store/__init__.py b/sdk/python/tests/unit/online_store/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 From 45ec63cbe5dc4b0c3985bab4efe2c6d5f5e06797 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 17 Aug 2022 10:59:42 -0700 Subject: [PATCH 05/11] Fix comments Signed-off-by: Felix Wang --- sdk/python/feast/feature_service.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 134634bbc6..3fead9cbc0 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -98,8 +98,15 @@ def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None) projection = feature_grouping.projection if fvs_to_update and feature_grouping.name in fvs_to_update: + # There are three situations to be handled. First, the projection specifies + # desired features, in which case we should select those desired features. + # Second, the projection does not specify any desired features but has + # already selected features, in which case nothing needs to be done. And + # third, the projection does not specify any desired features but has not + # yet selected features (since the original feature view did not yet have + # features), in which case we should select all possible inferred features. if projection.desired_features: - # Select the specific desired features. + # First case, so we select the specific desired features. desired_features = set(projection.desired_features) actual_features = set( [ @@ -115,8 +122,11 @@ def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None) for f in fvs_to_update[feature_grouping.name].features: if f.name in desired_features: projection.features.append(f) - elif not projection.features: - # No features have been specifically selected, so all features will be added to the projection. + elif not projection.desired_features and projection.features: + # Second cass, so nothing needs to be done. + pass + else: + # Third case, so all inferred features will be selected. projection.features = fvs_to_update[ feature_grouping.name ].features From 7a43b8fc0bab35deff9af8cdadd2695e6feae9d2 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 17 Aug 2022 11:24:13 -0700 Subject: [PATCH 06/11] Add check Signed-off-by: Felix Wang --- sdk/python/feast/feature_service.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 3fead9cbc0..c7b2ae9b3d 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -123,8 +123,15 @@ def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None) if f.name in desired_features: projection.features.append(f) elif not projection.desired_features and projection.features: - # Second cass, so nothing needs to be done. - pass + # Second cass, so nothing needs to be done. In case something went wrong + # during feature inference, we check that the selected features still exist. + actual_features = set( + [ + f.name + for f in fvs_to_update[feature_grouping.name].features + ] + ) + assert projection.features.issubset(actual_features) else: # Third case, so all inferred features will be selected. projection.features = fvs_to_update[ From 40d7965a752fd673c194156945c004af6b1961c2 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 17 Aug 2022 15:39:40 -0700 Subject: [PATCH 07/11] Temp Signed-off-by: Felix Wang --- sdk/python/feast/errors.py | 7 ++++ sdk/python/feast/feature_service.py | 62 ++++++++++++++++------------- 2 files changed, 42 insertions(+), 27 deletions(-) diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index f8a288940a..a13f1a387f 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -310,6 +310,13 @@ def __init__(self, expected_column_name: str): ) +class FeatureViewMissingDuringFeatureServiceInference(Exception): + def __init__(self, feature_view_name: str, feature_service_name: str): + super().__init__( + f"Missing {feature_view_name} feature view during inference for {feature_service_name} feature service." + ) + + class InvalidEntityType(Exception): def __init__(self, entity_type: type): super().__init__( diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index c7b2ae9b3d..1eaa8a97b0 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -5,6 +5,7 @@ from typeguard import typechecked from feast.base_feature_view import BaseFeatureView +from feast.errors import FeatureViewMissingDuringFeatureServiceInference from feast.feature_logging import LoggingConfig from feast.feature_view import FeatureView from feast.feature_view_projection import FeatureViewProjection @@ -85,28 +86,30 @@ def __init__( if isinstance(feature_grouping, BaseFeatureView): self.feature_view_projections.append(feature_grouping.projection) - def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None): + def infer_features(self, fvs_to_update: Dict[str, FeatureView]): """ Infers the features for the projections of this feature service, and updates this feature service in place. This method is necessary since feature services may rely on feature views which require feature inference. + + Args: + fvs_to_update: A mapping of feature view names to corresponding feature views that + contains all the feature views necessary to run inference. """ for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): projection = feature_grouping.projection - if fvs_to_update and feature_grouping.name in fvs_to_update: - # There are three situations to be handled. First, the projection specifies - # desired features, in which case we should select those desired features. - # Second, the projection does not specify any desired features but has - # already selected features, in which case nothing needs to be done. And - # third, the projection does not specify any desired features but has not - # yet selected features (since the original feature view did not yet have - # features), in which case we should select all possible inferred features. - if projection.desired_features: - # First case, so we select the specific desired features. + + + if projection.desired_features: + # The projection wants to select a specific set of inferred features. + # Example: FeatureService(features=[fv[["inferred_feature"]]]), where + # 'fv' is a feature view that was defined without a schema. + if feature_grouping.name in fvs_to_update: + # Validate that the selected features have actually been inferred. desired_features = set(projection.desired_features) actual_features = set( [ @@ -116,27 +119,32 @@ def infer_features(self, fvs_to_update: Optional[Dict[str, FeatureView]] = None) ) assert desired_features.issubset(actual_features) - # We need to set the features for the projection at this point so we ensure we're starting with - # an empty list. + # Extract the selected features and add them to the projection. projection.features = [] for f in fvs_to_update[feature_grouping.name].features: if f.name in desired_features: projection.features.append(f) - elif not projection.desired_features and projection.features: - # Second cass, so nothing needs to be done. In case something went wrong - # during feature inference, we check that the selected features still exist. - actual_features = set( - [ - f.name - for f in fvs_to_update[feature_grouping.name].features - ] - ) - assert projection.features.issubset(actual_features) else: - # Third case, so all inferred features will be selected. - projection.features = fvs_to_update[ - feature_grouping.name - ].features + raise FeatureViewMissingDuringFeatureServiceInference( + feature_view_name=feature_grouping.name, + feature_service_name=self.name, + ) + + continue + + if projection.features: + # The projection has already selected features from a feature view with a + # known schema, so no action needs to be taken. + # Example: FeatureService(features=[fv[["existing_feature"]]]), where + # 'existing_feature' was defined as part of the schema of 'fv'. + continue + + # The projection wants to select all possible inferred features. + # Example: FeatureService(features=[fv]), where 'fv' is a feature view that + # was defined without a schema. + projection.features = fvs_to_update[ + feature_grouping.name + ].features else: raise ValueError( f"The feature service {self.name} has been provided with an invalid type " From 8dfb6dba0c8b2cf35debf75e2dd1efe158126ba0 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 17 Aug 2022 16:04:43 -0700 Subject: [PATCH 08/11] Address comments Signed-off-by: Felix Wang --- sdk/python/feast/feature_service.py | 46 ++++++++++++----------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 1eaa8a97b0..a6dcb9ad53 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -100,35 +100,29 @@ def infer_features(self, fvs_to_update: Dict[str, FeatureView]): """ for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): - projection = feature_grouping.projection - - + if feature_grouping.name not in fvs_to_update: + raise FeatureViewMissingDuringFeatureServiceInference( + feature_view_name=feature_grouping.name, + feature_service_name=self.name, + ) + projection = feature_grouping.projection if projection.desired_features: # The projection wants to select a specific set of inferred features. # Example: FeatureService(features=[fv[["inferred_feature"]]]), where # 'fv' is a feature view that was defined without a schema. - if feature_grouping.name in fvs_to_update: - # Validate that the selected features have actually been inferred. - desired_features = set(projection.desired_features) - actual_features = set( - [ - f.name - for f in fvs_to_update[feature_grouping.name].features - ] - ) - assert desired_features.issubset(actual_features) - - # Extract the selected features and add them to the projection. - projection.features = [] - for f in fvs_to_update[feature_grouping.name].features: - if f.name in desired_features: - projection.features.append(f) - else: - raise FeatureViewMissingDuringFeatureServiceInference( - feature_view_name=feature_grouping.name, - feature_service_name=self.name, - ) + # First we validate that the selected features have actually been inferred. + desired_features = set(projection.desired_features) + actual_features = set( + [f.name for f in fvs_to_update[feature_grouping.name].features] + ) + assert desired_features.issubset(actual_features) + + # Then we extract the selected features and add them to the projection. + projection.features = [] + for f in fvs_to_update[feature_grouping.name].features: + if f.name in desired_features: + projection.features.append(f) continue @@ -142,9 +136,7 @@ def infer_features(self, fvs_to_update: Dict[str, FeatureView]): # The projection wants to select all possible inferred features. # Example: FeatureService(features=[fv]), where 'fv' is a feature view that # was defined without a schema. - projection.features = fvs_to_update[ - feature_grouping.name - ].features + projection.features = fvs_to_update[feature_grouping.name].features else: raise ValueError( f"The feature service {self.name} has been provided with an invalid type " From c4d9c5a83fe5ce57931db7014334d2b342089975 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 17 Aug 2022 17:09:27 -0700 Subject: [PATCH 09/11] Fix Signed-off-by: Felix Wang --- sdk/python/feast/feature_service.py | 48 +++++++++++++++++------------ 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index a6dcb9ad53..3b6f1e7a90 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -100,29 +100,33 @@ def infer_features(self, fvs_to_update: Dict[str, FeatureView]): """ for feature_grouping in self._features: if isinstance(feature_grouping, BaseFeatureView): - if feature_grouping.name not in fvs_to_update: - raise FeatureViewMissingDuringFeatureServiceInference( - feature_view_name=feature_grouping.name, - feature_service_name=self.name, - ) - projection = feature_grouping.projection + if projection.desired_features: # The projection wants to select a specific set of inferred features. # Example: FeatureService(features=[fv[["inferred_feature"]]]), where # 'fv' is a feature view that was defined without a schema. - # First we validate that the selected features have actually been inferred. - desired_features = set(projection.desired_features) - actual_features = set( - [f.name for f in fvs_to_update[feature_grouping.name].features] - ) - assert desired_features.issubset(actual_features) - - # Then we extract the selected features and add them to the projection. - projection.features = [] - for f in fvs_to_update[feature_grouping.name].features: - if f.name in desired_features: - projection.features.append(f) + if feature_grouping.name in fvs_to_update: + # First we validate that the selected features have actually been inferred. + desired_features = set(projection.desired_features) + actual_features = set( + [ + f.name + for f in fvs_to_update[feature_grouping.name].features + ] + ) + assert desired_features.issubset(actual_features) + + # Then we extract the selected features and add them to the projection. + projection.features = [] + for f in fvs_to_update[feature_grouping.name].features: + if f.name in desired_features: + projection.features.append(f) + else: + raise FeatureViewMissingDuringFeatureServiceInference( + feature_view_name=feature_grouping.name, + feature_service_name=self.name, + ) continue @@ -136,7 +140,13 @@ def infer_features(self, fvs_to_update: Dict[str, FeatureView]): # The projection wants to select all possible inferred features. # Example: FeatureService(features=[fv]), where 'fv' is a feature view that # was defined without a schema. - projection.features = fvs_to_update[feature_grouping.name].features + if feature_grouping.name in fvs_to_update: + projection.features = fvs_to_update[feature_grouping.name].features + else: + raise FeatureViewMissingDuringFeatureServiceInference( + feature_view_name=feature_grouping.name, + feature_service_name=self.name, + ) else: raise ValueError( f"The feature service {self.name} has been provided with an invalid type " From 64dfdef1706b712fb82e92445c2b3a0554ec7853 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 18 Aug 2022 10:34:48 -0700 Subject: [PATCH 10/11] Address comments Signed-off-by: Felix Wang --- sdk/python/feast/base_feature_view.py | 10 +++++++--- sdk/python/feast/feature_service.py | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index 5feb1d7d89..fe7f8229cc 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -117,10 +117,14 @@ def __getitem__(self, item): cp = self.__copy__() if self.features: + feature_names = [feature.name for feature in self.features] referenced_features = [] - for feature in self.features: - if feature.name in item: - referenced_features.append(feature) + for feature in item: + if feature not in feature_names: + raise ValueError( + f"Feature {feature} does not exist in this feature view." + ) + referenced_features.append(feature) cp.projection.features = referenced_features else: cp.projection.desired_features = item diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 3b6f1e7a90..c3037a55da 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -135,6 +135,7 @@ def infer_features(self, fvs_to_update: Dict[str, FeatureView]): # known schema, so no action needs to be taken. # Example: FeatureService(features=[fv[["existing_feature"]]]), where # 'existing_feature' was defined as part of the schema of 'fv'. + # Example: FeatureService(features=[fv]), where 'fv' was defined with a schema. continue # The projection wants to select all possible inferred features. From 31115acc82417d427e3f93be2be5f65d4efc5119 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Thu, 18 Aug 2022 13:40:04 -0700 Subject: [PATCH 11/11] Fix Signed-off-by: Felix Wang --- sdk/python/feast/base_feature_view.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/base_feature_view.py b/sdk/python/feast/base_feature_view.py index fe7f8229cc..01a4b39704 100644 --- a/sdk/python/feast/base_feature_view.py +++ b/sdk/python/feast/base_feature_view.py @@ -117,14 +117,16 @@ def __getitem__(self, item): cp = self.__copy__() if self.features: - feature_names = [feature.name for feature in self.features] + feature_name_to_feature = { + feature.name: feature for feature in self.features + } referenced_features = [] for feature in item: - if feature not in feature_names: + if feature not in feature_name_to_feature: raise ValueError( f"Feature {feature} does not exist in this feature view." ) - referenced_features.append(feature) + referenced_features.append(feature_name_to_feature[feature]) cp.projection.features = referenced_features else: cp.projection.desired_features = item