diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index c6a3422ecd..11e20ab831 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -81,7 +81,7 @@ * [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md) * [.feastignore](reference/feature-repository/feast-ignore.md) * [Feature servers](reference/feature-servers/README.md) - * [Local feature server](reference/feature-servers/local-feature-server.md) + * [Python feature server](reference/feature-servers/python-feature-server.md) * [Go-based feature retrieval](reference/feature-servers/go-feature-retrieval.md) * [\[Alpha\] Data quality monitoring](reference/dqm.md) * [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md) diff --git a/docs/reference/data-sources/push.md b/docs/reference/data-sources/push.md index 9f377d2099..e6eff312ec 100644 --- a/docs/reference/data-sources/push.md +++ b/docs/reference/data-sources/push.md @@ -1,5 +1,7 @@ # Push source +**Warning**: This is an _experimental_ feature. It's intended for early testing and feedback, and could change without warnings in future releases. + ## Description Push sources allow feature values to be pushed to the online store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the @@ -31,10 +33,6 @@ from feast.types import Int64 push_source = PushSource( name="push_source", - schema=[ - Field(name="user_id", dtype=Int64), - Field(name="life_time_value", dtype=Int64) - ], batch_source=BigQuerySource(table="test.test"), ) @@ -42,7 +40,7 @@ fv = FeatureView( name="feature view", entities=["user_id"], schema=[Field(name="life_time_value", dtype=Int64)], - stream_source=push_source, + source=push_source, ) ``` @@ -53,6 +51,8 @@ import pandas as pd fs = FeatureStore(...) feature_data_frame = pd.DataFrame() -fs.push("push_source", feature_data_frame) +fs.push("push_source_name", feature_data_frame) ``` +See also [Python feature server](../feature-servers/python-feature-server.md) for instructions on how to push data to a deployed feature server. + diff --git a/docs/reference/feature-servers/README.md b/docs/reference/feature-servers/README.md index e9e3afa4c0..301cea372c 100644 --- a/docs/reference/feature-servers/README.md +++ b/docs/reference/feature-servers/README.md @@ -2,4 +2,4 @@ Feast users can choose to retrieve features from a feature server, as opposed to through the Python SDK. -{% page-ref page="local-feature-server.md" %} +{% page-ref page="python-feature-server.md" %} diff --git a/docs/reference/feature-servers/go-feature-retrieval.md b/docs/reference/feature-servers/go-feature-retrieval.md index 415817dd85..05411a7f8c 100644 --- a/docs/reference/feature-servers/go-feature-retrieval.md +++ b/docs/reference/feature-servers/go-feature-retrieval.md @@ -2,7 +2,7 @@ ## Overview -The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](local-feature-server.md). +The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](python-feature-server.md). Currently, this component only supports online serving and does not have an offline component including APIs to create feast feature repositories or apply configuration to the registry to facilitate online materialization. It also does not expose its own dedicated cli to perform feast actions. Furthermore, this component is only meant to expose an online serving API that can be called through the python SDK to facilitate faster online feature retrieval. diff --git a/docs/reference/feature-servers/local-feature-server.md b/docs/reference/feature-servers/python-feature-server.md similarity index 64% rename from docs/reference/feature-servers/local-feature-server.md rename to docs/reference/feature-servers/python-feature-server.md index 4ea37d4f1e..352f0edc16 100644 --- a/docs/reference/feature-servers/local-feature-server.md +++ b/docs/reference/feature-servers/python-feature-server.md @@ -1,15 +1,23 @@ -# Local feature server +# Python feature server ## Overview -The local feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to get features from Feast using any programming language that can make HTTP requests. A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is also available. A remote feature server on GCP Cloud Run is currently being developed. +The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests. ## CLI -There is a new CLI command that starts the server: `feast serve`. By default Feast uses port 6566; the port be overridden by a `--port` flag. +There is a CLI command that starts the server: `feast serve`. By default, Feast uses port 6566; the port be overridden by a `--port` flag. + +## Deploying as a service + +One can also deploy a feature server by building a docker image that bundles in the project's `feature_store.yaml`. See [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-python-server) for example. + +A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is available. A remote feature server on GCP Cloud Run is currently being developed. + ## Example +### Initializing a feature server Here's the local feature server usage example with the local template: ```bash @@ -41,6 +49,7 @@ INFO: Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit) 09/10/2021 10:42:11 AM INFO:Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit) ``` +### Retrieving features from the online store After the server starts, we can execute cURL commands from another terminal tab: ```bash @@ -142,3 +151,45 @@ curl -X POST \ } }' | jq ``` + +### Pushing features to the online store +You can push data corresponding to a push source to the online store (note that timestamps need to be strings): + +```text +curl -X POST "http://localhost:6566/push" -d '{ + "push_source_name": "driver_hourly_stats_push_source", + "df": { + "driver_id": [1001], + "event_timestamp": ["2022-05-13 10:59:42"], + "created": ["2022-05-13 10:59:42"], + "conv_rate": [1.0], + "acc_rate": [1.0], + "avg_daily_trips": [1000] + } + }' | jq +``` + +or equivalently from Python: +```python +import json +import requests +import pandas as pd +from datetime import datetime + +event_dict = { + "driver_id": [1001], + "event_timestamp": [str(datetime(2021, 5, 13, 10, 59, 42))], + "created": [str(datetime(2021, 5, 13, 10, 59, 42))], + "conv_rate": [1.0], + "acc_rate": [1.0], + "avg_daily_trips": [1000], + "string_feature": "test2", +} +push_data = { + "push_source_name":"driver_stats_push_source", + "df":event_dict +} +requests.post( + "http://localhost:6566/push", + data=json.dumps(push_data)) +``` diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index d958281ca2..9e6028ccfa 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -222,8 +222,7 @@ message DataSource { // Defines options for DataSource that supports pushing data to it. This allows data to be pushed to // the online store on-demand, such as by stream consumers. message PushOptions { - // Mapping of feature name to type - map schema = 1; + reserved 1; } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 0e264117ae..4a3762031e 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -21,7 +21,7 @@ from feast import type_map from feast.data_format import StreamFormat -from feast.field import Field, from_value_type +from feast.field import Field from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.repo_config import RepoConfig, get_data_source_class_from_type from feast.types import VALUE_TYPES_TO_FEAST_TYPES @@ -714,45 +714,35 @@ class PushSource(DataSource): A source that can be used to ingest features on request """ - name: str - schema: List[Field] + # TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store + # TODO(adchia): consider a "mode" to support pushing raw vs transformed events batch_source: DataSource - timestamp_field: str def __init__( self, *, name: str, - schema: List[Field], batch_source: DataSource, description: Optional[str] = "", tags: Optional[Dict[str, str]] = None, owner: Optional[str] = "", - timestamp_field: Optional[str] = "", ): """ Creates a PushSource object. Args: name: Name of the push source - schema: Schema mapping from the input feature name to a ValueType batch_source: The batch source that backs this push source. It's used when materializing from the offline store to the online store, and when retrieving historical features. description (optional): A human-readable description. tags (optional): A dictionary of key-value pairs to store arbitrary metadata. owner (optional): The owner of the data source, typically the email of the primary maintainer. - timestamp_field (optional): Event timestamp foe;d used for point in time - joins of feature values. """ super().__init__(name=name, description=description, tags=tags, owner=owner) - self.schema = sorted(schema) # TODO: add schema inference from a batch source self.batch_source = batch_source if not self.batch_source: raise ValueError(f"batch_source is needed for push source {self.name}") - if not timestamp_field: - raise ValueError(f"timestamp field is needed for push source {self.name}") - self.timestamp_field = timestamp_field def validate(self, config: RepoConfig): pass @@ -764,38 +754,25 @@ def get_table_column_names_and_types( @staticmethod def from_proto(data_source: DataSourceProto): - schema_pb = data_source.push_options.schema - schema = [] - for key, val in schema_pb.items(): - schema.append(Field(name=key, dtype=from_value_type(ValueType(val)))) - assert data_source.HasField("batch_source") batch_source = DataSource.from_proto(data_source.batch_source) return PushSource( name=data_source.name, - schema=sorted(schema), batch_source=batch_source, - timestamp_field=data_source.timestamp_field, description=data_source.description, tags=dict(data_source.tags), owner=data_source.owner, ) def to_proto(self) -> DataSourceProto: - schema_pb = {} - for field in self.schema: - schema_pb[field.name] = field.dtype.to_value_type().value batch_source_proto = None if self.batch_source: batch_source_proto = self.batch_source.to_proto() - options = DataSourceProto.PushOptions(schema=schema_pb,) data_source_proto = DataSourceProto( name=self.name, type=DataSourceProto.PUSH_SOURCE, - push_options=options, - timestamp_field=self.timestamp_field, description=self.description, tags=self.tags, owner=self.owner, diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py index 20fcd410c2..8347bed6da 100644 --- a/sdk/python/feast/feature_server.py +++ b/sdk/python/feast/feature_server.py @@ -94,9 +94,7 @@ def push(body=Depends(get_body)): @app.post("/write-to-online-store") def write_to_online_store(body=Depends(get_body)): warnings.warn( - "write_to_online_store is an experimental feature. " - "This API is unstable and it could be changed in the future. " - "We do not guarantee that future changes will maintain backward compatibility.", + "write_to_online_store is deprecated. Please consider using /push instead", RuntimeWarning, ) try: diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 33d297f3ca..2311f78e9b 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -92,7 +92,6 @@ warnings.simplefilter("once", DeprecationWarning) - if TYPE_CHECKING: from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer @@ -1186,16 +1185,25 @@ def tqdm_builder(length): ) @log_exceptions_and_usage - def push(self, push_source_name: str, df: pd.DataFrame): + def push( + self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True + ): """ Push features to a push source. This updates all the feature views that have the push source as stream source. Args: push_source_name: The name of the push source we want to push data to. df: the data being pushed. + allow_registry_cache: whether to allow cached versions of the registry. """ + warnings.warn( + "Push source is an experimental feature. " + "This API is unstable and it could and might change in the future. " + "We do not guarantee that future changes will maintain backward compatibility.", + RuntimeWarning, + ) from feast.data_source import PushSource - all_fvs = self.list_feature_views(allow_cache=True) + all_fvs = self.list_feature_views(allow_cache=allow_registry_cache) fvs_with_push_sources = { fv @@ -1208,7 +1216,9 @@ def push(self, push_source_name: str, df: pd.DataFrame): } for fv in fvs_with_push_sources: - self.write_to_online_store(fv.name, df, allow_registry_cache=True) + self.write_to_online_store( + fv.name, df, allow_registry_cache=allow_registry_cache + ) @log_exceptions_and_usage def write_to_online_store( diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 9d15a6a25f..2ff252b00c 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -2,7 +2,7 @@ from typing import List from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource -from feast.data_source import DataSource, RequestSource +from feast.data_source import DataSource, PushSource, RequestSource from feast.errors import RegistryInferenceFailure from feast.feature_view import FeatureView from feast.field import Field, from_value_type @@ -74,6 +74,8 @@ def update_data_sources_with_inferred_event_timestamp_col( for data_source in data_sources: if isinstance(data_source, RequestSource): continue + if isinstance(data_source, PushSource): + data_source = data_source.batch_source if data_source.timestamp_field is None or data_source.timestamp_field == "": # prepare right match pattern for data source ts_column_type_regex_pattern = "" diff --git a/sdk/python/feast/infra/online_stores/sqlite.py b/sdk/python/feast/infra/online_stores/sqlite.py index 710f4c386a..5657fbe372 100644 --- a/sdk/python/feast/infra/online_stores/sqlite.py +++ b/sdk/python/feast/infra/online_stores/sqlite.py @@ -230,7 +230,9 @@ def teardown( def _initialize_conn(db_path: str): Path(db_path).parent.mkdir(exist_ok=True) return sqlite3.connect( - db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + db_path, + detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES, + check_same_thread=False, ) diff --git a/sdk/python/tests/example_repos/example_feature_repo_1.py b/sdk/python/tests/example_repos/example_feature_repo_1.py index 76b42b2241..bd07100af8 100644 --- a/sdk/python/tests/example_repos/example_feature_repo_1.py +++ b/sdk/python/tests/example_repos/example_feature_repo_1.py @@ -40,14 +40,7 @@ ) driver_locations_push_source = PushSource( - name="driver_locations_push", - schema=[ - Field(name="driver_id", dtype=String), - Field(name="driver_lat", dtype=Float32), - Field(name="driver_long", dtype=String), - ], - batch_source=driver_locations_source, - timestamp_field="event_timestamp", + name="driver_locations_push", batch_source=driver_locations_source, ) driver = Entity( diff --git a/sdk/python/tests/integration/e2e/test_python_feature_server.py b/sdk/python/tests/integration/e2e/test_python_feature_server.py new file mode 100644 index 0000000000..a3048300a3 --- /dev/null +++ b/sdk/python/tests/integration/e2e/test_python_feature_server.py @@ -0,0 +1,121 @@ +import contextlib +import json +from datetime import datetime +from typing import List + +import pytest +from fastapi.testclient import TestClient + +from feast.feast_object import FeastObject +from feast.feature_server import get_app +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, + construct_universal_feature_views, + construct_universal_test_data, +) +from tests.integration.feature_repos.universal.entities import ( + customer, + driver, + location, +) + + +@pytest.mark.integration +@pytest.mark.universal +def test_get_online_features(): + with setup_python_fs_client() as client: + request_data_dict = { + "features": [ + "driver_stats:conv_rate", + "driver_stats:acc_rate", + "driver_stats:avg_daily_trips", + ], + "entities": {"driver_id": [5001, 5002]}, + } + response = client.post( + "/get-online-features", data=json.dumps(request_data_dict) + ) + + # Check entities and features are present + parsed_response = json.loads(response.text) + assert "metadata" in parsed_response + metadata = parsed_response["metadata"] + expected_features = ["driver_id", "conv_rate", "acc_rate", "avg_daily_trips"] + response_feature_names = metadata["feature_names"] + assert len(response_feature_names) == len(expected_features) + for expected_feature in expected_features: + assert expected_feature in response_feature_names + assert "results" in parsed_response + results = parsed_response["results"] + for result in results: + # Same order as in metadata + assert len(result["statuses"]) == 2 # Requested two entities + for status in result["statuses"]: + assert status == "PRESENT" + results_driver_id_index = response_feature_names.index("driver_id") + assert ( + results[results_driver_id_index]["values"] + == request_data_dict["entities"]["driver_id"] + ) + + +@pytest.mark.integration +@pytest.mark.universal +def test_push(): + with setup_python_fs_client() as client: + initial_temp = get_temperatures(client, location_ids=[1])[0] + json_data = json.dumps( + { + "push_source_name": "location_stats_push_source", + "df": { + "location_id": [1], + "temperature": [initial_temp * 100], + "event_timestamp": [str(datetime.utcnow())], + "created": [str(datetime.utcnow())], + }, + } + ) + response = client.post("/push", data=json_data,) + + # Check new pushed temperature is fetched + assert response.status_code == 200 + assert get_temperatures(client, location_ids=[1]) == [initial_temp * 100] + + +def get_temperatures(client, location_ids: List[int]): + get_request_data = { + "features": ["pushable_location_stats:temperature"], + "entities": {"location_id": location_ids}, + } + response = client.post("/get-online-features", data=json.dumps(get_request_data)) + parsed_response = json.loads(response.text) + assert "metadata" in parsed_response + metadata = parsed_response["metadata"] + response_feature_names = metadata["feature_names"] + assert "results" in parsed_response + results = parsed_response["results"] + results_temperature_index = response_feature_names.index("temperature") + return results[results_temperature_index]["values"] + + +@contextlib.contextmanager +def setup_python_fs_client(): + config = IntegrationTestRepoConfig() + environment = construct_test_environment(config) + fs = environment.feature_store + try: + entities, datasets, data_sources = construct_universal_test_data(environment) + feature_views = construct_universal_feature_views(data_sources) + feast_objects: List[FeastObject] = [] + feast_objects.extend(feature_views.values()) + feast_objects.extend([driver(), customer(), location()]) + fs.apply(feast_objects) + fs.materialize(environment.start_date, environment.end_date) + client = TestClient(get_app(fs)) + yield client + finally: + fs.teardown() + environment.data_source_creator.teardown() diff --git a/sdk/python/tests/integration/feature_repos/universal/feature_views.py b/sdk/python/tests/integration/feature_repos/universal/feature_views.py index 5918e36753..a6786528e1 100644 --- a/sdk/python/tests/integration/feature_repos/universal/feature_views.py +++ b/sdk/python/tests/integration/feature_repos/universal/feature_views.py @@ -13,7 +13,7 @@ ValueType, ) from feast.data_source import DataSource, RequestSource -from feast.types import Array, FeastType, Float32, Float64, Int32, Int64 +from feast.types import Array, FeastType, Float32, Float64, Int32 from tests.integration.feature_repos.universal.entities import location @@ -65,19 +65,19 @@ def conv_rate_plus_100(features_df: pd.DataFrame) -> pd.DataFrame: def conv_rate_plus_100_feature_view( sources: Dict[str, Union[RequestSource, FeatureView]], infer_features: bool = False, - features: Optional[List[Feature]] = None, + features: Optional[List[Field]] = None, ) -> OnDemandFeatureView: # Test that positional arguments and Features still work for ODFVs. _features = features or [ - Feature(name="conv_rate_plus_100", dtype=ValueType.DOUBLE), - Feature(name="conv_rate_plus_val_to_add", dtype=ValueType.DOUBLE), - Feature(name="conv_rate_plus_100_rounded", dtype=ValueType.INT32), + Field(name="conv_rate_plus_100", dtype=Float64), + Field(name="conv_rate_plus_val_to_add", dtype=Float64), + Field(name="conv_rate_plus_100_rounded", dtype=Int32), ] return OnDemandFeatureView( - conv_rate_plus_100.__name__, - [] if infer_features else _features, - sources, - conv_rate_plus_100, + name=conv_rate_plus_100.__name__, + schema=[] if infer_features else _features, + sources=sources, + udf=conv_rate_plus_100, ) @@ -237,13 +237,7 @@ def create_field_mapping_feature_view(source): def create_pushable_feature_view(batch_source: DataSource): push_source = PushSource( - name="location_stats_push_source", - schema=[ - Field(name="location_id", dtype=Int64), - Field(name="temperature", dtype=Int32), - ], - timestamp_field="timestamp", - batch_source=batch_source, + name="location_stats_push_source", batch_source=batch_source, ) return FeatureView( name="pushable_location_stats", diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index ceb9ff4ce6..a0de42e1e2 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -9,13 +9,7 @@ def test_push_with_batch(): push_source = PushSource( - name="test", - schema=[ - Field(name="f1", dtype=PrimitiveFeastType.FLOAT32), - Field(name="f2", dtype=PrimitiveFeastType.BOOL), - ], - timestamp_field="event_timestamp", - batch_source=BigQuerySource(table="test.test"), + name="test", batch_source=BigQuerySource(table="test.test"), ) push_source_proto = push_source.to_proto() assert push_source_proto.HasField("batch_source") @@ -25,8 +19,6 @@ def test_push_with_batch(): push_source_unproto = PushSource.from_proto(push_source_proto) assert push_source.name == push_source_unproto.name - assert push_source.schema == push_source_unproto.schema - assert push_source.timestamp_field == push_source_unproto.timestamp_field assert push_source.batch_source.name == push_source_unproto.batch_source.name