From a39fd43959b6e588a98056f29382580a31c5718c Mon Sep 17 00:00:00 2001 From: Noah Paige <69586985+noah-paige@users.noreply.github.com> Date: Thu, 10 Aug 2023 10:33:18 -0400 Subject: [PATCH] Resolve Dataset Profiling Glue Job (#649) ### Feature or Bugfix - Bugfix ### Detail - Specify `SPARK_VERSION` as an environment variable for `pydeequ` before import - Add IAM Permissions to Dataset IAM Role to Allow for Glue Job logging in CloudWatch - Add LF Permissions to resolve insufficient permissions error thrown when looking for `default` database ### Relates By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. --- .../gluedatabasecustomresource/index.py | 23 ++++++++++++++++++- .../assets/glueprofilingjob/glue_script.py | 7 +++++- backend/dataall/cdkproxy/stacks/dataset.py | 10 +++++--- documentation/userguide/docs/tables.md | 3 +++ 4 files changed, 38 insertions(+), 5 deletions(-) diff --git a/backend/dataall/cdkproxy/assets/gluedatabasecustomresource/index.py b/backend/dataall/cdkproxy/assets/gluedatabasecustomresource/index.py index e548dcbf6..ce01c0f9a 100644 --- a/backend/dataall/cdkproxy/assets/gluedatabasecustomresource/index.py +++ b/backend/dataall/cdkproxy/assets/gluedatabasecustomresource/index.py @@ -49,6 +49,13 @@ def on_create(event): except ClientError as e: pass + default_db_exists = False + try: + glue_client.get_database(Name="default") + default_db_exists = True + except ClientError as e: + pass + if not exists: try: db_input = props.get('DatabaseInput').copy() @@ -63,7 +70,7 @@ def on_create(event): raise Exception(f"Could not create Glue Database {props['DatabaseInput']['Name']} in aws://{AWS_ACCOUNT}/{AWS_REGION}, received {str(e)}") Entries = [] - for i, role_arn in enumerate(props.get('DatabaseAdministrators')): + for i, role_arn in enumerate(props.get('DatabaseAdministrators', [])): Entries.append( { 'Id': str(uuid.uuid4()), @@ -103,6 +110,20 @@ def on_create(event): 'PermissionsWithGrantOption': ['SELECT', 'ALTER', 'DESCRIBE'], } ) + if default_db_exists: + Entries.append( + { + 'Id': str(uuid.uuid4()), + 'Principal': {'DataLakePrincipalIdentifier': role_arn}, + 'Resource': { + 'Database': { + 'Name': 'default' + } + }, + 'Permissions': ['Describe'.upper()], + } + ) + lf_client.batch_grant_permissions(CatalogId=props['CatalogId'], Entries=Entries) physical_id = props['DatabaseInput']['Imported'] + props['DatabaseInput']['Name'] diff --git a/backend/dataall/cdkproxy/assets/glueprofilingjob/glue_script.py b/backend/dataall/cdkproxy/assets/glueprofilingjob/glue_script.py index 8279bc11c..e974c6bf9 100644 --- a/backend/dataall/cdkproxy/assets/glueprofilingjob/glue_script.py +++ b/backend/dataall/cdkproxy/assets/glueprofilingjob/glue_script.py @@ -1,4 +1,5 @@ import json +import os import logging import pprint import sys @@ -8,7 +9,6 @@ from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext -from pydeequ.profiles import * sc = SparkContext.getOrCreate() sc._jsc.hadoopConfiguration().set('fs.s3.canned.acl', 'BucketOwnerFullControl') @@ -32,6 +32,7 @@ 'environmentBucket', 'dataallRegion', 'table', + "SPARK_VERSION" ] try: args = getResolvedOptions(sys.argv, list_args) @@ -43,6 +44,10 @@ list_args.remove('table') args = getResolvedOptions(sys.argv, list_args) +os.environ["SPARK_VERSION"] = args.get("SPARK_VERSION", "3.1") + +from pydeequ.profiles import * + logger.info('Parsed Retrieved parameters') logger.info('Parsed Args = %s', pprint.pformat(args)) diff --git a/backend/dataall/cdkproxy/stacks/dataset.py b/backend/dataall/cdkproxy/stacks/dataset.py index 854ac6e3f..3e75633d3 100644 --- a/backend/dataall/cdkproxy/stacks/dataset.py +++ b/backend/dataall/cdkproxy/stacks/dataset.py @@ -295,7 +295,7 @@ def __init__(self, scope, id, target_uri: str = None, **kwargs): ] ), iam.PolicyStatement( - sid="CreateLoggingGlueCrawler", + sid="CreateLoggingGlue", actions=[ 'logs:CreateLogGroup', 'logs:CreateLogStream', @@ -303,16 +303,18 @@ def __init__(self, scope, id, target_uri: str = None, **kwargs): effect=iam.Effect.ALLOW, resources=[ f'arn:aws:logs:{dataset.region}:{dataset.AwsAccountId}:log-group:/aws-glue/crawlers*', + f'arn:aws:logs:{dataset.region}:{dataset.AwsAccountId}:log-group:/aws-glue/jobs/*', ], ), iam.PolicyStatement( - sid="LoggingGlueCrawler", + sid="LoggingGlue", actions=[ 'logs:PutLogEvents', ], effect=iam.Effect.ALLOW, resources=[ f'arn:aws:logs:{dataset.region}:{dataset.AwsAccountId}:log-group:/aws-glue/crawlers:log-stream:{dataset.GlueCrawlerName}', + f'arn:aws:logs:{dataset.region}:{dataset.AwsAccountId}:log-group:/aws-glue/jobs/*', ], ), iam.PolicyStatement( @@ -443,7 +445,8 @@ def __init__(self, scope, id, target_uri: str = None, **kwargs): 'CreateTableDefaultPermissions': [], 'Imported': 'IMPORTED-' if dataset.imported else 'CREATED-' }, - 'DatabaseAdministrators': dataset_admins + 'DatabaseAdministrators': dataset_admins, + 'TriggerUpdate': True }, ) @@ -484,6 +487,7 @@ def __init__(self, scope, id, target_uri: str = None, **kwargs): '--enable-metrics': 'true', '--enable-continuous-cloudwatch-log': 'true', '--enable-glue-datacatalog': 'true', + '--SPARK_VERSION': '3.1', } job = glue.CfnJob( diff --git a/documentation/userguide/docs/tables.md b/documentation/userguide/docs/tables.md index 192757b51..cb8396a28 100644 --- a/documentation/userguide/docs/tables.md +++ b/documentation/userguide/docs/tables.md @@ -70,6 +70,9 @@ By selecting the **Metrics** tab of your data table you can run a profiling job ![](pictures/tables/table_metrics.png#zoom#shadow) +!!! warning "Profiling Job Prerequisite" + Before running the profiling job you will need to ensure that the **default** Glue Database exists in the AWS Account where the data exists (by default this database exists for new accounts). This is required to enable the Glue profiling job to use the metadata stored in the Glue Catalog. + ### :material-trash-can-outline: **Delete a table** Deleting a table means deleting it from the data.all Catalog, but it will be still available on the AWS Glue Catalog. Moreover, when data owners