diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py index da6b003..406f8d8 100644 --- a/src/glue/jobs/run_great_expectations_on_parquet.py +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -1,25 +1,19 @@ import json import logging import os -import subprocess import sys from datetime import datetime from typing import Dict import boto3 -import great_expectations as gx +import yaml from awsglue.context import GlueContext from awsglue.utils import getResolvedOptions -from great_expectations.core.batch import RuntimeBatchRequest +from pyspark.context import SparkContext + +import great_expectations as gx from great_expectations.core.expectation_configuration import ExpectationConfiguration from great_expectations.core.run_identifier import RunIdentifier -from great_expectations.core.yaml_handler import YAMLHandler -from great_expectations.data_context.types.base import DataContextConfig -from great_expectations.data_context.types.resource_identifiers import ( - ExpectationSuiteIdentifier, - ValidationResultIdentifier, -) -from pyspark.context import SparkContext logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -64,35 +58,47 @@ def validate_args(value: str) -> None: return None -def update_data_docs_sites( - context: gx.data_context.AbstractDataContext, - s3_bucket: str, +def configure_gx_config( + gx_config_bucket: str, + gx_config_key: str, + shareable_artifacts_bucket: str, namespace: str, -) -> gx.data_context.AbstractDataContext: - """ - Updates the `data_docs_sites` configuration to reflect the appropriate environment and namespace +) -> None: + """Download and configure a `great_expectations.yml` file locally + + This function will download a `great_expectations.yml` file from S3 to a `gx` directory. + This file will be automatically be used to configue the GX data context when calling + `gx.get_context()`. Args: - context (gx.data_context.AbstractDataContext): The GX data context to update - s3_bucket (str): The S3 bucket where data docs are written + gx_config_bucket (str): S3 bucket containing the `great_expectations.yml` file. + gx_config_key (str): S3 key where this file is located. + shareable_artifacts_bucket (str): S3 bucket where shareable artifacts are written. namespace (str): The current namespace - - Returns: - gx.data_context.AbstractDataContext: The updated GX data context object """ - context.update_data_docs_site( - site_name="s3_site", - site_config={ - "class_name": "SiteBuilder", - "store_backend": { - "class_name": "TupleS3StoreBackend", - "bucket": s3_bucket, - "prefix": f"{namespace}/great_expectation_reports/parquet/", - }, - "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"}, - }, + gx_config_path = "gx/great_expectations.yml" + os.makedirs("gx", exist_ok=True) + s3_client = boto3.client("s3") + logger.info( + f"Downloading s3://{gx_config_bucket}/{gx_config_key} to {gx_config_path}" ) - return context + s3_client.download_file( + Bucket=gx_config_bucket, Key=gx_config_key, Filename=gx_config_path + ) + with open(gx_config_path, "rb") as gx_config_obj: + gx_config = yaml.safe_load(gx_config_obj) + # fmt: off + gx_config["stores"]["validations_store"]["store_backend"]["bucket"] = shareable_artifacts_bucket + gx_config["stores"]["validations_store"]["store_backend"]["prefix"] = ( + gx_config["stores"]["validations_store"]["store_backend"]["prefix"].format(namespace=namespace) + ) + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["bucket"] = shareable_artifacts_bucket + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"] = ( + gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"].format(namespace=namespace) + ) + # fmt: on + with open(gx_config_path, "w", encoding="utf-8") as gx_config_obj: + yaml.dump(gx_config, gx_config_obj) def get_spark_df( @@ -211,29 +217,28 @@ def add_expectations_from_json( def main(): args = read_args() + # args = { + # "cfn_bucket": "recover-dev-cloudformation", + # "data_type": "fitbitdailydata", + # "expectation_suite_key": "etl-686/src/glue/resources/data_values_expectations.json", + # "gx_config_key": "etl-686/src/glue/resources/great_expectations.yml", + # "namespace": "etl-686", + # "parquet_bucket": "recover-dev-processed-data", + # "shareable_artifacts_bucket": "recover-dev-shareable-artifacts-vpn", + # } s3 = boto3.client("s3") - # Download GX stores and configuration - subprocess.run( - args=[ - "aws", - "s3", - "sync", - f"s3://{os.path.join(args['shareable_artifacts_bucket'], args['gx_resources_key_prefix'])}", - ".", - ], - check=True, - ) run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") expectation_suite_name = f"{args['data_type']}_expectations" # Set up Great Expectations - gx_context = gx.get_context() - logger.info("update_data_docs_site") - gx_context = update_data_docs_sites( - context=gx_context, - s3_bucket=args["shareable_artifacts_bucket"], + logger.info("configure_gx_config") + configure_gx_config( + gx_config_bucket=args["cfn_bucket"], + gx_config_key=args["gx_config_key"], + shareable_artifacts_bucket=args["shareable_artifacts_bucket"], namespace=args["namespace"], ) + gx_context = gx.get_context() logger.info("reads_expectations_from_json") expectations_data = read_json( s3=s3, diff --git a/tests/test_run_great_expectations_on_parquet.py b/tests/test_run_great_expectations_on_parquet.py index fbf078e..ca98193 100644 --- a/tests/test_run_great_expectations_on_parquet.py +++ b/tests/test_run_great_expectations_on_parquet.py @@ -1,9 +1,9 @@ import unittest -import great_expectations as gx import pyspark import pytest +import great_expectations as gx from src.glue.jobs import run_great_expectations_on_parquet as run_gx_on_pq