Skip to content

Commit

Permalink
disable ssl verification for http registry
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhargav Dodla committed Jul 26, 2023
1 parent 91fe9bf commit 618f86e
Showing 1 changed file with 68 additions and 23 deletions.
91 changes: 68 additions & 23 deletions sdk/python/feast/infra/registry/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ def get_entity( # type: ignore[return]
) -> Entity:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.get_entity(self.cached_registry_proto, name, project)
return proto_registry_utils.get_entity(
self.cached_registry_proto, name, project
)
try:
url = f"{self.base_url}/projects/{project}/entities/{name}"
params = {"allow_cache": False}
Expand All @@ -155,17 +157,23 @@ def list_entities( # type: ignore[return]
) -> List[Entity]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_entities(self.cached_registry_proto, project)
return proto_registry_utils.list_entities(
self.cached_registry_proto, project
)
try:
url = f"{self.base_url}/projects/{project}/entities"
params = {"allow_cache": False}
response_data = self._send_request("GET", url, params=params)
response_list = response_data if isinstance(response_data, list) else []
return [EntityModel.parse_obj(entity).to_entity() for entity in response_list]
return [
EntityModel.parse_obj(entity).to_entity() for entity in response_list
]
except Exception as exception:
self._handle_exception(exception)

def apply_data_source(self, data_source: DataSource, project: str, commit: bool = True):
def apply_data_source(
self, data_source: DataSource, project: str, commit: bool = True
):
try:
url = f"{self.base_url}/projects/{project}/data_sources"
params = {"commit": commit}
Expand Down Expand Up @@ -206,7 +214,9 @@ def get_data_source( # type: ignore[return]
) -> DataSource:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.get_data_source(self.cached_registry_proto, name, project)
return proto_registry_utils.get_data_source(
self.cached_registry_proto, name, project
)
try:
url = f"{self.base_url}/projects/{project}/data_sources/{name}"
params = {"allow_cache": False}
Expand Down Expand Up @@ -234,7 +244,9 @@ def list_data_sources( # type: ignore[return]
) -> List[DataSource]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_data_sources(self.cached_registry_proto, project)
return proto_registry_utils.list_data_sources(
self.cached_registry_proto, project
)
try:
url = f"{self.base_url}/projects/{project}/data_sources"
params = {"allow_cache": False}
Expand Down Expand Up @@ -300,7 +312,9 @@ def get_feature_service( # type: ignore[return]
response_data = self._send_request("GET", url, params=params)
return FeatureServiceModel.parse_obj(response_data).to_feature_service()
except FeatureServiceNotFoundException as exception:
logger.error(f"FeatureService {name} requested does not exist: %s", str(exception))
logger.error(
f"FeatureService {name} requested does not exist: %s", str(exception)
)
raise httpx.HTTPError(message=f"FeatureService: {name} not found")
except Exception as exception:
self._handle_exception(exception)
Expand All @@ -310,7 +324,9 @@ def list_feature_services( # type: ignore[return]
) -> List[FeatureService]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_services(self.cached_registry_proto, project)
return proto_registry_utils.list_feature_services(
self.cached_registry_proto, project
)
try:
url = f"{self.base_url}/projects/{project}/feature_services"
params = {"allow_cache": False}
Expand All @@ -323,7 +339,9 @@ def list_feature_services( # type: ignore[return]
except Exception as exception:
self._handle_exception(exception)

def apply_feature_view(self, feature_view: BaseFeatureView, project: str, commit: bool = True):
def apply_feature_view(
self, feature_view: BaseFeatureView, project: str, commit: bool = True
):
try:
params = {"commit": commit}
if isinstance(feature_view, FeatureView):
Expand All @@ -335,7 +353,9 @@ def apply_feature_view(self, feature_view: BaseFeatureView, project: str, commit
url = f"{self.base_url}/projects/{project}/on_demand_feature_views"
data = OnDemandFeatureViewModel.from_feature_view(feature_view).json()
response_data = self._send_request("PUT", url, params=params, data=data)
return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view()
return OnDemandFeatureViewModel.parse_obj(
response_data
).to_feature_view()
else:
raise TypeError(
"Unsupported FeatureView type. Please use either FeatureView or OnDemandFeatureView only"
Expand Down Expand Up @@ -366,14 +386,18 @@ def get_feature_view( # type: ignore[return]
) -> FeatureView:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.get_feature_view(self.cached_registry_proto, name, project)
return proto_registry_utils.get_feature_view(
self.cached_registry_proto, name, project
)
try:
url = f"{self.base_url}/projects/{project}/feature_views/{name}"
params = {"allow_cache": False}
response_data = self._send_request("GET", url, params=params)
return FeatureViewModel.parse_obj(response_data).to_feature_view()
except FeatureViewNotFoundException as exception:
logger.error(f"FeatureView {name} requested does not exist: %s", str(exception))
logger.error(
f"FeatureView {name} requested does not exist: %s", str(exception)
)
raise httpx.HTTPError(message=f"FeatureView: {name} not found")
except Exception as exception:
self._handle_exception(exception)
Expand All @@ -383,7 +407,9 @@ def list_feature_views( # type: ignore[return]
) -> List[FeatureView]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_views(self.cached_registry_proto, project)
return proto_registry_utils.list_feature_views(
self.cached_registry_proto, project
)
try:
url = f"{self.base_url}/projects/{project}/feature_views"
params = {"allow_cache": False}
Expand All @@ -410,7 +436,9 @@ def get_on_demand_feature_view( # type: ignore[return]
response_data = self._send_request("GET", url, params=params)
return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view()
except FeatureViewNotFoundException as exception:
logger.error(f"FeatureView {name} requested does not exist: %s", str(exception))
logger.error(
f"FeatureView {name} requested does not exist: %s", str(exception)
)
raise httpx.HTTPError(message=f"FeatureView: {name} not found")
except Exception as exception:
self._handle_exception(exception)
Expand Down Expand Up @@ -489,7 +517,9 @@ def apply_materialization(
except Exception as exception:
self._handle_exception(exception)

def apply_saved_dataset(self, saved_dataset: SavedDataset, project: str, commit: bool = True):
def apply_saved_dataset(
self, saved_dataset: SavedDataset, project: str, commit: bool = True
):
raise NotImplementedError("Method not implemented")

def get_saved_dataset(
Expand Down Expand Up @@ -545,7 +575,9 @@ def apply_user_metadata(
):
raise NotImplementedError("Method not implemented")

def get_user_metadata(self, project: str, feature_view: BaseFeatureView) -> Optional[bytes]:
def get_user_metadata(
self, project: str, feature_view: BaseFeatureView
) -> Optional[bytes]:
raise NotImplementedError("Method not implemented")

def list_validation_references(
Expand Down Expand Up @@ -604,19 +636,26 @@ def refresh(self, project: Optional[str] = None):
if project_metadata:
usage.set_current_project_uuid(project_metadata.project_uuid)
else:
proto_registry_utils.init_project_metadata(self.cached_registry_proto, project)
proto_registry_utils.init_project_metadata(
self.cached_registry_proto, project
)
self.cached_registry_proto = self.proto()
self.cached_registry_proto_created = datetime.utcnow()

def _refresh_cached_registry_if_necessary(self):
with self._refresh_lock:
expired = (
self.cached_registry_proto is None or self.cached_registry_proto_created is None
self.cached_registry_proto is None
or self.cached_registry_proto_created is None
) or (
self.cached_registry_proto_ttl.total_seconds() > 0 # 0 ttl means infinity
self.cached_registry_proto_ttl.total_seconds()
> 0 # 0 ttl means infinity
and (
datetime.utcnow()
> (self.cached_registry_proto_created + self.cached_registry_proto_ttl)
> (
self.cached_registry_proto_created
+ self.cached_registry_proto_ttl
)
)
)

Expand All @@ -636,7 +675,9 @@ def _get_last_updated_metadata(self, project: str):
try:
url = f"{self.base_url}/projects/{project}"
response_data = self._send_request("GET", url)
return datetime.strptime(response_data["last_updated_timestamp"], "%Y-%m-%dT%H:%M:%S")
return datetime.strptime(
response_data["last_updated_timestamp"], "%Y-%m-%dT%H:%M:%S"
)
except Exception as exception:
self._handle_exception(exception)

Expand All @@ -645,13 +686,17 @@ def list_project_metadata( # type: ignore[return]
) -> List[ProjectMetadata]:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_project_metadata(self.cached_registry_proto, project)
return proto_registry_utils.list_project_metadata(
self.cached_registry_proto, project
)
try:
url = f"{self.base_url}/projects/{project}"
response_data = self._send_request("GET", url)
return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()]
except ProjectMetadataNotFoundException as exception:
logger.error(f"Project {project} requested does not exist: {str(exception)}")
logger.error(
f"Project {project} requested does not exist: {str(exception)}"
)
raise httpx.HTTPError(message=f"ProjectMetadata: {project} not found")
except Exception as exception:
self._handle_exception(exception)

0 comments on commit 618f86e

Please sign in to comment.