Skip to content

Commit

Permalink
[MAINTENANCE]Update PandasProfiling and GreatExpectations (#87)
Browse files Browse the repository at this point in the history
* updated pp and gx

---------

Co-authored-by: bvolodarskiy <bvolodarskiy@provectus.com>
  • Loading branch information
bvolodarskiy and bvolodarskiy authored Jun 9, 2023
1 parent 3bda82c commit 05440e9
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 269 deletions.
19 changes: 11 additions & 8 deletions functions/allure_report/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ def get_test_name(file):
def get_suit_name(file, i):
if "column" in i["expectation_config"]["kwargs"]:
column = i["expectation_config"]["kwargs"]["column"]
data_asset_name = file["meta"]["batch_kwargs"]["data_asset_name"]
data_asset_name = file["meta"]["active_batch_definition"]["data_asset_name"]
return f"{data_asset_name}.{column}"
return file["meta"]["batch_kwargs"]["data_asset_name"]
return file["meta"]["active_batch_definition"]["data_asset_name"]


def get_jira_ticket(file):
Expand Down Expand Up @@ -103,11 +103,14 @@ def get_stop_suit_time():


def parse_datetime(date_str):
return datetime.timestamp(datetime.strptime(date_str, '%Y%m%dT%H%M%S.%fZ')) * 1000
if '+00:00' in date_str:
return datetime.timestamp(datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%f+00:00')) * 1000
else:
return datetime.timestamp(datetime.strptime(date_str, '%Y%m%dT%H%M%S.%fZ')) * 1000


def get_start_test_time(file):
return parse_datetime(file['meta']['run_id']['run_name'])
return parse_datetime(file['meta']['run_id']['run_time'])


def get_stop_test_time(file):
Expand All @@ -116,7 +119,7 @@ def get_stop_test_time(file):

def get_params(file):
params = file['expectation_config']['kwargs']
del params['result_format']
del params['batch_id']
results = []
for param in params:
if isinstance(params[param], list):
Expand All @@ -140,7 +143,7 @@ def get_test_description(file):
for f in file["result"]:
if str(f) != "observed_value":
result = result + "\n" + \
f"{str(f)}: {str(file['result'][f])}" + "\n"
f"{str(f)}: {str(file['result'][f])}" + "\n"
return result


Expand Down Expand Up @@ -198,7 +201,7 @@ def create_categories_json(json_name, key):
result = json.dumps(data)
s3.Object(qa_bucket,
f"allure/{json_name}{key}/result/categories.json").put(
Body=bytes(result.encode("UTF-8")))
Body=bytes(result.encode("UTF-8")))


def get_uuid(i, json_name, key):
Expand Down Expand Up @@ -242,7 +245,7 @@ def create_suit_json(json_name, key, validate_id):
{
"name": "severity",
"value": get_severity(i)
}
}
],
"links": [get_jira_ticket(i)],
"name": get_test_name(i),
Expand Down
342 changes: 193 additions & 149 deletions functions/data_test/Expectation_report_new.py

Large diffs are not rendered by default.

61 changes: 43 additions & 18 deletions functions/data_test/data_source_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@

ENV = os.environ['ENVIRONMENT']


class DataSourceFactory:

@staticmethod
def create_data_source(engine, qa_bucket_name, extension, run_name, table_name, coverage_config):
def create_data_source(
engine,
qa_bucket_name,
extension,
run_name,
table_name,
coverage_config):
if engine == 's3':
return S3DataSource(extension)
elif engine == 'athena':
return AthenaDataSource(qa_bucket_name, table_name)
elif engine == 'redshift':
return RedshiftDataSource(qa_bucket_name, run_name, table_name, coverage_config)
return RedshiftDataSource(
qa_bucket_name, run_name, table_name, coverage_config)
elif engine == 'hudi':
return HudiDataSource(qa_bucket_name, run_name, table_name)
else:
Expand Down Expand Up @@ -43,7 +51,7 @@ def read(self, source):
return wr.s3.read_json(path=source, lines=True), source
else:
return wr.s3.read_parquet(path=source), source


class AthenaDataSource(DataSource):
def __init__(self, qa_bucket_name, table_name):
Expand Down Expand Up @@ -76,9 +84,10 @@ def read(self, source):
redshift_db = os.environ['REDSHIFT_DB']
redshift_secret = os.environ['REDSHIFT_SECRET']
try:
sort_keys_config = json.loads(
wr.s3.read_json(path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json())
sort_key = list(map(str.lower, sort_keys_config[self.table_name]["sortKey"]))
sort_keys_config = json.loads(wr.s3.read_json(
path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json())
sort_key = list(map(str.lower,
sort_keys_config[self.table_name]["sortKey"]))
except KeyError:
sort_key = ['update_dt']
try:
Expand All @@ -87,29 +96,34 @@ def read(self, source):
target_table = None
if target_table:
table_name = target_table
con = wr.redshift.connect(secret_id=redshift_secret, dbname=redshift_db)
con = wr.redshift.connect(
secret_id=redshift_secret, dbname=redshift_db)
try:
nunique = final_df.nunique()[sort_key][0]
except (KeyError,IndexError) as e:
except (KeyError, IndexError) as e:
nunique = final_df.nunique()[sort_key]
if nunique > 1:
min_key = final_df[sort_key].min()
max_key = final_df[sort_key].max()
if type(min_key) != str or type(max_key) != str:
if not isinstance(
min_key,
str) or not isinstance(
max_key,
str):
min_key = final_df[sort_key].min()[0]
max_key = final_df[sort_key].max()[0]
sql_query = f"SELECT * FROM public.{self.table_name} WHERE {sort_key[0]} between \\'{min_key}\\' and \\'{max_key}\\'"
else:
key = final_df[sort_key].values[0]
if type(key) != str:
if not isinstance(key, str):
key = str(key[0])
sql_query = f"SELECT * FROM {table_name}.{table_name} WHERE {sort_key[0]}=\\'{key}\\'"
path = f"s3://{self.qa_bucket_name}/redshift/{self.table_name}/"
final_df = self.unload_final_df(sql_query, con, path)
return final_df, source

def unload_final_df(self, sql_query, con, path):
try:
try:
final_df = wr.redshift.unload(
sql=sql_query,
con=con,
Expand All @@ -119,20 +133,27 @@ def unload_final_df(self, sql_query, con, path):
con.close()
return final_df


class HudiDataSource(DataSource):
def __init__(self, qa_bucket_name, run_name, table_name):
self.qa_bucket_name = qa_bucket_name
self.run_name = run_name
self.table_name = table_name

def read(self, source):
columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
'_hoodie_partition_path', '_hoodie_file_name']
pk_config = wr.s3.read_json(path=f"s3://{self.qa_bucket_name}/test_configs/pks.json")
columns_to_drop = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name']
pk_config = wr.s3.read_json(
path=f"s3://{self.qa_bucket_name}/test_configs/pks.json")
parquet_args = {
'timestamp_as_object': True
}
df = wr.s3.read_parquet(path=source, pyarrow_additional_kwargs=parquet_args)
df = wr.s3.read_parquet(path=source,
pyarrow_additional_kwargs=parquet_args)
try:
primary_key = pk_config[self.table_name][0]
except KeyError:
Expand All @@ -147,12 +168,16 @@ def read(self, source):
ctas_approach=False,
s3_output=f"s3://{self.qa_bucket_name}/athena_results/"
)
final_df = data[data['dms_load_at'].isin(keys)].reset_index(drop=True)
final_df = data[data['dms_load_at'].isin(
keys)].reset_index(drop=True)
try:
path = final_df['_hoodie_commit_time'].iloc[0]
except IndexError:
raise IndexError('Keys from CDC not found in HUDI table')
final_df = final_df.drop(columns_to_drop, axis=1).reset_index(drop=True)
final_df = final_df.drop(
columns_to_drop,
axis=1).reset_index(
drop=True)
return final_df, path
else:
keys = df.groupby(primary_key)['dms_load_at'].max().tolist()
Expand Down
14 changes: 8 additions & 6 deletions functions/data_test/data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def handler(event, context):
mapping_config = json.loads(
s3.Object(qa_bucket_name, "test_configs/mapping.json").get()["Body"]
.read().decode("utf-8"))
if type(source_input) is not list:
if not isinstance(source_input, list):
source = [source_input]
else:
source = source_input
Expand All @@ -54,11 +54,13 @@ def handler(event, context):
final_ds, path = prepare_final_ds(source, engine, source_root, run_name,
source_name, suite_coverage_config)

profile_link, folder_key, config = profile_data(final_ds, suite_name,
cloudfront, source_root,
source_covered,
mapping_config, run_name)
validate_id = validate_data(final_ds, suite_name, config)
profile_link, folder_key, saved_context, data_asset = profile_data(
final_ds, suite_name, cloudfront, source_root, source_covered, mapping_config, run_name)
validate_id = validate_data(
final_ds,
suite_name,
saved_context,
data_asset)
test_suite = f"{cloudfront}/data_docs/validations/{validate_id}.html"

report = {
Expand Down
Loading

0 comments on commit 05440e9

Please sign in to comment.