From 0a32568db636eaf5e9e443877f890c54089844b9 Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 11 May 2022 11:24:01 -0700 Subject: [PATCH 1/2] Fix materialization with ttl=0 bug Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 11 ++++- .../example_feature_repo_with_ttl_0.py | 48 +++++++++++++++++++ .../online_store/test_e2e_local.py | 20 ++++++-- 3 files changed, 72 insertions(+), 7 deletions(-) create mode 100644 sdk/python/tests/example_repos/example_feature_repo_with_ttl_0.py diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 115f0fd971..ae48c4c2c1 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -16,7 +16,7 @@ import os import warnings from collections import Counter, defaultdict -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import ( TYPE_CHECKING, @@ -1080,7 +1080,14 @@ def materialize_incremental( f"No start time found for feature view {feature_view.name}. materialize_incremental() requires" f" either a ttl to be set or for materialize() to have been run at least once." ) - start_date = datetime.utcnow() - feature_view.ttl + elif feature_view.ttl.total_seconds() > 0: + start_date = datetime.utcnow() - feature_view.ttl + else: + print( + f"Since the ttl is 0 for feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}, " + "the start date will be set to 1 year before the current time." + ) + start_date = datetime.utcnow() - timedelta(weeks=52) provider = self._get_provider() print( f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}" diff --git a/sdk/python/tests/example_repos/example_feature_repo_with_ttl_0.py b/sdk/python/tests/example_repos/example_feature_repo_with_ttl_0.py new file mode 100644 index 0000000000..e2bec03f8f --- /dev/null +++ b/sdk/python/tests/example_repos/example_feature_repo_with_ttl_0.py @@ -0,0 +1,48 @@ +from datetime import timedelta + +from feast import Entity, FeatureView, Field, FileSource, ValueType +from feast.types import Float32, Int32, Int64 + +driver_hourly_stats = FileSource( + path="%PARQUET_PATH%", # placeholder to be replaced by the test + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id") + + +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=0), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_hourly_stats, + tags={}, +) + + +global_daily_stats = FileSource( + path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + + +global_stats_feature_view = FeatureView( + name="global_daily_stats", + entities=[], + ttl=timedelta(days=0), + schema=[ + Field(name="num_rides", dtype=Int32), + Field(name="avg_ride_length", dtype=Float32), + ], + online=True, + source=global_daily_stats, + tags={}, +) diff --git a/sdk/python/tests/integration/online_store/test_e2e_local.py b/sdk/python/tests/integration/online_store/test_e2e_local.py index 59ea0777a4..d3cb86716a 100644 --- a/sdk/python/tests/integration/online_store/test_e2e_local.py +++ b/sdk/python/tests/integration/online_store/test_e2e_local.py @@ -101,12 +101,12 @@ def _test_materialize_and_online_retrieval( def test_e2e_local() -> None: """ - A more comprehensive than "basic" test, using local provider. + Tests the end-to-end workflow of apply, materialize, and online retrieval. - 1. Create a repo. - 2. Apply - 3. Ingest some data to online store from parquet - 4. Read from the online store to make sure it made it there. + This test runs against several different types of repos: + 1. A repo with a normal FV and an entity-less FV. + 2. A repo using the SDK from version 0.19.0. + 3. A repo with a FV with a ttl of 0. """ runner = CliRunner() with tempfile.TemporaryDirectory() as data_dir: @@ -143,6 +143,16 @@ def test_e2e_local() -> None: runner, store, start_date, end_date, driver_df ) + with runner.local_repo( + get_example_repo("example_feature_repo_with_ttl_0.py") + .replace("%PARQUET_PATH%", driver_stats_path) + .replace("%PARQUET_PATH_GLOBAL%", global_stats_path), + "file", + ) as store: + _test_materialize_and_online_retrieval( + runner, store, start_date, end_date, driver_df + ) + # Test a failure case when the parquet file doesn't include a join key with runner.local_repo( get_example_repo("example_feature_repo_with_entity_join_key.py").replace( From 68b5c134e358e835e2f4ce9b9e8e9a636a0bafbf Mon Sep 17 00:00:00 2001 From: Felix Wang Date: Wed, 11 May 2022 13:00:24 -0700 Subject: [PATCH 2/2] Add TODO Signed-off-by: Felix Wang --- sdk/python/feast/feature_store.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ae48c4c2c1..8be1510d51 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1083,6 +1083,8 @@ def materialize_incremental( elif feature_view.ttl.total_seconds() > 0: start_date = datetime.utcnow() - feature_view.ttl else: + # TODO(felixwang9817): Find the earliest timestamp for this specific feature + # view from the offline store, and set the start date to that timestamp. print( f"Since the ttl is 0 for feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}, " "the start date will be set to 1 year before the current time."