diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index ab74c8d5c2..6d9f0350ba 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -134,7 +134,9 @@ def get_entity( # type: ignore[return] ) -> Entity: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.get_entity(self.cached_registry_proto, name, project) + return proto_registry_utils.get_entity( + self.cached_registry_proto, name, project + ) try: url = f"{self.base_url}/projects/{project}/entities/{name}" params = {"allow_cache": False} @@ -155,17 +157,23 @@ def list_entities( # type: ignore[return] ) -> List[Entity]: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.list_entities(self.cached_registry_proto, project) + return proto_registry_utils.list_entities( + self.cached_registry_proto, project + ) try: url = f"{self.base_url}/projects/{project}/entities" params = {"allow_cache": False} response_data = self._send_request("GET", url, params=params) response_list = response_data if isinstance(response_data, list) else [] - return [EntityModel.parse_obj(entity).to_entity() for entity in response_list] + return [ + EntityModel.parse_obj(entity).to_entity() for entity in response_list + ] except Exception as exception: self._handle_exception(exception) - def apply_data_source(self, data_source: DataSource, project: str, commit: bool = True): + def apply_data_source( + self, data_source: DataSource, project: str, commit: bool = True + ): try: url = f"{self.base_url}/projects/{project}/data_sources" params = {"commit": commit} @@ -206,7 +214,9 @@ def get_data_source( # type: ignore[return] ) -> DataSource: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.get_data_source(self.cached_registry_proto, name, project) + return proto_registry_utils.get_data_source( + self.cached_registry_proto, name, project + ) try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" params = {"allow_cache": False} @@ -234,7 +244,9 @@ def list_data_sources( # type: ignore[return] ) -> List[DataSource]: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.list_data_sources(self.cached_registry_proto, project) + return proto_registry_utils.list_data_sources( + self.cached_registry_proto, project + ) try: url = f"{self.base_url}/projects/{project}/data_sources" params = {"allow_cache": False} @@ -300,7 +312,9 @@ def get_feature_service( # type: ignore[return] response_data = self._send_request("GET", url, params=params) return FeatureServiceModel.parse_obj(response_data).to_feature_service() except FeatureServiceNotFoundException as exception: - logger.error(f"FeatureService {name} requested does not exist: %s", str(exception)) + logger.error( + f"FeatureService {name} requested does not exist: %s", str(exception) + ) raise httpx.HTTPError(message=f"FeatureService: {name} not found") except Exception as exception: self._handle_exception(exception) @@ -310,7 +324,9 @@ def list_feature_services( # type: ignore[return] ) -> List[FeatureService]: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.list_feature_services(self.cached_registry_proto, project) + return proto_registry_utils.list_feature_services( + self.cached_registry_proto, project + ) try: url = f"{self.base_url}/projects/{project}/feature_services" params = {"allow_cache": False} @@ -323,7 +339,9 @@ def list_feature_services( # type: ignore[return] except Exception as exception: self._handle_exception(exception) - def apply_feature_view(self, feature_view: BaseFeatureView, project: str, commit: bool = True): + def apply_feature_view( + self, feature_view: BaseFeatureView, project: str, commit: bool = True + ): try: params = {"commit": commit} if isinstance(feature_view, FeatureView): @@ -335,7 +353,9 @@ def apply_feature_view(self, feature_view: BaseFeatureView, project: str, commit url = f"{self.base_url}/projects/{project}/on_demand_feature_views" data = OnDemandFeatureViewModel.from_feature_view(feature_view).json() response_data = self._send_request("PUT", url, params=params, data=data) - return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() + return OnDemandFeatureViewModel.parse_obj( + response_data + ).to_feature_view() else: raise TypeError( "Unsupported FeatureView type. Please use either FeatureView or OnDemandFeatureView only" @@ -366,14 +386,18 @@ def get_feature_view( # type: ignore[return] ) -> FeatureView: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.get_feature_view(self.cached_registry_proto, name, project) + return proto_registry_utils.get_feature_view( + self.cached_registry_proto, name, project + ) try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" params = {"allow_cache": False} response_data = self._send_request("GET", url, params=params) return FeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: - logger.error(f"FeatureView {name} requested does not exist: %s", str(exception)) + logger.error( + f"FeatureView {name} requested does not exist: %s", str(exception) + ) raise httpx.HTTPError(message=f"FeatureView: {name} not found") except Exception as exception: self._handle_exception(exception) @@ -383,7 +407,9 @@ def list_feature_views( # type: ignore[return] ) -> List[FeatureView]: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.list_feature_views(self.cached_registry_proto, project) + return proto_registry_utils.list_feature_views( + self.cached_registry_proto, project + ) try: url = f"{self.base_url}/projects/{project}/feature_views" params = {"allow_cache": False} @@ -410,7 +436,9 @@ def get_on_demand_feature_view( # type: ignore[return] response_data = self._send_request("GET", url, params=params) return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() except FeatureViewNotFoundException as exception: - logger.error(f"FeatureView {name} requested does not exist: %s", str(exception)) + logger.error( + f"FeatureView {name} requested does not exist: %s", str(exception) + ) raise httpx.HTTPError(message=f"FeatureView: {name} not found") except Exception as exception: self._handle_exception(exception) @@ -489,7 +517,9 @@ def apply_materialization( except Exception as exception: self._handle_exception(exception) - def apply_saved_dataset(self, saved_dataset: SavedDataset, project: str, commit: bool = True): + def apply_saved_dataset( + self, saved_dataset: SavedDataset, project: str, commit: bool = True + ): raise NotImplementedError("Method not implemented") def get_saved_dataset( @@ -545,7 +575,9 @@ def apply_user_metadata( ): raise NotImplementedError("Method not implemented") - def get_user_metadata(self, project: str, feature_view: BaseFeatureView) -> Optional[bytes]: + def get_user_metadata( + self, project: str, feature_view: BaseFeatureView + ) -> Optional[bytes]: raise NotImplementedError("Method not implemented") def list_validation_references( @@ -604,19 +636,26 @@ def refresh(self, project: Optional[str] = None): if project_metadata: usage.set_current_project_uuid(project_metadata.project_uuid) else: - proto_registry_utils.init_project_metadata(self.cached_registry_proto, project) + proto_registry_utils.init_project_metadata( + self.cached_registry_proto, project + ) self.cached_registry_proto = self.proto() self.cached_registry_proto_created = datetime.utcnow() def _refresh_cached_registry_if_necessary(self): with self._refresh_lock: expired = ( - self.cached_registry_proto is None or self.cached_registry_proto_created is None + self.cached_registry_proto is None + or self.cached_registry_proto_created is None ) or ( - self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity + self.cached_registry_proto_ttl.total_seconds() + > 0 # 0 ttl means infinity and ( datetime.utcnow() - > (self.cached_registry_proto_created + self.cached_registry_proto_ttl) + > ( + self.cached_registry_proto_created + + self.cached_registry_proto_ttl + ) ) ) @@ -636,7 +675,9 @@ def _get_last_updated_metadata(self, project: str): try: url = f"{self.base_url}/projects/{project}" response_data = self._send_request("GET", url) - return datetime.strptime(response_data["last_updated_timestamp"], "%Y-%m-%dT%H:%M:%S") + return datetime.strptime( + response_data["last_updated_timestamp"], "%Y-%m-%dT%H:%M:%S" + ) except Exception as exception: self._handle_exception(exception) @@ -645,13 +686,17 @@ def list_project_metadata( # type: ignore[return] ) -> List[ProjectMetadata]: if allow_cache: self._refresh_cached_registry_if_necessary() - return proto_registry_utils.list_project_metadata(self.cached_registry_proto, project) + return proto_registry_utils.list_project_metadata( + self.cached_registry_proto, project + ) try: url = f"{self.base_url}/projects/{project}" response_data = self._send_request("GET", url) return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] except ProjectMetadataNotFoundException as exception: - logger.error(f"Project {project} requested does not exist: {str(exception)}") + logger.error( + f"Project {project} requested does not exist: {str(exception)}" + ) raise httpx.HTTPError(message=f"ProjectMetadata: {project} not found") except Exception as exception: self._handle_exception(exception)