From 2e573769a24e2429233afe34424af0433b2dc7ec Mon Sep 17 00:00:00 2001 From: Danny Chiao Date: Wed, 14 Dec 2022 23:01:20 -0500 Subject: [PATCH] fix: Enable registry caching in SQL Registry (#3395) * fix: Enable registry caching in SQL Registry Signed-off-by: Danny Chiao * docs Signed-off-by: Danny Chiao * fix regular file registry docs too Signed-off-by: Danny Chiao * fix new file lint Signed-off-by: Danny Chiao * fix new file lint Signed-off-by: Danny Chiao Signed-off-by: Danny Chiao --- docs/getting-started/concepts/registry.md | 20 +- docs/tutorials/using-scalable-registry.md | 1 + .../infra/registry/proto_registry_utils.py | 208 ++++++++++++++++++ sdk/python/feast/infra/registry/registry.py | 169 ++++---------- sdk/python/feast/infra/registry/sql.py | 130 ++++++++++- 5 files changed, 400 insertions(+), 128 deletions(-) create mode 100644 sdk/python/feast/infra/registry/proto_registry_utils.py diff --git a/docs/getting-started/concepts/registry.md b/docs/getting-started/concepts/registry.md index b228050944..f7d4a5b3e1 100644 --- a/docs/getting-started/concepts/registry.md +++ b/docs/getting-started/concepts/registry.md @@ -16,7 +16,9 @@ a remote file registry, you need to create a GCS / S3 bucket that Feast can unde ```yaml project: feast_demo_aws provider: aws -registry: s3://[YOUR BUCKET YOU CREATED]/registry.pb +registry: + path: s3://[YOUR BUCKET YOU CREATED]/registry.pb + cache_ttl_seconds: 60 online_store: null offline_store: type: file @@ -27,7 +29,9 @@ offline_store: ```yaml project: feast_demo_gcp provider: gcp -registry: gs://[YOUR BUCKET YOU CREATED]/registry.pb +registry: + path: gs://[YOUR BUCKET YOU CREATED]/registry.pb + cache_ttl_seconds: 60 online_store: null offline_store: type: file @@ -43,6 +47,18 @@ multiple feature views or time ranges concurrently). #### SQL Registry Alternatively, a [SQL Registry](../../tutorials/using-scalable-registry.md) can be used for a more scalable registry. +The configuration roughly looks like: +```yaml +project: +provider: +online_store: redis +offline_store: file +registry: + registry_type: sql + path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast + cache_ttl_seconds: 60 +``` + This supports any SQLAlchemy compatible database as a backend. The exact schema can be seen in [sql.py](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/registry/sql.py) ### Updating the registry diff --git a/docs/tutorials/using-scalable-registry.md b/docs/tutorials/using-scalable-registry.md index 0ee02674b1..11d8df47b4 100644 --- a/docs/tutorials/using-scalable-registry.md +++ b/docs/tutorials/using-scalable-registry.md @@ -28,6 +28,7 @@ offline_store: file registry: registry_type: sql path: postgresql://postgres:mysecretpassword@127.0.0.1:55001/feast + cache_ttl_seconds: 60 ``` Specifically, the registry_type needs to be set to sql in the registry config block. On doing so, the path should refer to the [Database URL](https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls) for the database to be used, as expected by SQLAlchemy. No other additional commands are currently needed to configure this registry. diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py new file mode 100644 index 0000000000..e26b1e10e7 --- /dev/null +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -0,0 +1,208 @@ +from typing import List + +from feast.data_source import DataSource +from feast.entity import Entity +from feast.errors import ( + DataSourceObjectNotFoundException, + EntityNotFoundException, + FeatureServiceNotFoundException, + FeatureViewNotFoundException, + OnDemandFeatureViewNotFoundException, + SavedDatasetNotFound, + ValidationReferenceNotFound, +) +from feast.feature_service import FeatureService +from feast.feature_view import FeatureView +from feast.on_demand_feature_view import OnDemandFeatureView +from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto +from feast.request_feature_view import RequestFeatureView +from feast.saved_dataset import SavedDataset, ValidationReference +from feast.stream_feature_view import StreamFeatureView + + +def get_feature_service( + registry_proto: RegistryProto, name: str, project: str +) -> FeatureService: + for feature_service_proto in registry_proto.feature_services: + if ( + feature_service_proto.spec.project == project + and feature_service_proto.spec.name == name + ): + return FeatureService.from_proto(feature_service_proto) + raise FeatureServiceNotFoundException(name, project=project) + + +def get_feature_view( + registry_proto: RegistryProto, name: str, project: str +) -> FeatureView: + for feature_view_proto in registry_proto.feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return FeatureView.from_proto(feature_view_proto) + raise FeatureViewNotFoundException(name, project) + + +def get_stream_feature_view( + registry_proto: RegistryProto, name: str, project: str +) -> StreamFeatureView: + for feature_view_proto in registry_proto.stream_feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return StreamFeatureView.from_proto(feature_view_proto) + raise FeatureViewNotFoundException(name, project) + + +def get_request_feature_view(registry_proto: RegistryProto, name: str, project: str): + for feature_view_proto in registry_proto.feature_views: + if ( + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project + ): + return RequestFeatureView.from_proto(feature_view_proto) + raise FeatureViewNotFoundException(name, project) + + +def get_on_demand_feature_view( + registry_proto: RegistryProto, name: str, project: str +) -> OnDemandFeatureView: + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if ( + on_demand_feature_view.spec.project == project + and on_demand_feature_view.spec.name == name + ): + return OnDemandFeatureView.from_proto(on_demand_feature_view) + raise OnDemandFeatureViewNotFoundException(name, project=project) + + +def get_data_source( + registry_proto: RegistryProto, name: str, project: str +) -> DataSource: + for data_source in registry_proto.data_sources: + if data_source.project == project and data_source.name == name: + return DataSource.from_proto(data_source) + raise DataSourceObjectNotFoundException(name, project=project) + + +def get_entity(registry_proto: RegistryProto, name: str, project: str) -> Entity: + for entity_proto in registry_proto.entities: + if entity_proto.spec.name == name and entity_proto.spec.project == project: + return Entity.from_proto(entity_proto) + raise EntityNotFoundException(name, project=project) + + +def get_saved_dataset( + registry_proto: RegistryProto, name: str, project: str +) -> SavedDataset: + for saved_dataset in registry_proto.saved_datasets: + if saved_dataset.spec.name == name and saved_dataset.spec.project == project: + return SavedDataset.from_proto(saved_dataset) + raise SavedDatasetNotFound(name, project=project) + + +def get_validation_reference( + registry_proto: RegistryProto, name: str, project: str +) -> ValidationReference: + for validation_reference in registry_proto.validation_references: + if ( + validation_reference.name == name + and validation_reference.project == project + ): + return ValidationReference.from_proto(validation_reference) + raise ValidationReferenceNotFound(name, project=project) + + +def list_feature_services( + registry_proto: RegistryProto, project: str, allow_cache: bool = False +) -> List[FeatureService]: + feature_services = [] + for feature_service_proto in registry_proto.feature_services: + if feature_service_proto.spec.project == project: + feature_services.append(FeatureService.from_proto(feature_service_proto)) + return feature_services + + +def list_feature_views( + registry_proto: RegistryProto, project: str +) -> List[FeatureView]: + feature_views: List[FeatureView] = [] + for feature_view_proto in registry_proto.feature_views: + if feature_view_proto.spec.project == project: + feature_views.append(FeatureView.from_proto(feature_view_proto)) + return feature_views + + +def list_request_feature_views( + registry_proto: RegistryProto, project: str +) -> List[RequestFeatureView]: + feature_views: List[RequestFeatureView] = [] + for request_feature_view_proto in registry_proto.request_feature_views: + if request_feature_view_proto.spec.project == project: + feature_views.append( + RequestFeatureView.from_proto(request_feature_view_proto) + ) + return feature_views + + +def list_stream_feature_views( + registry_proto: RegistryProto, project: str +) -> List[StreamFeatureView]: + stream_feature_views = [] + for stream_feature_view in registry_proto.stream_feature_views: + if stream_feature_view.spec.project == project: + stream_feature_views.append( + StreamFeatureView.from_proto(stream_feature_view) + ) + return stream_feature_views + + +def list_on_demand_feature_views( + registry_proto: RegistryProto, project: str +) -> List[OnDemandFeatureView]: + on_demand_feature_views = [] + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if on_demand_feature_view.spec.project == project: + on_demand_feature_views.append( + OnDemandFeatureView.from_proto(on_demand_feature_view) + ) + return on_demand_feature_views + + +def list_entities(registry_proto: RegistryProto, project: str) -> List[Entity]: + entities = [] + for entity_proto in registry_proto.entities: + if entity_proto.spec.project == project: + entities.append(Entity.from_proto(entity_proto)) + return entities + + +def list_data_sources(registry_proto: RegistryProto, project: str) -> List[DataSource]: + data_sources = [] + for data_source_proto in registry_proto.data_sources: + if data_source_proto.project == project: + data_sources.append(DataSource.from_proto(data_source_proto)) + return data_sources + + +def list_saved_datasets( + registry_proto: RegistryProto, project: str, allow_cache: bool = False +) -> List[SavedDataset]: + return [ + SavedDataset.from_proto(saved_dataset) + for saved_dataset in registry_proto.saved_datasets + if saved_dataset.spec.project == project + ] + + +def list_project_metadata( + registry_proto: RegistryProto, project: str +) -> List[ProjectMetadata]: + return [ + ProjectMetadata.from_proto(project_metadata) + for project_metadata in registry_proto.project_metadata + if project_metadata.project == project + ] diff --git a/sdk/python/feast/infra/registry/registry.py b/sdk/python/feast/infra/registry/registry.py index 09d22ee376..4870c1d45d 100644 --- a/sdk/python/feast/infra/registry/registry.py +++ b/sdk/python/feast/infra/registry/registry.py @@ -30,18 +30,16 @@ from feast.errors import ( ConflictingFeatureViewNames, DataSourceNotFoundException, - DataSourceObjectNotFoundException, EntityNotFoundException, FeatureServiceNotFoundException, FeatureViewNotFoundException, - OnDemandFeatureViewNotFoundException, - SavedDatasetNotFound, ValidationReferenceNotFound, ) from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.importer import import_class from feast.infra.infra_object import Infra +from feast.infra.registry import proto_registry_utils from feast.infra.registry.base_registry import BaseRegistry from feast.infra.registry.registry_store import NoopRegistryStore from feast.on_demand_feature_view import OnDemandFeatureView @@ -293,11 +291,7 @@ def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity] registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - entities = [] - for entity_proto in registry_proto.entities: - if entity_proto.spec.project == project: - entities.append(Entity.from_proto(entity_proto)) - return entities + return proto_registry_utils.list_entities(registry_proto, project) def list_data_sources( self, project: str, allow_cache: bool = False @@ -305,11 +299,7 @@ def list_data_sources( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - data_sources = [] - for data_source_proto in registry_proto.data_sources: - if data_source_proto.project == project: - data_sources.append(DataSource.from_proto(data_source_proto)) - return data_sources + return proto_registry_utils.list_data_sources(registry_proto, project) def apply_data_source( self, data_source: DataSource, project: str, commit: bool = True @@ -371,36 +361,24 @@ def apply_feature_service( def list_feature_services( self, project: str, allow_cache: bool = False ) -> List[FeatureService]: - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) - feature_services = [] - for feature_service_proto in registry.feature_services: - if feature_service_proto.spec.project == project: - feature_services.append( - FeatureService.from_proto(feature_service_proto) - ) - return feature_services + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.list_feature_services(registry_proto, project) def get_feature_service( self, name: str, project: str, allow_cache: bool = False ) -> FeatureService: - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) - - for feature_service_proto in registry.feature_services: - if ( - feature_service_proto.spec.project == project - and feature_service_proto.spec.name == name - ): - return FeatureService.from_proto(feature_service_proto) - raise FeatureServiceNotFoundException(name, project=project) + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.get_feature_service(registry_proto, name, project) def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - for entity_proto in registry_proto.entities: - if entity_proto.spec.name == name and entity_proto.spec.project == project: - return Entity.from_proto(entity_proto) - raise EntityNotFoundException(name, project=project) + return proto_registry_utils.get_entity(registry_proto, name, project) def apply_feature_view( self, feature_view: BaseFeatureView, project: str, commit: bool = True @@ -461,49 +439,38 @@ def apply_feature_view( def list_stream_feature_views( self, project: str, allow_cache: bool = False ) -> List[StreamFeatureView]: - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) - stream_feature_views = [] - for stream_feature_view in registry.stream_feature_views: - if stream_feature_view.spec.project == project: - stream_feature_views.append( - StreamFeatureView.from_proto(stream_feature_view) - ) - return stream_feature_views + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.list_stream_feature_views(registry_proto, project) def list_on_demand_feature_views( self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) - on_demand_feature_views = [] - for on_demand_feature_view in registry.on_demand_feature_views: - if on_demand_feature_view.spec.project == project: - on_demand_feature_views.append( - OnDemandFeatureView.from_proto(on_demand_feature_view) - ) - return on_demand_feature_views + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.list_on_demand_feature_views( + registry_proto, project + ) def get_on_demand_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) - - for on_demand_feature_view in registry.on_demand_feature_views: - if ( - on_demand_feature_view.spec.project == project - and on_demand_feature_view.spec.name == name - ): - return OnDemandFeatureView.from_proto(on_demand_feature_view) - raise OnDemandFeatureViewNotFoundException(name, project=project) + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.get_on_demand_feature_view( + registry_proto, name, project + ) def get_data_source( self, name: str, project: str, allow_cache: bool = False ) -> DataSource: - registry = self._get_registry_proto(project=project, allow_cache=allow_cache) - - for data_source in registry.data_sources: - if data_source.project == project and data_source.name == name: - return DataSource.from_proto(data_source) - raise DataSourceObjectNotFoundException(name, project=project) + registry_proto = self._get_registry_proto( + project=project, allow_cache=allow_cache + ) + return proto_registry_utils.get_data_source(registry_proto, name, project) def apply_materialization( self, @@ -570,21 +537,13 @@ def list_feature_views( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - feature_views: List[FeatureView] = [] - for feature_view_proto in registry_proto.feature_views: - if feature_view_proto.spec.project == project: - feature_views.append(FeatureView.from_proto(feature_view_proto)) - return feature_views + return proto_registry_utils.list_feature_views(registry_proto, project) def get_request_feature_view(self, name: str, project: str): registry_proto = self._get_registry_proto(project=project, allow_cache=False) - for feature_view_proto in registry_proto.feature_views: - if ( - feature_view_proto.spec.name == name - and feature_view_proto.spec.project == project - ): - return RequestFeatureView.from_proto(feature_view_proto) - raise FeatureViewNotFoundException(name, project) + return proto_registry_utils.get_request_feature_view( + registry_proto, name, project + ) def list_request_feature_views( self, project: str, allow_cache: bool = False @@ -592,13 +551,7 @@ def list_request_feature_views( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - feature_views: List[RequestFeatureView] = [] - for request_feature_view_proto in registry_proto.request_feature_views: - if request_feature_view_proto.spec.project == project: - feature_views.append( - RequestFeatureView.from_proto(request_feature_view_proto) - ) - return feature_views + return proto_registry_utils.list_request_feature_views(registry_proto, project) def get_feature_view( self, name: str, project: str, allow_cache: bool = False @@ -606,13 +559,7 @@ def get_feature_view( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - for feature_view_proto in registry_proto.feature_views: - if ( - feature_view_proto.spec.name == name - and feature_view_proto.spec.project == project - ): - return FeatureView.from_proto(feature_view_proto) - raise FeatureViewNotFoundException(name, project) + return proto_registry_utils.get_feature_view(registry_proto, name, project) def get_stream_feature_view( self, name: str, project: str, allow_cache: bool = False @@ -620,13 +567,9 @@ def get_stream_feature_view( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - for feature_view_proto in registry_proto.stream_feature_views: - if ( - feature_view_proto.spec.name == name - and feature_view_proto.spec.project == project - ): - return StreamFeatureView.from_proto(feature_view_proto) - raise FeatureViewNotFoundException(name, project) + return proto_registry_utils.get_stream_feature_view( + registry_proto, name, project + ) def delete_feature_service(self, name: str, project: str, commit: bool = True): self._prepare_registry_for_changes(project) @@ -753,13 +696,7 @@ def get_saved_dataset( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - for saved_dataset in registry_proto.saved_datasets: - if ( - saved_dataset.spec.name == name - and saved_dataset.spec.project == project - ): - return SavedDataset.from_proto(saved_dataset) - raise SavedDatasetNotFound(name, project=project) + return proto_registry_utils.get_saved_dataset(registry_proto, name, project) def list_saved_datasets( self, project: str, allow_cache: bool = False @@ -767,11 +704,7 @@ def list_saved_datasets( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - return [ - SavedDataset.from_proto(saved_dataset) - for saved_dataset in registry_proto.saved_datasets - if saved_dataset.spec.project == project - ] + return proto_registry_utils.list_saved_datasets(registry_proto, project) def apply_validation_reference( self, @@ -803,13 +736,9 @@ def get_validation_reference( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - for validation_reference in registry_proto.validation_references: - if ( - validation_reference.name == name - and validation_reference.project == project - ): - return ValidationReference.from_proto(validation_reference) - raise ValidationReferenceNotFound(name, project=project) + return proto_registry_utils.get_validation_reference( + registry_proto, name, project + ) def delete_validation_reference(self, name: str, project: str, commit: bool = True): registry_proto = self._prepare_registry_for_changes(project) @@ -832,11 +761,7 @@ def list_project_metadata( registry_proto = self._get_registry_proto( project=project, allow_cache=allow_cache ) - return [ - ProjectMetadata.from_proto(project_metadata) - for project_metadata in registry_proto.project_metadata - if project_metadata.project == project - ] + return proto_registry_utils.list_project_metadata(registry_proto, project) def commit(self): """Commits the state of the registry cache to the remote registry store.""" diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 5e980e90b3..f078251544 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1,7 +1,8 @@ import uuid -from datetime import datetime +from datetime import datetime, timedelta from enum import Enum from pathlib import Path +from threading import Lock from typing import Any, Callable, List, Optional, Set, Union from sqlalchemy import ( # type: ignore @@ -34,6 +35,7 @@ from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra +from feast.infra.registry import proto_registry_utils from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView from feast.project_metadata import ProjectMetadata @@ -183,6 +185,14 @@ def __init__( assert registry_config is not None, "SqlRegistry needs a valid registry_config" self.engine: Engine = create_engine(registry_config.path, echo=False) metadata.create_all(self.engine) + self.cached_registry_proto = self.proto() + self.cached_registry_proto_created = datetime.utcnow() + self._refresh_lock = Lock() + self.cached_registry_proto_ttl = timedelta( + seconds=registry_config.cache_ttl_seconds + if registry_config.cache_ttl_seconds is not None + else 0 + ) def teardown(self): for t in { @@ -200,12 +210,37 @@ def teardown(self): conn.execute(stmt) def refresh(self, project: Optional[str]): - # This method is a no-op since we're always reading the latest values from the db. - pass + self.cached_registry_proto = self.proto() + self.cached_registry_proto_created = datetime.utcnow() + + def _refresh_cached_registry_if_necessary(self): + with self._refresh_lock: + expired = ( + self.cached_registry_proto is None + or self.cached_registry_proto_created is None + ) or ( + self.cached_registry_proto_ttl.total_seconds() + > 0 # 0 ttl means infinity + and ( + datetime.utcnow() + > ( + self.cached_registry_proto_created + + self.cached_registry_proto_ttl + ) + ) + ) + + if expired: + self.refresh() def get_stream_feature_view( self, name: str, project: str, allow_cache: bool = False ): + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_stream_feature_view( + self.cached_registry_proto, name, project + ) return self._get_object( table=stream_feature_views, name=name, @@ -220,6 +255,11 @@ def get_stream_feature_view( def list_stream_feature_views( self, project: str, allow_cache: bool = False ) -> List[StreamFeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_stream_feature_views( + self.cached_registry_proto, project + ) return self._list_objects( stream_feature_views, project, @@ -238,6 +278,11 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True): ) def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Entity: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_entity( + self.cached_registry_proto, name, project + ) return self._get_object( table=entities, name=name, @@ -252,6 +297,11 @@ def get_entity(self, name: str, project: str, allow_cache: bool = False) -> Enti def get_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> FeatureView: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_feature_view( + self.cached_registry_proto, name, project + ) return self._get_object( table=feature_views, name=name, @@ -266,6 +316,11 @@ def get_feature_view( def get_on_demand_feature_view( self, name: str, project: str, allow_cache: bool = False ) -> OnDemandFeatureView: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_on_demand_feature_view( + self.cached_registry_proto, name, project + ) return self._get_object( table=on_demand_feature_views, name=name, @@ -277,7 +332,14 @@ def get_on_demand_feature_view( not_found_exception=FeatureViewNotFoundException, ) - def get_request_feature_view(self, name: str, project: str): + def get_request_feature_view( + self, name: str, project: str, allow_cache: bool = False + ): + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_request_feature_view( + self.cached_registry_proto, name, project + ) return self._get_object( table=request_feature_views, name=name, @@ -292,6 +354,11 @@ def get_request_feature_view(self, name: str, project: str): def get_feature_service( self, name: str, project: str, allow_cache: bool = False ) -> FeatureService: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_feature_service( + self.cached_registry_proto, name, project + ) return self._get_object( table=feature_services, name=name, @@ -306,6 +373,11 @@ def get_feature_service( def get_saved_dataset( self, name: str, project: str, allow_cache: bool = False ) -> SavedDataset: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_saved_dataset( + self.cached_registry_proto, name, project + ) return self._get_object( table=saved_datasets, name=name, @@ -320,6 +392,11 @@ def get_saved_dataset( def get_validation_reference( self, name: str, project: str, allow_cache: bool = False ) -> ValidationReference: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_validation_reference( + self.cached_registry_proto, name, project + ) return self._get_object( table=validation_references, name=name, @@ -332,6 +409,11 @@ def get_validation_reference( ) def list_entities(self, project: str, allow_cache: bool = False) -> List[Entity]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_entities( + self.cached_registry_proto, project + ) return self._list_objects( entities, project, EntityProto, Entity, "entity_proto" ) @@ -367,6 +449,11 @@ def delete_feature_service(self, name: str, project: str, commit: bool = True): def get_data_source( self, name: str, project: str, allow_cache: bool = False ) -> DataSource: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.get_data_source( + self.cached_registry_proto, name, project + ) return self._get_object( table=data_sources, name=name, @@ -381,6 +468,11 @@ def get_data_source( def list_data_sources( self, project: str, allow_cache: bool = False ) -> List[DataSource]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_data_sources( + self.cached_registry_proto, project + ) return self._list_objects( data_sources, project, DataSourceProto, DataSource, "data_source_proto" ) @@ -425,6 +517,11 @@ def delete_data_source(self, name: str, project: str, commit: bool = True): def list_feature_services( self, project: str, allow_cache: bool = False ) -> List[FeatureService]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_feature_services( + self.cached_registry_proto, project + ) return self._list_objects( feature_services, project, @@ -436,6 +533,11 @@ def list_feature_services( def list_feature_views( self, project: str, allow_cache: bool = False ) -> List[FeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_feature_views( + self.cached_registry_proto, project + ) return self._list_objects( feature_views, project, FeatureViewProto, FeatureView, "feature_view_proto" ) @@ -443,6 +545,11 @@ def list_feature_views( def list_saved_datasets( self, project: str, allow_cache: bool = False ) -> List[SavedDataset]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_saved_datasets( + self.cached_registry_proto, project + ) return self._list_objects( saved_datasets, project, @@ -454,6 +561,11 @@ def list_saved_datasets( def list_request_feature_views( self, project: str, allow_cache: bool = False ) -> List[RequestFeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_request_feature_views( + self.cached_registry_proto, project + ) return self._list_objects( request_feature_views, project, @@ -465,6 +577,11 @@ def list_request_feature_views( def list_on_demand_feature_views( self, project: str, allow_cache: bool = False ) -> List[OnDemandFeatureView]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_on_demand_feature_views( + self.cached_registry_proto, project + ) return self._list_objects( on_demand_feature_views, project, @@ -476,6 +593,11 @@ def list_on_demand_feature_views( def list_project_metadata( self, project: str, allow_cache: bool = False ) -> List[ProjectMetadata]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_project_metadata( + self.cached_registry_proto, project + ) with self.engine.connect() as conn: stmt = select(feast_metadata).where( feast_metadata.c.project_id == project,