Skip to content

Commit

Permalink
Added permission assert check for registry server, offline server, on…
Browse files Browse the repository at this point in the history
…line server functions

Signed-off-by: Abdul Hameed <ahameed@redhat.com>
  • Loading branch information
redhatHameed committed Jul 3, 2024
1 parent 8a57b38 commit 285856b
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 100 deletions.
84 changes: 77 additions & 7 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from feast import proto_json, utils
from feast.constants import DEFAULT_FEATURE_SERVER_REGISTRY_TTL
from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.errors import FeatureViewNotFoundException, PushSourceNotFoundException
from feast.permissions.action import WRITE, AuthzedAction
from feast.permissions.security_manager import assert_permissions


# TODO: deprecate this in favor of push features
Expand Down Expand Up @@ -86,19 +88,40 @@ async def get_body(request: Request):
def get_online_features(body=Depends(get_body)):
try:
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
# Initialize parameters for FeatureStore.get_online_features(...) call
if "feature_service" in body:
features = store.get_feature_service(
feature_service = store.get_feature_service(
body["feature_service"], allow_cache=True
)
assert_permissions(
resource=feature_service, actions=[AuthzedAction.QUERY_ONLINE]
)
features = feature_service
else:
features = body["features"]

full_feature_names = body.get("full_feature_names", False)
all_feature_views, all_on_demand_feature_views = (
utils._get_feature_views_to_use(
store.registry,
store.project,
features,
allow_cache=True,
hide_dummy_entity=False,
)
)
for feature_view in all_feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.QUERY_ONLINE]
)
for od_feature_view in all_on_demand_feature_views:
assert_permissions(
resource=od_feature_view, actions=[AuthzedAction.QUERY_ONLINE]
)

response_proto = store.get_online_features(
features=features,
entity_rows=body["entities"],
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto

Expand All @@ -117,16 +140,41 @@ def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
actions = []
if request.to == "offline":
to = PushMode.OFFLINE
actions = [AuthzedAction.WRITE_OFFLINE]
elif request.to == "online":
to = PushMode.ONLINE
actions = [AuthzedAction.WRITE_ONLINE]
elif request.to == "online_and_offline":
to = PushMode.ONLINE_AND_OFFLINE
actions = WRITE
else:
raise ValueError(
f"{request.to} is not a supported push format. Please specify one of these ['online', 'offline', 'online_and_offline']."
)

from feast.data_source import PushSource

all_fvs = store.list_feature_views(
allow_cache=request.allow_registry_cache
) + store.list_stream_feature_views(
allow_cache=request.allow_registry_cache
)
fvs_with_push_sources = {
fv
for fv in all_fvs
if (
fv.stream_source is not None
and isinstance(fv.stream_source, PushSource)
and fv.stream_source.name == request.push_source_name
)
}

for feature_view in fvs_with_push_sources:
assert_permissions(resource=feature_view, actions=actions)

store.push(
push_source_name=request.push_source_name,
df=df,
Expand All @@ -149,10 +197,24 @@ def write_to_online_store(body=Depends(get_body)):
try:
request = WriteToFeatureStoreRequest(**json.loads(body))
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
try:
feature_view = store.get_stream_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)
except FeatureViewNotFoundException:
feature_view = store.get_feature_view(
feature_view_name, allow_registry_cache=allow_registry_cache
)

assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.write_to_online_store(
feature_view_name=request.feature_view_name,
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
allow_registry_cache=allow_registry_cache,
)
except Exception as e:
# Print the original exception on the server side
Expand All @@ -168,6 +230,10 @@ def health():
def materialize(body=Depends(get_body)):
try:
request = MaterializeRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
Expand All @@ -183,6 +249,10 @@ def materialize(body=Depends(get_body)):
def materialize_incremental(body=Depends(get_body)):
try:
request = MaterializeIncrementalRequest(**json.loads(body))
for feature_view in request.feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.WRITE_ONLINE]
)
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
)
Expand Down
36 changes: 31 additions & 5 deletions sdk/python/feast/offline_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import traceback
from datetime import datetime
from typing import Any, Dict, List
from typing import Any, Dict, List, cast

import pyarrow as pa
import pyarrow.flight as fl
Expand All @@ -12,6 +12,8 @@
from feast.feature_logging import FeatureServiceLoggingSource
from feast.feature_view import DUMMY_ENTITY_NAME
from feast.infra.offline_stores.offline_utils import get_offline_store_from_config
from feast.permissions.action import AuthzedAction
from feast.permissions.security_manager import assert_permissions
from feast.saved_dataset import SavedDatasetStorage

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -217,7 +219,15 @@ def offline_write_batch(self, command: dict, key: str):
assert len(feature_views) == 1, "incorrect feature view"
table = self.flights[key]
self.offline_store.offline_write_batch(
self.store.config, feature_views[0], table, command["progress"]
self.store.config,
cast(
FeatureView,
assert_permissions(
feature_views[0], actions=[AuthzedAction.WRITE_OFFLINE]
),
),
table,
command["progress"],
)

def _validate_write_logged_features_parameters(self, command: dict):
Expand All @@ -234,6 +244,10 @@ def write_logged_features(self, command: dict, key: str):
feature_service.logging_config is not None
), "feature service must have logging_config set"

assert_permissions(
resource=feature_service,
actions=[AuthzedAction.WRITE_OFFLINE],
)
self.offline_store.write_logged_features(
config=self.store.config,
data=table,
Expand All @@ -260,10 +274,12 @@ def _validate_pull_all_from_table_or_query_parameters(self, command: dict):

def pull_all_from_table_or_query(self, command: dict):
self._validate_pull_all_from_table_or_query_parameters(command)
data_source = self.store.get_data_source(command["data_source_name"])
assert_permissions(data_source, actions=[AuthzedAction.QUERY_OFFLINE])

return self.offline_store.pull_all_from_table_or_query(
self.store.config,
self.store.get_data_source(command["data_source_name"]),
data_source,
command["join_key_columns"],
command["feature_name_columns"],
command["timestamp_field"],
Expand All @@ -287,10 +303,11 @@ def _validate_pull_latest_from_table_or_query_parameters(self, command: dict):

def pull_latest_from_table_or_query(self, command: dict):
self._validate_pull_latest_from_table_or_query_parameters(command)

data_source = self.store.get_data_source(command["data_source_name"])
assert_permissions(resource=data_source, actions=[AuthzedAction.QUERY_OFFLINE])
return self.offline_store.pull_latest_from_table_or_query(
self.store.config,
self.store.get_data_source(command["data_source_name"]),
data_source,
command["join_key_columns"],
command["feature_name_columns"],
command["timestamp_field"],
Expand Down Expand Up @@ -343,6 +360,11 @@ def get_historical_features(self, command: dict, key: str):
project=project,
)

for feature_view in feature_views:
assert_permissions(
resource=feature_view, actions=[AuthzedAction.QUERY_OFFLINE]
)

retJob = self.offline_store.get_historical_features(
config=self.store.config,
feature_views=feature_views,
Expand Down Expand Up @@ -377,6 +399,10 @@ def persist(self, command: dict, key: str):
raise NotImplementedError

data_source = self.store.get_data_source(command["data_source_name"])
assert_permissions(
resource=data_source,
actions=[AuthzedAction.WRITE_OFFLINE],
)
storage = SavedDatasetStorage.from_data_source(data_source)
ret_job.persist(storage, command["allow_overwrite"], command["timeout"])
except Exception as e:
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/permissions/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,12 @@ class AuthzedAction(enum.Enum):
AuthzedAction.WRITE_OFFLINE,
AuthzedAction.WRITE_ONLINE,
]


# Alias for CRUD actions
CRUD = [
AuthzedAction.CREATE,
AuthzedAction.READ,
AuthzedAction.UPDATE,
AuthzedAction.DELETE,
]
Loading

0 comments on commit 285856b

Please sign in to comment.