diff --git a/Makefile b/Makefile index 26e79cf6a9..813a27f4e3 100644 --- a/Makefile +++ b/Makefile @@ -176,18 +176,6 @@ test-python-universal-athena: not s3_registry and \ not test_snowflake" \ sdk/python/tests - -test-python-universal-duckdb: - PYTHONPATH='.' \ - FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.duckdb_repo_configuration \ - python -m pytest -n 8 --integration \ - -k "not test_nullable_online_store and \ - not gcs_registry and \ - not s3_registry and \ - not test_snowflake and \ - not bigquery and \ - not test_spark_materialization_consistency" \ - sdk/python/tests test-python-universal-postgres-offline: PYTHONPATH='.' \ diff --git a/docs/project/development-guide.md b/docs/project/development-guide.md index 28baa789bb..2d4ab0c7c6 100644 --- a/docs/project/development-guide.md +++ b/docs/project/development-guide.md @@ -187,7 +187,7 @@ make lint-python ### Unit Tests Unit tests (`pytest`) for the Feast Python SDK / CLI can run as follows: ```sh -make test-python +make test-python-unit ``` > :warning: Local configuration can interfere with Unit tests and cause them to fail: diff --git a/environment-setup.md b/environment-setup.md index a6c30c2aa2..5dde9dfd94 100644 --- a/environment-setup.md +++ b/environment-setup.md @@ -19,5 +19,5 @@ make install-python-ci-dependencies PYTHON=3.9 4. start the docker daemon 5. run unit tests: ```bash -make test-python +make test-python-unit ``` \ No newline at end of file diff --git a/sdk/python/feast/infra/offline_stores/contrib/duckdb_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/duckdb_offline_store/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/sdk/python/feast/infra/offline_stores/contrib/duckdb_repo_configuration.py b/sdk/python/feast/infra/offline_stores/contrib/duckdb_repo_configuration.py deleted file mode 100644 index 263ae97466..0000000000 --- a/sdk/python/feast/infra/offline_stores/contrib/duckdb_repo_configuration.py +++ /dev/null @@ -1,19 +0,0 @@ -from feast.infra.offline_stores.contrib.duckdb_offline_store.duckdb import ( - DuckDBOfflineStoreConfig, -) -from tests.integration.feature_repos.universal.data_sources.file import ( # noqa: E402 - FileDataSourceCreator, -) - - -class DuckDBDataSourceCreator(FileDataSourceCreator): - def create_offline_store_config(self): - self.duckdb_offline_store_config = DuckDBOfflineStoreConfig() - return self.duckdb_offline_store_config - - -AVAILABLE_OFFLINE_STORES = [ - ("local", DuckDBDataSourceCreator), -] - -AVAILABLE_ONLINE_STORES = {"sqlite": ({"type": "sqlite"}, None)} diff --git a/sdk/python/feast/infra/offline_stores/contrib/ibis_offline_store/__init__.py b/sdk/python/feast/infra/offline_stores/contrib/ibis_offline_store/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/sdk/python/feast/infra/offline_stores/contrib/duckdb_offline_store/duckdb.py b/sdk/python/feast/infra/offline_stores/duckdb.py similarity index 82% rename from sdk/python/feast/infra/offline_stores/contrib/duckdb_offline_store/duckdb.py rename to sdk/python/feast/infra/offline_stores/duckdb.py index f927f2ff92..d43286f371 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/duckdb_offline_store/duckdb.py +++ b/sdk/python/feast/infra/offline_stores/duckdb.py @@ -1,7 +1,7 @@ import ibis from pydantic import StrictStr -from feast.infra.offline_stores.contrib.ibis_offline_store.ibis import IbisOfflineStore +from feast.infra.offline_stores.ibis import IbisOfflineStore from feast.repo_config import FeastConfigBaseModel diff --git a/sdk/python/feast/infra/offline_stores/contrib/ibis_offline_store/ibis.py b/sdk/python/feast/infra/offline_stores/ibis.py similarity index 95% rename from sdk/python/feast/infra/offline_stores/contrib/ibis_offline_store/ibis.py rename to sdk/python/feast/infra/offline_stores/ibis.py index 37fc6a4718..f9c6b2d20b 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/ibis_offline_store/ibis.py +++ b/sdk/python/feast/infra/offline_stores/ibis.py @@ -134,7 +134,9 @@ def get_historical_features( entity_table, feature_views, event_timestamp_col ) - def read_fv(feature_view, feature_refs, full_feature_names): + def read_fv( + feature_view: FeatureView, feature_refs: List[str], full_feature_names: bool + ) -> Tuple: fv_table: Table = ibis.read_parquet(feature_view.batch_source.name) for old_name, new_name in feature_view.batch_source.field_mapping.items(): @@ -175,6 +177,7 @@ def read_fv(feature_view, feature_refs, full_feature_names): return ( fv_table, feature_view.batch_source.timestamp_field, + feature_view.batch_source.created_timestamp_column, feature_view.projection.join_key_map or {e.name: e.name for e in feature_view.entity_columns}, feature_refs, @@ -351,12 +354,19 @@ def metadata(self) -> Optional[RetrievalMetadata]: def point_in_time_join( entity_table: Table, - feature_tables: List[Tuple[Table, str, Dict[str, str], List[str], timedelta]], + feature_tables: List[Tuple[Table, str, str, Dict[str, str], List[str], timedelta]], event_timestamp_col="event_timestamp", ): # TODO handle ttl all_entities = [event_timestamp_col] - for feature_table, timestamp_field, join_key_map, _, _ in feature_tables: + for ( + feature_table, + timestamp_field, + created_timestamp_field, + join_key_map, + _, + _, + ) in feature_tables: all_entities.extend(join_key_map.values()) r = ibis.literal("") @@ -371,6 +381,7 @@ def point_in_time_join( for ( feature_table, timestamp_field, + created_timestamp_field, join_key_map, feature_refs, ttl, @@ -395,9 +406,13 @@ def point_in_time_join( feature_table = feature_table.drop(s.endswith("_y")) + order_by_fields = [ibis.desc(feature_table[timestamp_field])] + if created_timestamp_field: + order_by_fields.append(ibis.desc(feature_table[created_timestamp_field])) + feature_table = ( feature_table.group_by(by="entity_row_id") - .order_by(ibis.desc(feature_table[timestamp_field])) + .order_by(order_by_fields) .mutate(rn=ibis.row_number()) ) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 5708754622..fe3491c6fe 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -75,7 +75,7 @@ "postgres": "feast.infra.offline_stores.contrib.postgres_offline_store.postgres.PostgreSQLOfflineStore", "athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore", "mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore", - "duckdb": "feast.infra.offline_stores.contrib.duckdb_offline_store.duckdb.DuckDBOfflineStore", + "duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore", } FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index d2450bf868..6eb5204161 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -31,6 +31,7 @@ BigQueryDataSourceCreator, ) from tests.integration.feature_repos.universal.data_sources.file import ( + DuckDBDataSourceCreator, FileDataSourceCreator, ) from tests.integration.feature_repos.universal.data_sources.redshift import ( @@ -108,6 +109,7 @@ AVAILABLE_OFFLINE_STORES: List[Tuple[str, Type[DataSourceCreator]]] = [ ("local", FileDataSourceCreator), + ("local", DuckDBDataSourceCreator), ] AVAILABLE_ONLINE_STORES: Dict[ diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index c70dae9863..6d4baa19ed 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -15,6 +15,7 @@ from feast.data_format import ParquetFormat from feast.data_source import DataSource from feast.feature_logging import LoggingDestination +from feast.infra.offline_stores.duckdb import DuckDBOfflineStoreConfig from feast.infra.offline_stores.file import FileOfflineStoreConfig from feast.infra.offline_stores.file_source import ( FileLoggingDestination, @@ -214,3 +215,10 @@ def create_offline_store_config(self) -> FeastConfigBaseModel: def teardown(self): self.minio.stop() self.f.close() + + +# TODO split up DataSourceCreator and OfflineStoreCreator +class DuckDBDataSourceCreator(FileDataSourceCreator): + def create_offline_store_config(self): + self.duckdb_offline_store_config = DuckDBOfflineStoreConfig() + return self.duckdb_offline_store_config diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index 9baba2397b..7e106b3e2a 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -518,7 +518,7 @@ def test_historical_features_with_no_ttl( @pytest.mark.integration @pytest.mark.universal_offline_stores -def test_historical_features_from_bigquery_sources_containing_backfills(environment): +def test_historical_features_containing_backfills(environment): store = environment.feature_store now = datetime.now().replace(microsecond=0, second=0, minute=0) diff --git a/sdk/python/tests/unit/infra/offline_stores/test_ibis.py b/sdk/python/tests/unit/infra/offline_stores/test_ibis.py index 5f105e2af7..fea1399552 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_ibis.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_ibis.py @@ -3,10 +3,9 @@ import ibis import pyarrow as pa +import pyarrow.compute as pc -from feast.infra.offline_stores.contrib.ibis_offline_store.ibis import ( - point_in_time_join, -) +from feast.infra.offline_stores.ibis import point_in_time_join def pa_datetime(year, month, day): @@ -16,12 +15,13 @@ def pa_datetime(year, month, day): def customer_table(): return pa.Table.from_arrays( arrays=[ - pa.array([1, 1, 2]), + pa.array([1, 1, 2, 3]), pa.array( [ pa_datetime(2024, 1, 1), pa_datetime(2024, 1, 2), pa_datetime(2024, 1, 1), + pa_datetime(2024, 1, 3), ] ), ], @@ -32,24 +32,38 @@ def customer_table(): def features_table_1(): return pa.Table.from_arrays( arrays=[ - pa.array([1, 1, 1, 2]), + pa.array([1, 1, 1, 2, 3, 3]), pa.array( [ pa_datetime(2023, 12, 31), pa_datetime(2024, 1, 2), pa_datetime(2024, 1, 3), pa_datetime(2023, 1, 3), + pa_datetime(2024, 1, 1), + pa_datetime(2024, 1, 1), ] ), - pa.array([11, 22, 33, 22]), + pa.array( + [ + pa_datetime(2023, 12, 31), + pa_datetime(2024, 1, 2), + pa_datetime(2024, 1, 3), + pa_datetime(2023, 1, 3), + pa_datetime(2024, 1, 3), + pa_datetime(2024, 1, 2), + ] + ), + pa.array([11, 22, 33, 22, 10, 20]), ], - names=["customer_id", "event_timestamp", "feature1"], + names=["customer_id", "event_timestamp", "created", "feature1"], ) def point_in_time_join_brute( entity_table: pa.Table, - feature_tables: List[Tuple[pa.Table, str, Dict[str, str], List[str], timedelta]], + feature_tables: List[ + Tuple[pa.Table, str, str, Dict[str, str], List[str], timedelta] + ], event_timestamp_col="event_timestamp", ): ret_fields = [entity_table.schema.field(n) for n in entity_table.schema.names] @@ -63,6 +77,7 @@ def point_in_time_join_brute( for ( feature_table, timestamp_key, + created_timestamp_key, join_key_map, feature_refs, ttl, @@ -72,7 +87,9 @@ def point_in_time_join_brute( [ feature_table.schema.field(f) for f in feature_table.schema.names - if f not in join_key_map.values() and f != timestamp_key + if f not in join_key_map.values() + and f != timestamp_key + and f != created_timestamp_key ] ) @@ -82,9 +99,11 @@ def check_equality(ft_dict, batch_dict, x, y): ) ft_dict = feature_table.to_pydict() + found_matches = [ - (j, ft_dict[timestamp_key][j]) - for j in range(entity_table.num_rows) + (j, (ft_dict[timestamp_key][j], ft_dict[created_timestamp_key][j])) + # (j, ft_dict[timestamp_key][j]) + for j in range(feature_table.num_rows) if check_equality(ft_dict, batch_dict, j, i) and ft_dict[timestamp_key][j] <= row_timestmap and ft_dict[timestamp_key][j] >= row_timestmap - ttl @@ -93,6 +112,7 @@ def check_equality(ft_dict, batch_dict, x, y): index_found = ( max(found_matches, key=itemgetter(1))[0] if found_matches else None ) + for col in ft_dict.keys(): if col not in feature_refs: continue @@ -108,6 +128,18 @@ def check_equality(ft_dict, batch_dict, x, y): return pa.Table.from_pydict(ret, schema=pa.schema(ret_fields)) +def tables_equal_ignore_order(actual: pa.Table, expected: pa.Table): + sort_keys = [(name, "ascending") for name in actual.column_names] + sort_indices = pc.sort_indices(actual, sort_keys) + actual = pc.take(actual, sort_indices) + + sort_keys = [(name, "ascending") for name in expected.column_names] + sort_indices = pc.sort_indices(expected, sort_keys) + expected = pc.take(expected, sort_indices) + + return actual.equals(expected) + + def test_point_in_time_join(): expected = point_in_time_join_brute( customer_table(), @@ -115,6 +147,7 @@ def test_point_in_time_join(): ( features_table_1(), "event_timestamp", + "created", {"customer_id": "customer_id"}, ["feature1"], timedelta(days=10), @@ -128,6 +161,7 @@ def test_point_in_time_join(): ( ibis.memtable(features_table_1()), "event_timestamp", + "created", {"customer_id": "customer_id"}, ["feature1"], timedelta(days=10), @@ -135,4 +169,4 @@ def test_point_in_time_join(): ], ).to_pyarrow() - assert actual.equals(expected) + assert tables_equal_ignore_order(actual, expected)