Skip to content

Commit

Permalink
get GX config template from S3
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Oct 29, 2024
1 parent 9b8abbf commit 22ae810
Show file tree
Hide file tree
Showing 6 changed files with 240 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ parameters:
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json"
GXConfigKey: "{{ stack_group_config.namespace }}/src/glue/resources/great_expectations.yml"
GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}"
AdditionalPythonModules: "great_expectations~=0.18,urllib3<2"
stack_tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ parameters:
S3ScriptBucket: {{ stack_group_config.template_bucket_name }}
S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py'
ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json"
GXConfigKey: "{{ stack_group_config.namespace }}/src/glue/resources/great_expectations.yml"
GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}"
AdditionalPythonModules: "great_expectations~=0.18,urllib3<2"
stack_tags:
Expand Down
117 changes: 65 additions & 52 deletions src/glue/jobs/run_great_expectations_on_parquet.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,16 @@
import json
import logging
import os
import subprocess
import sys
from datetime import datetime
from typing import Dict

import boto3
import great_expectations as gx
import pyspark
import yaml
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from great_expectations.core.run_identifier import RunIdentifier
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.data_context.types.base import DataContextConfig
from great_expectations.data_context.types.resource_identifiers import (
ExpectationSuiteIdentifier,
ValidationResultIdentifier,
)
from pyspark.context import SparkContext

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -64,35 +55,56 @@ def validate_args(value: str) -> None:
return None


def update_data_docs_sites(
context: gx.data_context.AbstractDataContext,
s3_bucket: str,
def configure_gx_config(
gx_config_bucket: str,
gx_config_key: str,
shareable_artifacts_bucket: str,
namespace: str,
) -> gx.data_context.AbstractDataContext:
"""
Updates the `data_docs_sites` configuration to reflect the appropriate environment and namespace
) -> dict:
"""Download and configure a `great_expectations.yml` file locally
This function will download a `great_expectations.yml` file from S3 to a `gx` directory.
This file will be automatically be used to configue the GX data context when calling
`gx.get_context()`.
Args:
context (gx.data_context.AbstractDataContext): The GX data context to update
s3_bucket (str): The S3 bucket where data docs are written
gx_config_bucket (str): S3 bucket containing the `great_expectations.yml` file.
gx_config_key (str): S3 key where this file is located.
shareable_artifacts_bucket (str): S3 bucket where shareable artifacts are written.
namespace (str): The current namespace
Returns:
gx.data_context.AbstractDataContext: The updated GX data context object
"""
context.update_data_docs_site(
site_name="s3_site",
site_config={
"class_name": "SiteBuilder",
"store_backend": {
"class_name": "TupleS3StoreBackend",
"bucket": s3_bucket,
"prefix": f"{namespace}/great_expectation_reports/parquet/",
},
"site_index_builder": {"class_name": "DefaultSiteIndexBuilder"},
},
gx_config_path = "gx/great_expectations.yml"
os.makedirs("gx", exist_ok=True)
s3_client = boto3.client("s3")
logger.info(
f"Downloading s3://{gx_config_bucket}/{gx_config_key} to {gx_config_path}"
)
return context
s3_client.download_file(
Bucket=gx_config_bucket, Key=gx_config_key, Filename=gx_config_path
)
with open(gx_config_path, "rb") as gx_config_obj:
gx_config = yaml.safe_load(gx_config_obj)
# fmt: off
gx_config["stores"]["validations_store"]["store_backend"]["bucket"] = (
shareable_artifacts_bucket
)
gx_config["stores"]["validations_store"]["store_backend"]["prefix"] = (
gx_config["stores"]["validations_store"]["store_backend"]["prefix"].format(
namespace=namespace
)
)
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["bucket"] = (
shareable_artifacts_bucket
)
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"] = (
gx_config["data_docs_sites"]["s3_site"]["store_backend"]["prefix"].format(
namespace=namespace
)
)
# fmt: on
with open(gx_config_path, "w", encoding="utf-8") as gx_config_obj:
yaml.dump(gx_config, gx_config_obj)
return gx_config


def get_spark_df(
Expand Down Expand Up @@ -195,7 +207,7 @@ def add_expectations_from_json(

# Convert new expectations from dict to ExpectationConfiguration objects
new_expectations_configs = [
ExpectationConfiguration(
gx.core.expectation_configuration.ExpectationConfiguration(
expectation_type=exp["expectation_type"], kwargs=exp["kwargs"]
)
for exp in new_expectations
Expand All @@ -211,29 +223,30 @@ def add_expectations_from_json(

def main():
args = read_args()
# args = {
# "cfn_bucket": "recover-dev-cloudformation",
# "data_type": "fitbitdailydata",
# "expectation_suite_key": "etl-686/src/glue/resources/data_values_expectations.json",
# "gx_config_key": "etl-686/src/glue/resources/great_expectations.yml",
# "namespace": "etl-686",
# "parquet_bucket": "recover-dev-processed-data",
# "shareable_artifacts_bucket": "recover-dev-shareable-artifacts-vpn",
# }
s3 = boto3.client("s3")
# Download GX stores and configuration
subprocess.run(
args=[
"aws",
"s3",
"sync",
f"s3://{os.path.join(args['shareable_artifacts_bucket'], args['gx_resources_key_prefix'])}",
".",
],
check=True,
run_id = gx.core.run_identifier.RunIdentifier(
run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
)
run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}")
expectation_suite_name = f"{args['data_type']}_expectations"

# Set up Great Expectations
gx_context = gx.get_context()
logger.info("update_data_docs_site")
gx_context = update_data_docs_sites(
context=gx_context,
s3_bucket=args["shareable_artifacts_bucket"],
logger.info("configure_gx_config")
configure_gx_config(
gx_config_bucket=args["cfn_bucket"],
gx_config_key=args["gx_config_key"],
shareable_artifacts_bucket=args["shareable_artifacts_bucket"],
namespace=args["namespace"],
)
gx_context = gx.get_context()
logger.info("reads_expectations_from_json")
expectations_data = read_json(
s3=s3,
Expand All @@ -247,7 +260,7 @@ def main():
)

# Set up Spark
glue_context = GlueContext(SparkContext.getOrCreate())
glue_context = GlueContext(pyspark.context.SparkContext.getOrCreate())
logger.info("get_spark_df")
spark_df = get_spark_df(
glue_context=glue_context,
Expand Down
60 changes: 28 additions & 32 deletions src/scripts/manage_artifacts/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,29 @@ def upload(namespace: str, cfn_bucket: str):
execute_command(cmd)


def sync(namespace: str, shareable_artifacts_bucket: str):
"""Sync resources which are not version controlled to this namespace.
In some cases, we do not want to version control some data (like Great Expectations artifacts)
but we need to duplicate this data from the main namespace to a development namespace.
Args:
namespace (str): The development namespace
shareable_artifacts_bucket (str): The S3 bucket containing shareable artifacts
"""
# Copy Great Expectations artifacts to this namespace
source_gx_artifacts = os.path.join(
"s3://", shareable_artifacts_bucket, "main/great_expectation_resources/"
)
target_gx_artifacts = os.path.join(
"s3://", shareable_artifacts_bucket, namespace, "great_expectation_resources/"
)
gx_artifacts_clean_up_cmd = ["aws", "s3", "rm", "--recursive", target_gx_artifacts]
execute_command(gx_artifacts_clean_up_cmd)
gx_artifacts_sync_cmd = [
"aws",
"s3",
"sync",
source_gx_artifacts,
target_gx_artifacts,
]
execute_command(gx_artifacts_sync_cmd)
# def sync(namespace: str, shareable_artifacts_bucket: str):
# """Sync resources which are not version controlled to this namespace.

# In some cases, we do not want to version control some data (like Great Expectations artifacts)
# but we need to duplicate this data from the main namespace to a development namespace.

# Args:
# namespace (str): The development namespace
# shareable_artifacts_bucket (str): The S3 bucket containing shareable artifacts
# """
# # Copy Great Expectations artifacts to this namespace
# source_gx_artifacts = os.path.join("s3://", shareable_artifacts_bucket, "main/")
# target_gx_artifacts = os.path.join("s3://", shareable_artifacts_bucket, namespace)
# gx_artifacts_clean_up_cmd = ["aws", "s3", "rm", "--recursive", target_gx_artifacts]
# execute_command(gx_artifacts_clean_up_cmd)
# gx_artifacts_sync_cmd = [
# "aws",
# "s3",
# "sync",
# source_gx_artifacts,
# target_gx_artifacts,
# ]
# execute_command(gx_artifacts_sync_cmd)


def delete(namespace: str, cfn_bucket: str):
Expand All @@ -93,11 +89,11 @@ def list_namespaces(cfn_bucket: str):
def main(args):
if args.upload:
upload(args.namespace, args.cfn_bucket)
if args.namespace != "main":
sync(
namespace=args.namespace,
shareable_artifacts_bucket=args.shareable_artifacts_bucket,
)
# if args.namespace != "main":
# sync(
# namespace=args.namespace,
# shareable_artifacts_bucket=args.shareable_artifacts_bucket,
# )
elif args.remove:
delete(args.namespace, args.cfn_bucket)
else:
Expand Down
7 changes: 6 additions & 1 deletion templates/glue-job-run-great-expectations-on-parquet.j2
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ Parameters:

ExpectationSuiteKey:
Type: String
Description: The s3 key prefix of the expectation suite.
Description: The S3 key of the GX expectation file.

GXConfigKey:
Type: String
Description: The S3 key of the GX configuration file/template.

MaxRetries:
Type: Number
Expand Down Expand Up @@ -106,6 +110,7 @@ Resources:
--parquet-bucket: !Ref ParquetBucket
--shareable-artifacts-bucket: !Ref ShareableArtifactsBucket
--expectation-suite-key: !Ref ExpectationSuiteKey
--gx-config-key: !Ref GXConfigKey
--gx-resources-key-prefix: !Sub "${Namespace}/great_expectation_resources/parquet/"
Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}"
GlueVersion: !Ref GlueVersion
Expand Down
Loading

0 comments on commit 22ae810

Please sign in to comment.