diff --git a/.github/workflows/lint_pr.yml b/.github/workflows/lint_pr.yml index 12f7182ce8..d1aa7d16a3 100644 --- a/.github/workflows/lint_pr.yml +++ b/.github/workflows/lint_pr.yml @@ -7,12 +7,13 @@ on: - edited - synchronize +permissions: + # read-only perms specified due to use of pull_request_target in lieu of security label check + pull-requests: read + jobs: validate-title: - # when using pull_request_target, all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. if: - ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || - (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && github.repository == 'feast-dev/feast' name: Validate PR title runs-on: ubuntu-latest diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index ba4169c292..2b0e6a1056 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -86,7 +86,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [ "3.8", "3.10" ] + python-version: [ "3.10" ] os: [ ubuntu-latest ] env: OS: ${{ matrix.os }} @@ -167,4 +167,4 @@ jobs: SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} - run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread \ No newline at end of file + run: pytest -n 8 --cov=./ --cov-report=xml --color=yes sdk/python/tests --integration --durations=5 --timeout=1200 --timeout_method=thread diff --git a/.github/workflows/pr_local_integration_tests.yml b/.github/workflows/pr_local_integration_tests.yml index 668bcb5e50..17ff54b1f8 100644 --- a/.github/workflows/pr_local_integration_tests.yml +++ b/.github/workflows/pr_local_integration_tests.yml @@ -19,7 +19,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [ "3.8", "3.10" ] + python-version: [ "3.10" ] os: [ ubuntu-latest ] env: OS: ${{ matrix.os }} diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index 31e6d08c74..7e2e3b577a 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -7,7 +7,7 @@ jobs: strategy: fail-fast: false matrix: - python-version: [ "3.8", "3.9", "3.10" ] + python-version: [ "3.9", "3.10" ] os: [ ubuntu-latest, macOS-latest ] exclude: - os: macOS-latest diff --git a/docs/reference/alpha-web-ui.md b/docs/reference/alpha-web-ui.md index 7d21a3d45d..398c8de0ae 100644 --- a/docs/reference/alpha-web-ui.md +++ b/docs/reference/alpha-web-ui.md @@ -85,6 +85,8 @@ When you start the React app, it will look for `project-list.json` to find a lis } ``` +* **Note** - `registryPath` only supports a file location or a url. + Then start the React App ```bash diff --git a/docs/reference/offline-stores/spark.md b/docs/reference/offline-stores/spark.md index ae5ea78071..3cca2aab1a 100644 --- a/docs/reference/offline-stores/spark.md +++ b/docs/reference/offline-stores/spark.md @@ -4,7 +4,7 @@ The Spark offline store provides support for reading [SparkSources](../data-sources/spark.md). -* Entity dataframes can be provided as a SQL query or can be provided as a Pandas dataframe. A Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view. +* Entity dataframes can be provided as a SQL query, Pandas dataframe or can be provided as a Pyspark dataframe. A Pandas dataframes will be converted to a Spark dataframe and processed as a temporary view. ## Disclaimer diff --git a/java/serving/src/test/resources/docker-compose/feast10/entrypoint.sh b/java/serving/src/test/resources/docker-compose/feast10/entrypoint.sh index d7dcd03c5f..0690b734c3 100755 --- a/java/serving/src/test/resources/docker-compose/feast10/entrypoint.sh +++ b/java/serving/src/test/resources/docker-compose/feast10/entrypoint.sh @@ -4,8 +4,8 @@ set -e # feast root directory is expected to be mounted (eg, by docker compose) cd /mnt/feast -pip install -e '.[redis]' +pip install -e '.[grpcio,redis]' cd /app python materialize.py -feast serve_transformations --port 8080 \ No newline at end of file +feast serve_transformations --port 8080 diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 9097e40c94..b7151ff0c8 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -415,3 +415,8 @@ def __init__(self): class PushSourceNotFoundException(Exception): def __init__(self, push_source_name: str): super().__init__(f"Unable to find push source '{push_source_name}'.") + + +class ReadOnlyRegistryException(Exception): + def __init__(self): + super().__init__("Registry implementation is read-only.") diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 4a53672b2e..e38120c33d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -164,6 +164,10 @@ def __init__( self._registry = SnowflakeRegistry( registry_config, self.config.project, None ) + elif registry_config and registry_config.registry_type == "remote": + from feast.infra.registry.remote import RemoteRegistry + + self._registry = RemoteRegistry(registry_config, self.config.project, None) else: r = Registry(self.config.project, registry_config, repo_path=self.repo_path) r._initialize_registry(self.config.project) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py index c9591b7c3f..b1b1c04c7d 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py @@ -125,7 +125,7 @@ def get_historical_features( config: RepoConfig, feature_views: List[FeatureView], feature_refs: List[str], - entity_df: Union[pandas.DataFrame, str], + entity_df: Union[pandas.DataFrame, str, pyspark.sql.DataFrame], registry: Registry, project: str, full_feature_names: bool = False, @@ -473,15 +473,16 @@ def _get_entity_df_event_timestamp_range( entity_df_event_timestamp.min().to_pydatetime(), entity_df_event_timestamp.max().to_pydatetime(), ) - elif isinstance(entity_df, str): + elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame): # If the entity_df is a string (SQL query), determine range # from table - df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) - - # Checks if executing entity sql resulted in any data - if df.rdd.isEmpty(): - raise EntitySQLEmptyResults(entity_df) - + if isinstance(entity_df, str): + df = spark_session.sql(entity_df).select(entity_df_event_timestamp_col) + # Checks if executing entity sql resulted in any data + if df.rdd.isEmpty(): + raise EntitySQLEmptyResults(entity_df) + else: + df = entity_df # TODO(kzhang132): need utc conversion here. entity_df_event_timestamp_range = ( @@ -499,8 +500,11 @@ def _get_entity_schema( ) -> Dict[str, np.dtype]: if isinstance(entity_df, pd.DataFrame): return dict(zip(entity_df.columns, entity_df.dtypes)) - elif isinstance(entity_df, str): - entity_spark_df = spark_session.sql(entity_df) + elif isinstance(entity_df, str) or isinstance(entity_df, pyspark.sql.DataFrame): + if isinstance(entity_df, str): + entity_spark_df = spark_session.sql(entity_df) + else: + entity_spark_df = entity_df return dict( zip( entity_spark_df.columns, @@ -526,6 +530,9 @@ def _upload_entity_df( elif isinstance(entity_df, str): spark_session.sql(entity_df).createOrReplaceTempView(table_name) return + elif isinstance(entity_df, pyspark.sql.DataFrame): + entity_df.createOrReplaceTempView(table_name) + return else: raise InvalidEntityType(type(entity_df)) diff --git a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py index a27065fb5e..1ff7e6de58 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py @@ -39,7 +39,6 @@ def __init__( query: Optional[str] = None, path: Optional[str] = None, file_format: Optional[str] = None, - event_timestamp_column: Optional[str] = None, created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, description: Optional[str] = "", diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py index fcc0c8d0fa..bd3f9def8f 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/data_source.py @@ -62,10 +62,11 @@ def __init__( "must be include into pytest plugins" ) self.exposed_port = self.container.get_exposed_port("8080") + self.container_host = self.container.get_container_host_ip() self.client = Trino( user="user", catalog="memory", - host="localhost", + host=self.container_host, port=self.exposed_port, source="trino-python-client", http_scheme="http", @@ -123,7 +124,7 @@ def get_prefixed_table_name(self, suffix: str) -> str: def create_offline_store_config(self) -> FeastConfigBaseModel: return TrinoOfflineStoreConfig( - host="localhost", + host=self.container_host, port=self.exposed_port, catalog="memory", dataset=self.project_name, diff --git a/sdk/python/feast/infra/registry/base_registry.py b/sdk/python/feast/infra/registry/base_registry.py index d0ab74812e..9ee3bbbabc 100644 --- a/sdk/python/feast/infra/registry/base_registry.py +++ b/sdk/python/feast/infra/registry/base_registry.py @@ -246,7 +246,7 @@ def delete_feature_view(self, name: str, project: str, commit: bool = True): @abstractmethod def get_stream_feature_view( self, name: str, project: str, allow_cache: bool = False - ): + ) -> StreamFeatureView: """ Retrieves a stream feature view. diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index fc7be75e0d..a9d6c44f38 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -178,6 +178,10 @@ def __new__( from feast.infra.registry.snowflake import SnowflakeRegistry return SnowflakeRegistry(registry_config, project, repo_path) + elif registry_config and registry_config.registry_type == "remote": + from feast.infra.registry.remote import RemoteRegistry + + return RemoteRegistry(registry_config, project, repo_path) else: return super(Registry, cls).__new__(cls) diff --git a/sdk/python/feast/infra/registry/remote.py b/sdk/python/feast/infra/registry/remote.py new file mode 100644 index 0000000000..67d61ffec7 --- /dev/null +++ b/sdk/python/feast/infra/registry/remote.py @@ -0,0 +1,370 @@ +from datetime import datetime +from pathlib import Path +from typing import List, Optional, Union + +import grpc +from google.protobuf.empty_pb2 import Empty +from pydantic import StrictStr + +from feast.base_feature_view import BaseFeatureView +from feast.data_source import DataSource +from feast.entity import Entity +from feast.errors import ReadOnlyRegistryException +from feast.feature_service import FeatureService +from feast.feature_view import FeatureView +from feast.infra.infra_object import Infra +from feast.infra.registry.base_registry import BaseRegistry +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc +from feast.repo_config import RegistryConfig +from feast.request_feature_view import RequestFeatureView +from feast.saved_dataset import SavedDataset, ValidationReference +from feast.stream_feature_view import StreamFeatureView + + +class RemoteRegistryConfig(RegistryConfig): + registry_type: StrictStr = "remote" + """ str: Provider name or a class name that implements Registry.""" + + path: StrictStr = "" + """ str: Path to metadata store. + If registry_type is 'remote', then this is a URL for registry server """ + + +class RemoteRegistry(BaseRegistry): + def __init__( + self, + registry_config: Union[RegistryConfig, RemoteRegistryConfig], + project: str, + repo_path: Optional[Path], + ): + self.channel = grpc.insecure_channel(registry_config.path) + self.stub = RegistryServer_pb2_grpc.RegistryServerStub(self.channel) + + def apply_entity(self, entity: Entity, project: str, commit: bool = True): + raise ReadOnlyRegistryException() + + def delete_entity(self, name: str, project: str, commit: bool = True): + raise ReadOnlyRegistryException() + + def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: + request = RegistryServer_pb2.GetEntityRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetEntity(request) + + return Entity.from_proto(response) + + def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: + request = RegistryServer_pb2.ListEntitiesRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListEntities(request) + + return [Entity.from_proto(entity) for entity in response.entities] + + def apply_data_source( + self, data_source: DataSource, project: str, commit: bool = True + ): + raise ReadOnlyRegistryException() + + def delete_data_source(self, name: str, project: str, commit: bool = True): + raise ReadOnlyRegistryException() + + def get_data_source( + self, name: str, project: str, allow_cache: bool = False + ) -> DataSource: + request = RegistryServer_pb2.GetDataSourceRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetDataSource(request) + + return DataSource.from_proto(response) + + def list_data_sources( + self, project: str, allow_cache: bool = False + ) -> List[DataSource]: + request = RegistryServer_pb2.ListDataSourcesRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListDataSources(request) + + return [ + DataSource.from_proto(data_source) for data_source in response.data_sources + ] + + def apply_feature_service( + self, feature_service: FeatureService, project: str, commit: bool = True + ): + raise ReadOnlyRegistryException() + + def delete_feature_service(self, name: str, project: str, commit: bool = True): + raise ReadOnlyRegistryException() + + def get_feature_service( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureService: + request = RegistryServer_pb2.GetFeatureServiceRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetFeatureService(request) + + return FeatureService.from_proto(response) + + def list_feature_services( + self, project: str, allow_cache: bool = False + ) -> List[FeatureService]: + request = RegistryServer_pb2.ListFeatureServicesRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListFeatureServices(request) + + return [ + FeatureService.from_proto(feature_service) + for feature_service in response.feature_services + ] + + def apply_feature_view( + self, feature_view: BaseFeatureView, project: str, commit: bool = True + ): + raise ReadOnlyRegistryException() + + def delete_feature_view(self, name: str, project: str, commit: bool = True): + raise ReadOnlyRegistryException() + + def get_stream_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> StreamFeatureView: + request = RegistryServer_pb2.GetStreamFeatureViewRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetStreamFeatureView(request) + + return StreamFeatureView.from_proto(response) + + def list_stream_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[StreamFeatureView]: + request = RegistryServer_pb2.ListStreamFeatureViewsRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListStreamFeatureViews(request) + + return [ + StreamFeatureView.from_proto(stream_feature_view) + for stream_feature_view in response.stream_feature_views + ] + + def get_on_demand_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> OnDemandFeatureView: + request = RegistryServer_pb2.GetOnDemandFeatureViewRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetOnDemandFeatureView(request) + + return OnDemandFeatureView.from_proto(response) + + def list_on_demand_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[OnDemandFeatureView]: + request = RegistryServer_pb2.ListOnDemandFeatureViewsRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListOnDemandFeatureViews(request) + + return [ + OnDemandFeatureView.from_proto(on_demand_feature_view) + for on_demand_feature_view in response.on_demand_feature_views + ] + + def get_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> FeatureView: + request = RegistryServer_pb2.GetFeatureViewRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetFeatureView(request) + + return FeatureView.from_proto(response) + + def list_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[FeatureView]: + request = RegistryServer_pb2.ListFeatureViewsRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListFeatureViews(request) + + return [ + FeatureView.from_proto(feature_view) + for feature_view in response.feature_views + ] + + def get_request_feature_view( + self, name: str, project: str, allow_cache: bool = False + ) -> RequestFeatureView: + request = RegistryServer_pb2.GetRequestFeatureViewRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetRequestFeatureView(request) + + return RequestFeatureView.from_proto(response) + + def list_request_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[RequestFeatureView]: + request = RegistryServer_pb2.ListRequestFeatureViewsRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListRequestFeatureViews(request) + + return [ + RequestFeatureView.from_proto(request_feature_view) + for request_feature_view in response.request_feature_views + ] + + def apply_materialization( + self, + feature_view: FeatureView, + project: str, + start_date: datetime, + end_date: datetime, + commit: bool = True, + ): + raise ReadOnlyRegistryException() + + def apply_saved_dataset( + self, + saved_dataset: SavedDataset, + project: str, + commit: bool = True, + ): + raise ReadOnlyRegistryException() + + def delete_saved_dataset(self, name: str, project: str, allow_cache: bool = False): + raise ReadOnlyRegistryException() + + def get_saved_dataset( + self, name: str, project: str, allow_cache: bool = False + ) -> SavedDataset: + request = RegistryServer_pb2.GetSavedDatasetRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetSavedDataset(request) + + return SavedDataset.from_proto(response) + + def list_saved_datasets( + self, project: str, allow_cache: bool = False + ) -> List[SavedDataset]: + request = RegistryServer_pb2.ListSavedDatasetsRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListSavedDatasets(request) + + return [ + SavedDataset.from_proto(saved_dataset) + for saved_dataset in response.saved_datasets + ] + + def apply_validation_reference( + self, + validation_reference: ValidationReference, + project: str, + commit: bool = True, + ): + raise ReadOnlyRegistryException() + + def delete_validation_reference(self, name: str, project: str, commit: bool = True): + raise ReadOnlyRegistryException() + + def get_validation_reference( + self, name: str, project: str, allow_cache: bool = False + ) -> ValidationReference: + request = RegistryServer_pb2.GetValidationReferenceRequest( + name=name, project=project, allow_cache=allow_cache + ) + + response = self.stub.GetValidationReference(request) + + return ValidationReference.from_proto(response) + + def list_validation_references( + self, project: str, allow_cache: bool = False + ) -> List[ValidationReference]: + request = RegistryServer_pb2.ListValidationReferencesRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListValidationReferences(request) + + return [ + ValidationReference.from_proto(validation_reference) + for validation_reference in response.validation_references + ] + + def list_project_metadata( + self, project: str, allow_cache: bool = False + ) -> List[ProjectMetadata]: + request = RegistryServer_pb2.ListProjectMetadataRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.ListProjectMetadata(request) + + return [ProjectMetadata.from_proto(pm) for pm in response.project_metadata] + + def update_infra(self, infra: Infra, project: str, commit: bool = True): + raise ReadOnlyRegistryException() + + def get_infra(self, project: str, allow_cache: bool = False) -> Infra: + request = RegistryServer_pb2.GetInfraRequest( + project=project, allow_cache=allow_cache + ) + + response = self.stub.GetInfra(request) + + return Infra.from_proto(response) + + def apply_user_metadata( + self, + project: str, + feature_view: BaseFeatureView, + metadata_bytes: Optional[bytes], + ): + pass + + def get_user_metadata( + self, project: str, feature_view: BaseFeatureView + ) -> Optional[bytes]: + pass + + def proto(self) -> RegistryProto: + return self.stub.Proto(Empty()) + + def commit(self): + raise ReadOnlyRegistryException() + + def refresh(self, project: Optional[str] = None): + request = RegistryServer_pb2.RefreshRequest(project=str(project)) + + self.stub.Refresh(request) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index d500059c6b..263ba81e39 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -39,6 +39,7 @@ "file": "feast.infra.registry.registry.Registry", "sql": "feast.infra.registry.sql.SqlRegistry", "snowflake.registry": "feast.infra.registry.snowflake.SnowflakeRegistry", + "remote": "feast.infra.registry.remote.RemoteRegistry", } BATCH_ENGINE_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/types.py b/sdk/python/feast/types.py index 0ba1725f17..4b07c58d19 100644 --- a/sdk/python/feast/types.py +++ b/sdk/python/feast/types.py @@ -50,7 +50,10 @@ def __hash__(self): return hash(self.to_value_type().value) def __eq__(self, other): - return self.to_value_type() == other.to_value_type() + if isinstance(other, ComplexFeastType): + return self.to_value_type() == other.to_value_type() + else: + return False class PrimitiveFeastType(Enum): diff --git a/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py b/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py index 11d62d9d30..8e18f7fb17 100644 --- a/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py +++ b/sdk/python/tests/integration/feature_repos/universal/online_store/redis.py @@ -20,7 +20,11 @@ def create_online_store(self) -> Dict[str, str]: container=self.container, predicate=log_string_to_wait_for, timeout=10 ) exposed_port = self.container.get_exposed_port("6379") - return {"type": "redis", "connection_string": f"localhost:{exposed_port},db=0"} + container_host = self.container.get_container_host_ip() + return { + "type": "redis", + "connection_string": f"{container_host}:{exposed_port},db=0", + } def teardown(self): self.container.stop() diff --git a/sdk/python/tests/unit/infra/registry/test_remote.py b/sdk/python/tests/unit/infra/registry/test_remote.py new file mode 100644 index 0000000000..16c6f0abfb --- /dev/null +++ b/sdk/python/tests/unit/infra/registry/test_remote.py @@ -0,0 +1,69 @@ +import assertpy +import grpc_testing +import pytest + +from feast import Entity, FeatureStore +from feast.infra.registry.remote import RemoteRegistry, RemoteRegistryConfig +from feast.protos.feast.registry import RegistryServer_pb2, RegistryServer_pb2_grpc +from feast.registry_server import RegistryServer + + +class GrpcMockChannel: + def __init__(self, service, servicer): + self.service = service + self.test_server = grpc_testing.server_from_dictionary( + {service: servicer}, + grpc_testing.strict_real_time(), + ) + + def unary_unary( + self, method: str, request_serializer=None, response_deserializer=None + ): + method_name = method.split("/")[-1] + method_descriptor = self.service.methods_by_name[method_name] + + def handler(request): + rpc = self.test_server.invoke_unary_unary( + method_descriptor, (), request, None + ) + + response, trailing_metadata, code, details = rpc.termination() + return response + + return handler + + +@pytest.fixture +def mock_remote_registry(environment): + store: FeatureStore = environment.feature_store + registry = RemoteRegistry( + registry_config=RemoteRegistryConfig(path=""), project=None, repo_path=None + ) + mock_channel = GrpcMockChannel( + RegistryServer_pb2.DESCRIPTOR.services_by_name["RegistryServer"], + RegistryServer(store=store), + ) + registry.stub = RegistryServer_pb2_grpc.RegistryServerStub(mock_channel) + return registry + + +def test_registry_server_get_entity(environment, mock_remote_registry): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.get_entity(entity.name) + response_entity = mock_remote_registry.get_entity(entity.name, store.project) + + assertpy.assert_that(response_entity).is_equal_to(expected) + + +def test_registry_server_proto(environment, mock_remote_registry): + store: FeatureStore = environment.feature_store + entity = Entity(name="driver", join_keys=["driver_id"]) + store.apply(entity) + + expected = store.registry.proto() + response = mock_remote_registry.proto() + + assertpy.assert_that(response).is_equal_to(expected) diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index b96dc6fe77..722e318b0c 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -66,10 +66,11 @@ def pg_registry(): ) logger.info("Waited for %s seconds until postgres container was up", waited) container_port = container.get_exposed_port(5432) + container_host = container.get_container_host_ip() registry_config = RegistryConfig( registry_type="sql", - path=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", + path=f"postgresql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}", ) yield SqlRegistry(registry_config, "project", None) @@ -100,10 +101,11 @@ def mysql_registry(): ) logger.info("Waited for %s seconds until mysql container was up", waited) container_port = container.get_exposed_port(3306) + container_host = container.get_container_host_ip() registry_config = RegistryConfig( registry_type="sql", - path=f"mysql+pymysql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", + path=f"mysql+pymysql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}", ) yield SqlRegistry(registry_config, "project", None) diff --git a/setup.py b/setup.py index 6d59fa0aa5..b601c90146 100644 --- a/setup.py +++ b/setup.py @@ -44,10 +44,6 @@ "click>=7.0.0,<9.0.0", "colorama>=0.3.9,<1", "dill~=0.3.0", - "grpcio>=1.56.2,<2", - "grpcio-tools>=1.56.2,<2", - "grpcio-reflection>=1.56.2,<2", - "grpcio-health-checking>=1.56.2,<2", "mypy-protobuf==3.1", "Jinja2>=2,<4", "jsonschema", @@ -143,7 +139,14 @@ IBIS_REQUIRED = [ "ibis-framework", - "ibis-substrait" + "ibis-substrait", +] + +GRPCIO_REQUIRED = [ + "grpcio>=1.56.2,<2", + "grpcio-tools>=1.56.2,<2", + "grpcio-reflection>=1.56.2,<2", + "grpcio-health-checking>=1.56.2,<2", ] DUCKDB_REQUIRED = [ @@ -209,6 +212,7 @@ + ROCKSET_REQUIRED + HAZELCAST_REQUIRED + IBIS_REQUIRED + + GRPCIO_REQUIRED ) @@ -375,6 +379,7 @@ def run(self): "docs": DOCS_REQUIRED, "cassandra": CASSANDRA_REQUIRED, "hazelcast": HAZELCAST_REQUIRED, + "grpcio": GRPCIO_REQUIRED, "rockset": ROCKSET_REQUIRED, "ibis": IBIS_REQUIRED, "duckdb": DUCKDB_REQUIRED diff --git a/ui/README.md b/ui/README.md index a9ce5d3ec7..12aacd329e 100644 --- a/ui/README.md +++ b/ui/README.md @@ -61,6 +61,8 @@ When you start the React app, it will look for `projects-list.json` to find a li } ``` +* **Note** - `registryPath` only supports a file location or a url. + ``` // Start the React App yarn start diff --git a/ui/yarn.lock b/ui/yarn.lock index becb6bbd7b..5c9e5c17ac 100644 --- a/ui/yarn.lock +++ b/ui/yarn.lock @@ -6461,9 +6461,9 @@ invariant@^2.2.4: loose-envify "^1.0.0" ip@^1.1.0: - version "1.1.5" - resolved "https://registry.yarnpkg.com/ip/-/ip-1.1.5.tgz#bdded70114290828c0a039e72ef25f5aaec4354a" - integrity sha1-vd7XARQpCCjAoDnnLvJfWq7ENUo= + version "1.1.9" + resolved "https://registry.yarnpkg.com/ip/-/ip-1.1.9.tgz#8dfbcc99a754d07f425310b86a99546b1151e396" + integrity sha512-cyRxvOEpNHNtchU3Ln9KC/auJgup87llfQpQ+t5ghoC/UhL16SWzbueiCsdTnWmqAWl7LadfuwhlqmtOaqMHdQ== ipaddr.js@1.9.1: version "1.9.1"