Skip to content

Commit

Permalink
[Gh 1301] Enhancement Feature - Bulk share reapply on dataset (#1363)
Browse files Browse the repository at this point in the history
### Feature or Bugfix
- Feature


### Detail

- Adds feature to reapply shares in bulk for a dataset. 
- Also contains bugfix for AWS worker lambda errors 

### Relates
- #1301
- #1364

### Security
Please answer the questions below briefly where applicable, or write
`N/A`. Based on
[OWASP 10](https://owasp.org/Top10/en/).

- Does this PR introduce or modify any input fields or queries - this
includes
fetching data from storage outside the application (e.g. a database, an
S3 bucket)? N/A
  - Is the input sanitized?
- What precautions are you taking before deserializing the data you
consume?
  - Is injection prevented by parametrizing queries?
  - Have you ensured no `eval` or similar functions are used?
- Does this PR introduce any functionality or component that requires
authorization? N/A
- How have you ensured it respects the existing AuthN/AuthZ mechanisms?
  - Are you logging failed auth attempts?
- Are you using or adding any cryptographic features? N/A
  - Do you use a standard proven implementations?
  - Are the used keys controlled by the customer? Where are they stored?
- Are you introducing any new policies/roles/users? N/A
  - Have you used the least-privilege principle? How?


By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.

---------

Co-authored-by: trajopadhye <tejas.rajopadhye@yahooinc.com>
  • Loading branch information
TejasRGitHub and trajopadhye authored Jun 27, 2024
1 parent 27f1ad7 commit ee71d7b
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 33 deletions.
2 changes: 1 addition & 1 deletion backend/dataall/base/utils/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def to_json(record):
elif isinstance(record, type({'a': 'dict'})):
return json.loads(json.dumps(record, default=json_decoder))
elif type(record) in [str, 'unicode']:
return record
return json.dumps(record)
elif type(record) in [int, float]:
return json.dumps(record)
elif isinstance(record, bool):
Expand Down
8 changes: 8 additions & 0 deletions backend/dataall/modules/s3_datasets_shares/api/mutations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from dataall.base.api import gql
from dataall.modules.s3_datasets_shares.api.resolvers import (
verify_dataset_share_objects,
reapply_share_items_share_object_for_dataset,
)


Expand All @@ -10,3 +11,10 @@
type=gql.Boolean,
resolver=verify_dataset_share_objects,
)

reApplyShareObjectItemsOnDataset = gql.MutationField(
name='reApplyShareObjectItemsOnDataset',
args=[gql.Argument(name='datasetUri', type=gql.NonNullableType(gql.String))],
type=gql.Boolean,
resolver=reapply_share_items_share_object_for_dataset,
)
4 changes: 4 additions & 0 deletions backend/dataall/modules/s3_datasets_shares/api/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ def verify_dataset_share_objects(context: Context, source, input):
return S3ShareService.verify_dataset_share_objects(uri=dataset_uri, share_uris=verify_share_uris)


def reapply_share_items_share_object_for_dataset(context: Context, source, datasetUri: str):
return S3ShareService.reapply_share_items_for_dataset(uri=datasetUri)


def get_s3_consumption_data(context: Context, source, shareUri: str):
return S3ShareService.get_s3_consumption_data(uri=shareUri)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.core.permissions.services.tenant_policy_service import TenantPolicyService
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.core.tasks.db.task_models import Task
from dataall.core.tasks.service_handlers import Worker
from dataall.modules.shares_base.db.share_object_repositories import ShareObjectRepository
from dataall.modules.shares_base.db.share_state_machines_repositories import ShareStatusRepository
from dataall.modules.shares_base.services.share_item_service import ShareItemService
Expand Down Expand Up @@ -150,6 +152,17 @@ def verify_dataset_share_objects(uri: str, share_uris: list):
ShareItemService.verify_items_share_object(uri=share_uri, item_uris=item_uris)
return True

@staticmethod
@TenantPolicyService.has_tenant_permission(MANAGE_DATASETS)
@ResourcePolicyService.has_resource_permission(UPDATE_DATASET)
def reapply_share_items_for_dataset(uri: str):
context = get_context()
with context.db_engine.scoped_session() as session:
reapply_share_items_task: Task = Task(action='ecs.dataset.share.reapply', targetUri=uri)
session.add(reapply_share_items_task)
Worker.queue(engine=context.db_engine, task_ids=[reapply_share_items_task.taskUri])
return True

@staticmethod
def list_shared_tables_by_env_dataset(dataset_uri: str, env_uri: str):
context = get_context()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,15 @@ def paginated_list_shareable_items(session, subqueries: List[Query], data: dict
query.order_by(shareable_objects.c.itemName).distinct(), data.get('page', 1), data.get('pageSize', 10)
).to_dict()

@staticmethod
def list_active_share_object_for_dataset(session, dataset_uri):
share_objects = (
session.query(ShareObject)
.filter(and_(ShareObject.datasetUri == dataset_uri, ShareObject.deleted.is_(None)))
.all()
)
return share_objects

@staticmethod
def fetch_submitted_shares_with_notifications(session):
"""
Expand Down
42 changes: 33 additions & 9 deletions backend/dataall/modules/shares_base/handlers/ecs_share_handler.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import logging
import os

from dataall.core.tasks.service_handlers import Worker
from dataall.core.stacks.aws.ecs import Ecs
from dataall.core.tasks.db.task_models import Task
from dataall.modules.shares_base.services.sharing_service import SharingService
from dataall.modules.shares_base.tasks.share_reapplier_task import EcsBulkShareRepplyService

log = logging.getLogger(__name__)

Expand All @@ -30,21 +32,43 @@ def verify_share(engine, task: Task):
def reapply_share(engine, task: Task):
return EcsShareHandler._manage_share(engine, task, SharingService.reapply_share, 'reapply_share')

@staticmethod
@Worker.handler(path='ecs.dataset.share.reapply')
def reapply_shares_of_dataset(engine, task: Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, task.targetUri)
else:
context = [
{'name': 'datasetUri', 'value': task.targetUri},
]
return EcsShareHandler._run_share_management_ecs_task(
task_definition_param_str='ecs/task_def_arn/share_reapplier',
container_name_param_str='ecs/container/share_reapplier',
context=context,
)

@staticmethod
def _manage_share(engine, task: Task, local_handler, ecs_handler: str):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
return local_handler(engine, task.targetUri)
else:
return EcsShareHandler._run_share_management_ecs_task(share_uri=task.targetUri, handler=ecs_handler)
share_management_context = [
{'name': 'shareUri', 'value': task.targetUri},
{'name': 'handler', 'value': ecs_handler},
]
return EcsShareHandler._run_share_management_ecs_task(
task_definition_param_str='ecs/task_def_arn/share_management',
container_name_param_str='ecs/container/share_management',
context=share_management_context,
)

@staticmethod
def _run_share_management_ecs_task(share_uri, handler):
return Ecs.run_ecs_task(
task_definition_param='ecs/task_def_arn/share_management',
container_name_param='ecs/container/share_management',
context=[
{'name': 'shareUri', 'value': share_uri},
{'name': 'handler', 'value': handler},
],
def _run_share_management_ecs_task(task_definition_param_str, container_name_param_str, context):
ecs_task_arn = Ecs.run_ecs_task(
task_definition_param=task_definition_param_str,
container_name_param=container_name_param_str,
context=context,
)
return {'task_arn': ecs_task_arn}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
from dataall.core.permissions.services.resource_policy_service import ResourcePolicyService
from dataall.core.tasks.service_handlers import Worker
from dataall.base.context import get_context
from dataall.core.environment.services.environment_service import EnvironmentService
from dataall.core.tasks.db.task_models import Task
from dataall.base.db.exceptions import ObjectNotFound, UnauthorizedOperation
from dataall.modules.shares_base.services.shares_enums import (
ShareObjectActions,
ShareableType,
ShareItemStatus,
ShareItemActions,
ShareItemHealthStatus,
Expand Down
76 changes: 56 additions & 20 deletions backend/dataall/modules/shares_base/tasks/share_reapplier_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,32 +18,68 @@
log = logging.getLogger(__name__)


def reapply_shares(engine):
class EcsBulkShareRepplyService:
@classmethod
def process_reapply_shares_for_dataset(cls, engine, dataset_uri):
with engine.scoped_session() as session:
processed_share_objects = []
share_objects_for_dataset = ShareObjectRepository.list_active_share_object_for_dataset(
session=session, dataset_uri=dataset_uri
)
log.info(f'Found {len(share_objects_for_dataset)} active share objects on dataset with uri: {dataset_uri}')
share_object: ShareObject
for share_object in share_objects_for_dataset:
log.info(
f'Re-applying Share Items for Share Object (Share URI: {share_object.shareUri} ) with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}'
)
processed_share_objects.append(share_object.shareUri)
ShareStatusRepository.update_share_item_health_status_batch(
session=session,
share_uri=share_object.shareUri,
old_status=ShareItemHealthStatus.Unhealthy.value,
new_status=ShareItemHealthStatus.PendingReApply.value,
)
SharingService.reapply_share(engine, share_uri=share_object.shareUri)
return processed_share_objects

@classmethod
def process_reapply_shares(cls, engine):
with engine.scoped_session() as session:
processed_share_objects = []
all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session)
log.info(f'Found {len(all_share_objects)} share objects ')
share_object: ShareObject
for share_object in all_share_objects:
log.info(
f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}'
)
processed_share_objects.append(share_object.shareUri)
ShareStatusRepository.update_share_item_health_status_batch(
session=session,
share_uri=share_object.shareUri,
old_status=ShareItemHealthStatus.Unhealthy.value,
new_status=ShareItemHealthStatus.PendingReApply.value,
)
SharingService.reapply_share(engine, share_uri=share_object.shareUri)
return processed_share_objects


def reapply_shares(engine, dataset_uri):
"""
A method used by the scheduled ECS Task to re-apply_share() on all data.all active shares
If dataset_uri is provided this ECS will reapply on all unhealthy shares belonging to a dataset
else it will reapply on all data.all active unhealthy shares.
"""
with engine.scoped_session() as session:
processed_share_objects = []
all_share_objects: [ShareObject] = ShareObjectRepository.list_all_active_share_objects(session)
log.info(f'Found {len(all_share_objects)} share objects ')
share_object: ShareObject
for share_object in all_share_objects:
log.info(
f'Re-applying Share Items for Share Object with Requestor: {share_object.principalId} on Target Dataset: {share_object.datasetUri}'
)
processed_share_objects.append(share_object.shareUri)
ShareStatusRepository.update_share_item_health_status_batch(
session=session,
share_uri=share_object.shareUri,
old_status=ShareItemHealthStatus.Unhealthy.value,
new_status=ShareItemHealthStatus.PendingReApply.value,
)
SharingService.reapply_share(engine, share_uri=share_object.shareUri)
return processed_share_objects
if dataset_uri:
return EcsBulkShareRepplyService.process_reapply_shares_for_dataset(engine, dataset_uri)
else:
return EcsBulkShareRepplyService.process_reapply_shares(engine)


if __name__ == '__main__':
load_modules(modes={ImportMode.SHARES_TASK})
ENVNAME = os.environ.get('envname', 'local')
ENGINE = get_engine(envname=ENVNAME)
reapply_shares(engine=ENGINE)
dataset_uri = os.environ.get('datasetUri', '')
processed_shares = reapply_shares(engine=ENGINE, dataset_uri=dataset_uri)
log.info(f'Finished processing {len(processed_shares)} shares')
15 changes: 15 additions & 0 deletions deploy/stacks/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,21 @@ def add_share_reapplier_task(self):
),
readonly_root_filesystem=True,
)

ssm.StringParameter(
self,
f'ShareReapplierTaskARNSSM{self._envname}',
parameter_name=f'/dataall/{self._envname}/ecs/task_def_arn/share_reapplier',
string_value=share_reapplier_task_definition.task_definition_arn,
)

ssm.StringParameter(
self,
f'ShareReapplierTaskContainerSSM{self._envname}',
parameter_name=f'/dataall/{self._envname}/ecs/container/share_reapplier',
string_value=share_reapplier_container.container_name,
)

self.ecs_task_definitions_families.append(share_reapplier_task_definition.family)

@run_if(['modules.dataset_base.features.share_notifications.email.persistent_reminders'])
Expand Down
52 changes: 51 additions & 1 deletion frontend/src/modules/Shares/components/ShareBoxList.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import CircularProgress from '@mui/material/CircularProgress';
import CheckBoxOutlineBlankIcon from '@mui/icons-material/CheckBoxOutlineBlank';
import CheckBoxIcon from '@mui/icons-material/CheckBox';
import PropTypes from 'prop-types';
import { useCallback, useEffect, useState } from 'react';
import React, { useCallback, useEffect, useState } from 'react';
import { Helmet } from 'react-helmet-async';
import { Defaults, Pager, ShareStatus, useSettings } from 'design';
import { SET_ERROR, useDispatch } from 'globalErrors';
Expand All @@ -29,6 +29,9 @@ import { ShareBoxListItem } from './ShareBoxListItem';
import { ShareObjectSelectorModal } from './ShareObjectSelectorModal';
import { NavigateShareViewModal } from './NavigateShareViewModal';
import { ShareStatusList } from '../constants';
import { RefreshRounded } from '@mui/icons-material';
import { reApplyShareObjectItemsOnDataset } from '../services/reApplyShareObjectItemsOnDataset';
import { useSnackbar } from 'notistack';

const icon = <CheckBoxOutlineBlankIcon fontSize="small" />;
const checkedIcon = <CheckBoxIcon fontSize="small" />;
Expand All @@ -53,7 +56,10 @@ export const ShareBoxList = (props) => {
useState(false);
const [isNavigateShareViewModalOpen, setIsNavigateShareViewModalOpen] =
useState(false);
const [reApplyButtonLoadingState, setreApplyButtonLoadingState] =
useState(false);
const statusOptions = ShareStatusList;
const { enqueueSnackbar } = useSnackbar();

const handleVerifyObjectItemsModalOpen = () => {
setIsVerifyObjectItemsModalOpen(true);
Expand Down Expand Up @@ -256,6 +262,33 @@ export const ShareBoxList = (props) => {
.finally(() => setLoading(false));
}, [client, dispatch]);

const reapplyShares = async (datasetUri) => {
try {
setreApplyButtonLoadingState(true);
const response = await client.mutate(
reApplyShareObjectItemsOnDataset({ datasetUri: datasetUri })
);
if (response && !response.errors) {
setreApplyButtonLoadingState(false);
enqueueSnackbar(
`Reapplying process for all unhealthy shares on dataset with uri: ${datasetUri} has started. Please check each individual share for share item health status`,
{
anchorOrigin: {
horizontal: 'right',
vertical: 'top'
},
variant: 'success'
}
);
} else {
dispatch({ type: SET_ERROR, error: response.errors[0].message });
}
} catch (error) {
setreApplyButtonLoadingState(false);
dispatch({ type: SET_ERROR, error: error?.message });
}
};

useEffect(() => {
setLoading(true);
setFilter({ page: 1, pageSize: 10, term: '' });
Expand Down Expand Up @@ -337,6 +370,23 @@ export const ShareBoxList = (props) => {
</LoadingButton>
)}

{dataset && (
<LoadingButton
loading={reApplyButtonLoadingState}
color="info"
align="right"
startIcon={<RefreshRounded fontSize="small" />}
sx={{ m: 1 }}
onClick={(event) => {
reapplyShares(dataset.datasetUri);
}}
type="button"
variant="outlined"
>
Re-apply Share Item(s) for Dataset
</LoadingButton>
)}

<Container maxWidth={settings.compact ? 'xl' : false}>
<Box
sx={{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { gql } from 'apollo-boost';

export const reApplyShareObjectItemsOnDataset = ({ datasetUri }) => ({
variables: {
datasetUri
},
mutation: gql`
mutation reApplyShareObjectItemsOnDataset($datasetUri: String!) {
reApplyShareObjectItemsOnDataset(datasetUri: $datasetUri)
}
`
});

0 comments on commit ee71d7b

Please sign in to comment.