From 25d9c1ab946349bffebb6fea668dfabeb57f1227 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 1 Apr 2021 19:19:36 -0700 Subject: [PATCH 1/8] Fail on missing registry and add cache test Signed-off-by: Willem Pienaar --- sdk/python/feast/registry.py | 97 +++++++++++++---------- sdk/python/feast/repo_config.py | 4 + sdk/python/tests/test_online_retrieval.py | 30 ++++++- 3 files changed, 86 insertions(+), 45 deletions(-) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 84ec9becb3..2115c084a8 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -69,8 +69,8 @@ def apply_entity(self, entity: Entity, project: str): def updater(registry_proto: RegistryProto): for idx, existing_entity_proto in enumerate(registry_proto.entities): if ( - existing_entity_proto.spec.name == entity_proto.spec.name - and existing_entity_proto.spec.project == project + existing_entity_proto.spec.name == entity_proto.spec.name + and existing_entity_proto.spec.project == project ): del registry_proto.entities[idx] registry_proto.entities.append(entity_proto) @@ -78,7 +78,7 @@ def updater(registry_proto: RegistryProto): registry_proto.entities.append(entity_proto) return registry_proto - self._registry_store.update_registry(updater) + self._registry_store.update_registry_proto(updater) return def list_entities(self, project: str) -> List[Entity]: @@ -91,7 +91,7 @@ def list_entities(self, project: str) -> List[Entity]: Returns: List of entities """ - registry_proto = self._registry_store.get_registry() + registry_proto = self._registry_store.get_registry_proto() entities = [] for entity_proto in registry_proto.entities: if entity_proto.spec.project == project: @@ -110,7 +110,7 @@ def get_entity(self, name: str, project: str) -> Entity: Returns either the specified entity, or raises an exception if none is found """ - registry_proto = self._registry_store.get_registry() + registry_proto = self._registry_store.get_registry_proto() for entity_proto in registry_proto.entities: if entity_proto.spec.name == name and entity_proto.spec.project == project: return Entity.from_proto(entity_proto) @@ -130,12 +130,12 @@ def apply_feature_table(self, feature_table: FeatureTable, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_table_proto in enumerate( - registry_proto.feature_tables + registry_proto.feature_tables ): if ( - existing_feature_table_proto.spec.name - == feature_table_proto.spec.name - and existing_feature_table_proto.spec.project == project + existing_feature_table_proto.spec.name + == feature_table_proto.spec.name + and existing_feature_table_proto.spec.project == project ): del registry_proto.feature_tables[idx] registry_proto.feature_tables.append(feature_table_proto) @@ -143,7 +143,7 @@ def updater(registry_proto: RegistryProto): registry_proto.feature_tables.append(feature_table_proto) return registry_proto - self._registry_store.update_registry(updater) + self._registry_store.update_registry_proto(updater) return def apply_feature_view(self, feature_view: FeatureView, project: str): @@ -160,12 +160,12 @@ def apply_feature_view(self, feature_view: FeatureView, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_view_proto in enumerate( - registry_proto.feature_views + registry_proto.feature_views ): if ( - existing_feature_view_proto.spec.name - == feature_view_proto.spec.name - and existing_feature_view_proto.spec.project == project + existing_feature_view_proto.spec.name + == feature_view_proto.spec.name + and existing_feature_view_proto.spec.project == project ): del registry_proto.feature_views[idx] registry_proto.feature_views.append(feature_view_proto) @@ -173,7 +173,7 @@ def updater(registry_proto: RegistryProto): registry_proto.feature_views.append(feature_view_proto) return registry_proto - self._registry_store.update_registry(updater) + self._registry_store.update_registry_proto(updater) def list_feature_tables(self, project: str) -> List[FeatureTable]: """ @@ -185,7 +185,7 @@ def list_feature_tables(self, project: str) -> List[FeatureTable]: Returns: List of feature tables """ - registry_proto = self._registry_store.get_registry() + registry_proto = self._registry_store.get_registry_proto() feature_tables = [] for feature_table_proto in registry_proto.feature_tables: if feature_table_proto.spec.project == project: @@ -202,7 +202,7 @@ def list_feature_views(self, project: str) -> List[FeatureView]: Returns: List of feature views """ - registry_proto = self._registry_store.get_registry() + registry_proto = self._registry_store.get_registry_proto() feature_views = [] for feature_view_proto in registry_proto.feature_views: if feature_view_proto.spec.project == project: @@ -221,11 +221,11 @@ def get_feature_table(self, name: str, project: str) -> FeatureTable: Returns either the specified feature table, or raises an exception if none is found """ - registry_proto = self._registry_store.get_registry() + registry_proto = self._registry_store.get_registry_proto() for feature_table_proto in registry_proto.feature_tables: if ( - feature_table_proto.spec.name == name - and feature_table_proto.spec.project == project + feature_table_proto.spec.name == name + and feature_table_proto.spec.project == project ): return FeatureTable.from_proto(feature_table_proto) raise Exception(f"Feature table {name} does not exist in project {project}") @@ -242,11 +242,11 @@ def get_feature_view(self, name: str, project: str) -> FeatureView: Returns either the specified feature view, or raises an exception if none is found """ - registry_proto = self._registry_store.get_registry() + registry_proto = self._registry_store.get_registry_proto() for feature_view_proto in registry_proto.feature_views: if ( - feature_view_proto.spec.name == name - and feature_view_proto.spec.project == project + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project ): return FeatureView.from_proto(feature_view_proto) raise Exception(f"Feature view {name} does not exist in project {project}") @@ -262,17 +262,17 @@ def delete_feature_table(self, name: str, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_table_proto in enumerate( - registry_proto.feature_tables + registry_proto.feature_tables ): if ( - existing_feature_table_proto.spec.name == name - and existing_feature_table_proto.spec.project == project + existing_feature_table_proto.spec.name == name + and existing_feature_table_proto.spec.project == project ): del registry_proto.feature_tables[idx] return registry_proto raise Exception(f"Feature table {name} does not exist in project {project}") - self._registry_store.update_registry(updater) + self._registry_store.update_registry_proto(updater) return def delete_feature_view(self, name: str, project: str): @@ -286,17 +286,17 @@ def delete_feature_view(self, name: str, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_view_proto in enumerate( - registry_proto.feature_views + registry_proto.feature_views ): if ( - existing_feature_view_proto.spec.name == name - and existing_feature_view_proto.spec.project == project + existing_feature_view_proto.spec.name == name + and existing_feature_view_proto.spec.project == project ): del registry_proto.feature_views[idx] return registry_proto raise Exception(f"Feature view {name} does not exist in project {project}") - self._registry_store.update_registry(updater) + self._registry_store.update_registry_proto(updater) class RegistryStore(ABC): @@ -306,7 +306,7 @@ class RegistryStore(ABC): """ @abstractmethod - def get_registry(self): + def get_registry_proto(self): """ Retrieves the registry proto from the registry path. If there is no file at that path, returns an empty registry proto. @@ -317,7 +317,7 @@ def get_registry(self): pass @abstractmethod - def update_registry(self, updater: Callable[[RegistryProto], RegistryProto]): + def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto]): """ Updates the registry using the function passed in. If the registry proto has not been created yet this method will create it. This method writes to the registry path. @@ -333,16 +333,21 @@ def __init__(self, filepath: str): self._filepath = Path(filepath) return - def get_registry(self): + def get_registry_proto(self): registry_proto = RegistryProto() if self._filepath.exists(): registry_proto.ParseFromString(self._filepath.read_bytes()) - else: + return registry_proto + raise FileNotFoundError( + f"Registry not found at path \"{self._filepath}\". Have you run \"feast apply\"?") + + def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto]): + try: + registry_proto = self.get_registry_proto() + except FileNotFoundError: + registry_proto = RegistryProto() registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION - return registry_proto - def update_registry(self, updater: Callable[[RegistryProto], RegistryProto]): - registry_proto = self.get_registry() registry_proto = updater(registry_proto) self._write_registry(registry_proto) return @@ -373,7 +378,7 @@ def __init__(self, uri: str): self._blob = self._uri.path.lstrip("/") return - def get_registry(self): + def get_registry_proto(self): from google.cloud import storage from google.cloud.exceptions import NotFound @@ -389,12 +394,16 @@ def get_registry(self): self.gcs_client.download_blob_to_file(self._uri.geturl(), file_obj) file_obj.seek(0) registry_proto.ParseFromString(file_obj.read()) - else: - registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION - return registry_proto + return registry_proto + raise FileNotFoundError( + f"Registry not found at path \"{self._uri.geturl()}\". Have you run \"feast apply\"?") - def update_registry(self, updater: Callable[[RegistryProto], RegistryProto]): - registry_proto = self.get_registry() + def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto]): + try: + registry_proto = self.get_registry_proto() + except FileNotFoundError: + registry_proto = RegistryProto() + registry_proto.registry_schema_version = REGISTRY_SCHEMA_VERSION registry_proto = updater(registry_proto) self._write_registry(registry_proto) return diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index f270ed5d9b..7f72499485 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -53,6 +53,10 @@ class RepoConfig(FeastBaseModel): online_store: Optional[OnlineStoreConfig] = None """ OnlineStoreConfig: Online store configuration (optional depending on provider) """ + # TODO: Nest in `metadata_store_config` object + auto_refresh_registry: bool = True + auto_refresh_registry_ttl_seconds: int = 600 + # This is the JSON Schema for config validation. We use this to have nice detailed error messages # for config validation, something that bindr unfortunately doesn't provide out of the box. diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index 18762c603f..4456233cee 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -1,7 +1,9 @@ +import os from datetime import datetime import pytest +from feast import FeatureStore, RepoConfig from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto from tests.cli_utils import CliRunner, get_example_repo @@ -13,7 +15,6 @@ def test_online() -> None: """ runner = CliRunner() with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store: - # Write some data to two tables registry = store._get_registry() table = registry.get_feature_view( @@ -77,3 +78,30 @@ def test_online() -> None: store.get_online_features( feature_refs=["driver_locations_bad:lon"], entity_rows=[{"driver": 1}], ) + + # Create new FeatureStore object with auto_refresh_registry disabled + fs_no_autoload = FeatureStore(config=RepoConfig( + metadata_store=store.config.metadata_store, + online_store=store.config.online_store, + project=store.config.project, + provider=store.config.provider, + auto_refresh_registry=False, + auto_refresh_registry_ttl_seconds=2 + )) + + # Should download the registry and cache it permanently (or until manually refreshed) + result = fs_no_autoload.get_online_features( + feature_refs=["driver_locations:lon", "driver_locations_2:lon"], + entity_rows=[{"driver": 1}, {"driver": 123}], + ) + assert result.to_dict()["driver_locations:lon"] == ["1.0", None] + + # Rename the metadata.db so that it cant be used for refreshes + os.rename(store.config.metadata_store, store.config.metadata_store + "_fake") + + # Should use cached registry + result = fs_no_autoload.get_online_features( + feature_refs=["driver_locations:lon", "driver_locations_2:lon"], + entity_rows=[{"driver": 1}, {"driver": 123}], + ) + assert result.to_dict()["driver_locations:lon"] == ["1.0", None] \ No newline at end of file From e641232387eac3176d773f699bdde11ee8130c1c Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 1 Apr 2021 20:58:00 -0700 Subject: [PATCH 2/8] Add basic support for caching to Registry Signed-off-by: Willem Pienaar --- sdk/python/feast/feature_store.py | 81 +++++++++++----------- sdk/python/feast/registry.py | 44 +++++++++--- sdk/python/feast/repo_config.py | 3 +- sdk/python/feast/repo_operations.py | 4 +- sdk/python/tests/online_read_write_test.py | 3 +- sdk/python/tests/test_online_retrieval.py | 12 ++-- 6 files changed, 86 insertions(+), 61 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index ecd40fa9bf..a662767a2d 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. from collections import defaultdict -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union @@ -51,9 +51,10 @@ class FeatureStore: config: RepoConfig repo_path: Optional[str] + _registry: Registry def __init__( - self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, + self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): self.repo_path = repo_path if repo_path is not None and config is not None: @@ -71,6 +72,8 @@ def __init__( local=LocalOnlineStoreConfig(path="online_store.db") ), ) + self._registry = Registry(self.config.metadata_store, + cache_ttl=timedelta(seconds=self.config.registry_cache_ttl_seconds)) @property def project(self) -> str: @@ -79,8 +82,9 @@ def project(self) -> str: def _get_provider(self) -> Provider: return get_provider(self.config) - def _get_registry(self) -> Registry: - return Registry(self.config.metadata_store) + def refresh_registry(self): + self._registry = Registry(self.config.metadata_store) + self._registry.refresh() def list_entities(self) -> List[Entity]: """ @@ -89,7 +93,7 @@ def list_entities(self) -> List[Entity]: Returns: List of entities """ - return self._get_registry().list_entities(self.project) + return self._registry.list_entities(self.project) def list_feature_views(self) -> List[FeatureView]: """ @@ -98,7 +102,7 @@ def list_feature_views(self) -> List[FeatureView]: Returns: List of feature views """ - return self._get_registry().list_feature_views(self.project) + return self._registry.list_feature_views(self.project) def get_entity(self, name: str) -> Entity: """ @@ -111,7 +115,7 @@ def get_entity(self, name: str) -> Entity: Returns either the specified entity, or raises an exception if none is found """ - return self._get_registry().get_entity(name, self.project) + return self._registry.get_entity(name, self.project) def get_feature_view(self, name: str) -> FeatureView: """ @@ -124,7 +128,7 @@ def get_feature_view(self, name: str) -> FeatureView: Returns either the specified feature view, or raises an exception if none is found """ - return self._get_registry().get_feature_view(name, self.project) + return self._registry.get_feature_view(name, self.project) def delete_feature_view(self, name: str): """ @@ -133,7 +137,7 @@ def delete_feature_view(self, name: str): Args: name: Name of feature view """ - return self._get_registry().delete_feature_view(name, self.project) + return self._registry.delete_feature_view(name, self.project) def apply(self, objects: List[Union[FeatureView, Entity]]): """Register objects to metadata store and update related infrastructure. @@ -166,15 +170,14 @@ def apply(self, objects: List[Union[FeatureView, Entity]]): # TODO: Add locking # TODO: Optimize by only making a single call (read/write) - registry = self._get_registry() views_to_update = [] for ob in objects: if isinstance(ob, FeatureView): - registry.apply_feature_view(ob, project=self.config.project) + self._registry.apply_feature_view(ob, project=self.config.project) views_to_update.append(ob) elif isinstance(ob, Entity): - registry.apply_entity(ob, project=self.config.project) + self._registry.apply_entity(ob, project=self.config.project) else: raise ValueError( f"Unknown object type ({type(ob)}) provided as part of apply() call" @@ -187,7 +190,7 @@ def apply(self, objects: List[Union[FeatureView, Entity]]): ) def get_historical_features( - self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str], + self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str], ) -> RetrievalJob: """Enrich an entity dataframe with historical feature values for either training or batch scoring. @@ -226,8 +229,7 @@ def get_historical_features( >>> model.fit(feature_data) # insert your modeling framework here. """ - registry = self._get_registry() - all_feature_views = registry.list_feature_views(project=self.config.project) + all_feature_views = self._registry.list_feature_views(project=self.config.project) feature_views = _get_requested_feature_views(feature_refs, all_feature_views) offline_store = get_offline_store_for_retrieval(feature_views) job = offline_store.get_historical_features( @@ -236,7 +238,7 @@ def get_historical_features( return job def materialize_incremental( - self, feature_views: Optional[List[str]], end_date: datetime, + self, feature_views: Optional[List[str]], end_date: datetime, ) -> None: """ Materialize incremental new data from the offline store into the online store. @@ -263,14 +265,13 @@ def materialize_incremental( >>> ) """ feature_views_to_materialize = [] - registry = self._get_registry() if feature_views is None: - feature_views_to_materialize = registry.list_feature_views( + feature_views_to_materialize = self._registry.list_feature_views( self.config.project ) else: for name in feature_views: - feature_view = registry.get_feature_view(name, self.config.project) + feature_view = self._registry.get_feature_view(name, self.config.project) feature_views_to_materialize.append(feature_view) # TODO paging large loads @@ -285,10 +286,10 @@ def materialize_incremental( self._materialize_single_feature_view(feature_view, start_date, end_date) def materialize( - self, - feature_views: Optional[List[str]], - start_date: datetime, - end_date: datetime, + self, + feature_views: Optional[List[str]], + start_date: datetime, + end_date: datetime, ) -> None: """ Materialize data from the offline store into the online store. @@ -316,14 +317,13 @@ def materialize( >>> ) """ feature_views_to_materialize = [] - registry = self._get_registry() if feature_views is None: - feature_views_to_materialize = registry.list_feature_views( + feature_views_to_materialize = self._registry.list_feature_views( self.config.project ) else: for name in feature_views: - feature_view = registry.get_feature_view(name, self.config.project) + feature_view = self._registry.get_feature_view(name, self.config.project) feature_views_to_materialize.append(feature_view) # TODO paging large loads @@ -331,7 +331,7 @@ def materialize( self._materialize_single_feature_view(feature_view, start_date, end_date) def _materialize_single_feature_view( - self, feature_view: FeatureView, start_date: datetime, end_date: datetime + self, feature_view: FeatureView, start_date: datetime, end_date: datetime ) -> None: ( entity_names, @@ -365,7 +365,7 @@ def _materialize_single_feature_view( self.apply([feature_view]) def get_online_features( - self, feature_refs: List[str], entity_rows: List[Dict[str, Any]], + self, feature_refs: List[str], entity_rows: List[Dict[str, Any]], ) -> OnlineResponse: """ Retrieves the latest online feature data. @@ -401,10 +401,10 @@ def get_online_features( return OnlineResponse(response) def _get_online_features( - self, - entity_rows: List[GetOnlineFeaturesRequestV2.EntityRow], - feature_refs: List[str], - project: str, + self, + entity_rows: List[GetOnlineFeaturesRequestV2.EntityRow], + feature_refs: List[str], + project: str, ) -> GetOnlineFeaturesResponse: provider = self._get_provider() @@ -416,8 +416,7 @@ def _get_online_features( entity_keys.append(_entity_row_to_key(row)) result_rows.append(_entity_row_to_field_values(row)) - registry = self._get_registry() - all_feature_views = registry.list_feature_views(project=self.config.project) + all_feature_views = self._registry.list_feature_views(project=self.config.project, allow_cache=True) grouped_refs = _group_refs(feature_refs, all_feature_views) for table, requested_features in grouped_refs: @@ -454,7 +453,7 @@ def _entity_row_to_key(row: GetOnlineFeaturesRequestV2.EntityRow) -> EntityKeyPr def _entity_row_to_field_values( - row: GetOnlineFeaturesRequestV2.EntityRow, + row: GetOnlineFeaturesRequestV2.EntityRow, ) -> GetOnlineFeaturesResponse.FieldValues: result = GetOnlineFeaturesResponse.FieldValues() for k in row.fields: @@ -465,7 +464,7 @@ def _entity_row_to_field_values( def _group_refs( - feature_refs: List[str], all_feature_views: List[FeatureView] + feature_refs: List[str], all_feature_views: List[FeatureView] ) -> List[Tuple[FeatureView, List[str]]]: """ Get list of feature views and corresponding feature names based on feature references""" @@ -488,7 +487,7 @@ def _group_refs( def _run_reverse_field_mapping( - feature_view: FeatureView, + feature_view: FeatureView, ) -> Tuple[List[str], List[str], str, Optional[str]]: """ If a field mapping exists, run it in reverse on the entity names, @@ -521,7 +520,7 @@ def _run_reverse_field_mapping( created_timestamp_column = ( reverse_field_mapping[created_timestamp_column] if created_timestamp_column - and created_timestamp_column in reverse_field_mapping.keys() + and created_timestamp_column in reverse_field_mapping.keys() else created_timestamp_column ) entity_names = [ @@ -541,7 +540,7 @@ def _run_reverse_field_mapping( def _run_forward_field_mapping( - table: pyarrow.Table, field_mapping: Dict[str, str], + table: pyarrow.Table, field_mapping: Dict[str, str], ) -> pyarrow.Table: # run field mapping in the forward direction cols = table.column_names @@ -553,7 +552,7 @@ def _run_forward_field_mapping( def _convert_arrow_to_proto( - table: pyarrow.Table, feature_view: FeatureView + table: pyarrow.Table, feature_view: FeatureView ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: rows_to_write = [] @@ -605,7 +604,7 @@ def _coerce_datetime(ts): def _get_requested_feature_views( - feature_refs: List[str], all_feature_views: List[FeatureView] + feature_refs: List[str], all_feature_views: List[FeatureView] ) -> List[FeatureView]: """Get list of feature views based on feature references""" return list(view for view, _ in _group_refs(feature_refs, all_feature_views)) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 2115c084a8..138471ef25 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -14,7 +14,7 @@ import uuid from abc import ABC, abstractmethod -from datetime import datetime +from datetime import datetime, timedelta from pathlib import Path from tempfile import TemporaryFile from typing import Callable, List @@ -34,8 +34,11 @@ class Registry: """ Registry: A registry allows for the management and persistence of feature definitions and related metadata. """ + cached_registry_proto: RegistryProto = None + cached_registry_proto_created: datetime = None + cached_registry_proto_ttl: timedelta - def __init__(self, registry_path: str): + def __init__(self, registry_path: str, cache_ttl: timedelta): """ Create the Registry object. @@ -52,6 +55,7 @@ def __init__(self, registry_path: str): raise Exception( f"Registry path {registry_path} has unsupported scheme {uri.scheme}. Supported schemes are file and gs." ) + self.cached_registry_proto_ttl = cache_ttl return def apply_entity(self, entity: Entity, project: str): @@ -91,7 +95,7 @@ def list_entities(self, project: str) -> List[Entity]: Returns: List of entities """ - registry_proto = self._registry_store.get_registry_proto() + registry_proto = self._get_registry_proto() entities = [] for entity_proto in registry_proto.entities: if entity_proto.spec.project == project: @@ -110,12 +114,22 @@ def get_entity(self, name: str, project: str) -> Entity: Returns either the specified entity, or raises an exception if none is found """ - registry_proto = self._registry_store.get_registry_proto() + registry_proto = self._get_registry_proto() for entity_proto in registry_proto.entities: if entity_proto.spec.name == name and entity_proto.spec.project == project: return Entity.from_proto(entity_proto) raise Exception(f"Entity {name} does not exist in project {project}") + def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: + first_run = self.cached_registry_proto is None or self.cached_registry_proto_created is None + if first_run or not allow_cache or \ + (datetime.now() > self.cached_registry_proto_created + self.cached_registry_proto_ttl): + registry_proto = self._registry_store.get_registry_proto() + self.cached_registry_proto = registry_proto + self.cached_registry_proto_created = datetime.now() + return self.cached_registry_proto + + def apply_feature_table(self, feature_table: FeatureTable, project: str): """ Registers a single feature table with Feast @@ -146,6 +160,7 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) return + def apply_feature_view(self, feature_view: FeatureView, project: str): """ Registers a single feature view with Feast @@ -175,6 +190,7 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) + def list_feature_tables(self, project: str) -> List[FeatureTable]: """ Retrieve a list of feature tables from the registry @@ -185,30 +201,33 @@ def list_feature_tables(self, project: str) -> List[FeatureTable]: Returns: List of feature tables """ - registry_proto = self._registry_store.get_registry_proto() + registry_proto = self._get_registry_proto() feature_tables = [] for feature_table_proto in registry_proto.feature_tables: if feature_table_proto.spec.project == project: feature_tables.append(FeatureTable.from_proto(feature_table_proto)) return feature_tables - def list_feature_views(self, project: str) -> List[FeatureView]: + + def list_feature_views(self, project: str, allow_cache: bool = False) -> List[FeatureView]: """ Retrieve a list of feature views from the registry Args: + allow_cache: Allow returning feature views from the cached registry project: Filter feature tables based on project name Returns: List of feature views """ - registry_proto = self._registry_store.get_registry_proto() + registry_proto = self._get_registry_proto(allow_cache=allow_cache) feature_views = [] for feature_view_proto in registry_proto.feature_views: if feature_view_proto.spec.project == project: feature_views.append(FeatureView.from_proto(feature_view_proto)) return feature_views + def get_feature_table(self, name: str, project: str) -> FeatureTable: """ Retrieves a feature table. @@ -221,7 +240,7 @@ def get_feature_table(self, name: str, project: str) -> FeatureTable: Returns either the specified feature table, or raises an exception if none is found """ - registry_proto = self._registry_store.get_registry_proto() + registry_proto = self._get_registry_proto() for feature_table_proto in registry_proto.feature_tables: if ( feature_table_proto.spec.name == name @@ -230,6 +249,7 @@ def get_feature_table(self, name: str, project: str) -> FeatureTable: return FeatureTable.from_proto(feature_table_proto) raise Exception(f"Feature table {name} does not exist in project {project}") + def get_feature_view(self, name: str, project: str) -> FeatureView: """ Retrieves a feature view. @@ -242,7 +262,7 @@ def get_feature_view(self, name: str, project: str) -> FeatureView: Returns either the specified feature view, or raises an exception if none is found """ - registry_proto = self._registry_store.get_registry_proto() + registry_proto = self._get_registry_proto() for feature_view_proto in registry_proto.feature_views: if ( feature_view_proto.spec.name == name @@ -251,6 +271,7 @@ def get_feature_view(self, name: str, project: str) -> FeatureView: return FeatureView.from_proto(feature_view_proto) raise Exception(f"Feature view {name} does not exist in project {project}") + def delete_feature_table(self, name: str, project: str): """ Deletes a feature table or raises an exception if not found. @@ -275,6 +296,7 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) return + def delete_feature_view(self, name: str, project: str): """ Deletes a feature view or raises an exception if not found. @@ -299,6 +321,10 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) + def refresh(self): + pass + + class RegistryStore(ABC): """ RegistryStore: abstract base class implemented by specific backends (local file system, GCS) diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 7f72499485..653a8b0640 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -54,8 +54,7 @@ class RepoConfig(FeastBaseModel): """ OnlineStoreConfig: Online store configuration (optional depending on provider) """ # TODO: Nest in `metadata_store_config` object - auto_refresh_registry: bool = True - auto_refresh_registry_ttl_seconds: int = 600 + registry_cache_ttl_seconds: int = 600 # This is the JSON Schema for config validation. We use this to have nice detailed error messages diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 5a13b489f4..6780c9b4e5 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -60,7 +60,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): sys.path.append("") project = repo_config.project - registry = Registry(repo_config.metadata_store) + registry = Registry(repo_config.metadata_store, timedelta(seconds=repo_config.registry_cache_ttl_seconds)) repo = parse_repo(repo_path) for entity in repo.entities: @@ -118,7 +118,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): def teardown(repo_config: RepoConfig, repo_path: Path): - registry = Registry(repo_config.metadata_store) + registry = Registry(repo_config.metadata_store, cache_ttl=timedelta(seconds=repo_config.registry_cache_ttl_seconds)) project = repo_config.project registry_tables: List[Union[FeatureTable, FeatureView]] = [] registry_tables.extend(registry.list_feature_tables(project=project)) diff --git a/sdk/python/tests/online_read_write_test.py b/sdk/python/tests/online_read_write_test.py index ae76f8a76b..ad22ee876a 100644 --- a/sdk/python/tests/online_read_write_test.py +++ b/sdk/python/tests/online_read_write_test.py @@ -10,8 +10,7 @@ def basic_rw_test(store: FeatureStore, view_name: str) -> None: This is a provider-independent test suite for reading and writing from the online store, to be used by provider-specific tests. """ - registry = store._get_registry() - table = registry.get_feature_view(project=store.project, name=view_name) + table = store.get_feature_view(name=view_name) provider = store._get_provider() diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index 4456233cee..e501ed91db 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -16,7 +16,7 @@ def test_online() -> None: runner = CliRunner() with runner.local_repo(get_example_repo("example_feature_repo_1.py")) as store: # Write some data to two tables - registry = store._get_registry() + registry = store._registry table = registry.get_feature_view( project=store.config.project, name="driver_locations" ) @@ -79,14 +79,13 @@ def test_online() -> None: feature_refs=["driver_locations_bad:lon"], entity_rows=[{"driver": 1}], ) - # Create new FeatureStore object with auto_refresh_registry disabled + # Create new FeatureStore object with fast cache invalidation fs_no_autoload = FeatureStore(config=RepoConfig( metadata_store=store.config.metadata_store, online_store=store.config.online_store, project=store.config.project, provider=store.config.provider, - auto_refresh_registry=False, - auto_refresh_registry_ttl_seconds=2 + registry_cache_ttl_seconds=1 )) # Should download the registry and cache it permanently (or until manually refreshed) @@ -104,4 +103,7 @@ def test_online() -> None: feature_refs=["driver_locations:lon", "driver_locations_2:lon"], entity_rows=[{"driver": 1}, {"driver": 123}], ) - assert result.to_dict()["driver_locations:lon"] == ["1.0", None] \ No newline at end of file + assert result.to_dict()["driver_locations:lon"] == ["1.0", None] + + # Restore metadata.db so that we can tear down the infra + os.rename(store.config.metadata_store + "_fake", store.config.metadata_store) \ No newline at end of file From 2a3a7f3b152703c4565f05eee9884cb754e4a91a Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 1 Apr 2021 22:54:44 -0700 Subject: [PATCH 3/8] Add additional tests to registry caching Signed-off-by: Willem Pienaar --- sdk/python/feast/feature_store.py | 67 ++++++++++-------- sdk/python/feast/registry.py | 84 ++++++++++++----------- sdk/python/feast/repo_operations.py | 10 ++- sdk/python/tests/test_online_retrieval.py | 76 ++++++++++++++++---- 4 files changed, 157 insertions(+), 80 deletions(-) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index a662767a2d..df4afe9cdf 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -54,7 +54,7 @@ class FeatureStore: _registry: Registry def __init__( - self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, + self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): self.repo_path = repo_path if repo_path is not None and config is not None: @@ -72,8 +72,10 @@ def __init__( local=LocalOnlineStoreConfig(path="online_store.db") ), ) - self._registry = Registry(self.config.metadata_store, - cache_ttl=timedelta(seconds=self.config.registry_cache_ttl_seconds)) + self._registry = Registry( + self.config.metadata_store, + cache_ttl=timedelta(seconds=self.config.registry_cache_ttl_seconds), + ) @property def project(self) -> str: @@ -83,7 +85,10 @@ def _get_provider(self) -> Provider: return get_provider(self.config) def refresh_registry(self): - self._registry = Registry(self.config.metadata_store) + self._registry = Registry( + self.config.metadata_store, + cache_ttl=timedelta(seconds=self.config.registry_cache_ttl_seconds), + ) self._registry.refresh() def list_entities(self) -> List[Entity]: @@ -190,7 +195,7 @@ def apply(self, objects: List[Union[FeatureView, Entity]]): ) def get_historical_features( - self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str], + self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str], ) -> RetrievalJob: """Enrich an entity dataframe with historical feature values for either training or batch scoring. @@ -229,7 +234,9 @@ def get_historical_features( >>> model.fit(feature_data) # insert your modeling framework here. """ - all_feature_views = self._registry.list_feature_views(project=self.config.project) + all_feature_views = self._registry.list_feature_views( + project=self.config.project + ) feature_views = _get_requested_feature_views(feature_refs, all_feature_views) offline_store = get_offline_store_for_retrieval(feature_views) job = offline_store.get_historical_features( @@ -238,7 +245,7 @@ def get_historical_features( return job def materialize_incremental( - self, feature_views: Optional[List[str]], end_date: datetime, + self, feature_views: Optional[List[str]], end_date: datetime, ) -> None: """ Materialize incremental new data from the offline store into the online store. @@ -271,7 +278,9 @@ def materialize_incremental( ) else: for name in feature_views: - feature_view = self._registry.get_feature_view(name, self.config.project) + feature_view = self._registry.get_feature_view( + name, self.config.project + ) feature_views_to_materialize.append(feature_view) # TODO paging large loads @@ -286,10 +295,10 @@ def materialize_incremental( self._materialize_single_feature_view(feature_view, start_date, end_date) def materialize( - self, - feature_views: Optional[List[str]], - start_date: datetime, - end_date: datetime, + self, + feature_views: Optional[List[str]], + start_date: datetime, + end_date: datetime, ) -> None: """ Materialize data from the offline store into the online store. @@ -323,7 +332,9 @@ def materialize( ) else: for name in feature_views: - feature_view = self._registry.get_feature_view(name, self.config.project) + feature_view = self._registry.get_feature_view( + name, self.config.project + ) feature_views_to_materialize.append(feature_view) # TODO paging large loads @@ -331,7 +342,7 @@ def materialize( self._materialize_single_feature_view(feature_view, start_date, end_date) def _materialize_single_feature_view( - self, feature_view: FeatureView, start_date: datetime, end_date: datetime + self, feature_view: FeatureView, start_date: datetime, end_date: datetime ) -> None: ( entity_names, @@ -365,7 +376,7 @@ def _materialize_single_feature_view( self.apply([feature_view]) def get_online_features( - self, feature_refs: List[str], entity_rows: List[Dict[str, Any]], + self, feature_refs: List[str], entity_rows: List[Dict[str, Any]], ) -> OnlineResponse: """ Retrieves the latest online feature data. @@ -401,10 +412,10 @@ def get_online_features( return OnlineResponse(response) def _get_online_features( - self, - entity_rows: List[GetOnlineFeaturesRequestV2.EntityRow], - feature_refs: List[str], - project: str, + self, + entity_rows: List[GetOnlineFeaturesRequestV2.EntityRow], + feature_refs: List[str], + project: str, ) -> GetOnlineFeaturesResponse: provider = self._get_provider() @@ -416,7 +427,9 @@ def _get_online_features( entity_keys.append(_entity_row_to_key(row)) result_rows.append(_entity_row_to_field_values(row)) - all_feature_views = self._registry.list_feature_views(project=self.config.project, allow_cache=True) + all_feature_views = self._registry.list_feature_views( + project=self.config.project, allow_cache=True + ) grouped_refs = _group_refs(feature_refs, all_feature_views) for table, requested_features in grouped_refs: @@ -453,7 +466,7 @@ def _entity_row_to_key(row: GetOnlineFeaturesRequestV2.EntityRow) -> EntityKeyPr def _entity_row_to_field_values( - row: GetOnlineFeaturesRequestV2.EntityRow, + row: GetOnlineFeaturesRequestV2.EntityRow, ) -> GetOnlineFeaturesResponse.FieldValues: result = GetOnlineFeaturesResponse.FieldValues() for k in row.fields: @@ -464,7 +477,7 @@ def _entity_row_to_field_values( def _group_refs( - feature_refs: List[str], all_feature_views: List[FeatureView] + feature_refs: List[str], all_feature_views: List[FeatureView] ) -> List[Tuple[FeatureView, List[str]]]: """ Get list of feature views and corresponding feature names based on feature references""" @@ -487,7 +500,7 @@ def _group_refs( def _run_reverse_field_mapping( - feature_view: FeatureView, + feature_view: FeatureView, ) -> Tuple[List[str], List[str], str, Optional[str]]: """ If a field mapping exists, run it in reverse on the entity names, @@ -520,7 +533,7 @@ def _run_reverse_field_mapping( created_timestamp_column = ( reverse_field_mapping[created_timestamp_column] if created_timestamp_column - and created_timestamp_column in reverse_field_mapping.keys() + and created_timestamp_column in reverse_field_mapping.keys() else created_timestamp_column ) entity_names = [ @@ -540,7 +553,7 @@ def _run_reverse_field_mapping( def _run_forward_field_mapping( - table: pyarrow.Table, field_mapping: Dict[str, str], + table: pyarrow.Table, field_mapping: Dict[str, str], ) -> pyarrow.Table: # run field mapping in the forward direction cols = table.column_names @@ -552,7 +565,7 @@ def _run_forward_field_mapping( def _convert_arrow_to_proto( - table: pyarrow.Table, feature_view: FeatureView + table: pyarrow.Table, feature_view: FeatureView ) -> List[Tuple[EntityKeyProto, Dict[str, ValueProto], datetime, Optional[datetime]]]: rows_to_write = [] @@ -604,7 +617,7 @@ def _coerce_datetime(ts): def _get_requested_feature_views( - feature_refs: List[str], all_feature_views: List[FeatureView] + feature_refs: List[str], all_feature_views: List[FeatureView] ) -> List[FeatureView]: """Get list of feature views based on feature references""" return list(view for view, _ in _group_refs(feature_refs, all_feature_views)) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 138471ef25..5d69aa22ee 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -34,6 +34,7 @@ class Registry: """ Registry: A registry allows for the management and persistence of feature definitions and related metadata. """ + cached_registry_proto: RegistryProto = None cached_registry_proto_created: datetime = None cached_registry_proto_ttl: timedelta @@ -73,8 +74,8 @@ def apply_entity(self, entity: Entity, project: str): def updater(registry_proto: RegistryProto): for idx, existing_entity_proto in enumerate(registry_proto.entities): if ( - existing_entity_proto.spec.name == entity_proto.spec.name - and existing_entity_proto.spec.project == project + existing_entity_proto.spec.name == entity_proto.spec.name + and existing_entity_proto.spec.project == project ): del registry_proto.entities[idx] registry_proto.entities.append(entity_proto) @@ -121,14 +122,23 @@ def get_entity(self, name: str, project: str) -> Entity: raise Exception(f"Entity {name} does not exist in project {project}") def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: - first_run = self.cached_registry_proto is None or self.cached_registry_proto_created is None - if first_run or not allow_cache or \ - (datetime.now() > self.cached_registry_proto_created + self.cached_registry_proto_ttl): - registry_proto = self._registry_store.get_registry_proto() - self.cached_registry_proto = registry_proto - self.cached_registry_proto_created = datetime.now() - return self.cached_registry_proto + expired = ( + self.cached_registry_proto is None + or self.cached_registry_proto_created is None + ) or ( + self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity + and ( + datetime.now() + > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) + ) + ) + if allow_cache and not expired: + return self.cached_registry_proto + registry_proto = self._registry_store.get_registry_proto() + self.cached_registry_proto = registry_proto + self.cached_registry_proto_created = datetime.now() + return registry_proto def apply_feature_table(self, feature_table: FeatureTable, project: str): """ @@ -144,12 +154,12 @@ def apply_feature_table(self, feature_table: FeatureTable, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_table_proto in enumerate( - registry_proto.feature_tables + registry_proto.feature_tables ): if ( - existing_feature_table_proto.spec.name - == feature_table_proto.spec.name - and existing_feature_table_proto.spec.project == project + existing_feature_table_proto.spec.name + == feature_table_proto.spec.name + and existing_feature_table_proto.spec.project == project ): del registry_proto.feature_tables[idx] registry_proto.feature_tables.append(feature_table_proto) @@ -160,7 +170,6 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) return - def apply_feature_view(self, feature_view: FeatureView, project: str): """ Registers a single feature view with Feast @@ -175,12 +184,12 @@ def apply_feature_view(self, feature_view: FeatureView, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_view_proto in enumerate( - registry_proto.feature_views + registry_proto.feature_views ): if ( - existing_feature_view_proto.spec.name - == feature_view_proto.spec.name - and existing_feature_view_proto.spec.project == project + existing_feature_view_proto.spec.name + == feature_view_proto.spec.name + and existing_feature_view_proto.spec.project == project ): del registry_proto.feature_views[idx] registry_proto.feature_views.append(feature_view_proto) @@ -190,7 +199,6 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) - def list_feature_tables(self, project: str) -> List[FeatureTable]: """ Retrieve a list of feature tables from the registry @@ -208,8 +216,9 @@ def list_feature_tables(self, project: str) -> List[FeatureTable]: feature_tables.append(FeatureTable.from_proto(feature_table_proto)) return feature_tables - - def list_feature_views(self, project: str, allow_cache: bool = False) -> List[FeatureView]: + def list_feature_views( + self, project: str, allow_cache: bool = False + ) -> List[FeatureView]: """ Retrieve a list of feature views from the registry @@ -227,7 +236,6 @@ def list_feature_views(self, project: str, allow_cache: bool = False) -> List[Fe feature_views.append(FeatureView.from_proto(feature_view_proto)) return feature_views - def get_feature_table(self, name: str, project: str) -> FeatureTable: """ Retrieves a feature table. @@ -243,13 +251,12 @@ def get_feature_table(self, name: str, project: str) -> FeatureTable: registry_proto = self._get_registry_proto() for feature_table_proto in registry_proto.feature_tables: if ( - feature_table_proto.spec.name == name - and feature_table_proto.spec.project == project + feature_table_proto.spec.name == name + and feature_table_proto.spec.project == project ): return FeatureTable.from_proto(feature_table_proto) raise Exception(f"Feature table {name} does not exist in project {project}") - def get_feature_view(self, name: str, project: str) -> FeatureView: """ Retrieves a feature view. @@ -265,13 +272,12 @@ def get_feature_view(self, name: str, project: str) -> FeatureView: registry_proto = self._get_registry_proto() for feature_view_proto in registry_proto.feature_views: if ( - feature_view_proto.spec.name == name - and feature_view_proto.spec.project == project + feature_view_proto.spec.name == name + and feature_view_proto.spec.project == project ): return FeatureView.from_proto(feature_view_proto) raise Exception(f"Feature view {name} does not exist in project {project}") - def delete_feature_table(self, name: str, project: str): """ Deletes a feature table or raises an exception if not found. @@ -283,11 +289,11 @@ def delete_feature_table(self, name: str, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_table_proto in enumerate( - registry_proto.feature_tables + registry_proto.feature_tables ): if ( - existing_feature_table_proto.spec.name == name - and existing_feature_table_proto.spec.project == project + existing_feature_table_proto.spec.name == name + and existing_feature_table_proto.spec.project == project ): del registry_proto.feature_tables[idx] return registry_proto @@ -296,7 +302,6 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) return - def delete_feature_view(self, name: str, project: str): """ Deletes a feature view or raises an exception if not found. @@ -308,11 +313,11 @@ def delete_feature_view(self, name: str, project: str): def updater(registry_proto: RegistryProto): for idx, existing_feature_view_proto in enumerate( - registry_proto.feature_views + registry_proto.feature_views ): if ( - existing_feature_view_proto.spec.name == name - and existing_feature_view_proto.spec.project == project + existing_feature_view_proto.spec.name == name + and existing_feature_view_proto.spec.project == project ): del registry_proto.feature_views[idx] return registry_proto @@ -320,9 +325,8 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) - def refresh(self): - pass + self._get_registry_proto() class RegistryStore(ABC): @@ -365,7 +369,8 @@ def get_registry_proto(self): registry_proto.ParseFromString(self._filepath.read_bytes()) return registry_proto raise FileNotFoundError( - f"Registry not found at path \"{self._filepath}\". Have you run \"feast apply\"?") + f'Registry not found at path "{self._filepath}". Have you run "feast apply"?' + ) def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto]): try: @@ -422,7 +427,8 @@ def get_registry_proto(self): registry_proto.ParseFromString(file_obj.read()) return registry_proto raise FileNotFoundError( - f"Registry not found at path \"{self._uri.geturl()}\". Have you run \"feast apply\"?") + f'Registry not found at path "{self._uri.geturl()}". Have you run "feast apply"?' + ) def update_registry_proto(self, updater: Callable[[RegistryProto], RegistryProto]): try: diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 6780c9b4e5..dd9248ced5 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -60,7 +60,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): sys.path.append("") project = repo_config.project - registry = Registry(repo_config.metadata_store, timedelta(seconds=repo_config.registry_cache_ttl_seconds)) + registry = Registry( + repo_config.metadata_store, + timedelta(seconds=repo_config.registry_cache_ttl_seconds), + ) repo = parse_repo(repo_path) for entity in repo.entities: @@ -118,7 +121,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): def teardown(repo_config: RepoConfig, repo_path: Path): - registry = Registry(repo_config.metadata_store, cache_ttl=timedelta(seconds=repo_config.registry_cache_ttl_seconds)) + registry = Registry( + repo_config.metadata_store, + cache_ttl=timedelta(seconds=repo_config.registry_cache_ttl_seconds), + ) project = repo_config.project registry_tables: List[Union[FeatureTable, FeatureView]] = [] registry_tables.extend(registry.list_feature_tables(project=project)) diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index e501ed91db..14fb071a8f 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -1,4 +1,5 @@ import os +import time from datetime import datetime import pytest @@ -80,16 +81,19 @@ def test_online() -> None: ) # Create new FeatureStore object with fast cache invalidation - fs_no_autoload = FeatureStore(config=RepoConfig( - metadata_store=store.config.metadata_store, - online_store=store.config.online_store, - project=store.config.project, - provider=store.config.provider, - registry_cache_ttl_seconds=1 - )) + cache_ttl = 1 + fs_fast_ttl = FeatureStore( + config=RepoConfig( + metadata_store=store.config.metadata_store, + online_store=store.config.online_store, + project=store.config.project, + provider=store.config.provider, + registry_cache_ttl_seconds=cache_ttl, + ) + ) # Should download the registry and cache it permanently (or until manually refreshed) - result = fs_no_autoload.get_online_features( + result = fs_fast_ttl.get_online_features( feature_refs=["driver_locations:lon", "driver_locations_2:lon"], entity_rows=[{"driver": 1}, {"driver": 123}], ) @@ -98,12 +102,60 @@ def test_online() -> None: # Rename the metadata.db so that it cant be used for refreshes os.rename(store.config.metadata_store, store.config.metadata_store + "_fake") - # Should use cached registry - result = fs_no_autoload.get_online_features( + # Wait for registry to expire + time.sleep(cache_ttl) + + # Will try to reload registry because it has expired (it will fail because we deleted the actual registry file) + with pytest.raises(FileNotFoundError): + fs_fast_ttl.get_online_features( + feature_refs=["driver_locations:lon", "driver_locations_2:lon"], + entity_rows=[{"driver": 1}, {"driver": 123}], + ) + + # Restore metadata.db so that we can see if it actually reloads registry + os.rename(store.config.metadata_store + "_fake", store.config.metadata_store) + + # Test if registry is actually reloaded and whether results return + result = fs_fast_ttl.get_online_features( feature_refs=["driver_locations:lon", "driver_locations_2:lon"], entity_rows=[{"driver": 1}, {"driver": 123}], ) assert result.to_dict()["driver_locations:lon"] == ["1.0", None] - # Restore metadata.db so that we can tear down the infra - os.rename(store.config.metadata_store + "_fake", store.config.metadata_store) \ No newline at end of file + # Create a registry with infinite cache (for users that want to manually refresh the registry) + fs_infinite_ttl = FeatureStore( + config=RepoConfig( + metadata_store=store.config.metadata_store, + online_store=store.config.online_store, + project=store.config.project, + provider=store.config.provider, + registry_cache_ttl_seconds=0, + ) + ) + + # Should return results (and fill the registry cache) + result = fs_infinite_ttl.get_online_features( + feature_refs=["driver_locations:lon", "driver_locations_2:lon"], + entity_rows=[{"driver": 1}, {"driver": 123}], + ) + assert result.to_dict()["driver_locations:lon"] == ["1.0", None] + + # Wait a bit so that an arbitrary TTL would take effect + time.sleep(2) + + # Rename the metadata.db so that it cant be used for refreshes + os.rename(store.config.metadata_store, store.config.metadata_store + "_fake") + + # TTL is infinite so this method should use registry cache + result = fs_infinite_ttl.get_online_features( + feature_refs=["driver_locations:lon", "driver_locations_2:lon"], + entity_rows=[{"driver": 1}, {"driver": 123}], + ) + assert result.to_dict()["driver_locations:lon"] == ["1.0", None] + + # Force registry reload (should fail because file is missing) + with pytest.raises(FileNotFoundError): + fs_infinite_ttl.refresh_registry() + + # Restore metadata.db so that teardown works + os.rename(store.config.metadata_store + "_fake", store.config.metadata_store) From 09f515ca24aef9b6922218626339e9c4982fee21 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Thu, 1 Apr 2021 23:04:32 -0700 Subject: [PATCH 4/8] Fix linting errors Signed-off-by: Willem Pienaar --- sdk/python/feast/registry.py | 7 ++++--- sdk/python/feast/repo_operations.py | 6 ++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 5d69aa22ee..1dc4db9309 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -17,7 +17,7 @@ from datetime import datetime, timedelta from pathlib import Path from tempfile import TemporaryFile -from typing import Callable, List +from typing import Callable, List, Optional from urllib.parse import urlparse from google.auth.exceptions import DefaultCredentialsError @@ -35,8 +35,8 @@ class Registry: Registry: A registry allows for the management and persistence of feature definitions and related metadata. """ - cached_registry_proto: RegistryProto = None - cached_registry_proto_created: datetime = None + cached_registry_proto: Optional[RegistryProto] = None + cached_registry_proto_created: Optional[datetime] = None cached_registry_proto_ttl: timedelta def __init__(self, registry_path: str, cache_ttl: timedelta): @@ -133,6 +133,7 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: ) ) if allow_cache and not expired: + assert isinstance(self.cached_registry_proto, RegistryProto) return self.cached_registry_proto registry_proto = self._registry_store.get_registry_proto() diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index dd9248ced5..1735f45423 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -137,7 +137,10 @@ def registry_dump(repo_config: RepoConfig): """ For debugging only: output contents of the metadata registry """ project = repo_config.project - registry = Registry(repo_config.metadata_store) + registry = Registry( + repo_config.metadata_store, + cache_ttl=timedelta(seconds=repo_config.registry_cache_ttl_seconds), + ) for entity in registry.list_entities(project=project): print(entity) @@ -156,7 +159,6 @@ def cli_check_repo(repo_path: Path): def init_repo(repo_path: Path, minimal: bool): - repo_config = repo_path / "feature_store.yaml" if repo_config.exists(): From 4cb295bf50d64fa60ce1351463a30878ce65be5f Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 2 Apr 2021 15:34:39 -0700 Subject: [PATCH 5/8] Add incremental configuration to metadata store config Signed-off-by: Willem Pienaar --- metadata.db | Bin 0 -> 7978 bytes sdk/python/feast/feature_store.py | 11 +++++---- sdk/python/feast/repo_config.py | 28 +++++++++++++++++----- sdk/python/feast/repo_operations.py | 17 ++++++------- sdk/python/tests/test_online_retrieval.py | 11 +++++---- 5 files changed, 45 insertions(+), 22 deletions(-) create mode 100644 metadata.db diff --git a/metadata.db b/metadata.db new file mode 100644 index 0000000000000000000000000000000000000000..b2b2475c3dfee5b3b7e051260af2bbbeea5ee767 GIT binary patch literal 7978 zcmb1QG*nVCOEgS2Pct&oHBB}*&^1XkNYu4RvoO_7GBmR^H8f2!OEWgr;^sKAeO@!0 zz>bX#+(rilxsEe(NtUD*m&7L(#+M`}<)p?Z=NA>FCYR);78eVt;FO9tG&3|eF)%W> zFqPs=DatHMEsD=fQR3ucElbQPO%-4>V&+)GWR=drB*f3fl$j!gP$a~~#ZponZz#l6 zQY;0;T3pFRsfi`2De)!6NoHIo*=f3IsfooUdSJ^l^U{IN%goQy!*CwL4TcVkLSOi~ z7^DQa*f>61J=?@41f(C$X=WQ084ZmANC`4rJRCE?8STT&15cQQcsOPwc%x}@G))db znj9?~N9%>bStsHzEkUwKr6;f~or@?-jn?XLT}Nue6qcqIRbp?$U`WB*4;F?-_}Vb# zz%~q0lLezagVBa@6LJZ5^>K9$QP5Iw3-b3>NWj&k!R;giiv(bs=7QE}n`X2>G^EZO#-u8mWizmEY HxRD6}QF|hN literal 0 HcmV?d00001 diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index df4afe9cdf..c0529c5850 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -72,9 +72,11 @@ def __init__( local=LocalOnlineStoreConfig(path="online_store.db") ), ) + + metadata_store_config = self.config.get_metadata_store_config() self._registry = Registry( - self.config.metadata_store, - cache_ttl=timedelta(seconds=self.config.registry_cache_ttl_seconds), + registry_path=metadata_store_config.path, + cache_ttl=timedelta(seconds=metadata_store_config.cache_ttl_seconds), ) @property @@ -85,9 +87,10 @@ def _get_provider(self) -> Provider: return get_provider(self.config) def refresh_registry(self): + metadata_store_config = self.config.get_metadata_store_config() self._registry = Registry( - self.config.metadata_store, - cache_ttl=timedelta(seconds=self.config.registry_cache_ttl_seconds), + registry_path=metadata_store_config.path, + cache_ttl=timedelta(seconds=metadata_store_config.cache_ttl_seconds), ) self._registry.refresh() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 653a8b0640..018067f816 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -1,8 +1,8 @@ from pathlib import Path -from typing import Optional +from typing import Optional, Union import yaml -from pydantic import BaseModel, StrictStr, ValidationError +from pydantic import BaseModel, StrictInt, StrictStr, ValidationError class FeastBaseModel(BaseModel): @@ -29,16 +29,29 @@ class DatastoreOnlineStoreConfig(FeastBaseModel): class OnlineStoreConfig(FeastBaseModel): datastore: Optional[DatastoreOnlineStoreConfig] = None - """ DatastoreOnlineStoreConfig: Optional DatastoreConfig """ + """ DatastoreOnlineStoreConfig: Optional Google Cloud Datastore config """ local: Optional[LocalOnlineStoreConfig] = None """ LocalOnlineStoreConfig: Optional local online store config """ +class MetadataStoreConfig(FeastBaseModel): + """ Metadata Store Configuration. Configuration that relates to reading from and writing to the Feast registry.""" + + path: StrictStr + """ str: Path to metadata store. Can be a local path, or remote object storage path, e.g. gcs://foo/bar """ + + cache_ttl_seconds: StrictInt = 600 + """int: The cache TTL is the amount of time registry state will be cached in memory. If this TTL is exceeded then + the registry will be refreshed when any feature store method asks for access to registry state. The TTL can be + set to infinity by setting TTL to 0 seconds, which means the cache will only be loaded once and will never + expire. Users can manually refresh the cache by calling feature_store.refresh_registry() """ + + class RepoConfig(FeastBaseModel): """ Repo config. Typically loaded from `feature_store.yaml` """ - metadata_store: StrictStr + metadata_store: Union[StrictStr, MetadataStoreConfig] """ str: Path to metadata store. Can be a local path, or remote object storage path, e.g. gcs://foo/bar """ project: StrictStr @@ -53,8 +66,11 @@ class RepoConfig(FeastBaseModel): online_store: Optional[OnlineStoreConfig] = None """ OnlineStoreConfig: Online store configuration (optional depending on provider) """ - # TODO: Nest in `metadata_store_config` object - registry_cache_ttl_seconds: int = 600 + def get_metadata_store_config(self): + if isinstance(self.metadata_store, str): + return MetadataStoreConfig(path=self.metadata_store) + else: + return self.metadata_store # This is the JSON Schema for config validation. We use this to have nice detailed error messages diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 1735f45423..c00de38026 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -58,11 +58,11 @@ def parse_repo(repo_root: Path) -> ParsedRepo: def apply_total(repo_config: RepoConfig, repo_path: Path): os.chdir(repo_path) sys.path.append("") - + metadata_store_config = repo_config.get_metadata_store_config() project = repo_config.project registry = Registry( - repo_config.metadata_store, - timedelta(seconds=repo_config.registry_cache_ttl_seconds), + registry_path=metadata_store_config.path, + cache_ttl=timedelta(seconds=metadata_store_config.cache_ttl_seconds), ) repo = parse_repo(repo_path) @@ -121,9 +121,10 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): def teardown(repo_config: RepoConfig, repo_path: Path): + metadata_store_config = repo_config.get_metadata_store_config() registry = Registry( - repo_config.metadata_store, - cache_ttl=timedelta(seconds=repo_config.registry_cache_ttl_seconds), + registry_path=metadata_store_config.path, + cache_ttl=timedelta(seconds=metadata_store_config.cache_ttl_seconds), ) project = repo_config.project registry_tables: List[Union[FeatureTable, FeatureView]] = [] @@ -135,11 +136,11 @@ def teardown(repo_config: RepoConfig, repo_path: Path): def registry_dump(repo_config: RepoConfig): """ For debugging only: output contents of the metadata registry """ - + metadata_store_config = repo_config.get_metadata_store_config() project = repo_config.project registry = Registry( - repo_config.metadata_store, - cache_ttl=timedelta(seconds=repo_config.registry_cache_ttl_seconds), + registry_path=metadata_store_config.path, + cache_ttl=timedelta(seconds=metadata_store_config.cache_ttl_seconds), ) for entity in registry.list_entities(project=project): diff --git a/sdk/python/tests/test_online_retrieval.py b/sdk/python/tests/test_online_retrieval.py index 14fb071a8f..fa2c51e58d 100644 --- a/sdk/python/tests/test_online_retrieval.py +++ b/sdk/python/tests/test_online_retrieval.py @@ -7,6 +7,7 @@ from feast import FeatureStore, RepoConfig from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.protos.feast.types.Value_pb2 import Value as ValueProto +from feast.repo_config import MetadataStoreConfig from tests.cli_utils import CliRunner, get_example_repo @@ -84,11 +85,12 @@ def test_online() -> None: cache_ttl = 1 fs_fast_ttl = FeatureStore( config=RepoConfig( - metadata_store=store.config.metadata_store, + metadata_store=MetadataStoreConfig( + path=store.config.metadata_store, cache_ttl_seconds=cache_ttl + ), online_store=store.config.online_store, project=store.config.project, provider=store.config.provider, - registry_cache_ttl_seconds=cache_ttl, ) ) @@ -125,11 +127,12 @@ def test_online() -> None: # Create a registry with infinite cache (for users that want to manually refresh the registry) fs_infinite_ttl = FeatureStore( config=RepoConfig( - metadata_store=store.config.metadata_store, + metadata_store=MetadataStoreConfig( + path=store.config.metadata_store, cache_ttl_seconds=0 + ), online_store=store.config.online_store, project=store.config.project, provider=store.config.provider, - registry_cache_ttl_seconds=0, ) ) From d97fb368b00ae003263d9fe3fb3972852b25308c Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 2 Apr 2021 15:42:55 -0700 Subject: [PATCH 6/8] Add comment to refresh_registry() Signed-off-by: Willem Pienaar --- sdk/python/feast/feature_store.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c0529c5850..d850529b36 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -87,6 +87,11 @@ def _get_provider(self) -> Provider: return get_provider(self.config) def refresh_registry(self): + """Fetches and caches a copy of the feature registry in memory. + + Explicitly calling this method makes it possible for methods like get_online_features to use cached registry + state instead of retrieving the registry state at request-time, thus reducing latency.""" + metadata_store_config = self.config.get_metadata_store_config() self._registry = Registry( registry_path=metadata_store_config.path, From 35f06b15755201c8bb964996c4018e436ba482d7 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 2 Apr 2021 15:49:05 -0700 Subject: [PATCH 7/8] Add comments Signed-off-by: Willem Pienaar --- sdk/python/feast/registry.py | 50 +++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 21 deletions(-) diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index 1dc4db9309..ad9fc329d3 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -121,26 +121,6 @@ def get_entity(self, name: str, project: str) -> Entity: return Entity.from_proto(entity_proto) raise Exception(f"Entity {name} does not exist in project {project}") - def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: - expired = ( - self.cached_registry_proto is None - or self.cached_registry_proto_created is None - ) or ( - self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity - and ( - datetime.now() - > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) - ) - ) - if allow_cache and not expired: - assert isinstance(self.cached_registry_proto, RegistryProto) - return self.cached_registry_proto - - registry_proto = self._registry_store.get_registry_proto() - self.cached_registry_proto = registry_proto - self.cached_registry_proto_created = datetime.now() - return registry_proto - def apply_feature_table(self, feature_table: FeatureTable, project: str): """ Registers a single feature table with Feast @@ -327,7 +307,35 @@ def updater(registry_proto: RegistryProto): self._registry_store.update_registry_proto(updater) def refresh(self): - self._get_registry_proto() + """Refreshes the state of the registry cache by fetching the registry state from the remote registry store.""" + self._get_registry_proto(allow_cache=False) + + def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: + """Returns the cached or remote registry state + + Args: + allow_cache: Whether to allow the use of the registry cache when fetching the RegistryProto + + Returns: Returns a RegistryProto object which represents the state of the registry + """ + expired = ( + self.cached_registry_proto is None + or self.cached_registry_proto_created is None + ) or ( + self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity + and ( + datetime.now() + > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) + ) + ) + if allow_cache and not expired: + assert isinstance(self.cached_registry_proto, RegistryProto) + return self.cached_registry_proto + + registry_proto = self._registry_store.get_registry_proto() + self.cached_registry_proto = registry_proto + self.cached_registry_proto_created = datetime.now() + return registry_proto class RegistryStore(ABC): From d70b3a0d864eb5b83ad9bbcf41ad5361cc240245 Mon Sep 17 00:00:00 2001 From: Willem Pienaar Date: Fri, 2 Apr 2021 18:59:27 -0700 Subject: [PATCH 8/8] Don't allow simultanous cache loads Signed-off-by: Willem Pienaar --- metadata.db | Bin 7978 -> 0 bytes sdk/python/feast/feature_store.py | 22 ++++++++++++++++++++-- sdk/python/feast/registry.py | 19 ++++++++++++++----- 3 files changed, 34 insertions(+), 7 deletions(-) delete mode 100644 metadata.db diff --git a/metadata.db b/metadata.db deleted file mode 100644 index b2b2475c3dfee5b3b7e051260af2bbbeea5ee767..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 7978 zcmb1QG*nVCOEgS2Pct&oHBB}*&^1XkNYu4RvoO_7GBmR^H8f2!OEWgr;^sKAeO@!0 zz>bX#+(rilxsEe(NtUD*m&7L(#+M`}<)p?Z=NA>FCYR);78eVt;FO9tG&3|eF)%W> zFqPs=DatHMEsD=fQR3ucElbQPO%-4>V&+)GWR=drB*f3fl$j!gP$a~~#ZponZz#l6 zQY;0;T3pFRsfi`2De)!6NoHIo*=f3IsfooUdSJ^l^U{IN%goQy!*CwL4TcVkLSOi~ z7^DQa*f>61J=?@41f(C$X=WQ084ZmANC`4rJRCE?8STT&15cQQcsOPwc%x}@G))db znj9?~N9%>bStsHzEkUwKr6;f~or@?-jn?XLT}Nue6qcqIRbp?$U`WB*4;F?-_}Vb# zz%~q0lLezagVBa@6LJZ5^>K9$QP5Iw3-b3>NWj&k!R;giiv(bs=7QE}n`X2>G^EZO#-u8mWizmEY HxRD6}QF|hN diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d850529b36..2f15d1d462 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -89,8 +89,17 @@ def _get_provider(self) -> Provider: def refresh_registry(self): """Fetches and caches a copy of the feature registry in memory. - Explicitly calling this method makes it possible for methods like get_online_features to use cached registry - state instead of retrieving the registry state at request-time, thus reducing latency.""" + Explicitly calling this method allows for direct control of the state of the registry cache. Every time this + method is called the complete registry state will be retrieved from the remote registry store backend + (e.g., GCS, S3), and the cache timer will be reset. If refresh_registry() is run before get_online_features() + is called, then get_online_feature() will use the cached registry instead of retrieving (and caching) the + registry itself. + + Additionally, the TTL for the registry cache can be set to infinity (by setting it to 0), which means that + refresh_registry() will become the only way to update the cached registry. If the TTL is set to a value + greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be + downloaded synchronously, which may increase latencies if the triggering method is get_online_features() + """ metadata_store_config = self.config.get_metadata_store_config() self._registry = Registry( @@ -388,6 +397,15 @@ def get_online_features( ) -> OnlineResponse: """ Retrieves the latest online feature data. + + Note: This method will download the full feature registry the first time it is run. If you are using a + remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL + duration (which can be set to infinitey). If the cached registry is stale (more time than the TTL has + passed), then a new registry will be downloaded synchronously by this method. This download may + introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call + refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to + infinity (cache forever). + Args: feature_refs: List of feature references that will be returned for each entity. Each feature reference should have the following format: diff --git a/sdk/python/feast/registry.py b/sdk/python/feast/registry.py index ad9fc329d3..8ef2901150 100644 --- a/sdk/python/feast/registry.py +++ b/sdk/python/feast/registry.py @@ -38,6 +38,7 @@ class Registry: cached_registry_proto: Optional[RegistryProto] = None cached_registry_proto_created: Optional[datetime] = None cached_registry_proto_ttl: timedelta + cache_being_updated: bool = False def __init__(self, registry_path: str, cache_ttl: timedelta): """ @@ -328,13 +329,19 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto: > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) ) ) - if allow_cache and not expired: + if allow_cache and (not expired or self.cache_being_updated): assert isinstance(self.cached_registry_proto, RegistryProto) return self.cached_registry_proto - registry_proto = self._registry_store.get_registry_proto() - self.cached_registry_proto = registry_proto - self.cached_registry_proto_created = datetime.now() + try: + self.cache_being_updated = True + registry_proto = self._registry_store.get_registry_proto() + self.cached_registry_proto = registry_proto + self.cached_registry_proto_created = datetime.now() + except Exception as e: + raise e + finally: + self.cache_being_updated = False return registry_proto @@ -431,7 +438,9 @@ def get_registry_proto(self): f"No bucket named {self._bucket} exists; please create it first." ) if storage.Blob(bucket=bucket, name=self._blob).exists(self.gcs_client): - self.gcs_client.download_blob_to_file(self._uri.geturl(), file_obj) + self.gcs_client.download_blob_to_file( + self._uri.geturl(), file_obj, timeout=30 + ) file_obj.seek(0) registry_proto.ParseFromString(file_obj.read()) return registry_proto