Skip to content

Commit

Permalink
Generic shares_base module and specific s3_datasets_shares module - p…
Browse files Browse the repository at this point in the history
…art 10 (other s3 references in shares_base) (#1357)

### Feature or Bugfix
- Refactoring

### Detail
As explained in the design for #1123 and #1283 we are trying to
implement generic `datasets_base` and `shares_base` modules that can be
used by any type of datasets and by any type of shareable object in a
generic way.

This PR:
- Remove the delete_resource_policy conditional for Tables in
`backend/dataall/modules/shares_base/services/share_item_service.py` -->
Permissions to the Table in data.all are granted once the share has
succeeded, the conditional that checks for share_failed tables should
not exist.
- Remove unnecessary check in share_item_service: in add_share_item we
check if it is a table whether it is a cross-region share. This check is
completely unnecessary because when we create a share request object we
are already checking if it is cross-region
- Use `get_share_item_details` in add_share_item - we want to check if
the table, folder, bucket exist so we need to query those tables.
- Move s3_prefix notifications to subscription task
- Fix error in query in
`backend/dataall/modules/shares_base/db/share_state_machines_repositories.py`


### Relates
- #1283 
- #1123 
- #955 

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)?
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization?
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features?
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users?
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
  • Loading branch information
dlpzx authored Jun 25, 2024
1 parent ac53b54 commit dc1d5de
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from dataall.modules.s3_datasets_shares.tasks.subscriptions import poll_queues
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.services.share_notification_service import DataSharingNotificationType

root = logging.getLogger()
root.setLevel(logging.INFO)
Expand Down Expand Up @@ -130,15 +133,25 @@ def publish_sns_message(self, session, message, dataset, share_items, prefix, ta
response = sns_client.publish_dataset_message(message)
log.info(f'SNS update publish response {response}')

notifications = ShareNotificationService(
session=session, dataset=dataset, share=share_object
).notify_new_data_available_from_owners(s3_prefix=prefix)
notifications = self.notify_new_data_available_from_owners(
session=session, dataset=dataset, share=share_object, s3_prefix=prefix
)

log.info(f'Notifications for share owners {notifications}')

except ClientError as e:
log.error(f'Failed to deliver message {message} due to: {e}')

@staticmethod
def notify_new_data_available_from_owners(session, dataset: DatasetBase, share: ShareObject, s3_prefix: str):
msg = (
f'New data (at {s3_prefix}) is available from dataset {dataset.datasetUri} shared by owner {dataset.owner}'
)
notifications = ShareNotificationService(session=session, dataset=dataset, share=share).register_notifications(
notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg
)
return notifications


if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_share_item_by_uri(session, uri):
return share_item

@staticmethod
def get_share_item_details(session, share_type_model, item_uri): # TODO CHECK THAT IT WORKS
def get_share_item_details(session, share_type_model, item_uri):
return session.query(share_type_model).get(item_uri)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def update_share_item_status_batch(
and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.status == old_status)
)
if share_item_type:
query = query.filter(ShareObjectItem.shareableType == share_item_type.value)
query = query.filter(ShareObjectItem.itemType == share_item_type.value)

query.update(
{
Expand Down
27 changes: 2 additions & 25 deletions backend/dataall/modules/shares_base/services/share_item_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,16 @@ def add_shared_item(uri: str, data: dict = None):
item_type = data.get('itemType')
item_uri = data.get('itemUri')
share = ShareObjectRepository.get_share_by_uri(session, uri)
target_environment = EnvironmentService.get_environment_by_uri(session, share.environmentUri)

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value)
share_sm.update_state(session, share, new_share_state)

processor = ShareProcessorManager.get_processor_by_item_type(item_type)
item = ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item_uri)
if not item:
raise ObjectNotFound('ShareObjectItem', item_uri)

if (
item_type == ShareableType.Table.value and item.region != target_environment.region
): # TODO Part10: remove from here (we might be able to remove get_share_item_details entirely
raise UnauthorizedOperation(
action=ADD_ITEM,
message=f'Lake Formation cross region sharing is not supported. '
f'Table {item.itemUri} is in {item.region} and target environment '
f'{target_environment.name} is in {target_environment.region} ',
)

share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri)

if not share_item:
Expand All @@ -162,17 +152,6 @@ def add_shared_item(uri: str, data: dict = None):
def remove_shared_item(uri: str):
with get_context().db_engine.scoped_session() as session:
share_item = ShareObjectRepository.get_share_item_by_uri(session, uri)
if (
share_item.itemType == ShareableType.Table.value # TODO Part10 - REMOVE
and share_item.status == ShareItemStatus.Share_Failed.value
):
share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri)
ResourcePolicyService.delete_resource_policy(
session=session,
group=share.groupUri,
resource_uri=share_item.itemUri,
)

item_sm = ShareItemSM(share_item.status)
item_sm.run_transition(ShareItemActions.RemoveItem.value)
ShareObjectRepository.remove_share_object_item(session, share_item)
Expand All @@ -183,9 +162,7 @@ def remove_shared_item(uri: str):
def resolve_shared_item(uri, item: ShareObjectItem):
with get_context().db_engine.scoped_session() as session:
processor = ShareProcessorManager.get_processor_by_item_type(item.itemType)
return ShareObjectRepository.get_share_item_details(
session, processor.shareable_type, item.itemUri
) # TODO - check it works
return ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item.itemUri)

@staticmethod
def check_existing_shared_items(share):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def notify_share_object_submission(self, email_id: str):
subject = f'Data.all | Share Request Submitted for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg
)

Expand All @@ -64,7 +64,7 @@ def notify_share_object_approval(self, email_id: str):
subject = f'Data.all | Share Request Approved for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_APPROVED.value, msg=msg
)

Expand All @@ -86,21 +86,13 @@ def notify_share_object_rejection(self, email_id: str):
subject = f'Data.all | Share Request Rejected / Revoked for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_REJECTED.value, msg=msg
)

self._create_notification_task(subject=subject, msg=email_notification_msg)
return notifications

def notify_new_data_available_from_owners(self, s3_prefix): # TODO part10: remove, this is specific for S3
msg = f'New data (at {s3_prefix}) is available from dataset {self.dataset.datasetUri} shared by owner {self.dataset.owner}'

notifications = self._register_notifications(
notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg
)
return notifications

def _get_share_object_targeted_users(self):
targeted_users = list()
targeted_users.append(self.dataset.SamlAdminGroupName)
Expand All @@ -109,7 +101,7 @@ def _get_share_object_targeted_users(self):
targeted_users.append(self.share.groupUri)
return targeted_users

def _register_notifications(self, notification_type, msg):
def register_notifications(self, notification_type, msg):
"""
Notifications sent to:
- dataset.SamlAdminGroupName
Expand Down

0 comments on commit dc1d5de

Please sign in to comment.