diff --git a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml index a99b013..d37da00 100644 --- a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml +++ b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -12,6 +12,7 @@ parameters: S3ScriptBucket: {{ stack_group_config.template_bucket_name }} S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" + GXConfigKey: "{{ stack_group_config.namespace }}/src/glue/resources/great_expectations.yml" GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" stack_tags: diff --git a/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml index d91ff9a..93ea716 100644 --- a/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml +++ b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -12,6 +12,7 @@ parameters: S3ScriptBucket: {{ stack_group_config.template_bucket_name }} S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" + GXConfigKey: "{{ stack_group_config.namespace }}/src/glue/resources/great_expectations.yml" GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" stack_tags: diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py index da6b003..f84d602 100644 --- a/src/glue/jobs/run_great_expectations_on_parquet.py +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -1,25 +1,16 @@ import json import logging import os -import subprocess import sys from datetime import datetime from typing import Dict import boto3 import great_expectations as gx +import pyspark +import yaml from awsglue.context import GlueContext from awsglue.utils import getResolvedOptions -from great_expectations.core.batch import RuntimeBatchRequest -from great_expectations.core.expectation_configuration import ExpectationConfiguration -from great_expectations.core.run_identifier import RunIdentifier -from great_expectations.core.yaml_handler import YAMLHandler -from great_expectations.data_context.types.base import DataContextConfig -from great_expectations.data_context.types.resource_identifiers import ( - ExpectationSuiteIdentifier, - ValidationResultIdentifier, -) -from pyspark.context import SparkContext logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -64,35 +55,56 @@ def validate_args(value: str) -> None: return None -def update_data_docs_sites( - context: gx.data_context.AbstractDataContext, - s3_bucket: str, +def configure_gx_config( + gx_config_bucket: str, + gx_config_key: str, + shareable_artifacts_bucket: str, namespace: str, -) -> gx.data_context.AbstractDataContext: - """ - Updates the `data_docs_sites` configuration to reflect the appropriate environment and namespace +) -> dict: + """Download and configure a `great_expectations.yml` file locally + + This function will download a `great_expectations.yml` file from S3 to a `gx` directory. + This file will be automatically be used to configue the GX data context when calling + `gx.get_context()`. Args: - context (gx.data_context.AbstractDataContext): The GX data context to update - s3_bucket (str): The S3 bucket where data docs are written + gx_config_bucket (str): S3 bucket containing the `great_expectations.yml` file. + gx_config_key (str): S3 key where this file is located. + shareable_artifacts_bucket (str): S3 bucket where shareable artifacts are written. namespace (str): The current namespace - - Returns: - gx.data_context.AbstractDataContext: The updated GX data context object """ - context.update_data_docs_site( - site_name="s3_site", - site_config={ - "class_name": "SiteBuilder", - "store_backend": { - "class_name": "TupleS3StoreBackend", - "bucket": s3_bucket, - "prefix": f"{namespace}/great_expectation_reports/parquet/", - }, - "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"}, - }, + gx_config_path = "gx/great_expectations.yml" + os.makedirs("gx", exist_ok=True) + s3_client = boto3.client("s3") + logger.info( + f"Downloading s3://{gx_config_bucket}/{gx_config_key} to {gx_config_path}" ) - return context + s3_client.download_file( + Bucket=gx_config_bucket, Key=gx_config_key, Filename=gx_config_path + ) + with open(gx_config_path, "rb") as gx_config_obj: + gx_config = yaml.safe_load(gx_config_obj) + # fmt: off + gx_config["stores"]["validations_store"]["store_backend"]["bucket"] = ( + shareable_artifacts_bucket + ) + gx_config["stores"]["validations_store"]["store_backend"]["prefix"] = ( + gx_config["stores"]["validations_store"]["store_backend"]["prefix"].format( + namespace=namespace + ) + ) + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["bucket"] = ( + shareable_artifacts_bucket + ) + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"] = ( + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"].format( + namespace=namespace + ) + ) + # fmt: on + with open(gx_config_path, "w", encoding="utf-8") as gx_config_obj: + yaml.dump(gx_config, gx_config_obj) + return gx_config def get_spark_df( @@ -195,7 +207,7 @@ def add_expectations_from_json( # Convert new expectations from dict to ExpectationConfiguration objects new_expectations_configs = [ - ExpectationConfiguration( + gx.core.expectation_configuration.ExpectationConfiguration( expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] ) for exp in new_expectations @@ -211,29 +223,30 @@ def add_expectations_from_json( def main(): args = read_args() + # args = { + # "cfn_bucket": "recover-dev-cloudformation", + # "data_type": "fitbitdailydata", + # "expectation_suite_key": "etl-686/src/glue/resources/data_values_expectations.json", + # "gx_config_key": "etl-686/src/glue/resources/great_expectations.yml", + # "namespace": "etl-686", + # "parquet_bucket": "recover-dev-processed-data", + # "shareable_artifacts_bucket": "recover-dev-shareable-artifacts-vpn", + # } s3 = boto3.client("s3") - # Download GX stores and configuration - subprocess.run( - args=[ - "aws", - "s3", - "sync", - f"s3://{os.path.join(args['shareable_artifacts_bucket'], args['gx_resources_key_prefix'])}", - ".", - ], - check=True, + run_id = gx.core.run_identifier.RunIdentifier( + run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}" ) - run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") expectation_suite_name = f"{args['data_type']}_expectations" # Set up Great Expectations - gx_context = gx.get_context() - logger.info("update_data_docs_site") - gx_context = update_data_docs_sites( - context=gx_context, - s3_bucket=args["shareable_artifacts_bucket"], + logger.info("configure_gx_config") + configure_gx_config( + gx_config_bucket=args["cfn_bucket"], + gx_config_key=args["gx_config_key"], + shareable_artifacts_bucket=args["shareable_artifacts_bucket"], namespace=args["namespace"], ) + gx_context = gx.get_context() logger.info("reads_expectations_from_json") expectations_data = read_json( s3=s3, @@ -247,7 +260,7 @@ def main(): ) # Set up Spark - glue_context = GlueContext(SparkContext.getOrCreate()) + glue_context = GlueContext(pyspark.context.SparkContext.getOrCreate()) logger.info("get_spark_df") spark_df = get_spark_df( glue_context=glue_context, diff --git a/src/scripts/manage_artifacts/artifacts.py b/src/scripts/manage_artifacts/artifacts.py index b616708..3622f06 100755 --- a/src/scripts/manage_artifacts/artifacts.py +++ b/src/scripts/manage_artifacts/artifacts.py @@ -47,33 +47,29 @@ def upload(namespace: str, cfn_bucket: str): execute_command(cmd) -def sync(namespace: str, shareable_artifacts_bucket: str): - """Sync resources which are not version controlled to this namespace. - - In some cases, we do not want to version control some data (like Great Expectations artifacts) - but we need to duplicate this data from the main namespace to a development namespace. - - Args: - namespace (str): The development namespace - shareable_artifacts_bucket (str): The S3 bucket containing shareable artifacts - """ - # Copy Great Expectations artifacts to this namespace - source_gx_artifacts = os.path.join( - "s3://", shareable_artifacts_bucket, "main/great_expectation_resources/" - ) - target_gx_artifacts = os.path.join( - "s3://", shareable_artifacts_bucket, namespace, "great_expectation_resources/" - ) - gx_artifacts_clean_up_cmd = ["aws", "s3", "rm", "--recursive", target_gx_artifacts] - execute_command(gx_artifacts_clean_up_cmd) - gx_artifacts_sync_cmd = [ - "aws", - "s3", - "sync", - source_gx_artifacts, - target_gx_artifacts, - ] - execute_command(gx_artifacts_sync_cmd) +# def sync(namespace: str, shareable_artifacts_bucket: str): +# """Sync resources which are not version controlled to this namespace. + +# In some cases, we do not want to version control some data (like Great Expectations artifacts) +# but we need to duplicate this data from the main namespace to a development namespace. + +# Args: +# namespace (str): The development namespace +# shareable_artifacts_bucket (str): The S3 bucket containing shareable artifacts +# """ +# # Copy Great Expectations artifacts to this namespace +# source_gx_artifacts = os.path.join("s3://", shareable_artifacts_bucket, "main/") +# target_gx_artifacts = os.path.join("s3://", shareable_artifacts_bucket, namespace) +# gx_artifacts_clean_up_cmd = ["aws", "s3", "rm", "--recursive", target_gx_artifacts] +# execute_command(gx_artifacts_clean_up_cmd) +# gx_artifacts_sync_cmd = [ +# "aws", +# "s3", +# "sync", +# source_gx_artifacts, +# target_gx_artifacts, +# ] +# execute_command(gx_artifacts_sync_cmd) def delete(namespace: str, cfn_bucket: str): @@ -93,11 +89,11 @@ def list_namespaces(cfn_bucket: str): def main(args): if args.upload: upload(args.namespace, args.cfn_bucket) - if args.namespace != "main": - sync( - namespace=args.namespace, - shareable_artifacts_bucket=args.shareable_artifacts_bucket, - ) + # if args.namespace != "main": + # sync( + # namespace=args.namespace, + # shareable_artifacts_bucket=args.shareable_artifacts_bucket, + # ) elif args.remove: delete(args.namespace, args.cfn_bucket) else: diff --git a/templates/glue-job-run-great-expectations-on-parquet.j2 b/templates/glue-job-run-great-expectations-on-parquet.j2 index a6c4046..8fc2d97 100644 --- a/templates/glue-job-run-great-expectations-on-parquet.j2 +++ b/templates/glue-job-run-great-expectations-on-parquet.j2 @@ -55,7 +55,11 @@ Parameters: ExpectationSuiteKey: Type: String - Description: The s3 key prefix of the expectation suite. + Description: The S3 key of the GX expectation file. + + GXConfigKey: + Type: String + Description: The S3 key of the GX configuration file/template. MaxRetries: Type: Number @@ -106,6 +110,7 @@ Resources: --parquet-bucket: !Ref ParquetBucket --shareable-artifacts-bucket: !Ref ShareableArtifactsBucket --expectation-suite-key: !Ref ExpectationSuiteKey + --gx-config-key: !Ref GXConfigKey --gx-resources-key-prefix: !Sub "${Namespace}/great_expectation_resources/parquet/" Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}" GlueVersion: !Ref GlueVersion diff --git a/tests/test_run_great_expectations_on_parquet.py b/tests/test_run_great_expectations_on_parquet.py index fbf078e..42bdba4 100644 --- a/tests/test_run_great_expectations_on_parquet.py +++ b/tests/test_run_great_expectations_on_parquet.py @@ -1,12 +1,16 @@ +import os +import shutil import unittest +import boto3 import great_expectations as gx import pyspark import pytest +import yaml +from moto import mock_s3 from src.glue.jobs import run_great_expectations_on_parquet as run_gx_on_pq - @pytest.fixture def gx_context(scope="function"): context = gx.get_context() @@ -18,6 +22,140 @@ def spark_session(): yield pyspark.sql.SparkSession.builder.appName("BatchRequestTest").getOrCreate() +@pytest.fixture() +def cloudformation_bucket(): + with mock_s3(): + # Create a mock S3 client + s3 = boto3.client("s3") + + # Define the bucket name + bucket_name = "test-great-expectations-bucket" + + # Create the mock bucket + s3.create_bucket(Bucket=bucket_name) + + # Create a sample great_expectations.yml with just the components we modify + great_expectations_content = """ + config_version: 3.0 + stores: + validations_store: + class_name: ValidationsStore + store_backend: + class_name: TupleS3StoreBackend + suppress_store_backend_id: true + bucket: "{shareable_artifacts_bucket}" + prefix: "{namespace}/great_expectation_reports/parquet/validations/" + data_docs_sites: + s3_site: + class_name: SiteBuilder + store_backend: + class_name: TupleS3StoreBackend + bucket: "{shareable_artifacts_bucket}" + prefix: "{namespace}/great_expectation_reports/parquet/" + site_index_builder: + class_name: DefaultSiteIndexBuilder + """ + + # Upload the great_expectations.yml file to the mocked bucket + s3.put_object( + Bucket=bucket_name, + Key="great_expectations.yml", + Body=great_expectations_content, + ) + + # Yield the bucket name for use in tests + yield { + "bucket": bucket_name, + "great_expectations_configuration_key": "great_expectations.yml", + "great_expectations_content": great_expectations_content, + } + + +@pytest.fixture() +def clean_up_after_configure_gx_config(): + """Remove artifacts of `configure_gx_config` function""" + yield + if os.path.isdir("gx"): + shutil.rmtree("gx") + + +def test_configure_gx_config_validations_store_bucket( + cloudformation_bucket, clean_up_after_configure_gx_config +): + gx_config = run_gx_on_pq.configure_gx_config( + gx_config_bucket=cloudformation_bucket["bucket"], + gx_config_key=cloudformation_bucket["great_expectations_configuration_key"], + shareable_artifacts_bucket="shareable_artifacts_bucket", + namespace="namespace", + ) + assert ( + gx_config["stores"]["validations_store"]["store_backend"]["bucket"] + == "shareable_artifacts_bucket" + ) + + +def test_configure_gx_config_validations_store_prefix( + cloudformation_bucket, clean_up_after_configure_gx_config +): + gx_config = run_gx_on_pq.configure_gx_config( + gx_config_bucket=cloudformation_bucket["bucket"], + gx_config_key=cloudformation_bucket["great_expectations_configuration_key"], + shareable_artifacts_bucket="shareable_artifacts_bucket", + namespace="namespace", + ) + original_gx_config = yaml.safe_load( + cloudformation_bucket["great_expectations_content"] + ) + # fmt: off + assert ( + gx_config["stores"]["validations_store"]["store_backend"]["prefix"] + == original_gx_config["stores"]["validations_store"]["store_backend"]["prefix"].format( + namespace="namespace" + ) + ) + # fmt: on + + +def test_configure_gx_config_data_docs_sites_bucket( + cloudformation_bucket, clean_up_after_configure_gx_config +): + gx_config = run_gx_on_pq.configure_gx_config( + gx_config_bucket=cloudformation_bucket["bucket"], + gx_config_key=cloudformation_bucket["great_expectations_configuration_key"], + shareable_artifacts_bucket="shareable_artifacts_bucket", + namespace="namespace", + ) + original_gx_config = yaml.safe_load( + cloudformation_bucket["great_expectations_content"] + ) + assert ( + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["bucket"] + == "shareable_artifacts_bucket" + ) + + +def test_configure_gx_config_data_docs_sites_prefix( + cloudformation_bucket, clean_up_after_configure_gx_config +): + gx_config = run_gx_on_pq.configure_gx_config( + gx_config_bucket=cloudformation_bucket["bucket"], + gx_config_key=cloudformation_bucket["great_expectations_configuration_key"], + shareable_artifacts_bucket="shareable_artifacts_bucket", + namespace="namespace", + ) + original_gx_config = yaml.safe_load( + cloudformation_bucket["great_expectations_content"] + ) + # fmt: off + assert ( + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"] + == original_gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"].format( + namespace="namespace" + ) + ) + # fmt: on + + def test_get_spark_df_has_expected_calls(): glue_context = unittest.mock.MagicMock() mock_dynamic_frame = unittest.mock.MagicMock()