Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better sharing management #176

Merged
merged 25 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions backend/dataall/aws/handlers/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from ... import db
from ...db import models
from ...utils import Parameter
from ...tasks.share_manager import ShareManager
from ...tasks.data_sharing.data_sharing_service import DataSharingService

log = logging.getLogger('aws:ecs')

Expand All @@ -23,7 +23,7 @@ def __init__(self):
def approve_share(engine, task: models.Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
return ShareManager.approve_share(engine, task.targetUri)
return DataSharingService.approve_share(engine, task.targetUri)
else:
return Ecs.run_share_management_ecs_task(
envname, task.targetUri, 'approve_share'
Expand All @@ -34,7 +34,7 @@ def approve_share(engine, task: models.Task):
def reject_share(engine, task: models.Task):
envname = os.environ.get('envname', 'local')
if envname in ['local', 'dkrcompose']:
return ShareManager.reject_share(engine, task.targetUri)
return DataSharingService.reject_share(engine, task.targetUri)
else:
return Ecs.run_share_management_ecs_task(
envname, task.targetUri, 'reject_share'
Expand Down
129 changes: 106 additions & 23 deletions backend/dataall/aws/handlers/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def __init__(self):
pass

@staticmethod
def _create_database(accountid, database, region, location):
def create_database(accountid, database, region, location):
try:
existing_database = Glue.database_exists(
accountid=accountid, database=database, region=region
Expand All @@ -27,7 +27,9 @@ def _create_database(accountid, database, region, location):
glue_database_created = True
return glue_database_created
except ClientError as e:
log.debug(f'Failed to create database {database}', e)
log.error(
f'Failed to create database {database} on account {accountid} due to {e}'
)
raise e

@staticmethod
Expand Down Expand Up @@ -64,15 +66,11 @@ def database_exists(**data):
session = SessionHelper.remote_session(accountid)
try:
glue_client = session.client('glue', region_name=region)
response = glue_client.get_database(
CatalogId=data['accountid'], Name=database
)
if response.get('Database'):
return response
else:
return None
except ClientError as e:
log.debug(f'Database already exists in Glue{database}', e)
glue_client.get_database(CatalogId=data['accountid'], Name=database)
return True
except ClientError:
log.info(f'Database {database} does not exist on account {accountid}...')
return False

@staticmethod
@Worker.handler(path='glue.dataset.database.tables')
Expand Down Expand Up @@ -140,12 +138,13 @@ def table_exists(**data):
log.info(f'Glue table not found: {data}')
return None

@staticmethod
def _create_table(**data):
accountid = data['accountid']
session = SessionHelper.remote_session(accountid=accountid)
region = data.get('region', 'eu-west-1')
database = data.get('database', 'UnknownDatabaseName')

session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
log.info(
'Creating table {} in database {}'.format(
Expand All @@ -155,7 +154,7 @@ def _create_table(**data):
if not Glue.database_exists(
database=database, region=region, accountid=accountid
):
Glue._create_database(accountid, database, region, None)
Glue.create_database(accountid, database, region, None)
if 'table_input' not in data:
table_input = {
'Name': data['tablename'],
Expand Down Expand Up @@ -222,6 +221,47 @@ def _create_table(**data):
)
return response

@staticmethod
def create_resource_link(**data):
accountid = data['accountid']
region = data['region']
database = data['database']
resource_link_name = data['resource_link_name']
resource_link_input = data['resource_link_input']
log.info(
f'Creating ResourceLink {resource_link_name} in database {accountid}://{database}'
)
try:
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
resource_link = Glue.table_exists(
accountid=accountid,
region=region,
database=database,
tablename=resource_link_name,
)
if resource_link:
log.info(
f'ResourceLink {resource_link_name} already exists in database {accountid}://{database}'
)
else:
resource_link = glue.create_table(
CatalogId=accountid,
DatabaseName=database,
TableInput=resource_link_input,
)
log.info(
f'Successfully created ResourceLink {resource_link_name} in database {accountid}://{database}'
)
return resource_link
except ClientError as e:
log.error(
f'Could not create ResourceLink {resource_link_name} '
f'in database {accountid}://{database} '
f'due to: {e}'
)
raise e

@staticmethod
def is_resource_link(table_input: dict):
"""
Expand Down Expand Up @@ -268,21 +308,64 @@ def delete_table_and_create_resourcelink(glue, database, accountid, table_input)
)
raise e

@staticmethod
def delete_database(**data):
accountid = data['accountid']
region = data['region']
database = data['database']
log.info(f'Deleting database {accountid}://{database} ...')
try:
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
if Glue.database_exists(
accountid=accountid,
region=region,
database=database,
):
glue.delete_database(CatalogId=accountid, Name=database)
return True
except ClientError as e:
log.error(
f'Could not delete database {database} '
f'in account {accountid} '
f'due to: {e}'
)
raise e

@staticmethod
def batch_delete_tables(**data):
accountid = data['accountid']
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=data.get('region', 'eu-west-1'))
region = data['region']
database = data['database']
tables = data['tables']
log.debug(f'Batch deleting tables: {tables}')
response = glue.batch_delete_table(
CatalogId=accountid, DatabaseName=database, TablesToDelete=tables
)
log.debug(
f'Batch deleted tables {len(tables)} from database {database} successfully'
)
return response

if not tables:
log.info('No tables to delete exiting method...')
return

log.info(f'Batch deleting tables: {tables}')
try:
session = SessionHelper.remote_session(accountid=accountid)
glue = session.client('glue', region_name=region)
if Glue.database_exists(
accountid=accountid,
region=region,
database=database,
):
glue.batch_delete_table(
CatalogId=accountid, DatabaseName=database, TablesToDelete=tables
)
log.debug(
f'Batch deleted tables {len(tables)} from database {database} successfully'
)
return True
except ClientError as e:
log.error(
f'Could not batch delete tables {tables} '
f'in database {accountid}://{database} '
f'due to: {e}'
)
raise e

@staticmethod
@Worker.handler(path='glue.dataset.crawler.create')
Expand Down
Loading