Skip to content

Commit

Permalink
Refactor: uncouple datasets and dataset_sharing modules - part 2-3 (#…
Browse files Browse the repository at this point in the history
…1213)

### Feature or Bugfix
- Refactoring
⚠️ MERGE AFTER #1187

### Detail
This is needed as explained in full PR [AFTER 2.4] Refactor: uncouple
datasets and dataset_sharing modules #1179

- [X] Creates an interface to execute checks and clean-ups of data
sharing objects when dataset objects are deleted (initially it was going
to be an db interface, but I think it is better in the service)
- [X] Move listDatasetShares query to dataset_sharing module in
#1185

### Relates
-  #1179

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
dlpzx authored May 2, 2024
1 parent 750a5ec commit 2ea24cb
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 57 deletions.
3 changes: 3 additions & 0 deletions backend/dataall/modules/dataset_sharing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ def depends_on() -> List[Type['ModuleInterface']]:
def __init__(self):
from dataall.modules.dataset_sharing import api
from dataall.modules.dataset_sharing.services.managed_share_policy_service import SharePolicyService
from dataall.modules.datasets.services.dataset_service import DatasetService
from dataall.modules.dataset_sharing.services.dataset_sharing_service import DatasetSharingService

EnvironmentResourceManager.register(ShareEnvironmentResource())
DatasetService.register(DatasetSharingService())
log.info('API of dataset sharing has been imported')


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ def delete_shares_with_no_shared_items(session, dataset_uri):
session.delete(share_obj)

@staticmethod
def _query_user_datasets(session, username, groups, filter) -> Query:
def query_user_shared_datasets(session, username, groups) -> Query:
share_item_shared_states = ShareItemSM.get_share_item_shared_states()
query = (
session.query(Dataset)
Expand All @@ -982,9 +982,6 @@ def _query_user_datasets(session, username, groups, filter) -> Query:
.outerjoin(ShareObjectItem, ShareObjectItem.shareUri == ShareObject.shareUri)
.filter(
or_(
Dataset.owner == username,
Dataset.SamlAdminGroupName.in_(groups),
Dataset.stewards.in_(groups),
and_(
ShareObject.principalId.in_(groups),
ShareObjectItem.status.in_(share_item_shared_states),
Expand All @@ -996,23 +993,8 @@ def _query_user_datasets(session, username, groups, filter) -> Query:
)
)
)
if filter and filter.get('term'):
query = query.filter(
or_(
Dataset.description.ilike(filter.get('term') + '%%'),
Dataset.label.ilike(filter.get('term') + '%%'),
)
)
return query.distinct(Dataset.datasetUri)

@staticmethod
def paginated_user_datasets(session, username, groups, data=None) -> dict:
return paginate(
query=ShareObjectRepository._query_user_datasets(session, username, groups, data),
page=data.get('page', 1),
page_size=data.get('pageSize', 10),
).to_dict()

@staticmethod
def find_dataset_shares(session, dataset_uri):
return session.query(ShareObject).filter(ShareObject.datasetUri == dataset_uri).all()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.base.context import get_context
from dataall.base.db import exceptions
from dataall.base.aws.sts import SessionHelper
from dataall.modules.dataset_sharing.db.share_object_repositories import (
ShareObjectRepository,
Expand All @@ -12,17 +13,63 @@
from dataall.modules.datasets.services.dataset_permissions import (
MANAGE_DATASETS,
UPDATE_DATASET,
DELETE_DATASET,
DELETE_DATASET_TABLE,
DELETE_DATASET_FOLDER,
CREDENTIALS_DATASET,
)

from dataall.modules.datasets_base.db.dataset_models import Dataset
from dataall.modules.datasets.services.dataset_service import DatasetServiceInterface


import logging

log = logging.getLogger(__name__)


class DatasetSharingService:
class DatasetSharingService(DatasetServiceInterface):
@staticmethod
def check_before_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
has_share = ShareObjectRepository.has_shared_items(session, uri)
if has_share:
raise exceptions.ResourceShared(
action=action,
message='Revoke all shares for this item before deletion',
)
elif action in [DELETE_DATASET]:
shares = ShareObjectRepository.list_dataset_shares_with_existing_shared_items(
session=session, dataset_uri=uri
)
if shares:
raise exceptions.ResourceShared(
action=DELETE_DATASET,
message='Revoke all dataset shares before deletion.',
)
else:
raise exceptions.RequiredParameter('Delete action')
return True

@staticmethod
def execute_on_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
ShareObjectRepository.delete_shares(session, uri)
elif action in [DELETE_DATASET]:
ShareObjectRepository.delete_shares_with_no_shared_items(session, uri)
else:
raise exceptions.RequiredParameter('Delete action')
return True

@staticmethod
def append_to_list_user_datasets(session, username, groups):
"""Implemented as part of the DatasetServiceInterface"""
return ShareObjectRepository.query_user_shared_datasets(session, username, groups)

@staticmethod
@TenantPolicyService.has_tenant_permission(MANAGE_DATASETS)
@ResourcePolicyService.has_resource_permission(UPDATE_DATASET)
Expand Down
4 changes: 2 additions & 2 deletions backend/dataall/modules/datasets/api/dataset/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataall.modules.datasets.api.dataset.input_types import DatasetFilter
from dataall.modules.datasets.api.dataset.resolvers import (
get_dataset,
list_owned_shared_datasets,
list_all_user_datasets,
list_owned_datasets,
get_dataset_assume_role_url,
get_file_upload_presigned_url,
Expand All @@ -24,7 +24,7 @@
name='listDatasets',
args=[gql.Argument('filter', DatasetFilter)],
type=DatasetSearchResult,
resolver=list_owned_shared_datasets,
resolver=list_all_user_datasets,
test_scope='Dataset',
)

Expand Down
4 changes: 2 additions & 2 deletions backend/dataall/modules/datasets/api/dataset/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def get_file_upload_presigned_url(context, source, datasetUri: str = None, input
return DatasetService.get_file_upload_presigned_url(uri=datasetUri, data=input)


def list_owned_shared_datasets(context: Context, source, filter: dict = None):
def list_all_user_datasets(context: Context, source, filter: dict = None):
if not filter:
filter = {'page': 1, 'pageSize': 5}
return DatasetService.list_owned_shared_datasets(filter)
return DatasetService.list_all_user_datasets(filter)


def list_owned_datasets(context: Context, source, filter: dict = None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
from dataall.modules.catalog.db.glossary_repositories import GlossaryRepository
from dataall.base.db.exceptions import ResourceShared, ResourceAlreadyExists
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository
from dataall.modules.datasets.services.dataset_service import DatasetService
from dataall.modules.datasets.aws.s3_location_client import S3LocationClient
from dataall.modules.datasets.db.dataset_location_repositories import DatasetLocationRepository
from dataall.modules.datasets.indexers.location_indexer import DatasetLocationIndexer
Expand Down Expand Up @@ -86,15 +86,9 @@ def update_storage_location(uri: str, data: dict):
def remove_storage_location(uri: str = None):
with get_context().db_engine.scoped_session() as session:
location = DatasetLocationRepository.get_location_by_uri(session, uri)
DatasetService.check_before_delete(session, location.locationUri, action=DELETE_DATASET_FOLDER)
DatasetService.execute_on_delete(session, location.locationUri, action=DELETE_DATASET_FOLDER)
dataset = DatasetRepository.get_dataset_by_uri(session, location.datasetUri)
has_shares = ShareObjectRepository.has_shared_items(session, location.locationUri)
if has_shares:
raise ResourceShared(
action=DELETE_DATASET_FOLDER,
message='Revoke all folder shares before deletion',
)

ShareObjectRepository.delete_shares(session, location.locationUri)
DatasetLocationService._delete_dataset_folder_read_permission(session, dataset, location.locationUri)
DatasetLocationRepository.delete(session, location)
GlossaryRepository.delete_glossary_terms_links(
Expand Down
73 changes: 59 additions & 14 deletions backend/dataall/modules/datasets/services/dataset_service.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import os
import json
import logging

from typing import List
from abc import ABC, abstractmethod
from dataall.base.aws.quicksight import QuicksightClient
from dataall.base.db import exceptions
from dataall.base.utils.naming_convention import NamingConventionPattern
Expand Down Expand Up @@ -50,7 +51,56 @@
log = logging.getLogger(__name__)


class DatasetServiceInterface(ABC):
@staticmethod
@abstractmethod
def check_before_delete(session, uri, **kwargs) -> bool:
"""Abstract method to be implemented by dependent modules that want to add checks before deletion for dataset objects"""
...

@staticmethod
@abstractmethod
def execute_on_delete(session, uri, **kwargs) -> bool:
"""Abstract method to be implemented by dependent modules that want to add clean-up actions when a dataset object is deleted"""
...

@staticmethod
@abstractmethod
def append_to_list_user_datasets(session, username, groups):
"""Abstract method to be implemented by dependent modules that want to add datasets to the list_datasets that list all datasets that the user has access to"""
...


class DatasetService:
_interfaces: List[DatasetServiceInterface] = []

@classmethod
def register(cls, interface: DatasetServiceInterface):
cls._interfaces.append(interface)

@classmethod
def check_before_delete(cls, session, uri, **kwargs) -> bool:
"""All actions from other modules that need to be executed before deletion"""
can_be_deleted = [interface.check_before_delete(session, uri, **kwargs) for interface in cls._interfaces]
return all(can_be_deleted)

@classmethod
def execute_on_delete(cls, session, uri, **kwargs) -> bool:
"""All actions from other modules that need to be executed during deletion"""
for interface in cls._interfaces:
interface.execute_on_delete(session, uri, **kwargs)
return True

@classmethod
def _list_all_user_interface_datasets(cls, session, username, groups) -> List:
"""All list_datasets from other modules that need to be appended to the list of datasets"""
return [
query
for interface in cls._interfaces
for query in [interface.append_to_list_user_datasets(session, username, groups)]
if query.first() is not None
]

@staticmethod
def check_dataset_account(session, environment):
dashboards_enabled = EnvironmentService.get_boolean_env_param(session, environment, 'dashboardsEnabled')
Expand Down Expand Up @@ -187,10 +237,13 @@ def get_file_upload_presigned_url(uri: str, data: dict):
return S3DatasetClient(dataset).get_file_upload_presigned_url(data)

@staticmethod
def list_owned_shared_datasets(data: dict):
def list_all_user_datasets(data: dict):
context = get_context()
with context.db_engine.scoped_session() as session:
return ShareObjectRepository.paginated_user_datasets(session, context.username, context.groups, data=data)
all_subqueries = DatasetService._list_all_user_interface_datasets(session, context.username, context.groups)
return DatasetRepository.paginated_all_user_datasets(
session, context.username, context.groups, all_subqueries, data=data
)

@staticmethod
def list_owned_datasets(data: dict):
Expand Down Expand Up @@ -291,7 +344,7 @@ def get_dataset_assume_role_url(uri):
else:
raise exceptions.UnauthorizedOperation(
action=CREDENTIALS_DATASET,
message=f'User: {context.username} is not a member of the group {dataset.SamlAdminGroupName}',
message=f'{context.username=} is not a member of the group {dataset.SamlAdminGroupName}',
)
pivot_session = SessionHelper.remote_session(account_id, region)
aws_session = SessionHelper.get_session(base_session=pivot_session, role_arn=role_arn)
Expand Down Expand Up @@ -357,15 +410,7 @@ def delete_dataset(uri: str, delete_from_aws: bool = False):
with context.db_engine.scoped_session() as session:
dataset: Dataset = DatasetRepository.get_dataset_by_uri(session, uri)
env = EnvironmentService.get_environment_by_uri(session, dataset.environmentUri)
shares = ShareObjectRepository.list_dataset_shares_with_existing_shared_items(
session=session, dataset_uri=uri
)
if shares:
raise exceptions.UnauthorizedOperation(
action=DELETE_DATASET,
message=f'Dataset {dataset.name} is shared with other teams. '
'Revoke all dataset shares before deletion.',
)
DatasetService.check_before_delete(session, uri, action=DELETE_DATASET)

tables = [t.tableUri for t in DatasetRepository.get_dataset_tables(session, uri)]
for tableUri in tables:
Expand All @@ -377,7 +422,7 @@ def delete_dataset(uri: str, delete_from_aws: bool = False):

DatasetIndexer.delete_doc(doc_id=uri)

ShareObjectRepository.delete_shares_with_no_shared_items(session, uri)
DatasetService.execute_on_delete(session, uri, action=DELETE_DATASET)
DatasetService.delete_dataset_term_links(session, uri)
DatasetTableRepository.delete_dataset_tables(session, dataset.datasetUri)
DatasetLocationRepository.delete_dataset_locations(session, dataset.datasetUri)
Expand Down
13 changes: 3 additions & 10 deletions backend/dataall/modules/datasets/services/dataset_table_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
from dataall.modules.catalog.db.glossary_repositories import GlossaryRepository
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.base.db.exceptions import ResourceShared
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository
from dataall.modules.datasets.aws.athena_table_client import AthenaTableClient
from dataall.modules.datasets.aws.glue_dataset_client import DatasetCrawler
from dataall.modules.datasets.db.dataset_table_repositories import DatasetTableRepository
Expand All @@ -25,6 +23,7 @@
DATASET_TABLE_READ,
GET_DATASET_TABLE,
)
from dataall.modules.datasets.services.dataset_service import DatasetService
from dataall.base.utils import json_utils

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -67,14 +66,8 @@ def update_table(uri: str, table_data: dict = None):
def delete_table(uri: str):
with get_context().db_engine.scoped_session() as session:
table = DatasetTableRepository.get_dataset_table_by_uri(session, uri)
has_share = ShareObjectRepository.has_shared_items(session, table.tableUri)
if has_share:
raise ResourceShared(
action=DELETE_DATASET_TABLE,
message='Revoke all table shares before deletion',
)

ShareObjectRepository.delete_shares(session, table.tableUri)
DatasetService.check_before_delete(session, table.tableUri, action=DELETE_DATASET_TABLE)
DatasetService.execute_on_delete(session, table.tableUri, action=DELETE_DATASET_TABLE)
DatasetTableRepository.delete(session, table)

GlossaryRepository.delete_glossary_terms_links(
Expand Down
33 changes: 33 additions & 0 deletions backend/dataall/modules/datasets_base/db/dataset_repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,39 @@ def delete_dataset_lock(session, dataset: Dataset):
session.delete(dataset_lock)
session.commit()

@staticmethod
def paginated_all_user_datasets(session, username, groups, all_subqueries, data=None) -> dict:
return paginate(
query=DatasetRepository._query_all_user_datasets(session, username, groups, all_subqueries, data),
page=data.get('page', 1),
page_size=data.get('pageSize', 10),
).to_dict()

@staticmethod
def _query_all_user_datasets(session, username, groups, all_subqueries, filter) -> Query:
query = session.query(Dataset).filter(
or_(
Dataset.owner == username,
Dataset.SamlAdminGroupName.in_(groups),
Dataset.stewards.in_(groups),
)
)
if query.first() is not None:
all_subqueries.append(query)
if len(all_subqueries) == 1:
query = all_subqueries[0]
elif len(all_subqueries) > 1:
query = all_subqueries[0].union(*all_subqueries[1:])

if filter and filter.get('term'):
union_query = query.filter(
or_(
Dataset.description.ilike(filter.get('term') + '%%'),
Dataset.label.ilike(filter.get('term') + '%%'),
)
)
return query.distinct(Dataset.datasetUri)

@staticmethod
def paginated_dataset_tables(session, uri, data=None) -> dict:
query = (
Expand Down

0 comments on commit 2ea24cb

Please sign in to comment.