diff --git a/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py b/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py index 757662c3e..3b725e9e4 100644 --- a/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py +++ b/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py @@ -729,6 +729,24 @@ def delete_share_item_status_batch( .delete() ) + @staticmethod + def update_share_item_health_status_batch( + session, + share_uri: str, + old_status: str, + new_status: str, + ) -> bool: + ( + session.query(ShareObjectItem) + .filter(and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.healthStatus == old_status)) + .update( + { + ShareObjectItem.healthStatus: new_status, + } + ) + ) + return True + @staticmethod def update_share_item_status_batch( session, diff --git a/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py new file mode 100644 index 000000000..388e43ea4 --- /dev/null +++ b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py @@ -0,0 +1,44 @@ +import logging +import os +import sys +from dataall.modules.dataset_sharing.api.types import ShareObject +from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository +from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareItemHealthStatus +from dataall.modules.dataset_sharing.services.data_sharing_service import DataSharingService +from dataall.base.db import get_engine + +root = logging.getLogger() +root.setLevel(logging.INFO) +if not root.hasHandlers(): + root.addHandler(logging.StreamHandler(sys.stdout)) +log = logging.getLogger(__name__) + + +def reapply_shares(engine): + """ + A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares + """ + with engine.scoped_session() as session: + processed_share_objects = [] + all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) + log.info(f'Found {len(all_share_objects)} share objects ') + share_object: ShareObject + for share_object in all_share_objects: + log.info( + f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + ) + processed_share_objects.append(share_object.shareUri) + ShareObjectRepository.update_share_item_health_status_batch( + session=session, + share_uri=share_object.shareUri, + old_status=ShareItemHealthStatus.Unhealthy.value, + new_status=ShareItemHealthStatus.PendingReApply.value, + ) + DataSharingService.reapply_share(engine, share_uri=share_object.shareUri) + return processed_share_objects + + +if __name__ == '__main__': + ENVNAME = os.environ.get('envname', 'local') + ENGINE = get_engine(envname=ENVNAME) + reapply_shares(engine=ENGINE) diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index bceb26a5e..7ded368c0 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -176,6 +176,7 @@ def __init__( self.add_subscription_task() self.add_share_management_task() self.add_share_verifier_task() + self.add_share_reapplier_task() @run_if(['modules.datasets.active', 'modules.dashboards.active']) def add_catalog_indexer_task(self): @@ -258,6 +259,32 @@ def add_share_verifier_task(self): ) self.ecs_task_definitions_families.append(verify_shares_task.task_definition.family) + @run_if(['modules.datasets.active']) + def add_share_reapplier_task(self): + share_reapplier_task_definition = ecs.FargateTaskDefinition( + self, + f'{self._resource_prefix}-{self._envname}-share-reapplier', + cpu=1024, + memory_limit_mib=2048, + task_role=self.task_role, + execution_role=self.task_role, + family=f'{self._resource_prefix}-{self._envname}-share-reapplier', + ) + + share_reapplier_container = share_reapplier_task_definition.add_container( + f'ShareReapplierTaskContainer{self._envname}', + container_name='container', + image=ecs.ContainerImage.from_ecr_repository(repository=self._ecr_repository, tag=self._cdkproxy_image_tag), + environment=self._create_env('INFO'), + command=['python3.9', '-m', 'dataall.modules.dataset_sharing.tasks.share_reapplier_task'], + logging=ecs.LogDriver.aws_logs( + stream_prefix='task', + log_group=self.create_log_group(self._envname, self._resource_prefix, log_group_name='share-reapplier'), + ), + readonly_root_filesystem=True, + ) + self.ecs_task_definitions_families.append(share_reapplier_task_definition.family) + @run_if(['modules.datasets.active']) def add_subscription_task(self): subscriptions_task, subscription_task_def = self.set_scheduled_task(