From 93b165eb352ef1d38510d1bd363a121773e04e8b Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 16:25:54 -0400 Subject: [PATCH 01/18] bug: Fixing push API endpoint to include a way to specify whether the registry should be looked up from a cache. Adding docs for feature server usage Signed-off-by: Danny Chiao --- .../feature-servers/local-feature-server.md | 59 ++++++++++++++++++- sdk/python/feast/feature_store.py | 10 +++- 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/docs/reference/feature-servers/local-feature-server.md b/docs/reference/feature-servers/local-feature-server.md index 4ea37d4f1e..8b2c17f6a5 100644 --- a/docs/reference/feature-servers/local-feature-server.md +++ b/docs/reference/feature-servers/local-feature-server.md @@ -1,15 +1,23 @@ -# Local feature server +# Python feature server ## Overview -The local feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to get features from Feast using any programming language that can make HTTP requests. A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is also available. A remote feature server on GCP Cloud Run is currently being developed. +The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests. ## CLI -There is a new CLI command that starts the server: `feast serve`. By default Feast uses port 6566; the port be overridden by a `--port` flag. +There is a CLI command that starts the server: `feast serve`. By default, Feast uses port 6566; the port be overridden by a `--port` flag. + +## Deploying as a service + +One can also deploy a feature server by building a docker image that bundles in the project's `feature_store.yaml`. See [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-python-server) for example. + +A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is available. A remote feature server on GCP Cloud Run is currently being developed. + ## Example +### Initializing a feature server Here's the local feature server usage example with the local template: ```bash @@ -41,6 +49,7 @@ INFO: Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit) 09/10/2021 10:42:11 AM INFO:Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit) ``` +### Retrieving features from the online store After the server starts, we can execute cURL commands from another terminal tab: ```bash @@ -142,3 +151,47 @@ curl -X POST \ } }' | jq ``` + +### Pushing features to the online store +You can push data corresponding to a push source to the online store (note that timestamps need to be strings): + +```text +curl -X POST "http://localhost:6566/write-to-online-store" -d '{ + "push_source_name": "driver_hourly_stats_push_source", + "df": { + "driver_id": [1001], + "event_timestamp": ["2022-05-13 10:59:42"], + "created": ["2022-05-13 10:59:42"], + "conv_rate": [1.0], + "acc_rate": [1.0], + "avg_daily_trips": [1000] + } + }' | jq +``` + +or equivalently from Python: +```python +import requests +import pandas as pd +from datetime import datetime + +event_df = pd.DataFrame.from_dict( + { + "driver_id": [1001], + "event_timestamp": [datetime(2021, 5, 13, 10, 59, 42),], + "created": [datetime(2021, 5, 13, 10, 59, 42),], + "conv_rate": [1.0], + "acc_rate": [1.0], + "avg_daily_trips": [1000], + "string_feature": "test2", + } +) +event_df['event_timestamp'] = event_df['event_timestamp'].astype(str) +event_df['created'] = event_df['created'].astype(str) +requests.post( + "http://localhost:6566/push", + json={ + "push_source_name":"driver_stats_push_source", + "df":event_df.to_dict() + }) +``` diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 33d297f3ca..01ace319a7 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1186,7 +1186,9 @@ def tqdm_builder(length): ) @log_exceptions_and_usage - def push(self, push_source_name: str, df: pd.DataFrame): + def push( + self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True + ): """ Push features to a push source. This updates all the feature views that have the push source as stream source. Args: @@ -1195,7 +1197,7 @@ def push(self, push_source_name: str, df: pd.DataFrame): """ from feast.data_source import PushSource - all_fvs = self.list_feature_views(allow_cache=True) + all_fvs = self.list_feature_views(allow_cache=allow_registry_cache) fvs_with_push_sources = { fv @@ -1208,7 +1210,9 @@ def push(self, push_source_name: str, df: pd.DataFrame): } for fv in fvs_with_push_sources: - self.write_to_online_store(fv.name, df, allow_registry_cache=True) + self.write_to_online_store( + fv.name, df, allow_registry_cache=allow_registry_cache + ) @log_exceptions_and_usage def write_to_online_store( From ddba2067245d5d87b28518d1b8f9c8df56902d21 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 16:28:04 -0400 Subject: [PATCH 02/18] fix Signed-off-by: Danny Chiao --- docs/SUMMARY.md | 2 +- docs/reference/data-sources/push.md | 4 +++- docs/reference/feature-servers/README.md | 2 +- docs/reference/feature-servers/go-feature-retrieval.md | 2 +- .../{local-feature-server.md => python-feature-server.md} | 0 5 files changed, 6 insertions(+), 4 deletions(-) rename docs/reference/feature-servers/{local-feature-server.md => python-feature-server.md} (100%) diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index c6a3422ecd..11e20ab831 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -81,7 +81,7 @@ * [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md) * [.feastignore](reference/feature-repository/feast-ignore.md) * [Feature servers](reference/feature-servers/README.md) - * [Local feature server](reference/feature-servers/local-feature-server.md) + * [Python feature server](reference/feature-servers/python-feature-server.md) * [Go-based feature retrieval](reference/feature-servers/go-feature-retrieval.md) * [\[Alpha\] Data quality monitoring](reference/dqm.md) * [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md) diff --git a/docs/reference/data-sources/push.md b/docs/reference/data-sources/push.md index 9f377d2099..89ebfcd5b0 100644 --- a/docs/reference/data-sources/push.md +++ b/docs/reference/data-sources/push.md @@ -53,6 +53,8 @@ import pandas as pd fs = FeatureStore(...) feature_data_frame = pd.DataFrame() -fs.push("push_source", feature_data_frame) +fs.push("push_source_name", feature_data_frame) ``` +See also [Python feature server](../feature-servers/python-feature-server.md) for instructions on how to push data to a deployed feature server. + diff --git a/docs/reference/feature-servers/README.md b/docs/reference/feature-servers/README.md index e9e3afa4c0..301cea372c 100644 --- a/docs/reference/feature-servers/README.md +++ b/docs/reference/feature-servers/README.md @@ -2,4 +2,4 @@ Feast users can choose to retrieve features from a feature server, as opposed to through the Python SDK. -{% page-ref page="local-feature-server.md" %} +{% page-ref page="python-feature-server.md" %} diff --git a/docs/reference/feature-servers/go-feature-retrieval.md b/docs/reference/feature-servers/go-feature-retrieval.md index 415817dd85..05411a7f8c 100644 --- a/docs/reference/feature-servers/go-feature-retrieval.md +++ b/docs/reference/feature-servers/go-feature-retrieval.md @@ -2,7 +2,7 @@ ## Overview -The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](local-feature-server.md). +The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](python-feature-server.md). Currently, this component only supports online serving and does not have an offline component including APIs to create feast feature repositories or apply configuration to the registry to facilitate online materialization. It also does not expose its own dedicated cli to perform feast actions. Furthermore, this component is only meant to expose an online serving API that can be called through the python SDK to facilitate faster online feature retrieval. diff --git a/docs/reference/feature-servers/local-feature-server.md b/docs/reference/feature-servers/python-feature-server.md similarity index 100% rename from docs/reference/feature-servers/local-feature-server.md rename to docs/reference/feature-servers/python-feature-server.md From 23c99890c410ba03c433dd9742b3fa223d862f88 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 23:47:34 -0400 Subject: [PATCH 03/18] prune out unneeded fields in push source Signed-off-by: Danny Chiao --- .../feature-servers/python-feature-server.md | 2 +- sdk/python/feast/data_source.py | 4 +- sdk/python/feast/feature_store.py | 262 +++++++++--------- .../feast/infra/online_stores/sqlite.py | 4 +- .../feature_repos/universal/feature_views.py | 18 +- .../test_python_feature_server.py | 110 ++++++++ 6 files changed, 258 insertions(+), 142 deletions(-) create mode 100644 sdk/python/tests/integration/online_store/test_python_feature_server.py diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 8b2c17f6a5..71c75ab104 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -156,7 +156,7 @@ curl -X POST \ You can push data corresponding to a push source to the online store (note that timestamps need to be strings): ```text -curl -X POST "http://localhost:6566/write-to-online-store" -d '{ +curl -X POST "http://localhost:6566/push" -d '{ "push_source_name": "driver_hourly_stats_push_source", "df": { "driver_id": [1001], diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 0e264117ae..2f7a7b7d71 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -714,10 +714,10 @@ class PushSource(DataSource): A source that can be used to ingest features on request """ - name: str schema: List[Field] batch_source: DataSource timestamp_field: str + # TODO(adchia): remove schema + timestamp_field? def __init__( self, @@ -728,7 +728,7 @@ def __init__( description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", - timestamp_field: Optional[str] = "", + timestamp_field: Optional[str] = None, ): """ Creates a PushSource object. diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 01ace319a7..0a373b94ed 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -92,7 +92,6 @@ warnings.simplefilter("once", DeprecationWarning) - if TYPE_CHECKING: from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer @@ -115,7 +114,7 @@ class FeatureStore: @log_exceptions def __init__( - self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, + self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): """ Creates a FeatureStore object. @@ -194,7 +193,7 @@ def list_entities(self, allow_cache: bool = False) -> List[Entity]: return self._list_entities(allow_cache) def _list_entities( - self, allow_cache: bool = False, hide_dummy_entity: bool = True + self, allow_cache: bool = False, hide_dummy_entity: bool = True ) -> List[Entity]: all_entities = self._registry.list_entities( self.project, allow_cache=allow_cache @@ -230,7 +229,7 @@ def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]: @log_exceptions_and_usage def list_request_feature_views( - self, allow_cache: bool = False + self, allow_cache: bool = False ) -> List[RequestFeatureView]: """ Retrieves the list of feature views from the registry. @@ -246,11 +245,11 @@ def list_request_feature_views( ) def _list_feature_views( - self, allow_cache: bool = False, hide_dummy_entity: bool = True, + self, allow_cache: bool = False, hide_dummy_entity: bool = True, ) -> List[FeatureView]: feature_views = [] for fv in self._registry.list_feature_views( - self.project, allow_cache=allow_cache + self.project, allow_cache=allow_cache ): if hide_dummy_entity and fv.entities[0] == DUMMY_ENTITY_NAME: fv.entities = [] @@ -259,7 +258,7 @@ def _list_feature_views( @log_exceptions_and_usage def list_on_demand_feature_views( - self, allow_cache: bool = False + self, allow_cache: bool = False ) -> List[OnDemandFeatureView]: """ Retrieves the list of on demand feature views from the registry. @@ -305,7 +304,7 @@ def get_entity(self, name: str, allow_registry_cache: bool = False) -> Entity: @log_exceptions_and_usage def get_feature_service( - self, name: str, allow_cache: bool = False + self, name: str, allow_cache: bool = False ) -> FeatureService: """ Retrieves a feature service. @@ -324,7 +323,7 @@ def get_feature_service( @log_exceptions_and_usage def get_feature_view( - self, name: str, allow_registry_cache: bool = False + self, name: str, allow_registry_cache: bool = False ) -> FeatureView: """ Retrieves a feature view. @@ -342,10 +341,10 @@ def get_feature_view( return self._get_feature_view(name, allow_registry_cache=allow_registry_cache) def _get_feature_view( - self, - name: str, - hide_dummy_entity: bool = True, - allow_registry_cache: bool = False, + self, + name: str, + hide_dummy_entity: bool = True, + allow_registry_cache: bool = False, ) -> FeatureView: feature_view = self._registry.get_feature_view( name, self.project, allow_cache=allow_registry_cache @@ -413,7 +412,7 @@ def delete_feature_service(self, name: str): return self._registry.delete_feature_service(name, self.project) def _get_features( - self, features: Union[List[str], FeatureService], allow_cache: bool = False, + self, features: Union[List[str], FeatureService], allow_cache: bool = False, ) -> List[str]: _features = features @@ -447,19 +446,19 @@ def _should_use_plan(self): """Returns True if _plan and _apply_diffs should be used, False otherwise.""" # Currently only the local provider with sqlite online store supports _plan and _apply_diffs. return self.config.provider == "local" and ( - self.config.online_store and self.config.online_store.type == "sqlite" + self.config.online_store and self.config.online_store.type == "sqlite" ) def _validate_all_feature_views( - self, - views_to_update: List[FeatureView], - odfvs_to_update: List[OnDemandFeatureView], - request_views_to_update: List[RequestFeatureView], + self, + views_to_update: List[FeatureView], + odfvs_to_update: List[OnDemandFeatureView], + request_views_to_update: List[RequestFeatureView], ): """Validates all feature views.""" if ( - not flags_helper.enable_on_demand_feature_views(self.config) - and len(odfvs_to_update) > 0 + not flags_helper.enable_on_demand_feature_views(self.config) + and len(odfvs_to_update) > 0 ): raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME) @@ -470,11 +469,11 @@ def _validate_all_feature_views( ) def _make_inferences( - self, - data_sources_to_update: List[DataSource], - entities_to_update: List[Entity], - views_to_update: List[FeatureView], - odfvs_to_update: List[OnDemandFeatureView], + self, + data_sources_to_update: List[DataSource], + entities_to_update: List[Entity], + views_to_update: List[FeatureView], + odfvs_to_update: List[OnDemandFeatureView], ): """Makes inferences for entities, feature views, and odfvs.""" update_entities_with_inferred_types_from_feature_views( @@ -500,7 +499,7 @@ def _make_inferences( @log_exceptions_and_usage def _plan( - self, desired_repo_contents: RepoContents + self, desired_repo_contents: RepoContents ) -> Tuple[RegistryDiff, InfraDiff, Infra]: """Dry-run registering objects to metadata store. @@ -578,7 +577,7 @@ def _plan( @log_exceptions_and_usage def _apply_diffs( - self, registry_diff: RegistryDiff, infra_diff: InfraDiff, new_infra: Infra + self, registry_diff: RegistryDiff, infra_diff: InfraDiff, new_infra: Infra ): """Applies the given diffs to the metadata store and infrastructure. @@ -596,18 +595,18 @@ def _apply_diffs( @log_exceptions_and_usage def apply( - self, - objects: Union[ - DataSource, - Entity, - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - FeatureService, - List[FeastObject], - ], - objects_to_delete: Optional[List[FeastObject]] = None, - partial: bool = True, + self, + objects: Union[ + DataSource, + Entity, + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + FeatureService, + List[FeastObject], + ], + objects_to_delete: Optional[List[FeastObject]] = None, + partial: bool = True, ): """Register objects to metadata store and update related infrastructure. @@ -702,7 +701,7 @@ def apply( for ds in data_sources_to_update: self._registry.apply_data_source(ds, project=self.project, commit=False) for view in itertools.chain( - views_to_update, odfvs_to_update, request_views_to_update + views_to_update, odfvs_to_update, request_views_to_update ): self._registry.apply_feature_view(view, project=self.project, commit=False) for ent in entities_to_update: @@ -790,10 +789,10 @@ def teardown(self): @log_exceptions_and_usage def get_historical_features( - self, - entity_df: Union[pd.DataFrame, str], - features: Union[List[str], FeatureService], - full_feature_names: bool = False, + self, + entity_df: Union[pd.DataFrame, str], + features: Union[List[str], FeatureService], + full_feature_names: bool = False, ) -> RetrievalJob: """Enrich an entity dataframe with historical feature values for either training or batch scoring. @@ -918,12 +917,12 @@ def get_historical_features( @log_exceptions_and_usage def create_saved_dataset( - self, - from_: RetrievalJob, - name: str, - storage: SavedDatasetStorage, - tags: Optional[Dict[str, str]] = None, - feature_service: Optional[FeatureService] = None, + self, + from_: RetrievalJob, + name: str, + storage: SavedDatasetStorage, + tags: Optional[Dict[str, str]] = None, + feature_service: Optional[FeatureService] = None, ) -> SavedDataset: """ Execute provided retrieval job and persist its outcome in given storage. @@ -1008,7 +1007,7 @@ def get_saved_dataset(self, name: str) -> SavedDataset: @log_exceptions_and_usage def materialize_incremental( - self, end_date: datetime, feature_views: Optional[List[str]] = None, + self, end_date: datetime, feature_views: Optional[List[str]] = None, ) -> None: """ Materialize incremental new data from the offline store into the online store. @@ -1100,10 +1099,10 @@ def tqdm_builder(length): @log_exceptions_and_usage def materialize( - self, - start_date: datetime, - end_date: datetime, - feature_views: Optional[List[str]] = None, + self, + start_date: datetime, + end_date: datetime, + feature_views: Optional[List[str]] = None, ) -> None: """ Materialize data from the offline store into the online store. @@ -1187,7 +1186,7 @@ def tqdm_builder(length): @log_exceptions_and_usage def push( - self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True + self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True ): """ Push features to a push source. This updates all the feature views that have the push source as stream source. @@ -1203,23 +1202,28 @@ def push( fv for fv in all_fvs if ( - fv.stream_source is not None - and isinstance(fv.stream_source, PushSource) - and fv.stream_source.name == push_source_name + fv.stream_source is not None + and isinstance(fv.stream_source, PushSource) + and fv.stream_source.name == push_source_name ) } for fv in fvs_with_push_sources: + push_source = cast(PushSource, fv.stream_source) + # If the push source's timestamp field doesn't match the batch source's timestamp field, do the mapping + renamed_df = df + if len(push_source.timestamp_field) > 0: + renamed_df = df.rename(columns={push_source.timestamp_field: push_source.batch_source.timestamp_field}) self.write_to_online_store( - fv.name, df, allow_registry_cache=allow_registry_cache + fv.name, renamed_df, allow_registry_cache=allow_registry_cache ) @log_exceptions_and_usage def write_to_online_store( - self, - feature_view_name: str, - df: pd.DataFrame, - allow_registry_cache: bool = True, + self, + feature_view_name: str, + df: pd.DataFrame, + allow_registry_cache: bool = True, ): """ ingests data directly into the Online store @@ -1238,10 +1242,10 @@ def write_to_online_store( @log_exceptions_and_usage def get_online_features( - self, - features: Union[List[str], FeatureService], - entity_rows: List[Dict[str, Any]], - full_feature_names: bool = False, + self, + features: Union[List[str], FeatureService], + entity_rows: List[Dict[str, Any]], + full_feature_names: bool = False, ) -> OnlineResponse: """ Retrieves the latest online feature data. @@ -1300,13 +1304,13 @@ def get_online_features( ) def _get_online_features( - self, - features: Union[List[str], FeatureService], - entity_values: Mapping[ - str, Union[Sequence[Any], Sequence[Value], RepeatedValue] - ], - full_feature_names: bool = False, - native_entity_values: bool = True, + self, + features: Union[List[str], FeatureService], + entity_values: Mapping[ + str, Union[Sequence[Any], Sequence[Value], RepeatedValue] + ], + full_feature_names: bool = False, + native_entity_values: bool = True, ): # Extract Sequence from RepeatedValue Protobuf. entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = { @@ -1422,8 +1426,8 @@ def _get_online_features( for join_key_or_entity_name, values in entity_proto_values.items(): # Found request data if ( - join_key_or_entity_name in needed_request_data - or join_key_or_entity_name in needed_request_fv_features + join_key_or_entity_name in needed_request_data + or join_key_or_entity_name in needed_request_fv_features ): if join_key_or_entity_name in needed_request_fv_features: # If the data was requested as a feature then @@ -1509,10 +1513,10 @@ def _get_online_features( @staticmethod def _get_columnar_entity_values( - rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]] + rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]] ) -> Dict[str, List[Any]]: if (rowise is None and columnar is None) or ( - rowise is not None and columnar is not None + rowise is not None and columnar is not None ): raise ValueError( "Exactly one of `columnar_entity_values` and `rowise_entity_values` must be set." @@ -1528,7 +1532,7 @@ def _get_columnar_entity_values( return cast(Dict[str, List[Any]], columnar) def _get_entity_maps( - self, feature_views + self, feature_views ) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]: entities = self._list_entities(allow_cache=True, hide_dummy_entity=False) entity_name_to_join_key_map: Dict[str, str] = {} @@ -1559,9 +1563,9 @@ def _get_entity_maps( @staticmethod def _get_table_entity_values( - table: FeatureView, - entity_name_to_join_key_map: Dict[str, str], - join_key_proto_values: Dict[str, List[Value]], + table: FeatureView, + entity_name_to_join_key_map: Dict[str, str], + join_key_proto_values: Dict[str, List[Value]], ) -> Dict[str, List[Value]]: # The correct join_keys expected by the OnlineStore for this Feature View. table_join_keys = [ @@ -1582,8 +1586,8 @@ def _get_table_entity_values( @staticmethod def _populate_result_rows_from_columnar( - online_features_response: GetOnlineFeaturesResponse, - data: Dict[str, List[Value]], + online_features_response: GetOnlineFeaturesResponse, + data: Dict[str, List[Value]], ): timestamp = Timestamp() # Only initialize this timestamp once. # Add more values to the existing result rows @@ -1599,8 +1603,8 @@ def _populate_result_rows_from_columnar( @staticmethod def get_needed_request_data( - grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], - grouped_request_fv_refs: List[Tuple[RequestFeatureView, List[str]]], + grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], + grouped_request_fv_refs: List[Tuple[RequestFeatureView, List[str]]], ) -> Tuple[Set[str], Set[str]]: needed_request_data: Set[str] = set() needed_request_fv_features: Set[str] = set() @@ -1614,12 +1618,12 @@ def get_needed_request_data( @staticmethod def ensure_request_data_values_exist( - needed_request_data: Set[str], - needed_request_fv_features: Set[str], - request_data_features: Dict[str, List[Any]], + needed_request_data: Set[str], + needed_request_fv_features: Set[str], + request_data_features: Dict[str, List[Any]], ): if len(needed_request_data) + len(needed_request_fv_features) != len( - request_data_features.keys() + request_data_features.keys() ): missing_features = [ x @@ -1633,10 +1637,10 @@ def ensure_request_data_values_exist( ) def _get_unique_entities( - self, - table: FeatureView, - join_key_values: Dict[str, List[Value]], - entity_name_to_join_key_map: Dict[str, str], + self, + table: FeatureView, + join_key_values: Dict[str, List[Value]], + entity_name_to_join_key_map: Dict[str, str], ) -> Tuple[Tuple[Dict[str, Value], ...], Tuple[List[int], ...]]: """Return the set of unique composite Entities for a Feature View and the indexes at which they appear. @@ -1672,11 +1676,11 @@ def _get_unique_entities( return unique_entities, indexes def _read_from_online_store( - self, - entity_rows: Iterable[Mapping[str, Value]], - provider: Provider, - requested_features: List[str], - table: FeatureView, + self, + entity_rows: Iterable[Mapping[str, Value]], + provider: Provider, + requested_features: List[str], + table: FeatureView, ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: """Read and process data from the OnlineStore for a given FeatureView. @@ -1731,16 +1735,16 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( - feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] - ] - ], - indexes: Iterable[List[int]], - online_features_response: GetOnlineFeaturesResponse, - full_feature_names: bool, - requested_features: Iterable[str], - table: FeatureView, + feature_data: Iterable[ + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] + ] + ], + indexes: Iterable[List[int]], + online_features_response: GetOnlineFeaturesResponse, + full_feature_names: bool, + requested_features: Iterable[str], + table: FeatureView, ): """Populate the GetOnlineFeaturesResponse with feature data. @@ -1776,8 +1780,8 @@ def _populate_response_from_feature_data( # Populate the result with data fetched from the OnlineStore # which is guaranteed to be aligned with `requested_features`. for ( - feature_idx, - (timestamp_vector, statuses_vector, values_vector), + feature_idx, + (timestamp_vector, statuses_vector, values_vector), ) in enumerate(zip(zip(*timestamps), zip(*statuses), zip(*values))): online_features_response.results.append( GetOnlineFeaturesResponse.FeatureVector( @@ -1789,10 +1793,10 @@ def _populate_response_from_feature_data( @staticmethod def _augment_response_with_on_demand_transforms( - online_features_response: GetOnlineFeaturesResponse, - feature_refs: List[str], - requested_on_demand_feature_views: List[OnDemandFeatureView], - full_feature_names: bool, + online_features_response: GetOnlineFeaturesResponse, + feature_refs: List[str], + requested_on_demand_feature_views: List[OnDemandFeatureView], + full_feature_names: bool, ): """Computes on demand feature values and adds them to the result rows. @@ -1858,8 +1862,8 @@ def _augment_response_with_on_demand_transforms( @staticmethod def _drop_unneeded_columns( - online_features_response: GetOnlineFeaturesResponse, - requested_result_row_names: Set[str], + online_features_response: GetOnlineFeaturesResponse, + requested_result_row_names: Set[str], ): """ Unneeded feature values such as request data and unrequested input feature views will @@ -1884,10 +1888,10 @@ def _drop_unneeded_columns( del online_features_response.results[idx] def _get_feature_views_to_use( - self, - features: Optional[Union[List[str], FeatureService]], - allow_cache=False, - hide_dummy_entity: bool = True, + self, + features: Optional[Union[List[str], FeatureService]], + allow_cache=False, + hide_dummy_entity: bool = True, ) -> Tuple[List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView]]: fvs = { @@ -2015,10 +2019,10 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( - features: List[str], - all_feature_views: List[FeatureView], - all_request_feature_views: List[RequestFeatureView], - all_on_demand_feature_views: List[OnDemandFeatureView], + features: List[str], + all_feature_views: List[FeatureView], + all_request_feature_views: List[RequestFeatureView], + all_on_demand_feature_views: List[OnDemandFeatureView], ) -> Tuple[ List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]], @@ -2080,7 +2084,7 @@ def _group_feature_refs( def _print_materialization_log( - start_date, end_date, num_feature_views: int, online_store: str + start_date, end_date, num_feature_views: int, online_store: str ): if start_date: print( @@ -2131,7 +2135,7 @@ def _validate_data_sources(data_sources: List[DataSource]): def apply_list_mapping( - lst: Iterable[Any], mapping_indexes: Iterable[List[int]] + lst: Iterable[Any], mapping_indexes: Iterable[List[int]] ) -> Iterable[Any]: output_len = sum(len(item) for item in mapping_indexes) output = [None] * output_len diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 710f4c386a..5657fbe372 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -230,7 +230,9 @@ def teardown( def _initialize_conn(db_path: str): Path(db_path).parent.mkdir(exist_ok=True) return sqlite3.connect( - db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + db_path, + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + check_same_thread=False, ) diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 5918e36753..d885bde369 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -65,19 +65,19 @@ def conv_rate_plus_100(features_df: pd.DataFrame) -> pd.DataFrame: def conv_rate_plus_100_feature_view( sources: Dict[str, Union[RequestSource, FeatureView]], infer_features: bool = False, - features: Optional[List[Feature]] = None, + features: Optional[List[Field]] = None, ) -> OnDemandFeatureView: # Test that positional arguments and Features still work for ODFVs. _features = features or [ - Feature(name="conv_rate_plus_100", dtype=ValueType.DOUBLE), - Feature(name="conv_rate_plus_val_to_add", dtype=ValueType.DOUBLE), - Feature(name="conv_rate_plus_100_rounded", dtype=ValueType.INT32), + Field(name="conv_rate_plus_100", dtype=Float64), + Field(name="conv_rate_plus_val_to_add", dtype=Float64), + Field(name="conv_rate_plus_100_rounded", dtype=Int32), ] return OnDemandFeatureView( - conv_rate_plus_100.__name__, - [] if infer_features else _features, - sources, - conv_rate_plus_100, + name=conv_rate_plus_100.__name__, + schema=[] if infer_features else _features, + sources=sources, + udf=conv_rate_plus_100, ) @@ -251,5 +251,5 @@ def create_pushable_feature_view(batch_source: DataSource): # Test that Features still work for FeatureViews. features=[Feature(name="temperature", dtype=ValueType.INT32)], ttl=timedelta(days=2), - source=push_source, + stream_source=push_source, ) diff --git a/sdk/python/tests/integration/online_store/test_python_feature_server.py b/sdk/python/tests/integration/online_store/test_python_feature_server.py new file mode 100644 index 0000000000..5b40d22d33 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_python_feature_server.py @@ -0,0 +1,110 @@ +import json +from typing import List + +import pytest +from fastapi.testclient import TestClient + +from feast.feast_object import FeastObject +from feast.feature_server import get_app +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, + construct_universal_feature_views, + construct_universal_test_data, +) +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + location, +) + + +@pytest.mark.integration +@pytest.mark.universal +def test_get_online_features(): + client = setup_python_fs_client() + request_data_dict = { + "features": [ + "driver_stats:conv_rate", + "driver_stats:acc_rate", + "driver_stats:avg_daily_trips", + ], + "entities": {"driver_id": [5001, 5002]}, + } + response = client.post("/get-online-features", data=json.dumps(request_data_dict)) + parsed_response = json.loads(response.text) + assert "metadata" in parsed_response + metadata = parsed_response["metadata"] + expected_features = ["driver_id", "conv_rate", "acc_rate", "avg_daily_trips"] + response_feature_names = metadata["feature_names"] + assert len(response_feature_names) == len(expected_features) + for expected_feature in expected_features: + assert expected_feature in response_feature_names + assert "results" in parsed_response + results = parsed_response["results"] + for result in results: + # Same order as in metadata + assert len(result["statuses"]) == 2 # Requested two entities + for status in result["statuses"]: + assert status == "PRESENT" + + results_driver_id_index = response_feature_names.index("driver_id") + assert ( + results[results_driver_id_index]["values"] + == request_data_dict["entities"]["driver_id"] + ) + + +@pytest.mark.integration +@pytest.mark.universal +def test_push(): + client = setup_python_fs_client() + initial_temp = get_temperatures(client, location_ids=[1])[0] + # Note that the timestamp field in the push df is different from the batch source ("event_timestamp") + json_data = json.dumps( + { + "push_source_name": "location_stats_push_source", + "df": { + "location_id": [1], + "temperature": [initial_temp * 100], + "event_timestamp": ["2022-05-13 10:59:42"], + "created": ["2022-05-13 10:59:42"], + }, + } + ) + response = client.post("/push", data=json_data,) + assert response.status_code == 200 + assert get_temperatures(client, location_ids=[1]) == [initial_temp * 100] + + +def get_temperatures(client, location_ids: List[int]): + get_request_data = { + "features": ["pushable_location_stats:temperature",], + "entities": {"location_id": location_ids}, + } + response = client.post("/get-online-features", data=json.dumps(get_request_data)) + parsed_response = json.loads(response.text) + assert "metadata" in parsed_response + metadata = parsed_response["metadata"] + response_feature_names = metadata["feature_names"] + assert "results" in parsed_response + results = parsed_response["results"] + results_temperature_index = response_feature_names.index("temperature") + return results[results_temperature_index]["values"] + + +def setup_python_fs_client(): + config = IntegrationTestRepoConfig() + environment = construct_test_environment(config) + fs = environment.feature_store + entities, datasets, data_sources = construct_universal_test_data(environment) + feature_views = construct_universal_feature_views(data_sources) + feast_objects: List[FeastObject] = [] + feast_objects.extend(feature_views.values()) + feast_objects.extend([driver(), customer(), location()]) + fs.apply(feast_objects) + fs.materialize(environment.start_date, environment.end_date) + client = TestClient(get_app(fs)) + return client From 315f2d84259326e34be4d4e57f1f79fbe4e3c3ce Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 23:49:42 -0400 Subject: [PATCH 04/18] prune out unneeded fields in push source Signed-off-by: Danny Chiao --- sdk/python/feast/feature_store.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 0a373b94ed..50cd8ca2a9 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1209,13 +1209,8 @@ def push( } for fv in fvs_with_push_sources: - push_source = cast(PushSource, fv.stream_source) - # If the push source's timestamp field doesn't match the batch source's timestamp field, do the mapping - renamed_df = df - if len(push_source.timestamp_field) > 0: - renamed_df = df.rename(columns={push_source.timestamp_field: push_source.batch_source.timestamp_field}) self.write_to_online_store( - fv.name, renamed_df, allow_registry_cache=allow_registry_cache + fv.name, df, allow_registry_cache=allow_registry_cache ) @log_exceptions_and_usage From ec3d5c23c221d249d7446d313ec1840235292add Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 23:50:26 -0400 Subject: [PATCH 05/18] prune out unneeded fields in push source Signed-off-by: Danny Chiao --- sdk/python/feast/feature_store.py | 254 +++++++++++++++--------------- 1 file changed, 127 insertions(+), 127 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 50cd8ca2a9..82543efff4 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -114,7 +114,7 @@ class FeatureStore: @log_exceptions def __init__( - self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, + self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): """ Creates a FeatureStore object. @@ -193,7 +193,7 @@ def list_entities(self, allow_cache: bool = False) -> List[Entity]: return self._list_entities(allow_cache) def _list_entities( - self, allow_cache: bool = False, hide_dummy_entity: bool = True + self, allow_cache: bool = False, hide_dummy_entity: bool = True ) -> List[Entity]: all_entities = self._registry.list_entities( self.project, allow_cache=allow_cache @@ -229,7 +229,7 @@ def list_feature_views(self, allow_cache: bool = False) -> List[FeatureView]: @log_exceptions_and_usage def list_request_feature_views( - self, allow_cache: bool = False + self, allow_cache: bool = False ) -> List[RequestFeatureView]: """ Retrieves the list of feature views from the registry. @@ -245,11 +245,11 @@ def list_request_feature_views( ) def _list_feature_views( - self, allow_cache: bool = False, hide_dummy_entity: bool = True, + self, allow_cache: bool = False, hide_dummy_entity: bool = True, ) -> List[FeatureView]: feature_views = [] for fv in self._registry.list_feature_views( - self.project, allow_cache=allow_cache + self.project, allow_cache=allow_cache ): if hide_dummy_entity and fv.entities[0] == DUMMY_ENTITY_NAME: fv.entities = [] @@ -258,7 +258,7 @@ def _list_feature_views( @log_exceptions_and_usage def list_on_demand_feature_views( - self, allow_cache: bool = False + self, allow_cache: bool = False ) -> List[OnDemandFeatureView]: """ Retrieves the list of on demand feature views from the registry. @@ -304,7 +304,7 @@ def get_entity(self, name: str, allow_registry_cache: bool = False) -> Entity: @log_exceptions_and_usage def get_feature_service( - self, name: str, allow_cache: bool = False + self, name: str, allow_cache: bool = False ) -> FeatureService: """ Retrieves a feature service. @@ -323,7 +323,7 @@ def get_feature_service( @log_exceptions_and_usage def get_feature_view( - self, name: str, allow_registry_cache: bool = False + self, name: str, allow_registry_cache: bool = False ) -> FeatureView: """ Retrieves a feature view. @@ -341,10 +341,10 @@ def get_feature_view( return self._get_feature_view(name, allow_registry_cache=allow_registry_cache) def _get_feature_view( - self, - name: str, - hide_dummy_entity: bool = True, - allow_registry_cache: bool = False, + self, + name: str, + hide_dummy_entity: bool = True, + allow_registry_cache: bool = False, ) -> FeatureView: feature_view = self._registry.get_feature_view( name, self.project, allow_cache=allow_registry_cache @@ -412,7 +412,7 @@ def delete_feature_service(self, name: str): return self._registry.delete_feature_service(name, self.project) def _get_features( - self, features: Union[List[str], FeatureService], allow_cache: bool = False, + self, features: Union[List[str], FeatureService], allow_cache: bool = False, ) -> List[str]: _features = features @@ -446,19 +446,19 @@ def _should_use_plan(self): """Returns True if _plan and _apply_diffs should be used, False otherwise.""" # Currently only the local provider with sqlite online store supports _plan and _apply_diffs. return self.config.provider == "local" and ( - self.config.online_store and self.config.online_store.type == "sqlite" + self.config.online_store and self.config.online_store.type == "sqlite" ) def _validate_all_feature_views( - self, - views_to_update: List[FeatureView], - odfvs_to_update: List[OnDemandFeatureView], - request_views_to_update: List[RequestFeatureView], + self, + views_to_update: List[FeatureView], + odfvs_to_update: List[OnDemandFeatureView], + request_views_to_update: List[RequestFeatureView], ): """Validates all feature views.""" if ( - not flags_helper.enable_on_demand_feature_views(self.config) - and len(odfvs_to_update) > 0 + not flags_helper.enable_on_demand_feature_views(self.config) + and len(odfvs_to_update) > 0 ): raise ExperimentalFeatureNotEnabled(flags.FLAG_ON_DEMAND_TRANSFORM_NAME) @@ -469,11 +469,11 @@ def _validate_all_feature_views( ) def _make_inferences( - self, - data_sources_to_update: List[DataSource], - entities_to_update: List[Entity], - views_to_update: List[FeatureView], - odfvs_to_update: List[OnDemandFeatureView], + self, + data_sources_to_update: List[DataSource], + entities_to_update: List[Entity], + views_to_update: List[FeatureView], + odfvs_to_update: List[OnDemandFeatureView], ): """Makes inferences for entities, feature views, and odfvs.""" update_entities_with_inferred_types_from_feature_views( @@ -499,7 +499,7 @@ def _make_inferences( @log_exceptions_and_usage def _plan( - self, desired_repo_contents: RepoContents + self, desired_repo_contents: RepoContents ) -> Tuple[RegistryDiff, InfraDiff, Infra]: """Dry-run registering objects to metadata store. @@ -577,7 +577,7 @@ def _plan( @log_exceptions_and_usage def _apply_diffs( - self, registry_diff: RegistryDiff, infra_diff: InfraDiff, new_infra: Infra + self, registry_diff: RegistryDiff, infra_diff: InfraDiff, new_infra: Infra ): """Applies the given diffs to the metadata store and infrastructure. @@ -595,18 +595,18 @@ def _apply_diffs( @log_exceptions_and_usage def apply( - self, - objects: Union[ - DataSource, - Entity, - FeatureView, - OnDemandFeatureView, - RequestFeatureView, - FeatureService, - List[FeastObject], - ], - objects_to_delete: Optional[List[FeastObject]] = None, - partial: bool = True, + self, + objects: Union[ + DataSource, + Entity, + FeatureView, + OnDemandFeatureView, + RequestFeatureView, + FeatureService, + List[FeastObject], + ], + objects_to_delete: Optional[List[FeastObject]] = None, + partial: bool = True, ): """Register objects to metadata store and update related infrastructure. @@ -701,7 +701,7 @@ def apply( for ds in data_sources_to_update: self._registry.apply_data_source(ds, project=self.project, commit=False) for view in itertools.chain( - views_to_update, odfvs_to_update, request_views_to_update + views_to_update, odfvs_to_update, request_views_to_update ): self._registry.apply_feature_view(view, project=self.project, commit=False) for ent in entities_to_update: @@ -789,10 +789,10 @@ def teardown(self): @log_exceptions_and_usage def get_historical_features( - self, - entity_df: Union[pd.DataFrame, str], - features: Union[List[str], FeatureService], - full_feature_names: bool = False, + self, + entity_df: Union[pd.DataFrame, str], + features: Union[List[str], FeatureService], + full_feature_names: bool = False, ) -> RetrievalJob: """Enrich an entity dataframe with historical feature values for either training or batch scoring. @@ -917,12 +917,12 @@ def get_historical_features( @log_exceptions_and_usage def create_saved_dataset( - self, - from_: RetrievalJob, - name: str, - storage: SavedDatasetStorage, - tags: Optional[Dict[str, str]] = None, - feature_service: Optional[FeatureService] = None, + self, + from_: RetrievalJob, + name: str, + storage: SavedDatasetStorage, + tags: Optional[Dict[str, str]] = None, + feature_service: Optional[FeatureService] = None, ) -> SavedDataset: """ Execute provided retrieval job and persist its outcome in given storage. @@ -1007,7 +1007,7 @@ def get_saved_dataset(self, name: str) -> SavedDataset: @log_exceptions_and_usage def materialize_incremental( - self, end_date: datetime, feature_views: Optional[List[str]] = None, + self, end_date: datetime, feature_views: Optional[List[str]] = None, ) -> None: """ Materialize incremental new data from the offline store into the online store. @@ -1099,10 +1099,10 @@ def tqdm_builder(length): @log_exceptions_and_usage def materialize( - self, - start_date: datetime, - end_date: datetime, - feature_views: Optional[List[str]] = None, + self, + start_date: datetime, + end_date: datetime, + feature_views: Optional[List[str]] = None, ) -> None: """ Materialize data from the offline store into the online store. @@ -1186,7 +1186,7 @@ def tqdm_builder(length): @log_exceptions_and_usage def push( - self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True + self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True ): """ Push features to a push source. This updates all the feature views that have the push source as stream source. @@ -1202,9 +1202,9 @@ def push( fv for fv in all_fvs if ( - fv.stream_source is not None - and isinstance(fv.stream_source, PushSource) - and fv.stream_source.name == push_source_name + fv.stream_source is not None + and isinstance(fv.stream_source, PushSource) + and fv.stream_source.name == push_source_name ) } @@ -1215,10 +1215,10 @@ def push( @log_exceptions_and_usage def write_to_online_store( - self, - feature_view_name: str, - df: pd.DataFrame, - allow_registry_cache: bool = True, + self, + feature_view_name: str, + df: pd.DataFrame, + allow_registry_cache: bool = True, ): """ ingests data directly into the Online store @@ -1237,10 +1237,10 @@ def write_to_online_store( @log_exceptions_and_usage def get_online_features( - self, - features: Union[List[str], FeatureService], - entity_rows: List[Dict[str, Any]], - full_feature_names: bool = False, + self, + features: Union[List[str], FeatureService], + entity_rows: List[Dict[str, Any]], + full_feature_names: bool = False, ) -> OnlineResponse: """ Retrieves the latest online feature data. @@ -1299,13 +1299,13 @@ def get_online_features( ) def _get_online_features( - self, - features: Union[List[str], FeatureService], - entity_values: Mapping[ - str, Union[Sequence[Any], Sequence[Value], RepeatedValue] - ], - full_feature_names: bool = False, - native_entity_values: bool = True, + self, + features: Union[List[str], FeatureService], + entity_values: Mapping[ + str, Union[Sequence[Any], Sequence[Value], RepeatedValue] + ], + full_feature_names: bool = False, + native_entity_values: bool = True, ): # Extract Sequence from RepeatedValue Protobuf. entity_value_lists: Dict[str, Union[List[Any], List[Value]]] = { @@ -1421,8 +1421,8 @@ def _get_online_features( for join_key_or_entity_name, values in entity_proto_values.items(): # Found request data if ( - join_key_or_entity_name in needed_request_data - or join_key_or_entity_name in needed_request_fv_features + join_key_or_entity_name in needed_request_data + or join_key_or_entity_name in needed_request_fv_features ): if join_key_or_entity_name in needed_request_fv_features: # If the data was requested as a feature then @@ -1508,10 +1508,10 @@ def _get_online_features( @staticmethod def _get_columnar_entity_values( - rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]] + rowise: Optional[List[Dict[str, Any]]], columnar: Optional[Dict[str, List[Any]]] ) -> Dict[str, List[Any]]: if (rowise is None and columnar is None) or ( - rowise is not None and columnar is not None + rowise is not None and columnar is not None ): raise ValueError( "Exactly one of `columnar_entity_values` and `rowise_entity_values` must be set." @@ -1527,7 +1527,7 @@ def _get_columnar_entity_values( return cast(Dict[str, List[Any]], columnar) def _get_entity_maps( - self, feature_views + self, feature_views ) -> Tuple[Dict[str, str], Dict[str, ValueType], Set[str]]: entities = self._list_entities(allow_cache=True, hide_dummy_entity=False) entity_name_to_join_key_map: Dict[str, str] = {} @@ -1558,9 +1558,9 @@ def _get_entity_maps( @staticmethod def _get_table_entity_values( - table: FeatureView, - entity_name_to_join_key_map: Dict[str, str], - join_key_proto_values: Dict[str, List[Value]], + table: FeatureView, + entity_name_to_join_key_map: Dict[str, str], + join_key_proto_values: Dict[str, List[Value]], ) -> Dict[str, List[Value]]: # The correct join_keys expected by the OnlineStore for this Feature View. table_join_keys = [ @@ -1581,8 +1581,8 @@ def _get_table_entity_values( @staticmethod def _populate_result_rows_from_columnar( - online_features_response: GetOnlineFeaturesResponse, - data: Dict[str, List[Value]], + online_features_response: GetOnlineFeaturesResponse, + data: Dict[str, List[Value]], ): timestamp = Timestamp() # Only initialize this timestamp once. # Add more values to the existing result rows @@ -1598,8 +1598,8 @@ def _populate_result_rows_from_columnar( @staticmethod def get_needed_request_data( - grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], - grouped_request_fv_refs: List[Tuple[RequestFeatureView, List[str]]], + grouped_odfv_refs: List[Tuple[OnDemandFeatureView, List[str]]], + grouped_request_fv_refs: List[Tuple[RequestFeatureView, List[str]]], ) -> Tuple[Set[str], Set[str]]: needed_request_data: Set[str] = set() needed_request_fv_features: Set[str] = set() @@ -1613,12 +1613,12 @@ def get_needed_request_data( @staticmethod def ensure_request_data_values_exist( - needed_request_data: Set[str], - needed_request_fv_features: Set[str], - request_data_features: Dict[str, List[Any]], + needed_request_data: Set[str], + needed_request_fv_features: Set[str], + request_data_features: Dict[str, List[Any]], ): if len(needed_request_data) + len(needed_request_fv_features) != len( - request_data_features.keys() + request_data_features.keys() ): missing_features = [ x @@ -1632,10 +1632,10 @@ def ensure_request_data_values_exist( ) def _get_unique_entities( - self, - table: FeatureView, - join_key_values: Dict[str, List[Value]], - entity_name_to_join_key_map: Dict[str, str], + self, + table: FeatureView, + join_key_values: Dict[str, List[Value]], + entity_name_to_join_key_map: Dict[str, str], ) -> Tuple[Tuple[Dict[str, Value], ...], Tuple[List[int], ...]]: """Return the set of unique composite Entities for a Feature View and the indexes at which they appear. @@ -1671,11 +1671,11 @@ def _get_unique_entities( return unique_entities, indexes def _read_from_online_store( - self, - entity_rows: Iterable[Mapping[str, Value]], - provider: Provider, - requested_features: List[str], - table: FeatureView, + self, + entity_rows: Iterable[Mapping[str, Value]], + provider: Provider, + requested_features: List[str], + table: FeatureView, ) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]: """Read and process data from the OnlineStore for a given FeatureView. @@ -1730,16 +1730,16 @@ def _read_from_online_store( @staticmethod def _populate_response_from_feature_data( - feature_data: Iterable[ - Tuple[ - Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] - ] - ], - indexes: Iterable[List[int]], - online_features_response: GetOnlineFeaturesResponse, - full_feature_names: bool, - requested_features: Iterable[str], - table: FeatureView, + feature_data: Iterable[ + Tuple[ + Iterable[Timestamp], Iterable["FieldStatus.ValueType"], Iterable[Value] + ] + ], + indexes: Iterable[List[int]], + online_features_response: GetOnlineFeaturesResponse, + full_feature_names: bool, + requested_features: Iterable[str], + table: FeatureView, ): """Populate the GetOnlineFeaturesResponse with feature data. @@ -1775,8 +1775,8 @@ def _populate_response_from_feature_data( # Populate the result with data fetched from the OnlineStore # which is guaranteed to be aligned with `requested_features`. for ( - feature_idx, - (timestamp_vector, statuses_vector, values_vector), + feature_idx, + (timestamp_vector, statuses_vector, values_vector), ) in enumerate(zip(zip(*timestamps), zip(*statuses), zip(*values))): online_features_response.results.append( GetOnlineFeaturesResponse.FeatureVector( @@ -1788,10 +1788,10 @@ def _populate_response_from_feature_data( @staticmethod def _augment_response_with_on_demand_transforms( - online_features_response: GetOnlineFeaturesResponse, - feature_refs: List[str], - requested_on_demand_feature_views: List[OnDemandFeatureView], - full_feature_names: bool, + online_features_response: GetOnlineFeaturesResponse, + feature_refs: List[str], + requested_on_demand_feature_views: List[OnDemandFeatureView], + full_feature_names: bool, ): """Computes on demand feature values and adds them to the result rows. @@ -1857,8 +1857,8 @@ def _augment_response_with_on_demand_transforms( @staticmethod def _drop_unneeded_columns( - online_features_response: GetOnlineFeaturesResponse, - requested_result_row_names: Set[str], + online_features_response: GetOnlineFeaturesResponse, + requested_result_row_names: Set[str], ): """ Unneeded feature values such as request data and unrequested input feature views will @@ -1883,10 +1883,10 @@ def _drop_unneeded_columns( del online_features_response.results[idx] def _get_feature_views_to_use( - self, - features: Optional[Union[List[str], FeatureService]], - allow_cache=False, - hide_dummy_entity: bool = True, + self, + features: Optional[Union[List[str], FeatureService]], + allow_cache=False, + hide_dummy_entity: bool = True, ) -> Tuple[List[FeatureView], List[RequestFeatureView], List[OnDemandFeatureView]]: fvs = { @@ -2014,10 +2014,10 @@ def _validate_feature_refs(feature_refs: List[str], full_feature_names: bool = F def _group_feature_refs( - features: List[str], - all_feature_views: List[FeatureView], - all_request_feature_views: List[RequestFeatureView], - all_on_demand_feature_views: List[OnDemandFeatureView], + features: List[str], + all_feature_views: List[FeatureView], + all_request_feature_views: List[RequestFeatureView], + all_on_demand_feature_views: List[OnDemandFeatureView], ) -> Tuple[ List[Tuple[FeatureView, List[str]]], List[Tuple[OnDemandFeatureView, List[str]]], @@ -2079,7 +2079,7 @@ def _group_feature_refs( def _print_materialization_log( - start_date, end_date, num_feature_views: int, online_store: str + start_date, end_date, num_feature_views: int, online_store: str ): if start_date: print( @@ -2130,7 +2130,7 @@ def _validate_data_sources(data_sources: List[DataSource]): def apply_list_mapping( - lst: Iterable[Any], mapping_indexes: Iterable[List[int]] + lst: Iterable[Any], mapping_indexes: Iterable[List[int]] ) -> Iterable[Any]: output_len = sum(len(item) for item in mapping_indexes) output = [None] * output_len From de905af459754a5c2077e0a3893075853f25b2b9 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 23:51:16 -0400 Subject: [PATCH 06/18] fix comment Signed-off-by: Danny Chiao --- .../tests/integration/online_store/test_python_feature_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/python/tests/integration/online_store/test_python_feature_server.py b/sdk/python/tests/integration/online_store/test_python_feature_server.py index 5b40d22d33..b8bb942b19 100644 --- a/sdk/python/tests/integration/online_store/test_python_feature_server.py +++ b/sdk/python/tests/integration/online_store/test_python_feature_server.py @@ -62,7 +62,6 @@ def test_get_online_features(): def test_push(): client = setup_python_fs_client() initial_temp = get_temperatures(client, location_ids=[1])[0] - # Note that the timestamp field in the push df is different from the batch source ("event_timestamp") json_data = json.dumps( { "push_source_name": "location_stats_push_source", From e6df1c3ab0d92e9010f1b1578bbe93f267fafb49 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 23:55:16 -0400 Subject: [PATCH 07/18] fix comment Signed-off-by: Danny Chiao --- .../integration/online_store/test_python_feature_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/online_store/test_python_feature_server.py b/sdk/python/tests/integration/online_store/test_python_feature_server.py index b8bb942b19..3f92fbf75f 100644 --- a/sdk/python/tests/integration/online_store/test_python_feature_server.py +++ b/sdk/python/tests/integration/online_store/test_python_feature_server.py @@ -80,7 +80,7 @@ def test_push(): def get_temperatures(client, location_ids: List[int]): get_request_data = { - "features": ["pushable_location_stats:temperature",], + "features": ["pushable_location_stats:temperature"], "entities": {"location_id": location_ids}, } response = client.post("/get-online-features", data=json.dumps(get_request_data)) From ffc743672f2107624262974f0f37da3a67c81a29 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 23:56:48 -0400 Subject: [PATCH 08/18] fix comment Signed-off-by: Danny Chiao --- .../{online_store => e2e}/test_python_feature_server.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sdk/python/tests/integration/{online_store => e2e}/test_python_feature_server.py (100%) diff --git a/sdk/python/tests/integration/online_store/test_python_feature_server.py b/sdk/python/tests/integration/e2e/test_python_feature_server.py similarity index 100% rename from sdk/python/tests/integration/online_store/test_python_feature_server.py rename to sdk/python/tests/integration/e2e/test_python_feature_server.py From 42f6e2727b12eae0bf00376e41d235056c4ddd64 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Sun, 17 Apr 2022 23:58:26 -0400 Subject: [PATCH 09/18] fix comment Signed-off-by: Danny Chiao --- docs/reference/data-sources/push.md | 2 +- .../tests/integration/feature_repos/universal/feature_views.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/reference/data-sources/push.md b/docs/reference/data-sources/push.md index 89ebfcd5b0..153134d04c 100644 --- a/docs/reference/data-sources/push.md +++ b/docs/reference/data-sources/push.md @@ -42,7 +42,7 @@ fv = FeatureView( name="feature view", entities=["user_id"], schema=[Field(name="life_time_value", dtype=Int64)], - stream_source=push_source, + source=push_source, ) ``` diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index d885bde369..8ad38b2c4d 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -251,5 +251,5 @@ def create_pushable_feature_view(batch_source: DataSource): # Test that Features still work for FeatureViews. features=[Feature(name="temperature", dtype=ValueType.INT32)], ttl=timedelta(days=2), - stream_source=push_source, + source=push_source, ) From 5b37bb6658e2e8aeabb25dbf7aa0978bf61eb672 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 00:06:32 -0400 Subject: [PATCH 10/18] fix generator Signed-off-by: Danny Chiao --- .../e2e/test_python_feature_server.py | 117 ++++++++++-------- 1 file changed, 62 insertions(+), 55 deletions(-) diff --git a/sdk/python/tests/integration/e2e/test_python_feature_server.py b/sdk/python/tests/integration/e2e/test_python_feature_server.py index 3f92fbf75f..62512f96a5 100644 --- a/sdk/python/tests/integration/e2e/test_python_feature_server.py +++ b/sdk/python/tests/integration/e2e/test_python_feature_server.py @@ -1,3 +1,4 @@ +import contextlib import json from typing import List @@ -24,58 +25,60 @@ @pytest.mark.integration @pytest.mark.universal def test_get_online_features(): - client = setup_python_fs_client() - request_data_dict = { - "features": [ - "driver_stats:conv_rate", - "driver_stats:acc_rate", - "driver_stats:avg_daily_trips", - ], - "entities": {"driver_id": [5001, 5002]}, - } - response = client.post("/get-online-features", data=json.dumps(request_data_dict)) - parsed_response = json.loads(response.text) - assert "metadata" in parsed_response - metadata = parsed_response["metadata"] - expected_features = ["driver_id", "conv_rate", "acc_rate", "avg_daily_trips"] - response_feature_names = metadata["feature_names"] - assert len(response_feature_names) == len(expected_features) - for expected_feature in expected_features: - assert expected_feature in response_feature_names - assert "results" in parsed_response - results = parsed_response["results"] - for result in results: - # Same order as in metadata - assert len(result["statuses"]) == 2 # Requested two entities - for status in result["statuses"]: - assert status == "PRESENT" + with setup_python_fs_client() as client: + request_data_dict = { + "features": [ + "driver_stats:conv_rate", + "driver_stats:acc_rate", + "driver_stats:avg_daily_trips", + ], + "entities": {"driver_id": [5001, 5002]}, + } + response = client.post( + "/get-online-features", data=json.dumps(request_data_dict) + ) + parsed_response = json.loads(response.text) + assert "metadata" in parsed_response + metadata = parsed_response["metadata"] + expected_features = ["driver_id", "conv_rate", "acc_rate", "avg_daily_trips"] + response_feature_names = metadata["feature_names"] + assert len(response_feature_names) == len(expected_features) + for expected_feature in expected_features: + assert expected_feature in response_feature_names + assert "results" in parsed_response + results = parsed_response["results"] + for result in results: + # Same order as in metadata + assert len(result["statuses"]) == 2 # Requested two entities + for status in result["statuses"]: + assert status == "PRESENT" - results_driver_id_index = response_feature_names.index("driver_id") - assert ( - results[results_driver_id_index]["values"] - == request_data_dict["entities"]["driver_id"] - ) + results_driver_id_index = response_feature_names.index("driver_id") + assert ( + results[results_driver_id_index]["values"] + == request_data_dict["entities"]["driver_id"] + ) @pytest.mark.integration @pytest.mark.universal def test_push(): - client = setup_python_fs_client() - initial_temp = get_temperatures(client, location_ids=[1])[0] - json_data = json.dumps( - { - "push_source_name": "location_stats_push_source", - "df": { - "location_id": [1], - "temperature": [initial_temp * 100], - "event_timestamp": ["2022-05-13 10:59:42"], - "created": ["2022-05-13 10:59:42"], - }, - } - ) - response = client.post("/push", data=json_data,) - assert response.status_code == 200 - assert get_temperatures(client, location_ids=[1]) == [initial_temp * 100] + with setup_python_fs_client() as client: + initial_temp = get_temperatures(client, location_ids=[1])[0] + json_data = json.dumps( + { + "push_source_name": "location_stats_push_source", + "df": { + "location_id": [1], + "temperature": [initial_temp * 100], + "event_timestamp": ["2022-05-13 10:59:42"], + "created": ["2022-05-13 10:59:42"], + }, + } + ) + response = client.post("/push", data=json_data,) + assert response.status_code == 200 + assert get_temperatures(client, location_ids=[1]) == [initial_temp * 100] def get_temperatures(client, location_ids: List[int]): @@ -94,16 +97,20 @@ def get_temperatures(client, location_ids: List[int]): return results[results_temperature_index]["values"] +@contextlib.contextmanager def setup_python_fs_client(): config = IntegrationTestRepoConfig() environment = construct_test_environment(config) fs = environment.feature_store - entities, datasets, data_sources = construct_universal_test_data(environment) - feature_views = construct_universal_feature_views(data_sources) - feast_objects: List[FeastObject] = [] - feast_objects.extend(feature_views.values()) - feast_objects.extend([driver(), customer(), location()]) - fs.apply(feast_objects) - fs.materialize(environment.start_date, environment.end_date) - client = TestClient(get_app(fs)) - return client + try: + entities, datasets, data_sources = construct_universal_test_data(environment) + feature_views = construct_universal_feature_views(data_sources) + feast_objects: List[FeastObject] = [] + feast_objects.extend(feature_views.values()) + feast_objects.extend([driver(), customer(), location()]) + fs.apply(feast_objects) + fs.materialize(environment.start_date, environment.end_date) + client = TestClient(get_app(fs)) + yield client + finally: + fs.teardown() From cf39fc332f6b57ba15ec35a4aa72e695b82ff7a1 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 00:08:05 -0400 Subject: [PATCH 11/18] add data source creator teardown Signed-off-by: Danny Chiao --- sdk/python/tests/integration/e2e/test_python_feature_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/integration/e2e/test_python_feature_server.py b/sdk/python/tests/integration/e2e/test_python_feature_server.py index 62512f96a5..9e64e3d29f 100644 --- a/sdk/python/tests/integration/e2e/test_python_feature_server.py +++ b/sdk/python/tests/integration/e2e/test_python_feature_server.py @@ -114,3 +114,4 @@ def setup_python_fs_client(): yield client finally: fs.teardown() + environment.data_source_creator.teardown() From ea6aaed906dabdadebc279bcd4238003dda9c3aa Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 00:10:08 -0400 Subject: [PATCH 12/18] add data source creator teardown Signed-off-by: Danny Chiao --- .../tests/integration/e2e/test_python_feature_server.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/e2e/test_python_feature_server.py b/sdk/python/tests/integration/e2e/test_python_feature_server.py index 9e64e3d29f..6f1fe018db 100644 --- a/sdk/python/tests/integration/e2e/test_python_feature_server.py +++ b/sdk/python/tests/integration/e2e/test_python_feature_server.py @@ -37,6 +37,8 @@ def test_get_online_features(): response = client.post( "/get-online-features", data=json.dumps(request_data_dict) ) + + # Check entities and features are present parsed_response = json.loads(response.text) assert "metadata" in parsed_response metadata = parsed_response["metadata"] @@ -52,7 +54,6 @@ def test_get_online_features(): assert len(result["statuses"]) == 2 # Requested two entities for status in result["statuses"]: assert status == "PRESENT" - results_driver_id_index = response_feature_names.index("driver_id") assert ( results[results_driver_id_index]["values"] @@ -77,6 +78,8 @@ def test_push(): } ) response = client.post("/push", data=json_data,) + + # Check new pushed temperature is fetched assert response.status_code == 200 assert get_temperatures(client, location_ids=[1]) == [initial_temp * 100] From 22dc1a41a95b545a6b747e0039f6e100c2d29683 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 18:47:58 -0400 Subject: [PATCH 13/18] update push source to alpha Signed-off-by: Danny Chiao --- docs/reference/data-sources/push.md | 6 ++--- protos/feast/core/DataSource.proto | 3 +-- sdk/python/feast/data_source.py | 27 ++----------------- sdk/python/feast/feature_server.py | 10 ++++--- sdk/python/feast/feature_store.py | 7 +++++ sdk/python/feast/inference.py | 4 ++- .../feature_repos/universal/feature_views.py | 8 +----- 7 files changed, 23 insertions(+), 42 deletions(-) diff --git a/docs/reference/data-sources/push.md b/docs/reference/data-sources/push.md index 153134d04c..e6eff312ec 100644 --- a/docs/reference/data-sources/push.md +++ b/docs/reference/data-sources/push.md @@ -1,5 +1,7 @@ # Push source +**Warning**: This is an _experimental_ feature. It's intended for early testing and feedback, and could change without warnings in future releases. + ## Description Push sources allow feature values to be pushed to the online store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the @@ -31,10 +33,6 @@ from feast.types import Int64 push_source = PushSource( name="push_source", - schema=[ - Field(name="user_id", dtype=Int64), - Field(name="life_time_value", dtype=Int64) - ], batch_source=BigQuerySource(table="test.test"), ) diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index d958281ca2..9e6028ccfa 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -222,8 +222,7 @@ message DataSource { // Defines options for DataSource that supports pushing data to it. This allows data to be pushed to // the online store on-demand, such as by stream consumers. message PushOptions { - // Mapping of feature name to type - map schema = 1; + reserved 1; } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 2f7a7b7d71..f5efa0335b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -714,45 +714,35 @@ class PushSource(DataSource): A source that can be used to ingest features on request """ - schema: List[Field] + # TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store + # TODO(adchia): consider a "mode" to support pushing raw vs transformed events batch_source: DataSource - timestamp_field: str - # TODO(adchia): remove schema + timestamp_field? def __init__( self, *, name: str, - schema: List[Field], batch_source: DataSource, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", - timestamp_field: Optional[str] = None, ): """ Creates a PushSource object. Args: name: Name of the push source - schema: Schema mapping from the input feature name to a ValueType batch_source: The batch source that backs this push source. It's used when materializing from the offline store to the online store, and when retrieving historical features. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the data source, typically the email of the primary maintainer. - timestamp_field (optional): Event timestamp foe;d used for point in time - joins of feature values. """ super().__init__(name=name, description=description, tags=tags, owner=owner) - self.schema = sorted(schema) # TODO: add schema inference from a batch source self.batch_source = batch_source if not self.batch_source: raise ValueError(f"batch_source is needed for push source {self.name}") - if not timestamp_field: - raise ValueError(f"timestamp field is needed for push source {self.name}") - self.timestamp_field = timestamp_field def validate(self, config: RepoConfig): pass @@ -764,38 +754,25 @@ def get_table_column_names_and_types( @staticmethod def from_proto(data_source: DataSourceProto): - schema_pb = data_source.push_options.schema - schema = [] - for key, val in schema_pb.items(): - schema.append(Field(name=key, dtype=from_value_type(ValueType(val)))) - assert data_source.HasField("batch_source") batch_source = DataSource.from_proto(data_source.batch_source) return PushSource( name=data_source.name, - schema=sorted(schema), batch_source=batch_source, - timestamp_field=data_source.timestamp_field, description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, ) def to_proto(self) -> DataSourceProto: - schema_pb = {} - for field in self.schema: - schema_pb[field.name] = field.dtype.to_value_type().value batch_source_proto = None if self.batch_source: batch_source_proto = self.batch_source.to_proto() - options = DataSourceProto.PushOptions(schema=schema_pb,) data_source_proto = DataSourceProto( name=self.name, type=DataSourceProto.PUSH_SOURCE, - push_options=options, - timestamp_field=self.timestamp_field, description=self.description, tags=self.tags, owner=self.owner, diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 20fcd410c2..1b682ce264 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -77,6 +77,12 @@ def get_online_features(body=Depends(get_body)): @app.post("/push") def push(body=Depends(get_body)): + warnings.warn( + "push is an experimental feature. " + "This API is unstable and it could be changed in the future. " + "We do not guarantee that future changes will maintain backward compatibility.", + RuntimeWarning, + ) try: request = PushFeaturesRequest(**json.loads(body)) df = pd.DataFrame(request.df) @@ -94,9 +100,7 @@ def push(body=Depends(get_body)): @app.post("/write-to-online-store") def write_to_online_store(body=Depends(get_body)): warnings.warn( - "write_to_online_store is an experimental feature. " - "This API is unstable and it could be changed in the future. " - "We do not guarantee that future changes will maintain backward compatibility.", + "write_to_online_store is deprecated. Please consider using /push instead", RuntimeWarning, ) try: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 82543efff4..2311f78e9b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -1193,7 +1193,14 @@ def push( Args: push_source_name: The name of the push source we want to push data to. df: the data being pushed. + allow_registry_cache: whether to allow cached versions of the registry. """ + warnings.warn( + "Push source is an experimental feature. " + "This API is unstable and it could and might change in the future. " + "We do not guarantee that future changes will maintain backward compatibility.", + RuntimeWarning, + ) from feast.data_source import PushSource all_fvs = self.list_feature_views(allow_cache=allow_registry_cache) diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 9d15a6a25f..2ff252b00c 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -2,7 +2,7 @@ from typing import List from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource -from feast.data_source import DataSource, RequestSource +from feast.data_source import DataSource, PushSource, RequestSource from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.field import Field, from_value_type @@ -74,6 +74,8 @@ def update_data_sources_with_inferred_event_timestamp_col( for data_source in data_sources: if isinstance(data_source, RequestSource): continue + if isinstance(data_source, PushSource): + data_source = data_source.batch_source if data_source.timestamp_field is None or data_source.timestamp_field == "": # prepare right match pattern for data source ts_column_type_regex_pattern = "" diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 8ad38b2c4d..4ca1cd0471 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -237,13 +237,7 @@ def create_field_mapping_feature_view(source): def create_pushable_feature_view(batch_source: DataSource): push_source = PushSource( - name="location_stats_push_source", - schema=[ - Field(name="location_id", dtype=Int64), - Field(name="temperature", dtype=Int32), - ], - timestamp_field="timestamp", - batch_source=batch_source, + name="location_stats_push_source", batch_source=batch_source, ) return FeatureView( name="pushable_location_stats", From 47c1697823b7e82698f13e47365b90c3a7392420 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 18:49:36 -0400 Subject: [PATCH 14/18] lint Signed-off-by: Danny Chiao --- sdk/python/feast/data_source.py | 2 +- sdk/python/tests/example_repos/example_feature_repo_1.py | 9 +-------- .../integration/feature_repos/universal/feature_views.py | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index f5efa0335b..4a3762031e 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -21,7 +21,7 @@ from feast import type_map from feast.data_format import StreamFormat -from feast.field import Field, from_value_type +from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.repo_config import RepoConfig, get_data_source_class_from_type from feast.types import VALUE_TYPES_TO_FEAST_TYPES diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index 76b42b2241..bd07100af8 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -40,14 +40,7 @@ ) driver_locations_push_source = PushSource( - name="driver_locations_push", - schema=[ - Field(name="driver_id", dtype=String), - Field(name="driver_lat", dtype=Float32), - Field(name="driver_long", dtype=String), - ], - batch_source=driver_locations_source, - timestamp_field="event_timestamp", + name="driver_locations_push", batch_source=driver_locations_source, ) driver = Entity( diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 4ca1cd0471..a6786528e1 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -13,7 +13,7 @@ ValueType, ) from feast.data_source import DataSource, RequestSource -from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 +from feast.types import Array, FeastType, Float32, Float64, Int32 from tests.integration.feature_repos.universal.entities import location From 1e3e5fff558921cf9a48f213fc61127a7b87ecf3 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 19:00:24 -0400 Subject: [PATCH 15/18] lint Signed-off-by: Danny Chiao --- .../feature-servers/python-feature-server.md | 32 +++++++++---------- .../e2e/test_python_feature_server.py | 5 +-- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/docs/reference/feature-servers/python-feature-server.md b/docs/reference/feature-servers/python-feature-server.md index 71c75ab104..352f0edc16 100644 --- a/docs/reference/feature-servers/python-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -171,27 +171,25 @@ curl -X POST "http://localhost:6566/push" -d '{ or equivalently from Python: ```python +import json import requests import pandas as pd from datetime import datetime -event_df = pd.DataFrame.from_dict( - { - "driver_id": [1001], - "event_timestamp": [datetime(2021, 5, 13, 10, 59, 42),], - "created": [datetime(2021, 5, 13, 10, 59, 42),], - "conv_rate": [1.0], - "acc_rate": [1.0], - "avg_daily_trips": [1000], - "string_feature": "test2", - } -) -event_df['event_timestamp'] = event_df['event_timestamp'].astype(str) -event_df['created'] = event_df['created'].astype(str) +event_dict = { + "driver_id": [1001], + "event_timestamp": [str(datetime(2021, 5, 13, 10, 59, 42))], + "created": [str(datetime(2021, 5, 13, 10, 59, 42))], + "conv_rate": [1.0], + "acc_rate": [1.0], + "avg_daily_trips": [1000], + "string_feature": "test2", +} +push_data = { + "push_source_name":"driver_stats_push_source", + "df":event_dict +} requests.post( "http://localhost:6566/push", - json={ - "push_source_name":"driver_stats_push_source", - "df":event_df.to_dict() - }) + data=json.dumps(push_data)) ``` diff --git a/sdk/python/tests/integration/e2e/test_python_feature_server.py b/sdk/python/tests/integration/e2e/test_python_feature_server.py index 6f1fe018db..a3048300a3 100644 --- a/sdk/python/tests/integration/e2e/test_python_feature_server.py +++ b/sdk/python/tests/integration/e2e/test_python_feature_server.py @@ -1,5 +1,6 @@ import contextlib import json +from datetime import datetime from typing import List import pytest @@ -72,8 +73,8 @@ def test_push(): "df": { "location_id": [1], "temperature": [initial_temp * 100], - "event_timestamp": ["2022-05-13 10:59:42"], - "created": ["2022-05-13 10:59:42"], + "event_timestamp": [str(datetime.utcnow())], + "created": [str(datetime.utcnow())], }, } ) From 152686e59c631cbe473c52fcfd3784fe5d1bbf8d Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 19:02:43 -0400 Subject: [PATCH 16/18] lint Signed-off-by: Danny Chiao --- sdk/python/feast/feature_server.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 1b682ce264..8347bed6da 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -77,12 +77,6 @@ def get_online_features(body=Depends(get_body)): @app.post("/push") def push(body=Depends(get_body)): - warnings.warn( - "push is an experimental feature. " - "This API is unstable and it could be changed in the future. " - "We do not guarantee that future changes will maintain backward compatibility.", - RuntimeWarning, - ) try: request = PushFeaturesRequest(**json.loads(body)) df = pd.DataFrame(request.df) From 05bc6a6d5eaab6b75a4cdc5c636cd385cecce876 Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 19:11:09 -0400 Subject: [PATCH 17/18] lint Signed-off-by: Danny Chiao --- sdk/python/tests/unit/test_data_sources.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index ceb9ff4ce6..6a8baaee2c 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -10,11 +10,6 @@ def test_push_with_batch(): push_source = PushSource( name="test", - schema=[ - Field(name="f1", dtype=PrimitiveFeastType.FLOAT32), - Field(name="f2", dtype=PrimitiveFeastType.BOOL), - ], - timestamp_field="event_timestamp", batch_source=BigQuerySource(table="test.test"), ) push_source_proto = push_source.to_proto() @@ -25,8 +20,6 @@ def test_push_with_batch(): push_source_unproto = PushSource.from_proto(push_source_proto) assert push_source.name == push_source_unproto.name - assert push_source.schema == push_source_unproto.schema - assert push_source.timestamp_field == push_source_unproto.timestamp_field assert push_source.batch_source.name == push_source_unproto.batch_source.name From 396492162f3f3156edcc1a678f8c0b9dd6a190df Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Mon, 18 Apr 2022 19:23:26 -0400 Subject: [PATCH 18/18] lint Signed-off-by: Danny Chiao --- sdk/python/tests/unit/test_data_sources.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 6a8baaee2c..a0de42e1e2 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -9,8 +9,7 @@ def test_push_with_batch(): push_source = PushSource( - name="test", - batch_source=BigQuerySource(table="test.test"), + name="test", batch_source=BigQuerySource(table="test.test"), ) push_source_proto = push_source.to_proto() assert push_source_proto.HasField("batch_source")