diff --git a/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py b/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py index ac37d633d..e382b05ef 100644 --- a/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py +++ b/backend/dataall/modules/s3_datasets_shares/tasks/dataset_subscription_task.py @@ -18,6 +18,9 @@ from dataall.modules.s3_datasets_shares.tasks.subscriptions import poll_queues from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset +from dataall.modules.datasets_base.db.dataset_models import DatasetBase +from dataall.modules.shares_base.db.share_object_models import ShareObject +from dataall.modules.shares_base.services.share_notification_service import DataSharingNotificationType root = logging.getLogger() root.setLevel(logging.INFO) @@ -130,15 +133,25 @@ def publish_sns_message(self, session, message, dataset, share_items, prefix, ta response = sns_client.publish_dataset_message(message) log.info(f'SNS update publish response {response}') - notifications = ShareNotificationService( - session=session, dataset=dataset, share=share_object - ).notify_new_data_available_from_owners(s3_prefix=prefix) + notifications = self.notify_new_data_available_from_owners( + session=session, dataset=dataset, share=share_object, s3_prefix=prefix + ) log.info(f'Notifications for share owners {notifications}') except ClientError as e: log.error(f'Failed to deliver message {message} due to: {e}') + @staticmethod + def notify_new_data_available_from_owners(session, dataset: DatasetBase, share: ShareObject, s3_prefix: str): + msg = ( + f'New data (at {s3_prefix}) is available from dataset {dataset.datasetUri} shared by owner {dataset.owner}' + ) + notifications = ShareNotificationService(session=session, dataset=dataset, share=share).register_notifications( + notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg + ) + return notifications + if __name__ == '__main__': ENVNAME = os.environ.get('envname', 'local') diff --git a/backend/dataall/modules/shares_base/db/share_object_repositories.py b/backend/dataall/modules/shares_base/db/share_object_repositories.py index bea8d3876..0a9e2eafb 100644 --- a/backend/dataall/modules/shares_base/db/share_object_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_object_repositories.py @@ -67,7 +67,7 @@ def get_share_item_by_uri(session, uri): return share_item @staticmethod - def get_share_item_details(session, share_type_model, item_uri): # TODO CHECK THAT IT WORKS + def get_share_item_details(session, share_type_model, item_uri): return session.query(share_type_model).get(item_uri) @staticmethod diff --git a/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py b/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py index ddb5d9ae9..2da805cef 100644 --- a/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py +++ b/backend/dataall/modules/shares_base/db/share_state_machines_repositories.py @@ -78,7 +78,7 @@ def update_share_item_status_batch( and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.status == old_status) ) if share_item_type: - query = query.filter(ShareObjectItem.shareableType == share_item_type.value) + query = query.filter(ShareObjectItem.itemType == share_item_type.value) query.update( { diff --git a/backend/dataall/modules/shares_base/services/share_item_service.py b/backend/dataall/modules/shares_base/services/share_item_service.py index 426fa8b35..058820c23 100644 --- a/backend/dataall/modules/shares_base/services/share_item_service.py +++ b/backend/dataall/modules/shares_base/services/share_item_service.py @@ -123,26 +123,16 @@ def add_shared_item(uri: str, data: dict = None): item_type = data.get('itemType') item_uri = data.get('itemUri') share = ShareObjectRepository.get_share_by_uri(session, uri) - target_environment = EnvironmentService.get_environment_by_uri(session, share.environmentUri) share_sm = ShareObjectSM(share.status) new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value) share_sm.update_state(session, share, new_share_state) + processor = ShareProcessorManager.get_processor_by_item_type(item_type) item = ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item_uri) if not item: raise ObjectNotFound('ShareObjectItem', item_uri) - if ( - item_type == ShareableType.Table.value and item.region != target_environment.region - ): # TODO Part10: remove from here (we might be able to remove get_share_item_details entirely - raise UnauthorizedOperation( - action=ADD_ITEM, - message=f'Lake Formation cross region sharing is not supported. ' - f'Table {item.itemUri} is in {item.region} and target environment ' - f'{target_environment.name} is in {target_environment.region} ', - ) - share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri) if not share_item: @@ -162,17 +152,6 @@ def add_shared_item(uri: str, data: dict = None): def remove_shared_item(uri: str): with get_context().db_engine.scoped_session() as session: share_item = ShareObjectRepository.get_share_item_by_uri(session, uri) - if ( - share_item.itemType == ShareableType.Table.value # TODO Part10 - REMOVE - and share_item.status == ShareItemStatus.Share_Failed.value - ): - share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri) - ResourcePolicyService.delete_resource_policy( - session=session, - group=share.groupUri, - resource_uri=share_item.itemUri, - ) - item_sm = ShareItemSM(share_item.status) item_sm.run_transition(ShareItemActions.RemoveItem.value) ShareObjectRepository.remove_share_object_item(session, share_item) @@ -183,9 +162,7 @@ def remove_shared_item(uri: str): def resolve_shared_item(uri, item: ShareObjectItem): with get_context().db_engine.scoped_session() as session: processor = ShareProcessorManager.get_processor_by_item_type(item.itemType) - return ShareObjectRepository.get_share_item_details( - session, processor.shareable_type, item.itemUri - ) # TODO - check it works + return ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item.itemUri) @staticmethod def check_existing_shared_items(share): diff --git a/backend/dataall/modules/shares_base/services/share_notification_service.py b/backend/dataall/modules/shares_base/services/share_notification_service.py index 197b706f4..765138af9 100644 --- a/backend/dataall/modules/shares_base/services/share_notification_service.py +++ b/backend/dataall/modules/shares_base/services/share_notification_service.py @@ -49,7 +49,7 @@ def notify_share_object_submission(self, email_id: str): subject = f'Data.all | Share Request Submitted for {self.dataset.label}' email_notification_msg = msg + share_link_text - notifications = self._register_notifications( + notifications = self.register_notifications( notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg ) @@ -64,7 +64,7 @@ def notify_share_object_approval(self, email_id: str): subject = f'Data.all | Share Request Approved for {self.dataset.label}' email_notification_msg = msg + share_link_text - notifications = self._register_notifications( + notifications = self.register_notifications( notification_type=DataSharingNotificationType.SHARE_OBJECT_APPROVED.value, msg=msg ) @@ -86,21 +86,13 @@ def notify_share_object_rejection(self, email_id: str): subject = f'Data.all | Share Request Rejected / Revoked for {self.dataset.label}' email_notification_msg = msg + share_link_text - notifications = self._register_notifications( + notifications = self.register_notifications( notification_type=DataSharingNotificationType.SHARE_OBJECT_REJECTED.value, msg=msg ) self._create_notification_task(subject=subject, msg=email_notification_msg) return notifications - def notify_new_data_available_from_owners(self, s3_prefix): # TODO part10: remove, this is specific for S3 - msg = f'New data (at {s3_prefix}) is available from dataset {self.dataset.datasetUri} shared by owner {self.dataset.owner}' - - notifications = self._register_notifications( - notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg - ) - return notifications - def _get_share_object_targeted_users(self): targeted_users = list() targeted_users.append(self.dataset.SamlAdminGroupName) @@ -109,7 +101,7 @@ def _get_share_object_targeted_users(self): targeted_users.append(self.share.groupUri) return targeted_users - def _register_notifications(self, notification_type, msg): + def register_notifications(self, notification_type, msg): """ Notifications sent to: - dataset.SamlAdminGroupName