diff --git a/functions/allure_report/mapper.py b/functions/allure_report/mapper.py index 35104b2..c843c28 100644 --- a/functions/allure_report/mapper.py +++ b/functions/allure_report/mapper.py @@ -73,9 +73,9 @@ def get_test_name(file): def get_suit_name(file, i): if "column" in i["expectation_config"]["kwargs"]: column = i["expectation_config"]["kwargs"]["column"] - data_asset_name = file["meta"]["batch_kwargs"]["data_asset_name"] + data_asset_name = file["meta"]["active_batch_definition"]["data_asset_name"] return f"{data_asset_name}.{column}" - return file["meta"]["batch_kwargs"]["data_asset_name"] + return file["meta"]["active_batch_definition"]["data_asset_name"] def get_jira_ticket(file): @@ -103,11 +103,14 @@ def get_stop_suit_time(): def parse_datetime(date_str): - return datetime.timestamp(datetime.strptime(date_str, '%Y%m%dT%H%M%S.%fZ')) * 1000 + if '+00:00' in date_str: + return datetime.timestamp(datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%f+00:00')) * 1000 + else: + return datetime.timestamp(datetime.strptime(date_str, '%Y%m%dT%H%M%S.%fZ')) * 1000 def get_start_test_time(file): - return parse_datetime(file['meta']['run_id']['run_name']) + return parse_datetime(file['meta']['run_id']['run_time']) def get_stop_test_time(file): @@ -116,7 +119,7 @@ def get_stop_test_time(file): def get_params(file): params = file['expectation_config']['kwargs'] - del params['result_format'] + del params['batch_id'] results = [] for param in params: if isinstance(params[param], list): @@ -140,7 +143,7 @@ def get_test_description(file): for f in file["result"]: if str(f) != "observed_value": result = result + "\n" + \ - f"{str(f)}: {str(file['result'][f])}" + "\n" + f"{str(f)}: {str(file['result'][f])}" + "\n" return result @@ -198,7 +201,7 @@ def create_categories_json(json_name, key): result = json.dumps(data) s3.Object(qa_bucket, f"allure/{json_name}{key}/result/categories.json").put( - Body=bytes(result.encode("UTF-8"))) + Body=bytes(result.encode("UTF-8"))) def get_uuid(i, json_name, key): @@ -242,7 +245,7 @@ def create_suit_json(json_name, key, validate_id): { "name": "severity", "value": get_severity(i) - } + } ], "links": [get_jira_ticket(i)], "name": get_test_name(i), diff --git a/functions/data_test/Expectation_report_new.py b/functions/data_test/Expectation_report_new.py index c033656..2575fb1 100644 --- a/functions/data_test/Expectation_report_new.py +++ b/functions/data_test/Expectation_report_new.py @@ -3,13 +3,11 @@ import pandas as pd from great_expectations.core import ExpectationConfiguration -from pandas_profiling.expectations_report import ExpectationHandler +from ydata_profiling.expectations_report import ExpectationHandler from visions import VisionsTypeset -from pandas_profiling.config import Settings -from pandas_profiling.model import expectation_algorithms -from pandas_profiling.model.handler import Handler -from pandas_profiling.utils.dataframe import slugify +from ydata_profiling.config import Settings +from ydata_profiling.model.handler import Handler import re @@ -22,18 +20,16 @@ def typeset(self) -> Optional[VisionsTypeset]: return None def to_expectation_suite( - self, - run_name: Optional[str] = None, - suite_name: Optional[str] = None, - data_context: Optional[Any] = None, - mapping_config: Optional[dict] = None, - save_suite: bool = True, - reuse_suite: bool = False, - run_validation: bool = True, - build_data_docs: bool = True, - old_suite_name: Optional[str] = None, - use_old_suite: Optional[str] = None, - handler: Optional[Handler] = None, + self, + run_name: Optional[str] = None, + suite_name: Optional[str] = None, + data_context: Optional[Any] = None, + mapping_config: Optional[dict] = None, + save_suite: bool = True, + reuse_suite: bool = False, + old_suite_name: Optional[str] = None, + use_old_suite: Optional[str] = None, + handler: Optional[Handler] = None, ) -> Any: """ All parameters default to True to make it easier to access the full functionality of Great Expectations out of @@ -59,10 +55,6 @@ def to_expectation_suite( "Please install great expectations before using the expectation functionality" ) - # Use report title if suite is empty - if suite_name is None: - suite_name = slugify(self.config.title) - # Use the default handler if none if handler is None: handler = ExpectationHandler(self.typeset) @@ -77,32 +69,45 @@ def to_expectation_suite( except KeyError: mapping_schema = None + data_asset = data_context.get_datasource( + "cloud").get_asset(f"{suite_name}_{run_name}") + batch_request = data_asset.build_batch_request() + if reuse_suite: if use_old_suite: - suite_old = data_context.get_expectation_suite(f"{suite_name}_{old_suite_name}") - data_context.save_expectation_suite(expectation_suite=suite_old, - expectation_suite_name=f"{suite_name}_{run_name}", - overwrite_existing=True) + suite_old = data_context.get_expectation_suite( + f"{suite_name}_{old_suite_name}") + data_context.add_or_update_expectation_suite( + expectations=suite_old.expectations, + expectation_suite_name=f"{suite_name}_{run_name}") else: schema_list = list(mapping_schema.keys()) - dict_keys = [i for i in mapping_schema if isinstance(mapping_schema[i], dict)] + dict_keys = [ + i for i in mapping_schema if isinstance( + mapping_schema[i], dict)] if not dict_keys: - suite_old = data_context.get_expectation_suite(f"{suite_name}_{old_suite_name}") + suite_old = data_context.get_expectation_suite( + f"{suite_name}_{old_suite_name}") schema_list.append("_nocolumn") r = re.compile("new_col_added") - new_column_in_mapping_keys = list(filter(r.match, schema_list)) + new_column_in_mapping_keys = list( + filter(r.match, schema_list)) for key in new_column_in_mapping_keys: - new_column_in_mapping.update({key: mapping_schema[key]}) + new_column_in_mapping.update( + {key: mapping_schema[key]}) if new_column_in_mapping_keys: - schema_list = [x for x in schema_list if - x not in new_column_in_mapping_keys and x not in ignored_columns] - old_schema_list = list(suite_old.get_grouped_and_ordered_expectations_by_column()[0].keys()) - new_schema_list = [x for x in old_schema_list if x not in schema_list] + schema_list = [ + x for x in schema_list if x not in new_column_in_mapping_keys and x not in ignored_columns] + old_schema_list = list( + suite_old.get_grouped_and_ordered_expectations_by_column()[0].keys()) + new_schema_list = [ + x for x in old_schema_list if x not in schema_list] for key in new_schema_list: exp_conf = [] - exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + exp_conf.append( + suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) for exps in exp_conf: for exp in exps: suite_old.remove_expectation( @@ -110,71 +115,93 @@ def to_expectation_suite( match_type="runtime", ) schema_values = list(mapping_schema.values()) - for key, v in list(itertools.zip_longest(schema_list, schema_values)): + for key, v in list( + itertools.zip_longest( + schema_list, schema_values)): exp_conf = [] - exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) - for exps in exp_conf: - for exp in exps: - if (exp["expectation_type"] == "expect_table_columns_to_match_set"): - suite_old.patch_expectation( - exp, - op="replace", - path="/column_set", - value=schema_values, - match_type="runtime", - ) - elif (exp["expectation_type"] != "expect_table_row_count_to_equal"): - suite_old.patch_expectation( - exp, - op="replace", - path="/column", - value=v, - match_type="runtime", - ) - data_context.save_expectation_suite(expectation_suite=suite_old, - expectation_suite_name=f"{suite_name}_{run_name}", - overwrite_existing=True) + if key in suite_old.get_grouped_and_ordered_expectations_by_column()[ + 0]: + exp_conf.append( + suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + for exps in exp_conf: + for exp in exps: + if (exp["expectation_type"] == + "expect_table_columns_to_match_set"): + suite_old.patch_expectation( + exp, + op="replace", + path="/column_set", + value=schema_values, + match_type="runtime", + ) + elif (exp["expectation_type"] != "expect_table_row_count_to_equal"): + suite_old.patch_expectation( + exp, + op="replace", + path="/column", + value=v, + match_type="runtime", + ) + data_context.add_or_update_expectation_suite( + expectations=suite_old.expectations, + expectation_suite_name=f"{suite_name}_{run_name}") if new_column_in_mapping: - suite_old = data_context.get_expectation_suite(f"{suite_name}_{run_name}") - batch = ge.dataset.PandasDataset(self.df, expectation_suite=suite_old) + suite_old = data_context.get_expectation_suite( + f"{suite_name}_{run_name}") + validator = data_context.get_validator( + batch_request=batch_request, + expectation_suite=suite_old, + ) summary = self.get_description() - for name, variable_summary in summary["variables"].items(): + for name, variable_summary in summary.variables.items(): if name in list(new_column_in_mapping.values()): - handler.handle(variable_summary["type"], name, variable_summary, batch) - suite = batch.get_expectation_suite(discard_failed_expectations=False) - data_context.save_expectation_suite(expectation_suite=suite, - expectation_suite_name=f"{suite_name}_{run_name}", - overwrite_existing=True) - + handler.handle( + variable_summary["type"], name, variable_summary, validator) + suite = validator.get_expectation_suite( + discard_failed_expectations=False) + data_context.add_or_update_expectation_suite( + expectations=suite.expectations, + expectation_suite_name=f"{suite_name}_{run_name}") else: # if we have nested tables r = re.compile("new_col_added") - new_column_in_mapping_keys = list(filter(r.match, schema_list)) - schema_list = [x for x in schema_list if x not in new_column_in_mapping_keys] - schema_list = [x for x in schema_list if - x not in dict_keys] # subtract original suite list keys from nested + new_column_in_mapping_keys = list( + filter(r.match, schema_list)) + schema_list = [ + x for x in schema_list if x not in new_column_in_mapping_keys] + # subtract original suite list keys from nested + schema_list = [ + x for x in schema_list if x not in dict_keys] dict_keys_schema_list = [] dict_values_schema_list = [] for key in dict_keys: + # if nested tables is empty then use just from mapping if not mapping_schema[key]: + # create list of lists for nested suites columns dict_keys_schema_list.append( - list(mapping_config[key].keys())) # create list of lists for nested suites columns - dict_values_schema_list.append(list(mapping_config[key].values())) - else: + list(mapping_config[key].keys())) + dict_values_schema_list.append( + list(mapping_config[key].values())) + else: # in other way use key-values from nested mapping + # if nested table has renaming dict_keys_schema_list.append( - list(mapping_schema[key].keys())) # if nested table has renaming - dict_values_schema_list.append(list(mapping_schema[key].values())) + list(mapping_schema[key].keys())) + dict_values_schema_list.append( + list(mapping_schema[key].values())) dict_suites = [] for d_key in dict_keys: - suite_old = data_context.get_expectation_suite(f"{d_key}_{old_suite_name}") + suite_old = data_context.get_expectation_suite( + f"{d_key}_{old_suite_name}") old_schema_list = list(suite_old.get_grouped_and_ordered_expectations_by_column()[ - 0].keys()) # get schema from original nested suite - dict_keys_schema_list[list(dict_keys).index(d_key)].append("_nocolumn") + 0].keys()) # get schema from original nested suite + dict_keys_schema_list[list(dict_keys).index( + d_key)].append("_nocolumn") new_schema_list = [x for x in old_schema_list if x not in dict_keys_schema_list[ list(dict_keys).index(d_key)]] # subtract mapping schema from original schema for key in new_schema_list: # delete not necessary tests based on mapping exp_conf = [] - exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + exp_conf.append( + suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) for exps in exp_conf: for exp in exps: suite_old.remove_expectation( @@ -183,106 +210,123 @@ def to_expectation_suite( ) # schema_values = list(mapping_config[d_key].values()) - schema_values = dict_values_schema_list[list(dict_keys).index(d_key)] - for key, v in zip(dict_keys_schema_list[list(dict_keys).index(d_key)], - schema_values): # remove table schema test and replace columns name in tests + schema_values = dict_values_schema_list[list( + dict_keys).index(d_key)] + for key, v in zip(dict_keys_schema_list[list(dict_keys).index( + d_key)], schema_values): # remove table schema test and replace columns name in tests exp_conf = [] - exp_conf.append(suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) - for exps in exp_conf: - for exp in exps: - if (exp["expectation_type"] == "expect_table_columns_to_match_set" or exp[ - "expectation_type"] == "expect_table_row_count_to_equal"): - suite_old.remove_expectation( - exp, - match_type="runtime", - ) - else: - suite_old.patch_expectation( - exp, - op="replace", - path="/column", - value=v, - match_type="runtime", - ) + if key in suite_old.get_grouped_and_ordered_expectations_by_column()[ + 0]: + exp_conf.append( + suite_old.get_grouped_and_ordered_expectations_by_column()[0][key]) + for exps in exp_conf: + for exp in exps: + if (exp["expectation_type"] == "expect_table_columns_to_match_set" or exp[ + "expectation_type"] == "expect_table_row_count_to_equal"): + suite_old.remove_expectation( + exp, + match_type="runtime", + ) + else: + suite_old.patch_expectation( + exp, + op="replace", + path="/column", + value=v, + match_type="runtime", + ) dict_suites.append(suite_old) # ##run tests generation against new columns - suite_old = data_context.create_expectation_suite(f"{suite_name}_{run_name}", - overwrite_existing=True) - batch = ge.dataset.PandasDataset(self.df, expectation_suite=suite_old) + suite_old = data_context.add_or_update_expectation_suite( + expectation_suite_name=f"{suite_name}_{run_name}" + ) + validator = data_context.get_validator( + batch_request=batch_request, + expectation_suite=suite_old, + ) summary = self.get_description() - for name, variable_summary in summary["variables"].items(): + for name, variable_summary in summary.variables.items(): if name in schema_list and name not in ignored_columns: - handler.handle(variable_summary["type"], name, variable_summary, batch) - suite = batch.get_expectation_suite(discard_failed_expectations=False) + handler.handle( + variable_summary["type"], name, variable_summary, validator) + suite = validator.get_expectation_suite( + discard_failed_expectations=False) - ## join all suites to one - ## new version + # join all suites to one + # new version # for dict_suite in dict_suites: # for (key, value) in dict_suite.get_grouped_and_ordered_expectations_by_column()[0].items(): # suite.add_expectation_configurations(value) for dict_suite in dict_suites: - for (key, values) in dict_suite.get_grouped_and_ordered_expectations_by_column()[0].items(): + for (key, values) in dict_suite.get_grouped_and_ordered_expectations_by_column()[ + 0].items(): for value in values: - suite.add_expectation(ExpectationConfiguration(kwargs=value["kwargs"], - expectation_type=value[ - "expectation_type"], - meta=value["meta"])) + suite.add_expectation( + ExpectationConfiguration( + kwargs=value["kwargs"], + expectation_type=value["expectation_type"], + meta=value["meta"])) - ## add expected_table_columns_to_match_set - final_schema = sum(dict_values_schema_list, [mapping_schema[x] for x in schema_list]) + # add expected_table_columns_to_match_set + final_schema = sum( + dict_values_schema_list, [ + mapping_schema[x] for x in schema_list]) suite.add_expectation( - expectation_configuration=ExpectationConfiguration(kwargs={"column_set": final_schema}, - expectation_type="expect_table_columns_to_match_set")) + expectation_configuration=ExpectationConfiguration( + kwargs={ + "column_set": final_schema}, + expectation_type="expect_table_columns_to_match_set")) suite.add_expectation( - expectation_configuration=ExpectationConfiguration(kwargs={"value": summary['table']['n']}, - expectation_type="expect_table_row_count_to_equal")) - data_context.save_expectation_suite(expectation_suite=suite, - expectation_suite_name=f"{suite_name}_{run_name}", - overwrite_existing=True, discard_failed_expectations=False) + expectation_configuration=ExpectationConfiguration( + kwargs={ + "value": summary.table['n']}, + expectation_type="expect_table_row_count_to_equal")) + data_context.add_or_update_expectation_suite( + expectations=suite.expectations, + expectation_suite_name=f"{suite_name}_{run_name}") else: - suite = data_context.create_expectation_suite( - f"{suite_name}_{run_name}", overwrite_existing=True, + + suite = data_context.add_or_update_expectation_suite( + expectation_suite_name=f"{suite_name}_{run_name}" ) # Instantiate an in-memory pandas dataset - batch = ge.dataset.PandasDataset(self.df, expectation_suite=suite) - + validator = data_context.get_validator( + batch_request=batch_request, + expectation_suite=suite, + ) # Obtain the profiling summary summary = self.get_description() # type: ignore # Dispatch to expectations per semantic variable type name_list = [] - for name, variable_summary in summary["variables"].items(): + for name, variable_summary in summary.variables.items(): name_list.append(name) if mapping_schema is not None: - if name in list(mapping_schema.keys()) and name not in ignored_columns: - handler.handle(variable_summary["type"], name, variable_summary, batch) + if name in list( + mapping_schema.keys()) and name not in ignored_columns: + handler.handle( + variable_summary["type"], + name, + variable_summary, + validator) else: if name not in ignored_columns: - handler.handle(variable_summary["type"], name, variable_summary, batch) - batch.expect_table_columns_to_match_set( + handler.handle( + variable_summary["type"], + name, + variable_summary, + validator) + validator.expect_table_columns_to_match_set( column_set=name_list) - batch.expect_table_row_count_to_equal(value=summary['table']['n']) - suite = batch.get_expectation_suite(discard_failed_expectations=False) - - validation_result_identifier = None - if run_validation: - batch = ge.dataset.PandasDataset(self.df, expectation_suite=suite) - - results = data_context.run_validation_operator( - "action_list_operator", assets_to_validate=[batch] - ) - validation_result_identifier = results.list_validation_result_identifiers()[ - 0 - ] - if save_suite or build_data_docs: - data_context.save_expectation_suite(suite) + validator.expect_table_row_count_to_equal(value=summary.table['n']) + suite = validator.get_expectation_suite( + discard_failed_expectations=False) - if build_data_docs: - data_context.build_data_docs() - data_context.open_data_docs(validation_result_identifier) + if save_suite: + data_context.update_expectation_suite(suite) - return batch.get_expectation_suite() + return validator.get_expectation_suite() diff --git a/functions/data_test/data_source_factory.py b/functions/data_test/data_source_factory.py index 15db8f2..c7278b0 100644 --- a/functions/data_test/data_source_factory.py +++ b/functions/data_test/data_source_factory.py @@ -5,16 +5,24 @@ ENV = os.environ['ENVIRONMENT'] + class DataSourceFactory: - + @staticmethod - def create_data_source(engine, qa_bucket_name, extension, run_name, table_name, coverage_config): + def create_data_source( + engine, + qa_bucket_name, + extension, + run_name, + table_name, + coverage_config): if engine == 's3': return S3DataSource(extension) elif engine == 'athena': return AthenaDataSource(qa_bucket_name, table_name) elif engine == 'redshift': - return RedshiftDataSource(qa_bucket_name, run_name, table_name, coverage_config) + return RedshiftDataSource( + qa_bucket_name, run_name, table_name, coverage_config) elif engine == 'hudi': return HudiDataSource(qa_bucket_name, run_name, table_name) else: @@ -43,7 +51,7 @@ def read(self, source): return wr.s3.read_json(path=source, lines=True), source else: return wr.s3.read_parquet(path=source), source - + class AthenaDataSource(DataSource): def __init__(self, qa_bucket_name, table_name): @@ -76,9 +84,10 @@ def read(self, source): redshift_db = os.environ['REDSHIFT_DB'] redshift_secret = os.environ['REDSHIFT_SECRET'] try: - sort_keys_config = json.loads( - wr.s3.read_json(path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json()) - sort_key = list(map(str.lower, sort_keys_config[self.table_name]["sortKey"])) + sort_keys_config = json.loads(wr.s3.read_json( + path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json()) + sort_key = list(map(str.lower, + sort_keys_config[self.table_name]["sortKey"])) except KeyError: sort_key = ['update_dt'] try: @@ -87,21 +96,26 @@ def read(self, source): target_table = None if target_table: table_name = target_table - con = wr.redshift.connect(secret_id=redshift_secret, dbname=redshift_db) + con = wr.redshift.connect( + secret_id=redshift_secret, dbname=redshift_db) try: nunique = final_df.nunique()[sort_key][0] - except (KeyError,IndexError) as e: + except (KeyError, IndexError) as e: nunique = final_df.nunique()[sort_key] if nunique > 1: min_key = final_df[sort_key].min() max_key = final_df[sort_key].max() - if type(min_key) != str or type(max_key) != str: + if not isinstance( + min_key, + str) or not isinstance( + max_key, + str): min_key = final_df[sort_key].min()[0] max_key = final_df[sort_key].max()[0] sql_query = f"SELECT * FROM public.{self.table_name} WHERE {sort_key[0]} between \\'{min_key}\\' and \\'{max_key}\\'" else: key = final_df[sort_key].values[0] - if type(key) != str: + if not isinstance(key, str): key = str(key[0]) sql_query = f"SELECT * FROM {table_name}.{table_name} WHERE {sort_key[0]}=\\'{key}\\'" path = f"s3://{self.qa_bucket_name}/redshift/{self.table_name}/" @@ -109,7 +123,7 @@ def read(self, source): return final_df, source def unload_final_df(self, sql_query, con, path): - try: + try: final_df = wr.redshift.unload( sql=sql_query, con=con, @@ -119,6 +133,7 @@ def unload_final_df(self, sql_query, con, path): con.close() return final_df + class HudiDataSource(DataSource): def __init__(self, qa_bucket_name, run_name, table_name): self.qa_bucket_name = qa_bucket_name @@ -126,13 +141,19 @@ def __init__(self, qa_bucket_name, run_name, table_name): self.table_name = table_name def read(self, source): - columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key', - '_hoodie_partition_path', '_hoodie_file_name'] - pk_config = wr.s3.read_json(path=f"s3://{self.qa_bucket_name}/test_configs/pks.json") + columns_to_drop = [ + '_hoodie_commit_time', + '_hoodie_commit_seqno', + '_hoodie_record_key', + '_hoodie_partition_path', + '_hoodie_file_name'] + pk_config = wr.s3.read_json( + path=f"s3://{self.qa_bucket_name}/test_configs/pks.json") parquet_args = { 'timestamp_as_object': True } - df = wr.s3.read_parquet(path=source, pyarrow_additional_kwargs=parquet_args) + df = wr.s3.read_parquet(path=source, + pyarrow_additional_kwargs=parquet_args) try: primary_key = pk_config[self.table_name][0] except KeyError: @@ -147,12 +168,16 @@ def read(self, source): ctas_approach=False, s3_output=f"s3://{self.qa_bucket_name}/athena_results/" ) - final_df = data[data['dms_load_at'].isin(keys)].reset_index(drop=True) + final_df = data[data['dms_load_at'].isin( + keys)].reset_index(drop=True) try: path = final_df['_hoodie_commit_time'].iloc[0] except IndexError: raise IndexError('Keys from CDC not found in HUDI table') - final_df = final_df.drop(columns_to_drop, axis=1).reset_index(drop=True) + final_df = final_df.drop( + columns_to_drop, + axis=1).reset_index( + drop=True) return final_df, path else: keys = df.groupby(primary_key)['dms_load_at'].max().tolist() diff --git a/functions/data_test/data_test.py b/functions/data_test/data_test.py index 353c09d..ad4ccc6 100755 --- a/functions/data_test/data_test.py +++ b/functions/data_test/data_test.py @@ -32,7 +32,7 @@ def handler(event, context): mapping_config = json.loads( s3.Object(qa_bucket_name, "test_configs/mapping.json").get()["Body"] .read().decode("utf-8")) - if type(source_input) is not list: + if not isinstance(source_input, list): source = [source_input] else: source = source_input @@ -54,11 +54,13 @@ def handler(event, context): final_ds, path = prepare_final_ds(source, engine, source_root, run_name, source_name, suite_coverage_config) - profile_link, folder_key, config = profile_data(final_ds, suite_name, - cloudfront, source_root, - source_covered, - mapping_config, run_name) - validate_id = validate_data(final_ds, suite_name, config) + profile_link, folder_key, saved_context, data_asset = profile_data( + final_ds, suite_name, cloudfront, source_root, source_covered, mapping_config, run_name) + validate_id = validate_data( + final_ds, + suite_name, + saved_context, + data_asset) test_suite = f"{cloudfront}/data_docs/validations/{validate_id}.html" report = { diff --git a/functions/data_test/profiling.py b/functions/data_test/profiling.py index 9eeac1e..8dc0110 100755 --- a/functions/data_test/profiling.py +++ b/functions/data_test/profiling.py @@ -1,14 +1,14 @@ import json -from pandas_profiling import ProfileReport +from ydata_profiling import ProfileReport import os import boto3 import awswrangler as wr -from pandas_profiling.model import expectation_algorithms -from pandas_profiling.model.handler import Handler +from ydata_profiling.model import expectation_algorithms +from ydata_profiling.model.handler import Handler from Expectation_report_new import ExpectationsReportNew -from pandas_profiling.expectations_report import ExpectationsReport +from ydata_profiling.expectations_report import ExpectationsReport from datetime import datetime -from great_expectations.data_context import BaseDataContext +from great_expectations.data_context import EphemeralDataContext from great_expectations.data_context.types.base import (DataContextConfig, S3StoreBackendDefaults) import yaml @@ -44,21 +44,25 @@ def __init__(self, typeset, *args, **kwargs): ], "Categorical": [expectation_algorithms.categorical_expectations, expectations_null, - ], + "Text": [expectation_algorithms.categorical_expectations, + expectations_null + ], "Boolean": [expectations_null, ], - "Numeric": [generic_expectations_without_null, expectations_null, + "Numeric": [generic_expectations_without_null, + expectations_null, ], - "URL": [expectation_algorithms.url_expectations, expectations_null, + "URL": [expectation_algorithms.url_expectations, + expectations_null, ], - "File": [expectation_algorithms.file_expectations, + "File": [expectation_algorithms.file_expectations, expectations_null, ], - "Path": [expectation_algorithms.path_expectations, + "Path": [expectation_algorithms.path_expectations, expectations_null, ], - "DateTime": [expectation_algorithms.datetime_expectations, + "DateTime": [expectation_algorithms.datetime_expectations, expectations_null, ], - "Image": [expectation_algorithms.image_expectations, + "Image": [expectation_algorithms.image_expectations, expectations_null, ], } super().__init__(mapping, typeset, *args, **kwargs) @@ -66,63 +70,41 @@ def __init__(self, typeset, *args, **kwargs): def change_ge_config(datasource_root): configfile = read_gx_config_file() - datasources = { - "pandas_s3": { - "class_name": "PandasDatasource", - "batch_kwargs_generators": { - "pandas_s3_generator": { - "class_name": "S3GlobReaderBatchKwargsGenerator", - "bucket": datasource_root, - "assets": { - "your_first_data_asset_name": { - "prefix": "/", - "regex_filter": ".*" - } - } - } - }, - "module_name": "great_expectations.datasource", - "data_asset_type": { - "class_name": "PandasDataset", - "module_name": "great_expectations.dataset" - } - } - } - if os.environ['ENVIRONMENT'] == 'local': stores = configfile["stores"] new_stores = add_local_s3_to_stores(stores, endpoint_url) data_docs_sites = configfile["data_docs_sites"] new_data_docs_sites = add_local_s3_to_data_docs(data_docs_sites, endpoint_url) - config = DataContextConfig(config_version=configfile["config_version"], - datasources=datasources, - stores=new_stores, - data_docs_sites=new_data_docs_sites, - expectations_store_name=configfile["expectations_store_name"], - validations_store_name=configfile["validations_store_name"], - evaluation_parameter_store_name=configfile["evaluation_parameter_store_name"], - plugins_directory="/great_expectations/plugins", - validation_operators=configfile["validation_operators"], - config_variables_file_path=configfile["config_variables_file_path"], - anonymous_usage_statistics=configfile["anonymous_usage_statistics"], - store_backend_defaults=S3StoreBackendDefaults( - default_bucket_name=qa_bucket_name, - expectations_store_prefix=f"{qa_bucket_name}/great_expectations/expectations/", - validations_store_prefix=f"{qa_bucket_name}/great_expectations/uncommitted/validations/")) + config = DataContextConfig( + config_version=configfile["config_version"], + stores=new_stores, + data_docs_sites=new_data_docs_sites, + expectations_store_name=configfile["expectations_store_name"], + validations_store_name=configfile["validations_store_name"], + evaluation_parameter_store_name=configfile["evaluation_parameter_store_name"], + plugins_directory="/great_expectations/plugins", + validation_operators=configfile["validation_operators"], + config_variables_file_path=configfile["config_variables_file_path"], + anonymous_usage_statistics=configfile["anonymous_usage_statistics"], + store_backend_defaults=S3StoreBackendDefaults( + default_bucket_name=qa_bucket_name, + expectations_store_prefix=f"{qa_bucket_name}/great_expectations/expectations/", + validations_store_prefix=f"{qa_bucket_name}/great_expectations/uncommitted/validations/")) else: - config = DataContextConfig(config_version=configfile["config_version"], datasources=datasources, - expectations_store_name=configfile["expectations_store_name"], - validations_store_name=configfile["validations_store_name"], - evaluation_parameter_store_name=configfile["evaluation_parameter_store_name"], - plugins_directory="/great_expectations/plugins", - validation_operators=configfile["validation_operators"], - config_variables_file_path=configfile["config_variables_file_path"], - anonymous_usage_statistics=configfile["anonymous_usage_statistics"], - store_backend_defaults=S3StoreBackendDefaults( - default_bucket_name=qa_bucket_name, - expectations_store_prefix=f"{qa_bucket_name}/great_expectations/expectations/", - validations_store_prefix=f"{qa_bucket_name}/great_expectations/uncommitted/validations/")) + config = DataContextConfig( + config_version=configfile["config_version"], + expectations_store_name=configfile["expectations_store_name"], + validations_store_name=configfile["validations_store_name"], + evaluation_parameter_store_name=configfile["evaluation_parameter_store_name"], + plugins_directory="/great_expectations/plugins", + validation_operators=configfile["validation_operators"], + config_variables_file_path=configfile["config_variables_file_path"], + anonymous_usage_statistics=configfile["anonymous_usage_statistics"], + store_backend_defaults=S3StoreBackendDefaults( + default_bucket_name=qa_bucket_name, + expectations_store_prefix=f"{qa_bucket_name}/great_expectations/expectations/", + validations_store_prefix=f"{qa_bucket_name}/great_expectations/uncommitted/validations/")) return config @@ -158,7 +140,9 @@ def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, mapping_config, run_name): qa_bucket = s3.Bucket(qa_bucket_name) config = change_ge_config(datasource_root) - context_ge = BaseDataContext(project_config=config) + context_ge = EphemeralDataContext(project_config=config) + datasource = context_ge.sources.add_pandas(name="cloud") + data_asset = datasource.add_dataframe_asset(name=suite_name, dataframe=df) try: profile = ProfileReport(df, title=f"{suite_name} Profiling Report", minimal=True, pool_size=1) @@ -169,8 +153,8 @@ def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, report = profile.to_html() if not source_covered: try: - pipeline_config = json.loads( - wr.s3.read_json(path=f"s3://{qa_bucket_name}/test_configs/pipeline.json").to_json()) + pipeline_config = json.loads(wr.s3.read_json( + path=f"s3://{qa_bucket_name}/test_configs/pipeline.json").to_json()) reuse_suite = pipeline_config[run_name]['reuse_suite'] use_old_suite_only = pipeline_config[run_name]['use_old_suite_only'] old_suite_name = pipeline_config[run_name]['old_suite_name'] @@ -183,9 +167,6 @@ def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, data_context=context_ge, suite_name=remove_suffix(suite_name, f"_{run_name}"), run_name=run_name, - save_suite=True, - run_validation=False, - build_data_docs=False, reuse_suite=reuse_suite, mapping_config=mapping_config, use_old_suite=use_old_suite_only, @@ -196,9 +177,9 @@ def profile_data(df, suite_name, cloudfront, datasource_root, source_covered, now = datetime.now() date_time = now.strftime("%y%m%dT%H%M%S") folder = f"{folder}{suite_name}/{str(date_time)}/" - + qa_bucket.put_object(Key=folder) qa_bucket.put_object(Key=f"{folder}{suite_name}_profiling.html", Body=report, ContentType='text/html') profile_link = f"{cloudfront}/{folder}{suite_name}_profiling.html" - return profile_link, date_time, config + return profile_link, date_time, context_ge, data_asset diff --git a/functions/data_test/requirements.txt b/functions/data_test/requirements.txt index 8960b58..eab4527 100755 --- a/functions/data_test/requirements.txt +++ b/functions/data_test/requirements.txt @@ -1,10 +1,10 @@ boto3==1.26.66 botocore==1.29.66 importlib-metadata==6.0.0 -great-expectations==0.14.13 +great-expectations==0.16.14 s3fs==0.4.2 python-dateutil==2.8.2 fastparquet==0.8.1 awswrangler==2.19.0 -pandas-profiling==3.6.3 -jinja2==3.0.3 \ No newline at end of file +ydata-profiling==4.2.0 +jinja2==3.0.3 diff --git a/functions/data_test/suite_run.py b/functions/data_test/suite_run.py index 1dd8211..a278665 100644 --- a/functions/data_test/suite_run.py +++ b/functions/data_test/suite_run.py @@ -1,24 +1,32 @@ from pathlib import Path -from great_expectations.data_context import BaseDataContext +from great_expectations.data_context import EphemeralDataContext +from great_expectations.checkpoint import SimpleCheckpoint BASE_DIR = Path(__file__).resolve().parent -def validate_data(file, suite_name, config): - context_ge = BaseDataContext(project_config=config) - +def validate_data(file, suite_name, saved_context, data_asset): + context_ge = saved_context expectation_suite_name = suite_name - batch_kwargs = {'dataset': file, - 'datasource': "pandas_s3", - 'data_asset_name': expectation_suite_name} - batch = context_ge.get_batch( - batch_kwargs=batch_kwargs, - expectation_suite_name=expectation_suite_name + batch_request = data_asset.build_batch_request() + checkpoint_config = { + "class_name": "SimpleCheckpoint", + "validations": [ + { + "batch_request": batch_request, + "expectation_suite_name": expectation_suite_name + } + ] + } + checkpoint = SimpleCheckpoint( + f"_tmp_checkpoint_{expectation_suite_name}", + context_ge, + **checkpoint_config ) - results = context_ge.run_validation_operator( - "action_list_operator", assets_to_validate=[batch]) - identifiers = results.list_validation_result_identifiers() - validation_result_identifier = identifiers[0] - if (not results['success']): + results = checkpoint.run(result_format="SUMMARY", run_name=suite_name) + validation_result_identifier = results.list_validation_result_identifiers()[ + 0] + + if not results['success']: context_ge.build_data_docs( site_names='s3_site', resource_identifiers=[validation_result_identifier]