Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add share_reapply task - ON DEMAND for data.all admins #1151

Merged
merged 4 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,24 @@ def delete_share_item_status_batch(
.delete()
)

@staticmethod
def update_share_item_health_status_batch(
session,
share_uri: str,
old_status: str,
new_status: str,
) -> bool:
(
session.query(ShareObjectItem)
.filter(and_(ShareObjectItem.shareUri == share_uri, ShareObjectItem.healthStatus == old_status))
.update(
{
ShareObjectItem.healthStatus: new_status,
}
)
)
return True

@staticmethod
def update_share_item_status_batch(
session,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging
import os
import sys
from dataall.modules.dataset_sharing.api.types import ShareObject
from dataall.modules.dataset_sharing.db.share_object_repositories import ShareObjectRepository
from dataall.modules.dataset_sharing.services.dataset_sharing_enums import ShareItemHealthStatus
from dataall.modules.dataset_sharing.services.data_sharing_service import DataSharingService
from dataall.base.db import get_engine

root = logging.getLogger()
root.setLevel(logging.INFO)
if not root.hasHandlers():
root.addHandler(logging.StreamHandler(sys.stdout))
log = logging.getLogger(__name__)


def reapply_shares(engine):
"""
A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares
"""
with engine.scoped_session() as session:
processed_share_objects = []
all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session)
log.info(f'Found {len(all_share_objects)} share objects ')
share_object: ShareObject
for share_object in all_share_objects:
log.info(
f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}'
)
processed_share_objects.append(share_object.shareUri)
ShareObjectRepository.update_share_item_health_status_batch(
session=session,
share_uri=share_object.shareUri,
old_status=ShareItemHealthStatus.Unhealthy.value,
new_status=ShareItemHealthStatus.PendingReApply.value,
)
DataSharingService.reapply_share(engine, share_uri=share_object.shareUri)
return processed_share_objects


if __name__ == '__main__':
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
reapply_shares(engine=ENGINE)
27 changes: 27 additions & 0 deletions deploy/stacks/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ def __init__(
self.add_subscription_task()
self.add_share_management_task()
self.add_share_verifier_task()
self.add_share_reapplier_task()

@run_if(['modules.datasets.active', 'modules.dashboards.active'])
def add_catalog_indexer_task(self):
Expand Down Expand Up @@ -258,6 +259,32 @@ def add_share_verifier_task(self):
)
self.ecs_task_definitions_families.append(verify_shares_task.task_definition.family)

@run_if(['modules.datasets.active'])
def add_share_reapplier_task(self):
share_reapplier_task_definition = ecs.FargateTaskDefinition(
self,
f'{self._resource_prefix}-{self._envname}-share-reapplier',
cpu=1024,
memory_limit_mib=2048,
task_role=self.task_role,
execution_role=self.task_role,
family=f'{self._resource_prefix}-{self._envname}-share-reapplier',
)

share_reapplier_container = share_reapplier_task_definition.add_container(
f'ShareReapplierTaskContainer{self._envname}',
container_name='container',
image=ecs.ContainerImage.from_ecr_repository(repository=self._ecr_repository, tag=self._cdkproxy_image_tag),
environment=self._create_env('INFO'),
command=['python3.9', '-m', 'dataall.modules.dataset_sharing.tasks.share_reapplier_task'],
logging=ecs.LogDriver.aws_logs(
stream_prefix='task',
log_group=self.create_log_group(self._envname, self._resource_prefix, log_group_name='share-reapplier'),
),
readonly_root_filesystem=True,
)
self.ecs_task_definitions_families.append(share_reapplier_task_definition.family)

@run_if(['modules.datasets.active'])
def add_subscription_task(self):
subscriptions_task, subscription_task_def = self.set_scheduled_task(
Expand Down
Loading