Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic shares_base module and specific s3_datasets_shares module - part 10 (other s3 references in shares_base) #1357

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from dataall.modules.s3_datasets_shares.tasks.subscriptions import poll_queues
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.services.share_notification_service import DataSharingNotificationType

root = logging.getLogger()
root.setLevel(logging.INFO)
Expand Down Expand Up @@ -130,15 +133,25 @@ def publish_sns_message(self, session, message, dataset, share_items, prefix, ta
response = sns_client.publish_dataset_message(message)
log.info(f'SNS update publish response {response}')

notifications = ShareNotificationService(
session=session, dataset=dataset, share=share_object
).notify_new_data_available_from_owners(s3_prefix=prefix)
notifications = self.notify_new_data_available_from_owners(
session=session, dataset=dataset, share=share_object, s3_prefix=prefix
)

log.info(f'Notifications for share owners {notifications}')

except ClientError as e:
log.error(f'Failed to deliver message {message} due to: {e}')

@staticmethod
def notify_new_data_available_from_owners(session, dataset: DatasetBase, share: ShareObject, s3_prefix: str):
msg = (
f'New data (at {s3_prefix}) is available from dataset {dataset.datasetUri} shared by owner {dataset.owner}'
)
notifications = ShareNotificationService(session=session, dataset=dataset, share=share).register_notifications(
notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg
)
return notifications


if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_share_item_by_uri(session, uri):
return share_item

@staticmethod
def get_share_item_details(session, share_type_model, item_uri): # TODO CHECK THAT IT WORKS
def get_share_item_details(session, share_type_model, item_uri):
return session.query(share_type_model).get(item_uri)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def update_share_item_status_batch(
and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.status == old_status)
)
if share_item_type:
query = query.filter(ShareObjectItem.shareableType == share_item_type.value)
query = query.filter(ShareObjectItem.itemType == share_item_type.value)

query.update(
{
Expand Down
27 changes: 2 additions & 25 deletions backend/dataall/modules/shares_base/services/share_item_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,16 @@ def add_shared_item(uri: str, data: dict = None):
item_type = data.get('itemType')
item_uri = data.get('itemUri')
share = ShareObjectRepository.get_share_by_uri(session, uri)
target_environment = EnvironmentService.get_environment_by_uri(session, share.environmentUri)

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value)
share_sm.update_state(session, share, new_share_state)

processor = ShareProcessorManager.get_processor_by_item_type(item_type)
item = ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item_uri)
if not item:
raise ObjectNotFound('ShareObjectItem', item_uri)

if (
item_type == ShareableType.Table.value and item.region != target_environment.region
): # TODO Part10: remove from here (we might be able to remove get_share_item_details entirely
raise UnauthorizedOperation(
action=ADD_ITEM,
message=f'Lake Formation cross region sharing is not supported. '
f'Table {item.itemUri} is in {item.region} and target environment '
f'{target_environment.name} is in {target_environment.region} ',
)

share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri)

if not share_item:
Expand All @@ -162,17 +152,6 @@ def add_shared_item(uri: str, data: dict = None):
def remove_shared_item(uri: str):
with get_context().db_engine.scoped_session() as session:
share_item = ShareObjectRepository.get_share_item_by_uri(session, uri)
if (
share_item.itemType == ShareableType.Table.value # TODO Part10 - REMOVE
and share_item.status == ShareItemStatus.Share_Failed.value
):
share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri)
ResourcePolicyService.delete_resource_policy(
session=session,
group=share.groupUri,
resource_uri=share_item.itemUri,
)

item_sm = ShareItemSM(share_item.status)
item_sm.run_transition(ShareItemActions.RemoveItem.value)
ShareObjectRepository.remove_share_object_item(session, share_item)
Expand All @@ -183,9 +162,7 @@ def remove_shared_item(uri: str):
def resolve_shared_item(uri, item: ShareObjectItem):
with get_context().db_engine.scoped_session() as session:
processor = ShareProcessorManager.get_processor_by_item_type(item.itemType)
return ShareObjectRepository.get_share_item_details(
session, processor.shareable_type, item.itemUri
) # TODO - check it works
return ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item.itemUri)

@staticmethod
def check_existing_shared_items(share):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def notify_share_object_submission(self, email_id: str):
subject = f'Data.all | Share Request Submitted for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg
)

Expand All @@ -64,7 +64,7 @@ def notify_share_object_approval(self, email_id: str):
subject = f'Data.all | Share Request Approved for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_APPROVED.value, msg=msg
)

Expand All @@ -86,21 +86,13 @@ def notify_share_object_rejection(self, email_id: str):
subject = f'Data.all | Share Request Rejected / Revoked for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_REJECTED.value, msg=msg
)

self._create_notification_task(subject=subject, msg=email_notification_msg)
return notifications

def notify_new_data_available_from_owners(self, s3_prefix): # TODO part10: remove, this is specific for S3
msg = f'New data (at {s3_prefix}) is available from dataset {self.dataset.datasetUri} shared by owner {self.dataset.owner}'

notifications = self._register_notifications(
notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg
)
return notifications

def _get_share_object_targeted_users(self):
targeted_users = list()
targeted_users.append(self.dataset.SamlAdminGroupName)
Expand All @@ -109,7 +101,7 @@ def _get_share_object_targeted_users(self):
targeted_users.append(self.share.groupUri)
return targeted_users

def _register_notifications(self, notification_type, msg):
def register_notifications(self, notification_type, msg):
"""
Notifications sent to:
- dataset.SamlAdminGroupName
Expand Down
Loading