From ccb12fe84dbb9093df3e76379fcaba97b209962a Mon Sep 17 00:00:00 2001 From: Phil Snyder Date: Fri, 18 Oct 2024 15:23:24 -0700 Subject: [PATCH] consolidate GX docs to a single docsite --- ...job-run-great-expectations-on-parquet.yaml | 5 +- config/develop/namespaced/glue-workflow.yaml | 2 - ...job-run-great-expectations-on-parquet.yaml | 5 +- config/prod/namespaced/glue-workflow.yaml | 2 - .../jobs/run_great_expectations_on_parquet.py | 367 ++++++------------ src/glue/resources/great_expectations.yml | 58 +++ ...e-job-run-great-expectations-on-parquet.j2 | 28 +- templates/glue-workflow.j2 | 17 +- .../test_run_great_expectations_on_parquet.py | 6 +- 9 files changed, 210 insertions(+), 280 deletions(-) create mode 100644 src/glue/resources/great_expectations.yml diff --git a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml index 1e6d461..a99b013 100644 --- a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml +++ b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -7,12 +7,15 @@ parameters: Namespace: {{ stack_group_config.namespace }} JobDescription: Runs great expectations on a set of data JobRole: !stack_output_external glue-job-role::RoleArn - TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }} + ParquetBucket: {{ stack_group_config.processed_data_bucket_name }} + ShareableArtifactsBucket: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} S3ScriptBucket: {{ stack_group_config.template_bucket_name }} S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' + ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: dataset_schemas: !file src/glue/resources/table_columns.yaml + data_values_expectations: !file src/glue/resources/data_values_expectations.json diff --git a/config/develop/namespaced/glue-workflow.yaml b/config/develop/namespaced/glue-workflow.yaml index 6861a72..1f51a5b 100644 --- a/config/develop/namespaced/glue-workflow.yaml +++ b/config/develop/namespaced/glue-workflow.yaml @@ -20,8 +20,6 @@ parameters: CompareParquetMainNamespace: "main" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} CloudformationBucketName: {{ stack_group_config.template_bucket_name }} - ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} - ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: diff --git a/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml index f0e8dd2..d91ff9a 100644 --- a/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml +++ b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -7,12 +7,15 @@ parameters: Namespace: {{ stack_group_config.namespace }} JobDescription: Runs great expectations on a set of data JobRole: !stack_output_external glue-job-role::RoleArn - TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }} + ParquetBucket: {{ stack_group_config.processed_data_bucket_name }} + ShareableArtifactsBucket: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} S3ScriptBucket: {{ stack_group_config.template_bucket_name }} S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' + ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: dataset_schemas: !file src/glue/resources/table_columns.yaml + data_values_expectations: !file src/glue/resources/data_values_expectations.json diff --git a/config/prod/namespaced/glue-workflow.yaml b/config/prod/namespaced/glue-workflow.yaml index 3223adb..ca20b3f 100644 --- a/config/prod/namespaced/glue-workflow.yaml +++ b/config/prod/namespaced/glue-workflow.yaml @@ -20,8 +20,6 @@ parameters: CompareParquetMainNamespace: "main" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} CloudformationBucketName: {{ stack_group_config.template_bucket_name }} - ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} - ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py index 68c0e99..f6e043f 100644 --- a/src/glue/jobs/run_great_expectations_on_parquet.py +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -1,13 +1,17 @@ import json import logging +import os +import subprocess import sys from datetime import datetime from typing import Dict import boto3 -import great_expectations as gx from awsglue.context import GlueContext from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext + +import great_expectations as gx from great_expectations.core.batch import RuntimeBatchRequest from great_expectations.core.expectation_configuration import ExpectationConfiguration from great_expectations.core.run_identifier import RunIdentifier @@ -17,7 +21,6 @@ ExpectationSuiteIdentifier, ValidationResultIdentifier, ) -from pyspark.context import SparkContext logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -39,6 +42,7 @@ def read_args() -> dict: "namespace", "data-type", "expectation-suite-key", + "gx-resources-key-prefix", ], ) for arg in args: @@ -61,145 +65,51 @@ def validate_args(value: str) -> None: return None -def create_context( - s3_bucket: str, namespace: str, key_prefix: str -) -> "EphemeralDataContext": - """Creates the data context and adds stores, - datasource and data docs configurations - - Args: - s3_bucket (str): name of s3 bucket to store to - namespace (str): namespace - key_prefix (str): s3 key prefix - - Returns: - EphemeralDataContext: context object with all - configurations - """ - context = gx.get_context() - add_datasource(context) - add_validation_stores(context, s3_bucket, namespace, key_prefix) - add_data_docs_sites(context, s3_bucket, namespace, key_prefix) - return context - - -def add_datasource(context: "EphemeralDataContext") -> "EphemeralDataContext": - """Adds the spark datasource - - Args: - context (EphemeralDataContext): data context to add to - - Returns: - EphemeralDataContext: data context object with datasource configuration - added - """ - yaml = YAMLHandler() - context.add_datasource( - **yaml.load( - """ - name: spark_datasource - class_name: Datasource - execution_engine: - class_name: SparkDFExecutionEngine - force_reuse_spark_context: true - data_connectors: - runtime_data_connector: - class_name: RuntimeDataConnector - batch_identifiers: - - batch_identifier - """ - ) - ) - return context - - -def add_validation_stores( - context: "EphemeralDataContext", +def update_data_docs_sites( + context: gx.data_context.AbstractDataContext, s3_bucket: str, namespace: str, - key_prefix: str, -) -> "EphemeralDataContext": - """Adds the validation store configurations to the context object - - Args: - context (EphemeralDataContext): data context to add to - s3_bucket (str): name of the s3 bucket to save validation results to - namespace (str): name of the namespace - key_prefix (str): s3 key prefix to save the - validation results to - - Returns: - EphemeralDataContext: data context object with validation stores' - configuration added +) -> gx.data_context.AbstractDataContext: """ - # Programmatically configure the validation result store and - # DataDocs to use S3 - context.add_store( - "validation_result_store", - { - "class_name": "ValidationsStore", - "store_backend": { - "class_name": "TupleS3StoreBackend", - "bucket": s3_bucket, - "prefix": f"{namespace}/{key_prefix}", - }, - }, - ) - return context - - -def add_data_docs_sites( - context: "EphemeralDataContext", - s3_bucket: str, - namespace: str, - key_prefix: str, -) -> "EphemeralDataContext": - """Adds the data docs sites configuration to the context object - so data docs can be saved to a s3 location. This is a special - workaround to add the data docs because we're using EphemeralDataContext - context objects and they don't store to memory. + Updates the `data_docs_sites` configuration to reflect the appropriate environment and namespace Args: - context (EphemeralDataContext): data context to add to - s3_bucket (str): name of the s3 bucket to save gx docs to - namespace (str): name of the namespace - key_prefix (str): s3 key prefix to save the - gx docs to + context (gx.data_context.AbstractDataContext): The GX data context to update + s3_bucket (str): The S3 bucket where data docs are written + namespace (str): The current namespace Returns: - EphemeralDataContext: data context object with data docs sites' - configuration added + gx.data_context.AbstractDataContext: The updated GX data context object """ - data_context_config = DataContextConfig() - data_context_config["data_docs_sites"] = { - "s3_site": { + context.update_data_docs_site( + site_name="s3_site", + site_config={ "class_name": "SiteBuilder", "store_backend": { "class_name": "TupleS3StoreBackend", "bucket": s3_bucket, - "prefix": f"{namespace}/{key_prefix}", + "prefix": f"{namespace}/great_expectation_reports/parquet/", }, "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"}, - } - } - context._project_config["data_docs_sites"] = data_context_config["data_docs_sites"] + }, + ) return context def get_spark_df( glue_context: GlueContext, parquet_bucket: str, namespace: str, data_type: str ) -> "pyspark.sql.dataframe.DataFrame": - """Reads in the parquet dataset as a Dynamic Frame and converts it - to a spark dataframe + """ + Read a data-type-specific Parquet dataset Args: - glue_context (GlueContext): the aws glue context object - parquet_bucket (str): the name of the bucket holding parquet files - namespace (str): the namespace - data_type (str): the data type name + glue_context (GlueContext): The AWS Glue context object + parquet_bucket (str): The S3 bucket containing the data-type-specific Parquet dataset + namespace (str): The associated namespace + data_type (str): The associated data type Returns: - pyspark.sql.dataframe.DataFrame: spark dataframe of the read in parquet dataset + pyspark.sql.dataframe.DataFrame: A Spark dataframe over our data-type-specific Parquet dataset """ s3_parquet_path = f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{data_type}/" dynamic_frame = glue_context.create_dynamic_frame_from_options( @@ -212,29 +122,24 @@ def get_spark_df( def get_batch_request( + gx_context: gx.data_context.AbstractDataContext, spark_dataset: "pyspark.sql.dataframe.DataFrame", data_type: str, - run_id: RunIdentifier, -) -> RuntimeBatchRequest: - """Retrieves the unique metadata for this batch request +) -> gx.datasource.fluent.batch_request: + """ + Get a GX batch request over a Spark dataframe Args: - spark_dataset (pyspark.sql.dataframe.DataFrame): parquet dataset as spark df - data_type (str): data type name - run_id (RunIdentifier): contains the run name and - run time metadata of this batch run + spark_dataset (pyspark.sql.dataframe.DataFrame): A Spark dataframe + data_type (str): The data type Returns: - RuntimeBatchRequest: contains metadata for the batch run request - to identify this great expectations run + RuntimeBatchRequest: A batch request which can be used in conjunction + with an expectation suite to validate our data. """ - batch_request = RuntimeBatchRequest( - datasource_name="spark_datasource", - data_connector_name="runtime_data_connector", - data_asset_name=f"{data_type}-parquet-data-asset", - runtime_parameters={"batch_data": spark_dataset}, - batch_identifiers={"batch_identifier": f"{data_type}_{run_id.run_name}_batch"}, - ) + data_source = gx_context.sources.add_or_update_spark(name="parquet") + data_asset = data_source.add_dataframe_asset(name=f"{data_type}_spark_dataframe") + batch_request = data_asset.build_batch_request(dataframe=spark_dataset) return batch_request @@ -243,13 +148,13 @@ def read_json( s3_bucket: str, key: str, ) -> Dict[str, str]: - """Reads in a json object + """ + Read a JSON file from an S3 bucket Args: - s3 (boto3.client): s3 client connection - s3_bucket (str): name of the s3 bucket to read from - key (str): s3 key prefix of the - location of the json to read from + s3 (boto3.client): An S3 client + s3_bucket (str): The S3 bucket containing the JSON file + key (str): The S3 key of the JSON file Returns: Dict[str, str]: the data read in from json @@ -263,112 +168,73 @@ def read_json( def add_expectations_from_json( expectations_data: Dict[str, str], - context: "EphemeralDataContext", - data_type: str, -) -> "EphemeralDataContext": - """Adds in the read in expectations to the context object - - Args: - expectations_data (Dict[str, str]): expectations - context (EphemeralDataContext): context object - data_type (str): name of the data type - - Raises: - ValueError: thrown when no expectations exist for this data type - - Returns: - EphemeralDataContext: context object with expectations added + context: gx.data_context.AbstractDataContext, +) -> gx.data_context.AbstractDataContext: """ - # Ensure the data type exists in the JSON file - if data_type not in expectations_data: - raise ValueError(f"No expectations found for data type '{data_type}'") - - # Extract the expectation suite and expectations for the dataset - suite_data = expectations_data[data_type] - expectation_suite_name = suite_data["expectation_suite_name"] - new_expectations = suite_data["expectations"] - - # Convert new expectations from JSON format to ExpectationConfiguration objects - new_expectations_configs = [ - ExpectationConfiguration( - expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] - ) - for exp in new_expectations - ] - - # Update the expectation suite in the data context - context.add_or_update_expectation_suite( - expectation_suite_name=expectation_suite_name, - expectations=new_expectations_configs, - ) - return context - - -def add_validation_results_to_store( - context: "EphemeralDataContext", - expectation_suite_name: str, - validation_result: Dict[str, str], - batch_identifier: RuntimeBatchRequest, - run_identifier: RunIdentifier, -) -> "EphemeralDataContext": - """Adds the validation results manually to the validation store. - This is a workaround for a EphemeralDataContext context object, - and for us to avoid complicating our folder structure to include - checkpoints/other more persistent data context object types - until we need that feature + Add an expectation suite with expectations to our GX data context for each data type. Args: - context (EphemeralDataContext): context object to add results to - expectation_suite_name (str): name of expectation suite - validation_result (Dict[str, str]): results outputted by gx - validator to be stored - batch_identifier (RuntimeBatchRequest): metadata containing details of - the batch request - run_identifier (RunIdentifier): metadata containing details of the gx run + expectations_data (Dict[str, str]): A mapping of data types to their expectations. + The expectations should be formatted like so: + + { + "expectation_suite_name": "string", + "expectations": { + "expectation_type": "str", + "kwargs": "readable by `ExpectationConfiguration`" + } + } + context (gx.data_context.AbstractDataContext): context object Returns: - EphemeralDataContext: context object with validation results added to + gx.data_context.AbstractDataContext: A GX data context object with expectation suites added """ - expectation_suite = context.get_expectation_suite(expectation_suite_name) - # Create an ExpectationSuiteIdentifier - expectation_suite_identifier = ExpectationSuiteIdentifier( - expectation_suite_name=expectation_suite.expectation_suite_name - ) - - # Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier - validation_result_identifier = ValidationResultIdentifier( - expectation_suite_identifier=expectation_suite_identifier, - batch_identifier=batch_identifier, - run_id=run_identifier, - ) - - context.validations_store.set(validation_result_identifier, validation_result) + for data_type in expectations_data: + suite_data = expectations_data[data_type] + expectation_suite_name = suite_data["expectation_suite_name"] + new_expectations = suite_data["expectations"] + + # Convert new expectations from dict to ExpectationConfiguration objects + new_expectations_configs = [ + ExpectationConfiguration( + expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] + ) + for exp in new_expectations + ] + + # Update the expectation suite in the data context + context.add_or_update_expectation_suite( + expectation_suite_name=expectation_suite_name, + expectations=new_expectations_configs, + ) return context def main(): args = read_args() + s3 = boto3.client("s3") + # Download GX stores and configuration + subprocess.run( + args=[ + "aws", + "s3", + "sync", + f"s3://{os.path.join(args['shareable_artifacts_bucket'], args['gx_resources_key_prefix'])}", + ".", + ], + check=True, + ) run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") expectation_suite_name = f"{args['data_type']}_expectations" - s3 = boto3.client("s3") - context = create_context( + + # Set up Great Expectations + gx_context = gx.get_context() + logger.info("update_data_docs_site") + gx_context = update_data_docs_sites( + context=gx_context, s3_bucket=args["shareable_artifacts_bucket"], namespace=args["namespace"], - key_prefix=f"great_expectation_reports/{args['data_type']}/parquet/", - ) - glue_context = GlueContext(SparkContext.getOrCreate()) - logger.info("get_spark_df") - spark_df = get_spark_df( - glue_context=glue_context, - parquet_bucket=args["parquet_bucket"], - namespace=args["namespace"], - data_type=args["data_type"], ) - logger.info("get_batch_request") - batch_request = get_batch_request(spark_df, args["data_type"], run_id) - logger.info("add_expectations") - - # Load the JSON file with the expectations logger.info("reads_expectations_from_json") expectations_data = read_json( s3=s3, @@ -376,32 +242,37 @@ def main(): key=args["expectation_suite_key"], ) logger.info("adds_expectations_from_json") - add_expectations_from_json( + gx_context = add_expectations_from_json( expectations_data=expectations_data, - context=context, - data_type=args["data_type"], - ) - logger.info("get_validator") - validator = context.get_validator( - batch_request=batch_request, - expectation_suite_name=expectation_suite_name, + context=gx_context, ) - logger.info("validator.validate") - validation_result = validator.validate() - logger.info("validation_result: %s", validation_result) + # Set up Spark + glue_context = GlueContext(SparkContext.getOrCreate()) + logger.info("get_spark_df") + spark_df = get_spark_df( + glue_context=glue_context, + parquet_bucket=args["parquet_bucket"], + namespace=args["namespace"], + data_type=args["data_type"], + ) - add_validation_results_to_store( - context, - expectation_suite_name, - validation_result, - batch_identifier=batch_request["batch_identifiers"]["batch_identifier"], - run_identifier=run_id, + # Put the two together and validate the GX expectations + logger.info("get_batch_request") + batch_request = get_batch_request( + gx_context=gx_context, spark_dataset=spark_df, data_type=args["data_type"] ) - context.build_data_docs( - site_names=["s3_site"], + logger.info("add_or_update_checkpoint") + # The default checkpoint action list is: + # StoreValidationResultAction, StoreEvaluationParametersAction, UpdateDataDocsAction + checkpoint = gx_context.add_or_update_checkpoint( + name=f"{args['data_type']}-checkpoint", + expectation_suite_name=expectation_suite_name, + batch_request=batch_request, ) - logger.info("data docs saved!") + logger.info("run checkpoint") + checkpoint_result = checkpoint.run(run_id=run_id) + logger.info("data docs updated!") if __name__ == "__main__": diff --git a/src/glue/resources/great_expectations.yml b/src/glue/resources/great_expectations.yml new file mode 100644 index 0000000..9ce83cd --- /dev/null +++ b/src/glue/resources/great_expectations.yml @@ -0,0 +1,58 @@ +config_version: 3.0 +stores: + expectations_store: + class_name: ExpectationsStore + store_backend: + class_name: TupleFilesystemStoreBackend + base_directory: expectations/ + validations_store: + class_name: ValidationsStore + store_backend: + class_name: TupleFilesystemStoreBackend + base_directory: uncommitted/validations/ + evaluation_parameter_store: + class_name: EvaluationParameterStore + checkpoint_store: + class_name: CheckpointStore + store_backend: + class_name: TupleFilesystemStoreBackend + suppress_store_backend_id: true + base_directory: checkpoints/ + profiler_store: + class_name: ProfilerStore + store_backend: + class_name: TupleFilesystemStoreBackend + suppress_store_backend_id: true + base_directory: profilers/ +expectations_store_name: expectations_store +validations_store_name: validations_store +evaluation_parameter_store_name: evaluation_parameter_store +checkpoint_store_name: checkpoint_store +datasources: + spark_datasource: + class_name: Datasource + execution_engine: + class_name: SparkDFExecutionEngine + force_reuse_spark_context: true + data_connectors: + runtime_data_connector: + class_name: RuntimeDataConnector + batch_identifiers: + - batch_identifier +fluent_datasources: + parquet: + type: spark + assets: {} +data_docs_sites: + s3_site: + class_name: SiteBuilder + store_backend: + class_name: TupleS3StoreBackend + bucket: recover-shareable-artifacts-vpn + prefix: main/great_expectation_reports/parquet + site_index_builder: + class_name: DefaultSiteIndexBuilder +include_rendered_content: + globally: false + expectation_suite: false + expectation_validation_result: false diff --git a/templates/glue-job-run-great-expectations-on-parquet.j2 b/templates/glue-job-run-great-expectations-on-parquet.j2 index 455b5e8..a6c4046 100644 --- a/templates/glue-job-run-great-expectations-on-parquet.j2 +++ b/templates/glue-job-run-great-expectations-on-parquet.j2 @@ -19,9 +19,14 @@ Parameters: Type: String Description: The name or ARN of the IAM role that will run this job. - TempS3Bucket: + ParquetBucket: Type: String - Description: The name of the S3 bucket where temporary files and logs are written. + Description: The name of the S3 bucket where Parquet data is written. This is + also where we will write temporary files and logs. + + ShareableArtifactsBucket: + Type: String + Description: The name of the bucket where shareable artifacts are stored. S3ScriptBucket: Type: String @@ -48,6 +53,10 @@ Parameters: FitbitIntradayCombined and HealthKitV2Samples. Default: 1 + ExpectationSuiteKey: + Type: String + Description: The s3 key prefix of the expectation suite. + MaxRetries: Type: Number Description: How many times to retry the job if it fails (integer). @@ -71,12 +80,11 @@ Resources: {% for v in sceptre_user_data.dataset_schemas.tables.keys() if not "Deleted" in v %} {% set dataset = {} %} {% do dataset.update({"type": v}) %} - {% do dataset.update({"table_name": "dataset_" + v.lower()})%} {% do dataset.update({"stackname_prefix": "{}".format(v.replace("_",""))}) %} {% do datasets.append(dataset) %} {% endfor %} - {% for dataset in datasets %} + {% for dataset in datasets if dataset["type"].lower() in sceptre_user_data.data_values_expectations %} {{ dataset["stackname_prefix"] }}GreatExpectationsParquetJob: Type: AWS::Glue::Job Properties: @@ -84,15 +92,21 @@ Resources: Name: glueetl ScriptLocation: !Sub s3://${S3ScriptBucket}/${S3ScriptKey} DefaultArguments: - --TempDir: !Sub s3://${TempS3Bucket}/tmp + --TempDir: !Sub s3://${ParquetBucket}/tmp --enable-continuous-cloudwatch-log: true --enable-metrics: true --enable-spark-ui: true - --spark-event-logs-path: !Sub s3://${TempS3Bucket}/spark-logs/${AWS::StackName}/ + --spark-event-logs-path: !Sub s3://${ParquetBucket}/spark-logs/${AWS::StackName}/ --job-bookmark-option: job-bookmark-disable --job-language: python --additional-python-modules: !Ref AdditionalPythonModules - # --conf spark.sql.adaptive.enabled + --data-type: {{ dataset["type"].lower() }} + --namespace: !Ref Namespace + --cfn-bucket: !Ref S3ScriptBucket + --parquet-bucket: !Ref ParquetBucket + --shareable-artifacts-bucket: !Ref ShareableArtifactsBucket + --expectation-suite-key: !Ref ExpectationSuiteKey + --gx-resources-key-prefix: !Sub "${Namespace}/great_expectation_resources/parquet/" Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}" GlueVersion: !Ref GlueVersion MaxRetries: !Ref MaxRetries diff --git a/templates/glue-workflow.j2 b/templates/glue-workflow.j2 index 16d8950..5483368 100644 --- a/templates/glue-workflow.j2 +++ b/templates/glue-workflow.j2 @@ -40,7 +40,7 @@ Parameters: ParquetKeyPrefix: Type: String - Description: S3 key prefix where JSON datasets are stored. + Description: S3 key prefix where Parquet datasets are stored. Default: parquet GlueDatabase: @@ -82,13 +82,6 @@ Parameters: Description: >- The name of the bucket where the cloudformation and artifacts are stored. - ShareableArtifactsBucketName: - Type: String - Description: The name of the bucket where shareable artifacts are stored. - - ExpectationSuiteKey: - Type: String - Description: The s3 key prefix of the expectation suite. Conditions: IsMainNamespace: !Equals [!Ref Namespace, "main"] @@ -316,14 +309,6 @@ Resources: Name: !Sub "${Namespace}-{{ dataset['stackname_prefix'] }}GreatExpectationsParquetTrigger" Actions: - JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"] }}-GreatExpectationsParquetJob - Arguments: - "--data-type": {{ dataset["data_type"].lower() }} - "--namespace": !Ref Namespace - "--cfn-bucket": !Ref CloudformationBucketName - "--parquet-bucket": !Ref ParquetBucketName - "--shareable-artifacts-bucket": !Ref ShareableArtifactsBucketName - "--expectation-suite-key": !Ref ExpectationSuiteKey - "--additional-python-modules": "great_expectations~=0.18,urllib3<2" Description: This trigger runs the great expectation parquet job for this data type after completion of the JSON to Parquet job for this data type Type: CONDITIONAL Predicate: diff --git a/tests/test_run_great_expectations_on_parquet.py b/tests/test_run_great_expectations_on_parquet.py index 074e140..df57edc 100644 --- a/tests/test_run_great_expectations_on_parquet.py +++ b/tests/test_run_great_expectations_on_parquet.py @@ -1,7 +1,9 @@ from unittest.mock import MagicMock, patch -import great_expectations as gx import pytest +from pyspark.sql import SparkSession + +import great_expectations as gx from great_expectations.core.batch import RuntimeBatchRequest from great_expectations.core.run_identifier import RunIdentifier from great_expectations.core.yaml_handler import YAMLHandler @@ -9,8 +11,6 @@ ExpectationSuiteIdentifier, ValidationResultIdentifier, ) -from pyspark.sql import SparkSession - from src.glue.jobs import run_great_expectations_on_parquet as run_gx_on_pq