Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Added Online Store REST client errors handler #4488

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def from_error_detail(detail: str) -> Optional["FeastError"]:
module = importlib.import_module(module_name)
class_reference = getattr(module, class_name)

instance = class_reference(message)
instance = class_reference.__new__(class_reference)
setattr(instance, "__overridden_message__", message)
return instance
except Exception as e:
Expand Down Expand Up @@ -451,6 +451,9 @@ class PushSourceNotFoundException(FeastError):
def __init__(self, push_source_name: str):
super().__init__(f"Unable to find push source '{push_source_name}'.")

def http_status_code(self) -> int:
return HttpStatusCode.HTTP_422_UNPROCESSABLE_ENTITY


class ReadOnlyRegistryException(FeastError):
def __init__(self):
Expand Down
278 changes: 129 additions & 149 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
import pandas as pd
import psutil
from dateutil import parser
from fastapi import Depends, FastAPI, HTTPException, Request, Response, status
from fastapi import Depends, FastAPI, Request, Response, status
from fastapi.logger import logger
from fastapi.responses import JSONResponse
from google.protobuf.json_format import MessageToDict
from prometheus_client import Gauge, start_http_server
from pydantic import BaseModel
Expand All @@ -19,7 +20,10 @@
from feast import proto_json, utils
from feast.constants import DEFAULT_FEATURE_SERVER_REGISTRY_TTL
from feast.data_source import PushMode
from feast.errors import FeatureViewNotFoundException, PushSourceNotFoundException
from feast.errors import (
FeastError,
FeatureViewNotFoundException,
)
from feast.permissions.action import WRITE, AuthzedAction
from feast.permissions.security_manager import assert_permissions
from feast.permissions.server.rest import inject_user_details
Expand Down Expand Up @@ -101,187 +105,163 @@ async def lifespan(app: FastAPI):
async def get_body(request: Request):
return await request.body()

# TODO RBAC: complete the dependencies for the other endpoints
@app.post(
"/get-online-features",
dependencies=[Depends(inject_user_details)],
)
def get_online_features(body=Depends(get_body)):
try:
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
# Initialize parameters for FeatureStore.get_online_features(...) call
if "feature_service" in body:
feature_service = store.get_feature_service(
body["feature_service"], allow_cache=True
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
# Initialize parameters for FeatureStore.get_online_features(...) call
if "feature_service" in body:
feature_service = store.get_feature_service(
body["feature_service"], allow_cache=True
)
assert_permissions(
resource=feature_service, actions=[AuthzedAction.READ_ONLINE]
)
features = feature_service
else:
features = body["features"]
all_feature_views, all_on_demand_feature_views = (
utils._get_feature_views_to_use(
store.registry,
store.project,
features,
allow_cache=True,
hide_dummy_entity=False,
)
)
for feature_view in all_feature_views:
assert_permissions(
resource=feature_service, actions=[AuthzedAction.READ_ONLINE]
resource=feature_view, actions=[AuthzedAction.READ_ONLINE]
)
features = feature_service
else:
features = body["features"]
all_feature_views, all_on_demand_feature_views = (
utils._get_feature_views_to_use(
store.registry,
store.project,
features,
allow_cache=True,
hide_dummy_entity=False,
)
for od_feature_view in all_on_demand_feature_views:
assert_permissions(
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
)
for feature_view in all_feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.READ_ONLINE]
)
for od_feature_view in all_on_demand_feature_views:
assert_permissions(
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
)

response_proto = store.get_online_features(
features=features,
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto

# Convert the Protobuf object to JSON and return it
return MessageToDict(
response_proto, preserving_proto_field_name=True, float_precision=18
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

response_proto = store.get_online_features(
features=features,
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto

# Convert the Protobuf object to JSON and return it
return MessageToDict(
response_proto, preserving_proto_field_name=True, float_precision=18
)

@app.post("/push", dependencies=[Depends(inject_user_details)])
def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
actions = []
if request.to == "offline":
to = PushMode.OFFLINE
actions = [AuthzedAction.WRITE_OFFLINE]
elif request.to == "online":
to = PushMode.ONLINE
actions = [AuthzedAction.WRITE_ONLINE]
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
actions = WRITE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
)

from feast.data_source import PushSource
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
actions = []
if request.to == "offline":
to = PushMode.OFFLINE
actions = [AuthzedAction.WRITE_OFFLINE]
elif request.to == "online":
to = PushMode.ONLINE
actions = [AuthzedAction.WRITE_ONLINE]
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
actions = WRITE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
)

all_fvs = store.list_feature_views(
allow_cache=request.allow_registry_cache
) + store.list_stream_feature_views(
allow_cache=request.allow_registry_cache
from feast.data_source import PushSource

all_fvs = store.list_feature_views(
allow_cache=request.allow_registry_cache
) + store.list_stream_feature_views(allow_cache=request.allow_registry_cache)
fvs_with_push_sources = {
fv
for fv in all_fvs
if (
fv.stream_source is not None
and isinstance(fv.stream_source, PushSource)
and fv.stream_source.name == request.push_source_name
)
fvs_with_push_sources = {
fv
for fv in all_fvs
if (
fv.stream_source is not None
and isinstance(fv.stream_source, PushSource)
and fv.stream_source.name == request.push_source_name
)
}
}

for feature_view in fvs_with_push_sources:
assert_permissions(resource=feature_view, actions=actions)
for feature_view in fvs_with_push_sources:
assert_permissions(resource=feature_view, actions=actions)

store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
)
except PushSourceNotFoundException as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=422, detail=str(e))
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
)

@app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)])
def write_to_online_store(body=Depends(get_body)):
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
try:
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
try:
feature_view = store.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = store.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)

assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
feature_view = store.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
store.write_to_online_store(
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
except FeatureViewNotFoundException:
feature_view = store.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

assert_permissions(resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE])
store.write_to_online_store(
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
)

@app.get("/health")
def health():
return Response(status_code=status.HTTP_200_OK)

@app.post("/materialize", dependencies=[Depends(inject_user_details)])
def materialize(body=Depends(get_body)):
try:
request = MaterializeRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
request.feature_views,
request = MaterializeRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
request.feature_views,
)

@app.post("/materialize-incremental", dependencies=[Depends(inject_user_details)])
def materialize_incremental(body=Depends(get_body)):
try:
request = MaterializeIncrementalRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
request = MaterializeIncrementalRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
)

@app.exception_handler(Exception)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice, I didn't realize FastAPI had a global exception handler.

https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers

async def rest_exception_handler(request: Request, exc: Exception):
# Print the original exception on the server side
logger.exception(traceback.format_exc())

if isinstance(exc, FeastError):
return JSONResponse(
status_code=exc.http_status_code(),
content=exc.to_error_detail(),
)
else:
return JSONResponse(
status_code=500,
content=str(exc),
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

return app

Expand Down
18 changes: 12 additions & 6 deletions sdk/python/feast/infra/online_stores/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
from datetime import datetime
from typing import Any, Callable, Dict, List, Literal, Optional, Sequence, Tuple

import requests
from pydantic import StrictStr

from feast import Entity, FeatureView, RepoConfig
from feast.infra.online_stores.online_store import OnlineStore
from feast.permissions.client.http_auth_requests_wrapper import (
get_http_auth_requests_session,
)
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
from feast.protos.feast.types.Value_pb2 import Value as ValueProto
from feast.repo_config import FeastConfigBaseModel
from feast.rest_error_handler import rest_error_handling_decorator
from feast.type_map import python_values_to_proto_values
from feast.value_type import ValueType

Expand Down Expand Up @@ -72,9 +71,7 @@ def online_read(
req_body = self._construct_online_read_api_json_request(
entity_keys, table, requested_features
)
response = get_http_auth_requests_session(config.auth_config).post(
f"{config.online_store.path}/get-online-features", data=req_body
)
response = get_remote_online_features(config=config, req_body=req_body)
if response.status_code == 200:
logger.debug("Able to retrieve the online features from feature server.")
response_json = json.loads(response.text)
Expand Down Expand Up @@ -167,3 +164,12 @@ def teardown(
entities: Sequence[Entity],
):
pass


@rest_error_handling_decorator
def get_remote_online_features(
session: requests.Session, config: RepoConfig, req_body: str
) -> requests.Response:
return session.post(
f"{config.online_store.path}/get-online-features", data=req_body
)
Loading
Loading