diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index f649320749..0254f4187b 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -17,6 +17,7 @@ import os import time from collections import OrderedDict +from math import ceil from typing import Dict, Union from typing import List from urllib.parse import urlparse @@ -36,7 +37,6 @@ GetFeatureSetResponse, ) from feast.core.CoreService_pb2_grpc import CoreServiceStub -from feast.core.FeatureSet_pb2 import FeatureSetStatus from feast.feature_set import FeatureSet, Entity from feast.job import Job from feast.loaders.abstract_producer import get_producer @@ -336,11 +336,15 @@ def get_batch_features( "feature_set_name:version:feature_name". entity_rows (Union[pd.DataFrame, str]): + Either: Pandas dataframe containing entities and a 'datetime' column. Each entity in a feature set must be present as a column in this dataframe. The datetime column must contain timestamps in datetime64 format. + Or: + A file path in AVRO format representing the entity rows. + Returns: feast.job.Job: Returns a job object that can be used to monitor retrieval @@ -794,7 +798,7 @@ def _read_table_from_source( # Write table as parquet file with a specified row_group_size tmp_table_name = f"{int(time.time())}.parquet" - row_group_size = min(int(table.num_rows/max_workers), chunk_size) + row_group_size = min(ceil(table.num_rows / max_workers), chunk_size) pq.write_table(table=table, where=tmp_table_name, row_group_size=row_group_size) diff --git a/sdk/python/feast/feature_set.py b/sdk/python/feast/feature_set.py index 28381891f5..42979ea911 100644 --- a/sdk/python/feast/feature_set.py +++ b/sdk/python/feast/feature_set.py @@ -19,8 +19,6 @@ import pandas as pd import pyarrow as pa -from feast.core.FeatureSet_pb2 import FeatureSet as FeatureSetProto -from feast.core.FeatureSet_pb2 import FeatureSetMeta as FeatureSetMetaProto from feast.core.FeatureSet_pb2 import FeatureSetSpec as FeatureSetSpecProto from feast.entity import Entity from feast.feature import Feature, Field diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 9ac7225e80..420d1a39af 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -32,7 +32,7 @@ "googleapis-common-protos==1.*", "google-cloud-bigquery-storage==0.7.*", "grpcio==1.*", - "pandas==0.*", + "pandas>=0.25.0", "pandavro==1.5.*", "protobuf>=3.10", "PyYAML==5.1.*", diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index f979c5a55d..9ef6e3d56f 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -385,36 +385,6 @@ def test_feature_set_ingest_success(self, dataframe, client, mocker): # Ingest data into Feast client.ingest("driver-feature-set", dataframe) - @pytest.mark.parametrize("dataframe,exception", [(dataframes.GOOD, TimeoutError)]) - def test_feature_set_ingest_fail_if_pending( - self, dataframe, exception, client, mocker - ): - with pytest.raises(exception): - driver_fs = FeatureSet( - "driver-feature-set", - source=KafkaSource(brokers="kafka:9092", topic="test"), - ) - driver_fs.add(Feature(name="feature_1", dtype=ValueType.FLOAT)) - driver_fs.add(Feature(name="feature_2", dtype=ValueType.STRING)) - driver_fs.add(Feature(name="feature_3", dtype=ValueType.INT64)) - driver_fs.add(Entity(name="entity_id", dtype=ValueType.INT64)) - - # Register with Feast core - client.apply(driver_fs) - driver_fs = driver_fs.to_proto() - driver_fs.meta.status = FeatureSetStatus.STATUS_PENDING - - mocker.patch.object( - client._core_service_stub, - "GetFeatureSet", - return_value=GetFeatureSetResponse(feature_set=driver_fs), - ) - - # Need to create a mock producer - with patch("feast.client.get_producer") as mocked_queue: - # Ingest data into Feast - client.ingest("driver-feature-set", dataframe, timeout=1) - @pytest.mark.parametrize( "dataframe,exception", [