Skip to content

Commit

Permalink
Merge branch 'main' into data-dot-allgh-1301-bulk-share-reapply
Browse files Browse the repository at this point in the history
Merging with O.S. data.all
  • Loading branch information
trajopadhye committed Jun 25, 2024
2 parents e8779c7 + 194770c commit e2811fc
Show file tree
Hide file tree
Showing 30 changed files with 1,252 additions and 880 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from dataall.modules.s3_datasets_shares.tasks.subscriptions import poll_queues
from dataall.modules.s3_datasets.db.dataset_repositories import DatasetRepository
from dataall.modules.s3_datasets.db.dataset_models import DatasetStorageLocation, DatasetTable, S3Dataset
from dataall.modules.datasets_base.db.dataset_models import DatasetBase
from dataall.modules.shares_base.db.share_object_models import ShareObject
from dataall.modules.shares_base.services.share_notification_service import DataSharingNotificationType

root = logging.getLogger()
root.setLevel(logging.INFO)
Expand Down Expand Up @@ -130,15 +133,25 @@ def publish_sns_message(self, session, message, dataset, share_items, prefix, ta
response = sns_client.publish_dataset_message(message)
log.info(f'SNS update publish response {response}')

notifications = ShareNotificationService(
session=session, dataset=dataset, share=share_object
).notify_new_data_available_from_owners(s3_prefix=prefix)
notifications = self.notify_new_data_available_from_owners(
session=session, dataset=dataset, share=share_object, s3_prefix=prefix
)

log.info(f'Notifications for share owners {notifications}')

except ClientError as e:
log.error(f'Failed to deliver message {message} due to: {e}')

@staticmethod
def notify_new_data_available_from_owners(session, dataset: DatasetBase, share: ShareObject, s3_prefix: str):
msg = (
f'New data (at {s3_prefix}) is available from dataset {dataset.datasetUri} shared by owner {dataset.owner}'
)
notifications = ShareNotificationService(session=session, dataset=dataset, share=share).register_notifications(
notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg
)
return notifications


if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def get_share_item_by_uri(session, uri):
return share_item

@staticmethod
def get_share_item_details(session, share_type_model, item_uri): # TODO CHECK THAT IT WORKS
def get_share_item_details(session, share_type_model, item_uri):
return session.query(share_type_model).get(item_uri)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ def get_share_items_states(session, share_uri, item_uris=None):
query = query.filter(ShareObjectItem.shareItemUri.in_(item_uris))
return [item.status for item in query.distinct(ShareObjectItem.status)]

@staticmethod
def get_share_items_health_states(session, share_uri, item_uris=None):
query = session.query(ShareObjectItem).filter(
and_(
ShareObjectItem.shareUri == share_uri,
)
)
if item_uris:
query = query.filter(ShareObjectItem.shareItemUri.in_(item_uris))
return [item.healthStatus for item in query.distinct(ShareObjectItem.healthStatus)]

@staticmethod
def update_share_object_status(session, share_uri: str, status: str) -> ShareObject:
share = ShareObjectRepository.get_share_by_uri(session, share_uri)
Expand Down Expand Up @@ -78,7 +89,7 @@ def update_share_item_status_batch(
and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.status == old_status)
)
if share_item_type:
query = query.filter(ShareObjectItem.shareableType == share_item_type.value)
query = query.filter(ShareObjectItem.itemType == share_item_type.value)

query.update(
{
Expand Down
36 changes: 11 additions & 25 deletions backend/dataall/modules/shares_base/services/share_item_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def revoke_items_share_object(uri, revoked_uris):
share = ShareObjectRepository.get_share_by_uri(session, uri)
dataset = DatasetBaseRepository.get_dataset_by_uri(session, share.datasetUri)
revoked_items_states = ShareStatusRepository.get_share_items_states(session, uri, revoked_uris)
revoked_items_health_states = ShareStatusRepository.get_share_items_health_states(
session, uri, revoked_uris
)
revoked_items = [ShareObjectRepository.get_share_item_by_uri(session, uri) for uri in revoked_uris]

if not revoked_items_states:
Expand All @@ -89,6 +92,12 @@ def revoke_items_share_object(uri, revoked_uris):
message='Nothing to be revoked.',
)

if ShareItemHealthStatus.PendingReApply.value in revoked_items_health_states:
raise UnauthorizedOperation(
action='Revoke Items from Share Object',
message='Cannot revoke while reapply pending for one or more items.',
)

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareObjectActions.RevokeItems.value)

Expand Down Expand Up @@ -124,26 +133,16 @@ def add_shared_item(uri: str, data: dict = None):
item_type = data.get('itemType')
item_uri = data.get('itemUri')
share = ShareObjectRepository.get_share_by_uri(session, uri)
target_environment = EnvironmentService.get_environment_by_uri(session, share.environmentUri)

share_sm = ShareObjectSM(share.status)
new_share_state = share_sm.run_transition(ShareItemActions.AddItem.value)
share_sm.update_state(session, share, new_share_state)

processor = ShareProcessorManager.get_processor_by_item_type(item_type)
item = ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item_uri)
if not item:
raise ObjectNotFound('ShareObjectItem', item_uri)

if (
item_type == ShareableType.Table.value and item.region != target_environment.region
): # TODO Part10: remove from here (we might be able to remove get_share_item_details entirely
raise UnauthorizedOperation(
action=ADD_ITEM,
message=f'Lake Formation cross region sharing is not supported. '
f'Table {item.itemUri} is in {item.region} and target environment '
f'{target_environment.name} is in {target_environment.region} ',
)

share_item: ShareObjectItem = ShareObjectRepository.find_sharable_item(session, uri, item_uri)

if not share_item:
Expand All @@ -163,17 +162,6 @@ def add_shared_item(uri: str, data: dict = None):
def remove_shared_item(uri: str):
with get_context().db_engine.scoped_session() as session:
share_item = ShareObjectRepository.get_share_item_by_uri(session, uri)
if (
share_item.itemType == ShareableType.Table.value # TODO Part10 - REMOVE
and share_item.status == ShareItemStatus.Share_Failed.value
):
share = ShareObjectRepository.get_share_by_uri(session, share_item.shareUri)
ResourcePolicyService.delete_resource_policy(
session=session,
group=share.groupUri,
resource_uri=share_item.itemUri,
)

item_sm = ShareItemSM(share_item.status)
item_sm.run_transition(ShareItemActions.RemoveItem.value)
ShareObjectRepository.remove_share_object_item(session, share_item)
Expand All @@ -184,9 +172,7 @@ def remove_shared_item(uri: str):
def resolve_shared_item(uri, item: ShareObjectItem):
with get_context().db_engine.scoped_session() as session:
processor = ShareProcessorManager.get_processor_by_item_type(item.itemType)
return ShareObjectRepository.get_share_item_details(
session, processor.shareable_type, item.itemUri
) # TODO - check it works
return ShareObjectRepository.get_share_item_details(session, processor.shareable_type, item.itemUri)

@staticmethod
def check_existing_shared_items(share):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def notify_share_object_submission(self, email_id: str):
subject = f'Data.all | Share Request Submitted for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_SUBMITTED.value, msg=msg
)

Expand All @@ -64,7 +64,7 @@ def notify_share_object_approval(self, email_id: str):
subject = f'Data.all | Share Request Approved for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_APPROVED.value, msg=msg
)

Expand All @@ -86,21 +86,13 @@ def notify_share_object_rejection(self, email_id: str):
subject = f'Data.all | Share Request Rejected / Revoked for {self.dataset.label}'
email_notification_msg = msg + share_link_text

notifications = self._register_notifications(
notifications = self.register_notifications(
notification_type=DataSharingNotificationType.SHARE_OBJECT_REJECTED.value, msg=msg
)

self._create_notification_task(subject=subject, msg=email_notification_msg)
return notifications

def notify_new_data_available_from_owners(self, s3_prefix): # TODO part10: remove, this is specific for S3
msg = f'New data (at {s3_prefix}) is available from dataset {self.dataset.datasetUri} shared by owner {self.dataset.owner}'

notifications = self._register_notifications(
notification_type=DataSharingNotificationType.DATASET_VERSION.value, msg=msg
)
return notifications

def _get_share_object_targeted_users(self):
targeted_users = list()
targeted_users.append(self.dataset.SamlAdminGroupName)
Expand All @@ -109,7 +101,7 @@ def _get_share_object_targeted_users(self):
targeted_users.append(self.share.groupUri)
return targeted_users

def _register_notifications(self, notification_type, msg):
def register_notifications(self, notification_type, msg):
"""
Notifications sent to:
- dataset.SamlAdminGroupName
Expand Down
2 changes: 1 addition & 1 deletion deploy/custom_resources/custom_authorizer/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ python-jose==3.3.0
requests==2.32.2
rsa==4.9
six==1.16.0
urllib3==1.26.18
urllib3==1.26.19
72 changes: 72 additions & 0 deletions frontend/src/design/components/ShareHealthStatus.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import VerifiedUserOutlinedIcon from '@mui/icons-material/VerifiedUserOutlined';
import GppBadOutlinedIcon from '@mui/icons-material/GppBadOutlined';
import PendingOutlinedIcon from '@mui/icons-material/PendingOutlined';
import DangerousOutlinedIcon from '@mui/icons-material/DangerousOutlined';
import * as PropTypes from 'prop-types';
import { Typography } from '@mui/material';
import { Label } from './Label';

export const ShareHealthStatus = (props) => {
const { status, healthStatus, lastVerificationTime } = props;

const isShared = ['Revoke_Failed', 'Share_Succeeded'].includes(status);
const isHealthPending = ['PendingReApply', 'PendingVerify', null].includes(
healthStatus
);
const setStatus = () => {
if (!healthStatus) return 'Undefined';
return healthStatus;
};

const setColor = () => {
if (!healthStatus) return 'info';
if (['Healthy'].includes(healthStatus)) return 'success';
if (['Unhealthy'].includes(healthStatus)) return 'error';
if (isHealthPending) return 'warning';
return 'info';
};

const setIcon = () => {
if (!healthStatus) return <DangerousOutlinedIcon color={setColor()} />;
if (['Healthy'].includes(healthStatus))
return <VerifiedUserOutlinedIcon color={setColor()} />;
if (['Unhealthy'].includes(healthStatus))
return <GppBadOutlinedIcon color={setColor()} />;
if (['PendingReApply', 'PendingVerify'].includes(healthStatus))
return <PendingOutlinedIcon color={setColor()} />;
return <DangerousOutlinedIcon color={setColor()} />;
};

if (!isShared) {
return (
<Typography color="textSecondary" variant="subtitle2">
{'Item is not Shared'}
</Typography>
);
}

return (
<div style={{ display: 'flex', alignItems: 'left' }}>
{setIcon()}
<Label color={setColor()}>{setStatus().toUpperCase()} </Label>
{!isHealthPending && (
<Typography color="textSecondary" variant="subtitle2" noWrap>
{(lastVerificationTime &&
'(' +
lastVerificationTime.substring(
0,
lastVerificationTime.indexOf('.')
) +
')') ||
''}
</Typography>
)}
</div>
);
};

ShareHealthStatus.propTypes = {
status: PropTypes.string.isRequired,
healthStatus: PropTypes.string,
lastVerificationTime: PropTypes.string
};
1 change: 1 addition & 0 deletions frontend/src/design/components/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export * from './Scrollbar';
export * from './SearchInput';
export * from './SettingsDrawer';
export * from './ShareStatus';
export * from './ShareHealthStatus';
export * from './SplashScreen';
export * from './StackStatus';
export * from './TagsInput';
Expand Down
Loading

0 comments on commit e2811fc

Please sign in to comment.