Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AFTER 2.4] Refactor: uncouple datasets and dataset_sharing modules #1179

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions backend/dataall/modules/dataset_sharing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from dataall.core.environment.services.environment_resource_manager import EnvironmentResourceManager
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareEnvironmentResource
from dataall.modules.datasets_base import DatasetBaseModuleInterface
from dataall.base.loader import ModuleInterface, ImportMode


Expand All @@ -18,14 +17,18 @@ def is_supported(modes: Set[ImportMode]) -> bool:
@staticmethod
def depends_on() -> List[Type['ModuleInterface']]:
from dataall.modules.notifications import NotificationsModuleInterface
from dataall.modules.datasets import DatasetApiModuleInterface

return [DatasetBaseModuleInterface, NotificationsModuleInterface]
return [DatasetApiModuleInterface, NotificationsModuleInterface]

def __init__(self):
from dataall.modules.dataset_sharing import api
from dataall.modules.dataset_sharing.services.managed_share_policy_service import SharePolicyService
from dataall.modules.datasets.services.dataset_service import DatasetService
from dataall.modules.dataset_sharing.services.dataset_sharing_service import DatasetSharingService

EnvironmentResourceManager.register(ShareEnvironmentResource())
DatasetService.register(DatasetSharingService())
log.info('API of dataset sharing has been imported')


Expand All @@ -39,8 +42,9 @@ def is_supported(modes: List[ImportMode]):
@staticmethod
def depends_on() -> List[Type['ModuleInterface']]:
from dataall.modules.notifications import NotificationsModuleInterface
from dataall.modules.datasets import DatasetAsyncHandlersModuleInterface

return [DatasetBaseModuleInterface, NotificationsModuleInterface]
return [DatasetAsyncHandlersModuleInterface, NotificationsModuleInterface]

def __init__(self):
import dataall.modules.dataset_sharing.handlers
Expand Down
8 changes: 8 additions & 0 deletions backend/dataall/modules/dataset_sharing/api/mutations.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
update_share_reject_purpose,
update_share_request_purpose,
verify_items_share_object,
verify_dataset_share_objects,
)

createShareObject = gql.MutationField(
Expand Down Expand Up @@ -117,3 +118,10 @@
type=gql.Boolean,
resolver=update_share_request_purpose,
)

verifyDatasetShareObjects = gql.MutationField(
name='verifyDatasetShareObjects',
args=[gql.Argument(name='input', type=gql.Ref('ShareObjectSelectorInput'))],
type=gql.Boolean,
resolver=verify_dataset_share_objects,
)
32 changes: 32 additions & 0 deletions backend/dataall/modules/dataset_sharing/api/queries.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from dataall.base.api import gql
from dataall.modules.dataset_sharing.api.resolvers import (
get_dataset_shared_assume_role_url,
get_share_object,
list_dataset_share_objects,
list_shared_with_environment_data_items,
list_shares_in_my_inbox,
list_shares_in_my_outbox,
list_shared_tables_by_env_dataset,
)

getShareObject = gql.QueryField(
Expand Down Expand Up @@ -38,3 +41,32 @@
type=gql.Ref('EnvironmentPublishedItemSearchResults'),
test_scope='Dataset',
)

getDatasetSharedAssumeRoleUrl = gql.QueryField(
name='getDatasetSharedAssumeRoleUrl',
args=[gql.Argument(name='datasetUri', type=gql.String)],
type=gql.String,
resolver=get_dataset_shared_assume_role_url,
test_scope='Dataset',
)

listShareObjects = gql.QueryField(
name='listDatasetShareObjects',
resolver=list_dataset_share_objects,
args=[
gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String)),
gql.Argument(name='environmentUri', type=gql.String),
gql.Argument(name='page', type=gql.Integer),
],
type=gql.Ref('ShareSearchResult'),
)

getSharedDatasetTables = gql.QueryField(
name='getSharedDatasetTables',
args=[
gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String)),
gql.Argument(name='envUri', type=gql.NonNullableType(gql.String)),
],
type=gql.ArrayType(gql.Ref('SharedDatasetTableItem')),
resolver=list_shared_tables_by_env_dataset,
)
39 changes: 37 additions & 2 deletions backend/dataall/modules/dataset_sharing/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.core.organizations.db.organization_repositories import OrganizationRepository
from dataall.base.db.exceptions import RequiredParameter
from dataall.base.feature_toggle_checker import is_feature_enabled
from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareObjectPermission
from dataall.modules.dataset_sharing.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.dataset_sharing.services.share_item_service import ShareItemService
from dataall.modules.dataset_sharing.services.share_object_service import ShareObjectService
from dataall.modules.dataset_sharing.services.dataset_sharing_service import DatasetSharingService
from dataall.modules.dataset_sharing.aws.glue_client import GlueClient
from dataall.modules.datasets_base.db.dataset_repositories import DatasetRepository
from dataall.modules.datasets_base.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset
from dataall.modules.datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset

log = logging.getLogger(__name__)

Expand All @@ -37,6 +39,15 @@ def validate_item_selector_input(data):
if not data.get('itemUris'):
raise RequiredParameter('itemUris')

@staticmethod
def validate_dataset_share_selector_input(data):
if not data:
raise RequiredParameter(data)
if not data.get('datasetUri'):
raise RequiredParameter('datasetUri')
if not data.get('shareUris'):
raise RequiredParameter('shareUris')


def create_share_object(
context: Context,
Expand Down Expand Up @@ -303,3 +314,27 @@ def update_share_reject_purpose(context: Context, source, shareUri: str = None,
uri=shareUri,
reject_purpose=rejectPurpose,
)


def verify_dataset_share_objects(context: Context, source, input):
RequestValidator.validate_dataset_share_selector_input(input)
dataset_uri = input.get('datasetUri')
verify_share_uris = input.get('shareUris')
return DatasetSharingService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)


@is_feature_enabled('modules.datasets.features.aws_actions')
def get_dataset_shared_assume_role_url(context: Context, source, datasetUri: str = None):
return DatasetSharingService.get_dataset_shared_assume_role_url(uri=datasetUri)


def list_dataset_share_objects(context, source, filter: dict = None):
if not source:
return None
if not filter:
filter = {'page': 1, 'pageSize': 5}
return DatasetSharingService.list_dataset_share_objects(source, filter)


def list_shared_tables_by_env_dataset(context: Context, source, datasetUri: str, envUri: str):
return DatasetSharingService.list_shared_tables_by_env_dataset(datasetUri, envUri)
19 changes: 0 additions & 19 deletions backend/dataall/modules/dataset_sharing/aws/kms_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,3 @@ def get_key_id(self, key_alias: str):
return None
else:
return response['KeyMetadata']['KeyId']

def check_key_exists(self, key_alias: str):
try:
key_exist = False
paginator = self._client.get_paginator('list_aliases')
for page in paginator.paginate():
key_aliases = [alias['AliasName'] for alias in page['Aliases']]
if key_alias in key_aliases:
key_exist = True
break
except ClientError as e:
if e.response['Error']['Code'] == 'AccessDenied':
raise Exception(
f'Data.all Environment Pivot Role does not have kms:ListAliases Permission in account {self._account_id}: {e}'
)
log.error(f'Failed to list KMS key aliases in account {self._account_id}: {e}')
return None
else:
return key_exist
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
PrincipalType,
)
from dataall.modules.dataset_sharing.db.share_object_models import ShareObjectItem, ShareObject
from dataall.modules.datasets_base.db.dataset_repositories import DatasetRepository
from dataall.modules.datasets_base.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset, DatasetBucket
from dataall.modules.datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, Dataset, DatasetBucket

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -368,11 +368,11 @@ def get_share_by_uri(session, uri):
return share

@staticmethod
def get_share_by_dataset_attributes(session, dataset_uri, dataset_owner):
def get_share_by_dataset_attributes(session, dataset_uri, dataset_owner, groups=[]):
share: ShareObject = (
session.query(ShareObject)
.filter(ShareObject.datasetUri == dataset_uri)
.filter(ShareObject.owner == dataset_owner)
.filter(or_(ShareObject.owner == dataset_owner, ShareObject.principalId.in_(groups)))
.first()
)
return share
Expand Down Expand Up @@ -971,7 +971,7 @@ def delete_shares_with_no_shared_items(session, dataset_uri):
session.delete(share_obj)

@staticmethod
def _query_user_datasets(session, username, groups, filter) -> Query:
def query_user_shared_datasets(session, username, groups) -> Query:
share_item_shared_states = ShareItemSM.get_share_item_shared_states()
query = (
session.query(Dataset)
Expand All @@ -982,9 +982,6 @@ def _query_user_datasets(session, username, groups, filter) -> Query:
.outerjoin(ShareObjectItem, ShareObjectItem.shareUri == ShareObject.shareUri)
.filter(
or_(
Dataset.owner == username,
Dataset.SamlAdminGroupName.in_(groups),
Dataset.stewards.in_(groups),
and_(
ShareObject.principalId.in_(groups),
ShareObjectItem.status.in_(share_item_shared_states),
Expand All @@ -996,23 +993,8 @@ def _query_user_datasets(session, username, groups, filter) -> Query:
)
)
)
if filter and filter.get('term'):
query = query.filter(
or_(
Dataset.description.ilike(filter.get('term') + '%%'),
Dataset.label.ilike(filter.get('term') + '%%'),
)
)
return query.distinct(Dataset.datasetUri)

@staticmethod
def paginated_user_datasets(session, username, groups, data=None) -> dict:
return paginate(
query=ShareObjectRepository._query_user_datasets(session, username, groups, data),
page=data.get('page', 1),
page_size=data.get('pageSize', 10),
).to_dict()

@staticmethod
def find_dataset_shares(session, dataset_uri):
return session.query(ShareObject).filter(ShareObject.datasetUri == dataset_uri).all()
Expand Down Expand Up @@ -1291,3 +1273,40 @@ def count_role_principal_shares(session, principal_id: str, principal_type: Prin
)
.count()
)

@staticmethod
def query_dataset_tables_shared_with_env(
session, environment_uri: str, dataset_uri: str, username: str, groups: [str]
):
"""For a given dataset, returns the list of Tables shared with the environment
This means looking at approved ShareObject items
for the share object associating the dataset and environment
"""
share_item_shared_states = ShareItemSM.get_share_item_shared_states()
env_tables_shared = (
session.query(DatasetTable) # all tables
.join(
ShareObjectItem, # found in ShareObjectItem
ShareObjectItem.itemUri == DatasetTable.tableUri,
)
.join(
ShareObject, # jump to share object
ShareObject.shareUri == ShareObjectItem.shareUri,
)
.filter(
and_(
ShareObject.datasetUri == dataset_uri, # for this dataset
ShareObject.environmentUri == environment_uri, # for this environment
ShareObjectItem.status.in_(share_item_shared_states),
ShareObject.principalType
!= PrincipalType.ConsumptionRole.value, # Exclude Consumption roles shares
or_(
ShareObject.owner == username,
ShareObject.principalId.in_(groups),
),
)
)
.all()
)

return env_tables_shared
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
ShareItemStatus,
ShareableType,
)
from dataall.modules.datasets_base.db.dataset_models import DatasetLock
from dataall.modules.datasets.db.dataset_models import DatasetLock

log = logging.getLogger(__name__)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@

from dataall.core.environment.db.environment_models import Environment
from dataall.modules.dataset_sharing.db.share_object_models import ShareObject
from dataall.modules.datasets_base.db.dataset_models import DatasetTable, Dataset, DatasetStorageLocation, DatasetBucket
from dataall.modules.datasets.db.dataset_models import DatasetTable, Dataset, DatasetStorageLocation, DatasetBucket
from dataall.base.utils.alarm_service import AlarmService

log = logging.getLogger(__name__)


class DatasetAlarmService(AlarmService):
class DatasetSharingAlarmService(AlarmService):
"""Contains set of alarms for datasets"""

def trigger_table_sharing_failure_alarm(
Expand Down Expand Up @@ -72,24 +72,6 @@ def trigger_revoke_table_sharing_failure_alarm(
"""
return self.publish_message_to_alarms_topic(subject, message)

def trigger_dataset_sync_failure_alarm(self, dataset: Dataset, error: str):
log.info(f'Triggering dataset {dataset.name} tables sync failure alarm...')
subject = f'Data.all Dataset Tables Sync Failure for {dataset.name}'[:100]
message = f"""
You are receiving this email because your Data.all {self.envname} environment in the {self.region} region has entered the ALARM state, because it failed to synchronize Dataset {dataset.name} tables from AWS Glue to the Search Catalog.

Alarm Details:
- State Change: OK -> ALARM
- Reason for State Change: {error}
- Timestamp: {datetime.now()}
Dataset
- Dataset URI: {dataset.datasetUri}
- AWS Account: {dataset.AwsAccountId}
- Region: {dataset.region}
- Glue Database: {dataset.GlueDatabaseName}
"""
return self.publish_message_to_alarms_topic(subject, message)

def trigger_folder_sharing_failure_alarm(
self,
folder: DatasetStorageLocation,
Expand Down
Loading
Loading