Skip to content

Commit

Permalink
Merge pull request #176 from awslabs/fix/issue-142
Browse files Browse the repository at this point in the history
Better sharing management
  • Loading branch information
dlpzx authored Nov 3, 2022
2 parents d53edc6 + 724d2df commit 1beb991
Show file tree
Hide file tree
Showing 23 changed files with 3,128 additions and 1,471 deletions.
6 changes: 3 additions & 3 deletions backend/dataall/aws/handlers/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ... import db
from ...db import models
from ...utils import Parameter
from ...tasks.share_manager import ShareManager
from ...tasks.data_sharing.data_sharing_service import DataSharingService

log = logging.getLogger('aws:ecs')

Expand All @@ -23,7 +23,7 @@ def __init__(self):
def approve_share(engine, task: models.Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
return ShareManager.approve_share(engine, task.targetUri)
return DataSharingService.approve_share(engine, task.targetUri)
else:
return Ecs.run_share_management_ecs_task(
envname, task.targetUri, 'approve_share'
Expand All @@ -34,7 +34,7 @@ def approve_share(engine, task: models.Task):
def reject_share(engine, task: models.Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
return ShareManager.reject_share(engine, task.targetUri)
return DataSharingService.reject_share(engine, task.targetUri)
else:
return Ecs.run_share_management_ecs_task(
envname, task.targetUri, 'reject_share'
Expand Down
129 changes: 106 additions & 23 deletions backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self):
pass

@staticmethod
def _create_database(accountid, database, region, location):
def create_database(accountid, database, region, location):
try:
existing_database = Glue.database_exists(
accountid=accountid, database=database, region=region
Expand All @@ -27,7 +27,9 @@ def _create_database(accountid, database, region, location):
glue_database_created = True
return glue_database_created
except ClientError as e:
log.debug(f'Failed to create database {database}', e)
log.error(
f'Failed to create database {database} on account {accountid} due to {e}'
)
raise e

@staticmethod
Expand Down Expand Up @@ -64,15 +66,11 @@ def database_exists(**data):
session = SessionHelper.remote_session(accountid)
try:
glue_client = session.client('glue', region_name=region)
response = glue_client.get_database(
CatalogId=data['accountid'], Name=database
)
if response.get('Database'):
return response
else:
return None
except ClientError as e:
log.debug(f'Database already exists in Glue{database}', e)
glue_client.get_database(CatalogId=data['accountid'], Name=database)
return True
except ClientError:
log.info(f'Database {database} does not exist on account {accountid}...')
return False

@staticmethod
@Worker.handler(path='glue.dataset.database.tables')
Expand Down Expand Up @@ -140,12 +138,13 @@ def table_exists(**data):
log.info(f'Glue table not found: {data}')
return None

@staticmethod
def _create_table(**data):
accountid = data['accountid']
session = SessionHelper.remote_session(accountid=accountid)
region = data.get('region', 'eu-west-1')
database = data.get('database', 'UnknownDatabaseName')

session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
log.info(
'Creating table {} in database {}'.format(
Expand All @@ -155,7 +154,7 @@ def _create_table(**data):
if not Glue.database_exists(
database=database, region=region, accountid=accountid
):
Glue._create_database(accountid, database, region, None)
Glue.create_database(accountid, database, region, None)
if 'table_input' not in data:
table_input = {
'Name': data['tablename'],
Expand Down Expand Up @@ -222,6 +221,47 @@ def _create_table(**data):
)
return response

@staticmethod
def create_resource_link(**data):
accountid = data['accountid']
region = data['region']
database = data['database']
resource_link_name = data['resource_link_name']
resource_link_input = data['resource_link_input']
log.info(
f'Creating ResourceLink {resource_link_name} in database {accountid}://{database}'
)
try:
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
resource_link = Glue.table_exists(
accountid=accountid,
region=region,
database=database,
tablename=resource_link_name,
)
if resource_link:
log.info(
f'ResourceLink {resource_link_name} already exists in database {accountid}://{database}'
)
else:
resource_link = glue.create_table(
CatalogId=accountid,
DatabaseName=database,
TableInput=resource_link_input,
)
log.info(
f'Successfully created ResourceLink {resource_link_name} in database {accountid}://{database}'
)
return resource_link
except ClientError as e:
log.error(
f'Could not create ResourceLink {resource_link_name} '
f'in database {accountid}://{database} '
f'due to: {e}'
)
raise e

@staticmethod
def is_resource_link(table_input: dict):
"""
Expand Down Expand Up @@ -268,21 +308,64 @@ def delete_table_and_create_resourcelink(glue, database, accountid, table_input)
)
raise e

@staticmethod
def delete_database(**data):
accountid = data['accountid']
region = data['region']
database = data['database']
log.info(f'Deleting database {accountid}://{database} ...')
try:
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
if Glue.database_exists(
accountid=accountid,
region=region,
database=database,
):
glue.delete_database(CatalogId=accountid, Name=database)
return True
except ClientError as e:
log.error(
f'Could not delete database {database} '
f'in account {accountid} '
f'due to: {e}'
)
raise e

@staticmethod
def batch_delete_tables(**data):
accountid = data['accountid']
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=data.get('region', 'eu-west-1'))
region = data['region']
database = data['database']
tables = data['tables']
log.debug(f'Batch deleting tables: {tables}')
response = glue.batch_delete_table(
CatalogId=accountid, DatabaseName=database, TablesToDelete=tables
)
log.debug(
f'Batch deleted tables {len(tables)} from database {database} successfully'
)
return response

if not tables:
log.info('No tables to delete exiting method...')
return

log.info(f'Batch deleting tables: {tables}')
try:
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
if Glue.database_exists(
accountid=accountid,
region=region,
database=database,
):
glue.batch_delete_table(
CatalogId=accountid, DatabaseName=database, TablesToDelete=tables
)
log.debug(
f'Batch deleted tables {len(tables)} from database {database} successfully'
)
return True
except ClientError as e:
log.error(
f'Could not batch delete tables {tables} '
f'in database {accountid}://{database} '
f'due to: {e}'
)
raise e

@staticmethod
@Worker.handler(path='glue.dataset.crawler.create')
Expand Down
Loading

0 comments on commit 1beb991

Please sign in to comment.