From 35f06d2869ac4f0a669ddfe5a13d96a00690f984 Mon Sep 17 00:00:00 2001 From: dlpzx Date: Mon, 8 Apr 2024 09:19:07 +0200 Subject: [PATCH 1/4] Add share_reapply task + container ECS task definition --- .../tasks/share_reapplier_task.py | 38 +++++++++++++++++++ deploy/stacks/container.py | 26 +++++++++++++ 2 files changed, 64 insertions(+) create mode 100644 backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py diff --git a/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py new file mode 100644 index 000000000..42890b257 --- /dev/null +++ b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py @@ -0,0 +1,38 @@ +import logging +import os +import sys +from dataall.modules.dataset_sharing.api.types import ShareObject +from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository +from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareItemStatus +from dataall.modules.dataset_sharing.services.data_sharing_service import DataSharingService +from dataall.base.db import get_engine + +root = logging.getLogger() +root.setLevel(logging.INFO) +if not root.hasHandlers(): + root.addHandler(logging.StreamHandler(sys.stdout)) +log = logging.getLogger(__name__) + + +def verify_shares(engine): + """ + A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares + """ + with engine.scoped_session() as session: + processed_share_objects = [] + all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session) + log.info(f'Found {len(all_share_objects)} share objects ') + share_object: ShareObject + for share_object in all_share_objects: + log.info( + f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' + ) + processed_share_objects.append(share_object.shareUri) + DataSharingService.reapply_share(engine, share_uri=share_object.shareUri) + return processed_share_objects + + +if __name__ == '__main__': + ENVNAME = os.environ.get('envname', 'local') + ENGINE = get_engine(envname=ENVNAME) + verify_shares(engine=ENGINE) diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index bceb26a5e..b51af7265 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -258,6 +258,32 @@ def add_share_verifier_task(self): ) self.ecs_task_definitions_families.append(verify_shares_task.task_definition.family) + @run_if(['modules.datasets.active']) + def add_share_reapplier_task(self): + share_reapplier_task_definition = ecs.FargateTaskDefinition( + self, + f'{self._resource_prefix}-{self._envname}-share-reapplier', + cpu=1024, + memory_limit_mib=2048, + task_role=self.task_role, + execution_role=self.task_role, + family=f'{self._resource_prefix}-{self._envname}-share-reapplier', + ) + + share_reapplier_container = share_reapplier_task_definition.add_container( + f'ShareReapplierTaskContainer{self._envname}', + container_name='container', + image=ecs.ContainerImage.from_ecr_repository(repository=self._ecr_repository, tag=self._cdkproxy_image_tag), + environment=self._create_env('INFO'), + command=['python3.9', '-m', 'dataall.modules.dataset_sharing.tasks.share_reapplier_task'], + logging=ecs.LogDriver.aws_logs( + stream_prefix='task', + log_group=self.create_log_group(self._envname, self._resource_prefix, log_group_name='share-reapplier'), + ), + readonly_root_filesystem=True, + ) + self.ecs_task_definitions_families.append(share_reapplier_task_definition.family) + @run_if(['modules.datasets.active']) def add_subscription_task(self): subscriptions_task, subscription_task_def = self.set_scheduled_task( From b100d1f9a9f95029cb17e9b149c341f98bb2d06f Mon Sep 17 00:00:00 2001 From: dlpzx Date: Mon, 8 Apr 2024 11:21:51 +0200 Subject: [PATCH 2/4] Add share_reapply call --- deploy/stacks/container.py | 1 + 1 file changed, 1 insertion(+) diff --git a/deploy/stacks/container.py b/deploy/stacks/container.py index b51af7265..7ded368c0 100644 --- a/deploy/stacks/container.py +++ b/deploy/stacks/container.py @@ -176,6 +176,7 @@ def __init__( self.add_subscription_task() self.add_share_management_task() self.add_share_verifier_task() + self.add_share_reapplier_task() @run_if(['modules.datasets.active', 'modules.dashboards.active']) def add_catalog_indexer_task(self): From 1858bf0a711aa192c4b056d525f408895c4b05c1 Mon Sep 17 00:00:00 2001 From: dlpzx Date: Mon, 8 Apr 2024 15:13:05 +0200 Subject: [PATCH 3/4] Update unhealthy to pendingReapply before reapplying all shares --- .../db/share_object_repositories.py | 18 ++++++++++++++++++ .../tasks/share_reapplier_task.py | 8 +++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py b/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py index 757662c3e..3b725e9e4 100644 --- a/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py +++ b/backend/dataall/modules/dataset_sharing/db/share_object_repositories.py @@ -729,6 +729,24 @@ def delete_share_item_status_batch( .delete() ) + @staticmethod + def update_share_item_health_status_batch( + session, + share_uri: str, + old_status: str, + new_status: str, + ) -> bool: + ( + session.query(ShareObjectItem) + .filter(and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.healthStatus == old_status)) + .update( + { + ShareObjectItem.healthStatus: new_status, + } + ) + ) + return True + @staticmethod def update_share_item_status_batch( session, diff --git a/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py index 42890b257..5cf905bc1 100644 --- a/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py +++ b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py @@ -3,7 +3,7 @@ import sys from dataall.modules.dataset_sharing.api.types import ShareObject from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository -from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareItemStatus +from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareItemHealthStatus from dataall.modules.dataset_sharing.services.data_sharing_service import DataSharingService from dataall.base.db import get_engine @@ -28,6 +28,12 @@ def verify_shares(engine): f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}' ) processed_share_objects.append(share_object.shareUri) + ShareObjectRepository.update_share_item_health_status_batch( + session=session, + share_uri=share_object.shareUri, + old_status=ShareItemHealthStatus.Unhealthy.value, + new_status=ShareItemHealthStatus.PendingReApply.value, + ) DataSharingService.reapply_share(engine, share_uri=share_object.shareUri) return processed_share_objects From b6af137d5da678ada67d76e7708902af35dd2fe5 Mon Sep 17 00:00:00 2001 From: dlpzx Date: Mon, 8 Apr 2024 15:56:06 +0200 Subject: [PATCH 4/4] Fix name in function reapply_shares --- .../modules/dataset_sharing/tasks/share_reapplier_task.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py index 5cf905bc1..388e43ea4 100644 --- a/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py +++ b/backend/dataall/modules/dataset_sharing/tasks/share_reapplier_task.py @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) -def verify_shares(engine): +def reapply_shares(engine): """ A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares """ @@ -41,4 +41,4 @@ def verify_shares(engine): if __name__ == '__main__': ENVNAME = os.environ.get('envname', 'local') ENGINE = get_engine(envname=ENVNAME) - verify_shares(engine=ENGINE) + reapply_shares(engine=ENGINE)