Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Gh 904] Central Catalog Support #1021

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@

class LFShareManager:
def __init__(
self,
session,
dataset: Dataset,
share: ShareObject,
shared_tables: [DatasetTable],
revoked_tables: [DatasetTable],
source_environment: Environment,
target_environment: Environment,
env_group: EnvironmentGroup,
self,
session,
dataset: Dataset,
share: ShareObject,
shared_tables: [DatasetTable],
revoked_tables: [DatasetTable],
source_environment: Environment,
target_environment: Environment,
env_group: EnvironmentGroup,
):
self.session = session
self.env_group = env_group
Expand All @@ -41,30 +41,16 @@ def __init__(
self.revoked_tables = revoked_tables
self.source_environment = source_environment
self.target_environment = target_environment
self.source_account_id = source_environment.AwsAccountId
self.source_account_region = source_environment.region
self.source_database_name = dataset.GlueDatabaseName
# Set the source account details by checking if a catalog account exists
self.source_account_id, self.source_account_region, self.source_database_name = self.init_source_account_details()
self.shared_db_name, self.is_new_share = self.build_shared_db_name()
self.principals = self.get_share_principals()
self.cross_account = self.target_environment.AwsAccountId != self.source_account_id
self.lf_client_in_target = LakeFormationClient(
account_id=self.target_environment.AwsAccountId,
region=self.target_environment.region
)
self.lf_client_in_source = LakeFormationClient(
account_id=self.source_account_id,
region=self.source_account_region
)
self.glue_client_in_target = GlueClient(
account_id=self.target_environment.AwsAccountId,
region=self.target_environment.region,
database=self.shared_db_name,
)
self.glue_client_in_source = GlueClient(
account_id=self.source_account_id,
region=self.source_account_region,
database=self.source_database_name,
)
# Below Clients initialized by the initialize_clients()
self.glue_client_in_source = None
self.glue_client_in_target = None
self.lf_client_in_source = None
self.lf_client_in_target = None

@abc.abstractmethod
def process_approved_shares(self) -> [str]:
Expand All @@ -74,6 +60,19 @@ def process_approved_shares(self) -> [str]:
def process_revoked_shares(self) -> [str]:
return NotImplementedError

def init_source_account_details(self):
"""
Check if the catalog account is present and update the source account, source database and source region accordingly
"""
catalog_account_present = self.check_catalog_account_exists_and_verify()
if catalog_account_present is not False:
if catalog_account_present is not None:
return self.get_catalog_account_details()
else:
return None, None, None
else:
return self.source_environment.AwsAccountId, self.source_environment.region, self.dataset.GlueDatabaseName

def get_share_principals(self) -> [str]:
"""
Builds list of principals of the share request
Expand All @@ -84,7 +83,8 @@ def get_share_principals(self) -> [str]:
role_name=self.share.principalIAMRoleName
)
principals = [principal_iam_role_arn]
dashboard_enabled = EnvironmentService.get_boolean_env_param(self.session, self.target_environment, "dashboardsEnabled")
dashboard_enabled = EnvironmentService.get_boolean_env_param(self.session, self.target_environment,
"dashboardsEnabled")

if dashboard_enabled:
group = QuicksightClient.create_quicksight_group(
Expand All @@ -103,11 +103,14 @@ def build_shared_db_name(self) -> tuple:
For shares after 2.3.0 the suffix returned is "_shared"
:return: Shared database name, boolean indicating if it is a new share
"""
if self.source_database_name is None:
return '', True
dlpzx marked this conversation as resolved.
Show resolved Hide resolved
old_shared_db_name = (self.source_database_name + '_shared_' + self.share.shareUri)[:254]
warn('old_shared_db_name will be deprecated in v2.6.0', DeprecationWarning, stacklevel=2)
logger.info(
f'Checking shared db {old_shared_db_name} exists in {self.target_environment.AwsAccountId}...'
)

database = GlueClient(
account_id=self.target_environment.AwsAccountId,
database=old_shared_db_name,
Expand All @@ -119,7 +122,7 @@ def build_shared_db_name(self) -> tuple:
return self.source_database_name + '_shared', True

def check_table_exists_in_source_database(
self, share_item: ShareObjectItem, table: DatasetTable
self, share_item: ShareObjectItem, table: DatasetTable
) -> True:
"""
Checks if the table to be shared exists on the Glue catalog in the source account
Expand All @@ -138,7 +141,7 @@ def check_table_exists_in_source_database(
return True

def check_resource_link_table_exists_in_target_database(
self, table: DatasetTable
self, table: DatasetTable
) -> bool:
"""
Checks if the table to be shared exists on the Glue catalog in the target account as resource link
Expand Down Expand Up @@ -320,7 +323,8 @@ def revoke_principals_permissions_to_table_in_target(self, table: DatasetTable,
:param other_table_shares_in_env: Boolean. Other table shares in this environment for this table
:return: True if it is successful
"""
principals = self.principals if not other_table_shares_in_env else [p for p in self.principals if "arn:aws:quicksight" not in p]
principals = self.principals if not other_table_shares_in_env else [p for p in self.principals if
"arn:aws:quicksight" not in p]

self.lf_client_in_target.revoke_permissions_from_table_with_columns(
principals=principals,
Expand Down Expand Up @@ -390,9 +394,9 @@ def revoke_external_account_access_on_source_account(self, table: DatasetTable)
return True

def handle_share_failure(
self,
table: DatasetTable,
error: Exception,
self,
table: DatasetTable,
error: Exception,
) -> True:
"""
Handles share failure by raising an alarm to alarmsTopic
Expand Down Expand Up @@ -435,6 +439,13 @@ def handle_revoke_failure(
return True

def handle_share_failure_for_all_tables(self, tables, error, share_item_status):
"""
Handle table share failure for all tables
:param tables - List[DatasetTable]
:param error - share error
:param share_item_status : Status of approved/ revoked share
returns : Returns True is handling is successful
"""
for table in tables:
share_item = ShareObjectRepository.find_sharable_item(
self.session, self.share.shareUri, table.tableUri
Expand All @@ -445,72 +456,110 @@ def handle_share_failure_for_all_tables(self, tables, error, share_item_status):
new_state = share_item_sm.run_transition(ShareItemActions.Failure.value)
share_item_sm.update_state_single_item(self.session, share_item, new_state)

self.handle_revoke_failure(table=table, error=error)
if share_item_status == ShareItemStatus.Share_Approved.value:
self.handle_share_failure(table=table, error=error)
if share_item_status == ShareItemStatus.Revoke_Approved.value:
self.handle_revoke_failure(table=table, error=error)

def verify_catalog_ownership(self, catalog_dict):
if catalog_dict.get('account_id') != self.source_environment.AwsAccountId:
return True

def _verify_catalog_ownership(self, catalog_account_id, catalog_region, catalog_database):
"""
Verifies the catalog ownership by checking
1. if the pivot role is assumable in the catalog account
2. if "owner_account_id" tag is present in the catalog account, which contains AWS account of source account / producer account - where the data is present in S3 bucket
Returns : Raises exception only in case there is an issue with any of above
"""
if catalog_account_id != self.source_environment.AwsAccountId:
logger.info(f'Database {self.dataset.GlueDatabaseName} is a resource link and '
f'the source database {catalog_dict.get("database_name")} belongs to a catalog account {catalog_dict.get("account_id")}')
if SessionHelper.is_assumable_pivot_role(catalog_dict.get('account_id')):
self.validate_catalog_ownership_tag(catalog_dict)
f'the source database {catalog_database} belongs to a catalog account {catalog_account_id}')
if SessionHelper.is_assumable_pivot_role(catalog_account_id):
self._validate_catalog_ownership_tag(catalog_account_id, catalog_region, catalog_database)
else:
raise Exception(f'Pivot role is not assumable, catalog account {catalog_dict.get("account_id")} is not onboarded')
raise Exception(f'Pivot role is not assumable, catalog account {catalog_account_id} is not onboarded')

def validate_catalog_ownership_tag(self, catalog_dict):
glue_client = GlueClient(account_id=catalog_dict.get('account_id'),
database=catalog_dict.get('database_name'),
region=catalog_dict.get('region'))
def _validate_catalog_ownership_tag(self, catalog_account_id, catalog_region, catalog_database):
glue_client = GlueClient(account_id=catalog_account_id,
database=catalog_database,
region=catalog_region)

tags = glue_client.get_database_tags()
if tags.get('owner_account_id', '') == self.source_environment.AwsAccountId:
logger.info(f'owner_account_id tag exists and matches the source account id {self.source_environment.AwsAccountId}')
logger.info(
f'owner_account_id tag exists and matches the source account id {self.source_environment.AwsAccountId}')
else:
raise Exception(f'owner_account_id tag does not exist or does not matches the source account id {self.source_environment.AwsAccountId}')
raise Exception(
f'owner_account_id tag does not exist or does not matches the source account id {self.source_environment.AwsAccountId}')

dlpzx marked this conversation as resolved.
Show resolved Hide resolved
def check_catalog_account_exists_and_update_processor(self):
def check_catalog_account_exists_and_verify(self):
"""
Checks if the source account has a catalog associated with it. This is checked by getting source catalog information and checking if there exists a target database for the source db
Return -
True - if a catalog account is present and it is verified
False - if no source catalog account is present
None - if catalog account exists but there is an issue with verifing the conditions needed for source account. Check _verify_catalog_ownership for more details
"""
try:
catalog_dict = self.glue_client_in_source.get_source_catalog()
catalog_dict = GlueClient(
account_id=self.source_environment.AwsAccountId,
region=self.source_environment.region,
database=self.dataset.GlueDatabaseName,
).get_source_catalog()
if catalog_dict is not None:
# Found a catalog account
logger.info("Updating source aws account id, source account region, source database with catalog details")
logger.debug(f"Catalog Account id - {catalog_dict.get('account_id')}, Catalog Region - { catalog_dict.get('region')}, Catalog DB - {catalog_dict.get('database_name')}")
self.source_account_id = catalog_dict.get('account_id')
self.source_account_region = catalog_dict.get('region')
self.source_database_name = catalog_dict.get('database_name')
# Build the shared db name again as the shared db name on the producer account cannot be used ( as that is the resource link ).
# Instead update the shared db name with the new name.
self.shared_db_name, self.is_new_share = self.build_shared_db_name()
# Again Update the Cross Account Instance Variable
self.cross_account = self.target_environment.AwsAccountId != self.source_account_id
# Update / Reinitialize Glue and Lake formation Clients as the source account , region, db are changed
# Also reinitialize target glue client as the shared DB name changes
self.lf_client_in_source = LakeFormationClient(
account_id=self.source_account_id,
region=self.source_account_region
)
self.glue_client_in_source = GlueClient(
account_id=self.source_account_id,
region=self.source_account_region,
database=self.source_database_name,
)

self.glue_client_in_target = GlueClient(
account_id=self.target_environment.AwsAccountId,
region=self.target_environment.region,
database=self.shared_db_name,
)

# Verify the ownership of dataset
self.verify_catalog_ownership(catalog_dict)
# Verify the ownership of dataset by checking if pivot role is assumable and ownership tag is present
self._verify_catalog_ownership(catalog_dict.get('account_id'), catalog_dict.get('region'),
noah-paige marked this conversation as resolved.
Show resolved Hide resolved
catalog_dict.get('database_name'))
else:
logger.info(
f'No Catalog information found for dataset - {self.dataset.name} containing database - {self.dataset.GlueDatabaseName}')

return True

return False
except Exception as e:
logger.error(
f'Failed to initialise catalog account details for share - {self.share.shareUri} '
f'due to: {e}'
)
raise e
return None
return True

def get_catalog_account_details(self):
dlpzx marked this conversation as resolved.
Show resolved Hide resolved
"""
Fetched the catalog details and returns a dict containing information about the catalog account
Returns :
'account_id' - AWS account id of catalog account
'region' - AWS region in which the catalog account is present
'database_name' - DB present in the catalog account
"""
try:
catalog_dict = GlueClient(
account_id=self.source_environment.AwsAccountId,
region=self.source_environment.region,
database=self.dataset.GlueDatabaseName,
).get_source_catalog()
return catalog_dict.get('account_id'), catalog_dict.get('region'), catalog_dict.get('database_name')
except Exception as e:
logger.error(
f'Failed to fetch catalog account details for share - {self.share.shareUri} '
f'due to: {e}'
)
return None, None, None

def initialize_clients(self):

self.lf_client_in_target = LakeFormationClient(
account_id=self.target_environment.AwsAccountId,
region=self.target_environment.region
)
self.lf_client_in_source = LakeFormationClient(
account_id=self.source_account_id,
region=self.source_account_region
)
self.glue_client_in_target = GlueClient(
account_id=self.target_environment.AwsAccountId,
region=self.target_environment.region,
database=self.shared_db_name,
)
self.glue_client_in_source = GlueClient(
account_id=self.source_account_id,
region=self.source_account_region,
database=self.source_database_name,
)
Loading
Loading