Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic shares_base module and specific s3_datasets_shares module - part 11 (renaming and cleaning up s3_shares) #1359

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions backend/dataall/modules/s3_datasets_shares/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ def depends_on() -> List[Type['ModuleInterface']]:
def __init__(self):
from dataall.core.environment.services.environment_resource_manager import EnvironmentResourceManager
from dataall.modules.s3_datasets_shares import api
from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import SharePolicyService
from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import S3SharePolicyService
from dataall.modules.s3_datasets.services.dataset_service import DatasetService
from dataall.modules.datasets_base.services.dataset_list_service import DatasetListService
from dataall.modules.s3_datasets_shares.services.dataset_sharing_service import DatasetSharingService
from dataall.modules.s3_datasets_shares.db.share_object_repositories import ShareEnvironmentResource
from dataall.modules.s3_datasets_shares.services.s3_share_dataset_service import S3ShareDatasetService
from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareEnvironmentResource
from dataall.modules.shares_base.services.share_processor_manager import (
ShareProcessorManager,
ShareProcessorDefinition,
)
from dataall.modules.shares_base.services.shares_enums import ShareableType
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, DatasetBucket, DatasetStorageLocation

EnvironmentResourceManager.register(ShareEnvironmentResource())
DatasetService.register(DatasetSharingService())
DatasetListService.register(DatasetSharingService())
EnvironmentResourceManager.register(S3ShareEnvironmentResource())
DatasetService.register(S3ShareDatasetService())
DatasetListService.register(S3ShareDatasetService())

ShareProcessorManager.register_processor(
ShareProcessorDefinition(ShareableType.Table, None, DatasetTable, DatasetTable.tableUri)
Expand Down Expand Up @@ -77,7 +77,7 @@ def depends_on() -> List[Type['ModuleInterface']]:
]

def __init__(self):
log.info('S3 Sharing handlers have been imported')
log.info('s3_datasets_shares handlers have been imported')


class S3DatasetsSharesCdkModuleInterface(ModuleInterface):
Expand All @@ -89,9 +89,9 @@ def is_supported(modes):

def __init__(self):
import dataall.modules.s3_datasets_shares.cdk
from dataall.modules.s3_datasets_shares.services.managed_share_policy_service import SharePolicyService
from dataall.modules.s3_datasets_shares.services.s3_share_managed_policy_service import S3SharePolicyService

log.info('CDK module data_sharing has been imported')
log.info('CDK module s3_datasets_shares has been imported')


class S3DatasetsSharesECSShareModuleInterface(ModuleInterface):
Expand Down Expand Up @@ -144,4 +144,4 @@ def __init__(self):
)
)

log.info('ECS Share module s3_data_sharing has been imported')
log.info('ECS Share module s3_datasets_shares has been imported')
16 changes: 7 additions & 9 deletions backend/dataall/modules/s3_datasets_shares/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dataall.base.api.context import Context
from dataall.base.db.exceptions import RequiredParameter
from dataall.base.feature_toggle_checker import is_feature_enabled
from dataall.modules.s3_datasets_shares.services.dataset_sharing_service import DatasetSharingService
from dataall.modules.s3_datasets_shares.services.s3_share_service import S3ShareService


log = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,32 +41,30 @@ def validate_dataset_share_selector_input(data):


def list_shared_tables_by_env_dataset(context: Context, source, datasetUri: str, envUri: str):
return DatasetSharingService.list_shared_tables_by_env_dataset(datasetUri, envUri)
return S3ShareService.list_shared_tables_by_env_dataset(datasetUri, envUri)


@is_feature_enabled('modules.s3_datasets.features.aws_actions')
def get_dataset_shared_assume_role_url(context: Context, source, datasetUri: str = None):
return DatasetSharingService.get_dataset_shared_assume_role_url(uri=datasetUri)
return S3ShareService.get_dataset_shared_assume_role_url(uri=datasetUri)


def verify_dataset_share_objects(context: Context, source, input):
RequestValidator.validate_dataset_share_selector_input(input)
dataset_uri = input.get('datasetUri')
verify_share_uris = input.get('shareUris')
return DatasetSharingService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)
return S3ShareService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)


def get_s3_consumption_data(context: Context, source, shareUri: str):
return DatasetSharingService.get_s3_consumption_data(uri=shareUri)
return S3ShareService.get_s3_consumption_data(uri=shareUri)


def list_shared_databases_tables_with_env_group(context: Context, source, environmentUri: str, groupUri: str):
return DatasetSharingService.list_shared_databases_tables_with_env_group(
environmentUri=environmentUri, groupUri=groupUri
)
return S3ShareService.list_shared_databases_tables_with_env_group(environmentUri=environmentUri, groupUri=groupUri)


def resolve_shared_db_name(context: Context, source, **kwargs):
return DatasetSharingService.resolve_shared_db_name(
return S3ShareService.resolve_shared_db_name(
source.GlueDatabaseName, source.shareUri, source.targetEnvAwsAccountId, source.targetEnvRegion
)
12 changes: 12 additions & 0 deletions backend/dataall/modules/s3_datasets_shares/aws/glue_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ def get_source_catalog(self):
raise e
return None

def get_glue_database_from_catalog(self):
# Check if a catalog account exists and return database accordingly
try:
catalog_dict = self.get_source_catalog()

if catalog_dict is not None:
return catalog_dict.get('database_name')
else:
return self._database
except Exception as e:
raise e
Comment on lines +187 to +188
Copy link
Contributor

@petrkalos petrkalos Jun 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: all it does here is catch and rethrow so the block isn't required


def get_database_tags(self):
# Get tags from the glue database
account_id = self._account_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,27 @@
from warnings import warn
from typing import List

from sqlalchemy import and_, or_, func, case
from sqlalchemy import and_, or_
from sqlalchemy.orm import Query

from dataall.core.environment.db.environment_models import Environment, EnvironmentGroup
from dataall.core.environment.db.environment_models import Environment
from dataall.core.environment.services.environment_resource_manager import EnvironmentResource
from dataall.base.db import exceptions, paginate
from dataall.modules.shares_base.services.shares_enums import (
ShareItemHealthStatus,
ShareObjectStatus,
ShareItemStatus,
ShareableType,
PrincipalType,
)
from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository
from dataall.modules.shares_base.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset, DatasetBucket
from dataall.modules.s3_datasets.db.dataset_models import DatasetTable, S3Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetBase

logger = logging.getLogger(__name__)


class ShareEnvironmentResource(EnvironmentResource):
class S3ShareEnvironmentResource(EnvironmentResource):
@staticmethod
def count_resources(session, environment, group_uri) -> int:
return S3ShareObjectRepository.count_S3_principal_shares(
Expand Down Expand Up @@ -395,124 +392,6 @@ def list_s3_dataset_shares_with_existing_shared_items(
query = query.filter(ShareObjectItem.itemType == item_type)
return query.all()

@staticmethod # TODO!!!
def list_shareable_items(session, share, states, data): # TODO
# All tables from dataset with a column isShared
# marking the table as part of the shareObject
tables = (
session.query(
DatasetTable.tableUri.label('itemUri'),
func.coalesce('DatasetTable').label('itemType'),
DatasetTable.GlueTableName.label('itemName'),
DatasetTable.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
ShareObjectItem.healthStatus.label('healthStatus'),
ShareObjectItem.healthMessage.label('healthMessage'),
ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetTable.tableUri == ShareObjectItem.itemUri,
),
)
.filter(DatasetTable.datasetUri == share.datasetUri)
)
if states:
tables = tables.filter(ShareObjectItem.status.in_(states))

# All folders from the dataset with a column isShared
# marking the folder as part of the shareObject
locations = (
session.query(
DatasetStorageLocation.locationUri.label('itemUri'),
func.coalesce('DatasetStorageLocation').label('itemType'),
DatasetStorageLocation.S3Prefix.label('itemName'),
DatasetStorageLocation.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
ShareObjectItem.healthStatus.label('healthStatus'),
ShareObjectItem.healthMessage.label('healthMessage'),
ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetStorageLocation.locationUri == ShareObjectItem.itemUri,
),
)
.filter(DatasetStorageLocation.datasetUri == share.datasetUri)
)
if states:
locations = locations.filter(ShareObjectItem.status.in_(states))

s3_buckets = (
session.query(
DatasetBucket.bucketUri.label('itemUri'),
func.coalesce('S3Bucket').label('itemType'),
DatasetBucket.S3BucketName.label('itemName'),
DatasetBucket.description.label('description'),
ShareObjectItem.shareItemUri.label('shareItemUri'),
ShareObjectItem.status.label('status'),
ShareObjectItem.healthStatus.label('healthStatus'),
ShareObjectItem.healthMessage.label('healthMessage'),
ShareObjectItem.lastVerificationTime.label('lastVerificationTime'),
case(
[(ShareObjectItem.shareItemUri.isnot(None), True)],
else_=False,
).label('isShared'),
)
.outerjoin(
ShareObjectItem,
and_(
ShareObjectItem.shareUri == share.shareUri,
DatasetBucket.bucketUri == ShareObjectItem.itemUri,
),
)
.filter(DatasetBucket.datasetUri == share.datasetUri)
)
if states:
s3_buckets = s3_buckets.filter(ShareObjectItem.status.in_(states))

shareable_objects = tables.union(locations, s3_buckets).subquery('shareable_objects')
query = session.query(shareable_objects)

if data:
if data.get('term'):
term = data.get('term')
query = query.filter(
or_(
shareable_objects.c.itemName.ilike(term + '%'),
shareable_objects.c.description.ilike(term + '%'),
)
)
if 'isShared' in data:
is_shared = data.get('isShared')
query = query.filter(shareable_objects.c.isShared == is_shared)

if 'isHealthy' in data:
# healthy_status = ShareItemHealthStatus.Healthy.value
query = (
query.filter(shareable_objects.c.healthStatus == ShareItemHealthStatus.Healthy.value)
if data.get('isHealthy')
else query.filter(shareable_objects.c.healthStatus != ShareItemHealthStatus.Healthy.value)
)

return paginate(
query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10)
).to_dict()

# the next 2 methods are used in subscription task
@staticmethod
def find_share_items_by_item_uri(session, item_uri):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
log = logging.getLogger(__name__)


class DatasetSharingAlarmService(AlarmService):
class S3ShareAlarmService(AlarmService):
"""Contains set of alarms for datasets"""

def trigger_table_sharing_failure_alarm(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.base.db import exceptions
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.s3_datasets_shares.db.s3_share_object_repositories import S3ShareObjectRepository
from dataall.modules.shares_base.services.share_permissions import SHARE_OBJECT_APPROVER
from dataall.modules.s3_datasets.services.dataset_permissions import (
DELETE_DATASET,
DELETE_DATASET_TABLE,
DELETE_DATASET_FOLDER,
)
from dataall.modules.datasets_base.services.datasets_enums import DatasetRole, DatasetTypes
from dataall.modules.datasets_base.services.dataset_service_interface import DatasetServiceInterface


import logging

log = logging.getLogger(__name__)


class S3ShareDatasetService(DatasetServiceInterface):
@property
def dataset_type(self):
return DatasetTypes.S3

@staticmethod
def resolve_additional_dataset_user_role(session, uri, username, groups):
"""Implemented as part of the DatasetServiceInterface"""
share = S3ShareObjectRepository.get_share_by_dataset_attributes(session, uri, username, groups)
if share is not None:
return DatasetRole.Shared.value
return None

@staticmethod
def check_before_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
existing_s3_shared_items = S3ShareObjectRepository.check_existing_s3_shared_items(session, uri)
if existing_s3_shared_items:
raise exceptions.ResourceShared(
action=action,
message='Revoke all shares for this item before deletion',
)
elif action in [DELETE_DATASET]:
shares = S3ShareObjectRepository.list_s3_dataset_shares_with_existing_shared_items(
session=session, dataset_uri=uri
)
if shares:
raise exceptions.ResourceShared(
action=DELETE_DATASET,
message='Revoke all dataset shares before deletion.',
)
else:
raise exceptions.RequiredParameter('Delete action')
return True

@staticmethod
def execute_on_delete(session, uri, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
action = kwargs.get('action')
if action in [DELETE_DATASET_FOLDER, DELETE_DATASET_TABLE]:
S3ShareObjectRepository.delete_s3_share_item(session, uri)
elif action in [DELETE_DATASET]:
S3ShareObjectRepository.delete_s3_shares_with_no_shared_items(session, uri)
else:
raise exceptions.RequiredParameter('Delete action')
return True

@staticmethod
def append_to_list_user_datasets(session, username, groups):
"""Implemented as part of the DatasetServiceInterface"""
return S3ShareObjectRepository.list_user_s3_shared_datasets(session, username, groups)

@staticmethod
def extend_attach_steward_permissions(session, dataset, new_stewards, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
if dataset_shares:
for share in dataset_shares:
ResourcePolicyService.attach_resource_policy(
session=session,
group=new_stewards,
permissions=SHARE_OBJECT_APPROVER,
resource_uri=share.shareUri,
resource_type=ShareObject.__name__,
)
if dataset.stewards != dataset.SamlAdminGroupName:
ResourcePolicyService.delete_resource_policy(
session=session,
group=dataset.stewards,
resource_uri=share.shareUri,
)

@staticmethod
def extend_delete_steward_permissions(session, dataset, **kwargs):
"""Implemented as part of the DatasetServiceInterface"""
dataset_shares = S3ShareObjectRepository.find_s3_dataset_shares(session, dataset.datasetUri)
if dataset_shares:
for share in dataset_shares:
if dataset.stewards != dataset.SamlAdminGroupName:
ResourcePolicyService.delete_resource_policy(
session=session,
group=dataset.stewards,
resource_uri=share.shareUri,
)
Loading
Loading