diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index af6362da3e..06c5edcc8b 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -96,6 +96,7 @@ * [Datastore](reference/online-stores/datastore.md) * [DynamoDB](reference/online-stores/dynamodb.md) * [Bigtable](reference/online-stores/bigtable.md) + * [Remote](reference/online-stores/remote.md) * [PostgreSQL (contrib)](reference/online-stores/postgres.md) * [Cassandra + Astra DB (contrib)](reference/online-stores/cassandra.md) * [MySQL (contrib)](reference/online-stores/mysql.md) diff --git a/docs/reference/online-stores/README.md b/docs/reference/online-stores/README.md index 686e820f4e..b5f4eb8de8 100644 --- a/docs/reference/online-stores/README.md +++ b/docs/reference/online-stores/README.md @@ -61,3 +61,7 @@ Please see [Online Store](../../getting-started/architecture-and-components/onli {% content-ref url="scylladb.md" %} [scylladb.md](scylladb.md) {% endcontent-ref %} + +{% content-ref url="remote.md" %} +[remote.md](remote.md) +{% endcontent-ref %} diff --git a/docs/reference/online-stores/remote.md b/docs/reference/online-stores/remote.md new file mode 100644 index 0000000000..c560fa6f22 --- /dev/null +++ b/docs/reference/online-stores/remote.md @@ -0,0 +1,21 @@ +# Remote online store + +## Description + +This remote online store will let you interact with remote feature server. At this moment this only supports the read operation. You can use this online store and able retrieve online features `store.get_online_features` from remote feature server. + +## Examples + +The registry is pointing to registry of remote feature store. If it is not accessible then should be configured to use remote registry. + +{% code title="feature_store.yaml" %} +```yaml +project: my-local-project + registry: /remote/data/registry.db + provider: local + online_store: + path: http://localhost:6566 + type: remote + entity_key_serialization_version: 2 +``` +{% endcode %} \ No newline at end of file diff --git a/sdk/python/feast/infra/online_stores/remote.py b/sdk/python/feast/infra/online_stores/remote.py new file mode 100644 index 0000000000..19e1b7d515 --- /dev/null +++ b/sdk/python/feast/infra/online_stores/remote.py @@ -0,0 +1,167 @@ +# Copyright 2021 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import logging +from datetime import datetime +from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple + +import requests +from pydantic import StrictStr + +from feast import Entity, FeatureView, RepoConfig +from feast.infra.online_stores.online_store import OnlineStore +from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto +from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import FeastConfigBaseModel +from feast.type_map import python_values_to_proto_values +from feast.value_type import ValueType + +logger = logging.getLogger(__name__) + + +class RemoteOnlineStoreConfig(FeastConfigBaseModel): + """Remote Online store config for remote online store""" + + type: Literal["remote"] = "remote" + """Online store type selector""" + + path: StrictStr = "http://localhost:6566" + """ str: Path to metadata store. + If type is 'remote', then this is a URL for registry server """ + + +class RemoteOnlineStore(OnlineStore): + """ + remote online store implementation wrapper to communicate with feast online server. + """ + + def online_write_batch( + self, + config: RepoConfig, + table: FeatureView, + data: List[ + Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]] + ], + progress: Optional[Callable[[int], Any]], + ) -> None: + raise NotImplementedError + + def online_read( + self, + config: RepoConfig, + table: FeatureView, + entity_keys: List[EntityKeyProto], + requested_features: Optional[List[str]] = None, + ) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]: + assert isinstance(config.online_store, RemoteOnlineStoreConfig) + config.online_store.__class__ = RemoteOnlineStoreConfig + + req_body = self._construct_online_read_api_json_request( + entity_keys, table, requested_features + ) + response = requests.post( + f"{config.online_store.path}/get-online-features", data=req_body + ) + if response.status_code == 200: + logger.debug("Able to retrieve the online features from feature server.") + response_json = json.loads(response.text) + event_ts = self._get_event_ts(response_json) + # Iterating over results and converting the API results in column format to row format. + result_tuples: List[ + Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]] + ] = [] + for feature_value_index in range(len(entity_keys)): + feature_values_dict: Dict[str, ValueProto] = dict() + for index, feature_name in enumerate( + response_json["metadata"]["feature_names"] + ): + if ( + requested_features is not None + and feature_name in requested_features + ): + if ( + response_json["results"][index]["statuses"][ + feature_value_index + ] + == "PRESENT" + ): + message = python_values_to_proto_values( + [ + response_json["results"][index]["values"][ + feature_value_index + ] + ], + ValueType.UNKNOWN, + ) + feature_values_dict[feature_name] = message[0] + else: + feature_values_dict[feature_name] = ValueProto() + result_tuples.append((event_ts, feature_values_dict)) + return result_tuples + else: + error_msg = f"Unable to retrieve the online store data using feature server API. Error_code={response.status_code}, error_message={response.reason}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + def _construct_online_read_api_json_request( + self, + entity_keys: List[EntityKeyProto], + table: FeatureView, + requested_features: Optional[List[str]] = None, + ) -> str: + api_requested_features = [] + if requested_features is not None: + for requested_feature in requested_features: + api_requested_features.append(f"{table.name}:{requested_feature}") + + entity_values = [] + entity_key = "" + for row in entity_keys: + entity_key = row.join_keys[0] + entity_values.append( + getattr(row.entity_values[0], row.entity_values[0].WhichOneof("val")) + ) + + req_body = json.dumps( + { + "features": api_requested_features, + "entities": {entity_key: entity_values}, + } + ) + return req_body + + def _get_event_ts(self, response_json) -> datetime: + event_ts = "" + if len(response_json["results"]) > 1: + event_ts = response_json["results"][1]["event_timestamps"][0] + return datetime.fromisoformat(event_ts.replace("Z", "+00:00")) + + def update( + self, + config: RepoConfig, + tables_to_delete: Sequence[FeatureView], + tables_to_keep: Sequence[FeatureView], + entities_to_delete: Sequence[Entity], + entities_to_keep: Sequence[Entity], + partial: bool, + ): + pass + + def teardown( + self, + config: RepoConfig, + tables: Sequence[FeatureView], + entities: Sequence[Entity], + ): + pass diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index b7c7b0a9d0..d5b3160b56 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -64,6 +64,7 @@ "hazelcast": "feast.infra.online_stores.contrib.hazelcast_online_store.hazelcast_online_store.HazelcastOnlineStore", "ikv": "feast.infra.online_stores.contrib.ikv_online_store.ikv.IKVOnlineStore", "elasticsearch": "feast.infra.online_stores.contrib.elasticsearch.ElasticSearchOnlineStore", + "remote": "feast.infra.online_stores.remote.RemoteOnlineStore", } OFFLINE_STORE_CLASS_FOR_TYPE = { diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 775db8c388..48f482f542 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -32,8 +32,8 @@ create_basic_driver_dataset, create_document_dataset, ) -from tests.integration.feature_repos.integration_test_repo_config import ( # noqa: E402 - IntegrationTestRepoConfig, +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, # noqa: E402 ) from tests.integration.feature_repos.repo_configuration import ( # noqa: E402 AVAILABLE_OFFLINE_STORES, @@ -45,8 +45,8 @@ construct_universal_feature_views, construct_universal_test_data, ) -from tests.integration.feature_repos.universal.data_sources.file import ( # noqa: E402 - FileDataSourceCreator, +from tests.integration.feature_repos.universal.data_sources.file import ( + FileDataSourceCreator, # noqa: E402 ) from tests.integration.feature_repos.universal.entities import ( # noqa: E402 customer, @@ -173,7 +173,7 @@ def simple_dataset_2() -> pd.DataFrame: def start_test_local_server(repo_path: str, port: int): fs = FeatureStore(repo_path) - fs.serve("localhost", port, no_access_log=True) + fs.serve(host="localhost", port=port) @pytest.fixture diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index be01a1e1ac..7123bd0fc1 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -136,9 +136,7 @@ AVAILABLE_ONLINE_STORES: Dict[ str, Tuple[Union[str, Dict[Any, Any]], Optional[Type[OnlineStoreCreator]]] -] = { - "sqlite": ({"type": "sqlite"}, None), -} +] = {"sqlite": ({"type": "sqlite"}, None)} # Only configure Cloud DWH if running full integration tests if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True": @@ -155,7 +153,6 @@ AVAILABLE_ONLINE_STORES["datastore"] = ("datastore", None) AVAILABLE_ONLINE_STORES["snowflake"] = (SNOWFLAKE_CONFIG, None) AVAILABLE_ONLINE_STORES["bigtable"] = (BIGTABLE_CONFIG, None) - # Uncomment to test using private Rockset account. Currently not enabled as # there is no dedicated Rockset instance for CI testing and there is no # containerized version of Rockset. @@ -489,7 +486,6 @@ def construct_test_environment( "arn:aws:iam::402087665549:role/lambda_execution_role", ), ) - else: feature_server = LocalFeatureServerConfig( feature_logging=FeatureLoggingConfig(enabled=True) diff --git a/sdk/python/tests/integration/online_store/test_remote_online_store.py b/sdk/python/tests/integration/online_store/test_remote_online_store.py new file mode 100644 index 0000000000..759a9c7a87 --- /dev/null +++ b/sdk/python/tests/integration/online_store/test_remote_online_store.py @@ -0,0 +1,233 @@ +import os +import subprocess +import tempfile +from datetime import datetime +from textwrap import dedent + +import pytest + +from feast.feature_store import FeatureStore +from feast.wait import wait_retry_backoff +from tests.utils.cli_repo_creator import CliRunner +from tests.utils.http_server import check_port_open, free_port + + +@pytest.mark.integration +def test_remote_online_store_read(): + with tempfile.TemporaryDirectory() as remote_server_tmp_dir, tempfile.TemporaryDirectory() as remote_client_tmp_dir: + server_store, server_url, registry_path = ( + _create_server_store_spin_feature_server(temp_dir=remote_server_tmp_dir) + ) + assert None not in (server_store, server_url, registry_path) + client_store = _create_remote_client_feature_store( + temp_dir=remote_client_tmp_dir, + server_registry_path=str(registry_path), + feature_server_url=server_url, + ) + assert client_store is not None + _assert_non_existing_entity_feature_views_entity( + client_store=client_store, server_store=server_store + ) + _assert_existing_feature_views_entity( + client_store=client_store, server_store=server_store + ) + _assert_non_existing_feature_views( + client_store=client_store, server_store=server_store + ) + + +def _assert_non_existing_entity_feature_views_entity( + client_store: FeatureStore, server_store: FeatureStore +): + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ] + + entity_rows = [{"driver_id": 1234}] + _assert_client_server_online_stores_are_matching( + client_store=client_store, + server_store=server_store, + features=features, + entity_rows=entity_rows, + ) + + +def _assert_non_existing_feature_views( + client_store: FeatureStore, server_store: FeatureStore +): + features = [ + "driver_hourly_stats1:conv_rate", + "driver_hourly_stats1:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ] + + entity_rows = [{"driver_id": 1001}, {"driver_id": 1002}] + + with pytest.raises( + Exception, match="Feature view driver_hourly_stats1 does not exist" + ): + client_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + with pytest.raises( + Exception, match="Feature view driver_hourly_stats1 does not exist" + ): + server_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + +def _assert_existing_feature_views_entity( + client_store: FeatureStore, server_store: FeatureStore +): + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + ] + + entity_rows = [{"driver_id": 1001}, {"driver_id": 1002}] + _assert_client_server_online_stores_are_matching( + client_store=client_store, + server_store=server_store, + features=features, + entity_rows=entity_rows, + ) + + features = ["driver_hourly_stats:conv_rate"] + _assert_client_server_online_stores_are_matching( + client_store=client_store, + server_store=server_store, + features=features, + entity_rows=entity_rows, + ) + + +def _assert_client_server_online_stores_are_matching( + client_store: FeatureStore, + server_store: FeatureStore, + features: list[str], + entity_rows: list, +): + online_features_from_client = client_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + assert online_features_from_client is not None + + online_features_from_server = server_store.get_online_features( + features=features, entity_rows=entity_rows + ).to_dict() + + assert online_features_from_server is not None + assert online_features_from_client is not None + assert online_features_from_client == online_features_from_server + + +def _create_server_store_spin_feature_server(temp_dir): + feast_server_port = free_port() + store = _default_store(str(temp_dir), "REMOTE_ONLINE_SERVER_PROJECT") + server_url = next( + _start_feature_server( + repo_path=str(store.repo_path), server_port=feast_server_port + ) + ) + print(f"Server started successfully, {server_url}") + return store, server_url, os.path.join(store.repo_path, "data", "registry.db") + + +def _default_store(temp_dir, project_name) -> FeatureStore: + runner = CliRunner() + result = runner.run(["init", project_name], cwd=temp_dir) + repo_path = os.path.join(temp_dir, project_name, "feature_repo") + assert result.returncode == 0 + + result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) + assert result.returncode == 0 + + fs = FeatureStore(repo_path=repo_path) + fs.materialize_incremental( + end_date=datetime.utcnow(), feature_views=["driver_hourly_stats"] + ) + return fs + + +def _create_remote_client_feature_store( + temp_dir, server_registry_path: str, feature_server_url: str +) -> FeatureStore: + project_name = "REMOTE_ONLINE_CLIENT_PROJECT" + runner = CliRunner() + result = runner.run(["init", project_name], cwd=temp_dir) + assert result.returncode == 0 + repo_path = os.path.join(temp_dir, project_name, "feature_repo") + _overwrite_remote_client_feature_store_yaml( + repo_path=str(repo_path), + registry_path=server_registry_path, + feature_server_url=feature_server_url, + ) + + result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) + assert result.returncode == 0 + + return FeatureStore(repo_path=repo_path) + + +def _overwrite_remote_client_feature_store_yaml( + repo_path: str, registry_path: str, feature_server_url: str +): + repo_config = os.path.join(repo_path, "feature_store.yaml") + with open(repo_config, "w") as repo_config: + repo_config.write( + dedent( + f""" + project: REMOTE_ONLINE_CLIENT_PROJECT + registry: {registry_path} + provider: local + online_store: + path: {feature_server_url} + type: remote + entity_key_serialization_version: 2 + """ + ) + ) + + +def _start_feature_server(repo_path: str, server_port: int): + host = "0.0.0.0" + cmd = [ + "feast", + "-c" + repo_path, + "serve", + "--host", + host, + "--port", + str(server_port), + ] + feast_server_process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL + ) + _time_out_sec: int = 60 + # Wait for server to start + wait_retry_backoff( + lambda: (None, check_port_open(host, server_port)), + timeout_secs=_time_out_sec, + timeout_msg=f"Unable to start the feast server in {_time_out_sec} seconds for remote online store type, port={server_port}", + ) + + yield f"http://localhost:{server_port}" + + if feast_server_process is not None: + feast_server_process.kill() + + # wait server to free the port + wait_retry_backoff( + lambda: ( + None, + not check_port_open("localhost", server_port), + ), + timeout_msg=f"Unable to stop the feast server in {_time_out_sec} seconds for remote online store type, port={server_port}", + timeout_secs=_time_out_sec, + ) diff --git a/sdk/python/tests/unit/online_store/__init__.py b/sdk/python/tests/unit/online_store/__init__.py new file mode 100644 index 0000000000..e69de29bb2