Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MAINTENANCE]Update PandasProfiling and GreatExpectations #87

Merged
merged 17 commits into from
Jun 9, 2023
19 changes: 11 additions & 8 deletions functions/allure_report/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ def get_test_name(file):
def get_suit_name(file, i):
if "column" in i["expectation_config"]["kwargs"]:
column = i["expectation_config"]["kwargs"]["column"]
data_asset_name = file["meta"]["batch_kwargs"]["data_asset_name"]
data_asset_name = file["meta"]["active_batch_definition"]["data_asset_name"]
return f"{data_asset_name}.{column}"
return file["meta"]["batch_kwargs"]["data_asset_name"]
return file["meta"]["active_batch_definition"]["data_asset_name"]


def get_jira_ticket(file):
Expand Down Expand Up @@ -103,11 +103,14 @@ def get_stop_suit_time():


def parse_datetime(date_str):
return datetime.timestamp(datetime.strptime(date_str, '%Y%m%dT%H%M%S.%fZ')) * 1000
if '+00:00' in date_str:
return datetime.timestamp(datetime.strptime(date_str, '%Y-%m-%dT%H:%M:%S.%f+00:00')) * 1000
else:
return datetime.timestamp(datetime.strptime(date_str, '%Y%m%dT%H%M%S.%fZ')) * 1000


def get_start_test_time(file):
return parse_datetime(file['meta']['run_id']['run_name'])
return parse_datetime(file['meta']['run_id']['run_time'])


def get_stop_test_time(file):
Expand All @@ -116,7 +119,7 @@ def get_stop_test_time(file):

def get_params(file):
params = file['expectation_config']['kwargs']
del params['result_format']
del params['batch_id']
results = []
for param in params:
if isinstance(params[param], list):
Expand All @@ -140,7 +143,7 @@ def get_test_description(file):
for f in file["result"]:
if str(f) != "observed_value":
result = result + "\n" + \
f"{str(f)}: {str(file['result'][f])}" + "\n"
f"{str(f)}: {str(file['result'][f])}" + "\n"
return result


Expand Down Expand Up @@ -198,7 +201,7 @@ def create_categories_json(json_name, key):
result = json.dumps(data)
s3.Object(qa_bucket,
f"allure/{json_name}{key}/result/categories.json").put(
Body=bytes(result.encode("UTF-8")))
Body=bytes(result.encode("UTF-8")))


def get_uuid(i, json_name, key):
Expand Down Expand Up @@ -242,7 +245,7 @@ def create_suit_json(json_name, key, validate_id):
{
"name": "severity",
"value": get_severity(i)
}
}
],
"links": [get_jira_ticket(i)],
"name": get_test_name(i),
Expand Down
342 changes: 193 additions & 149 deletions functions/data_test/Expectation_report_new.py

Large diffs are not rendered by default.

61 changes: 43 additions & 18 deletions functions/data_test/data_source_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@

ENV = os.environ['ENVIRONMENT']


class DataSourceFactory:

@staticmethod
def create_data_source(engine, qa_bucket_name, extension, run_name, table_name, coverage_config):
def create_data_source(
engine,
qa_bucket_name,
extension,
run_name,
table_name,
coverage_config):
if engine == 's3':
return S3DataSource(extension)
elif engine == 'athena':
return AthenaDataSource(qa_bucket_name, table_name)
elif engine == 'redshift':
return RedshiftDataSource(qa_bucket_name, run_name, table_name, coverage_config)
return RedshiftDataSource(
qa_bucket_name, run_name, table_name, coverage_config)
elif engine == 'hudi':
return HudiDataSource(qa_bucket_name, run_name, table_name)
else:
Expand Down Expand Up @@ -43,7 +51,7 @@ def read(self, source):
return wr.s3.read_json(path=source, lines=True), source
else:
return wr.s3.read_parquet(path=source), source


class AthenaDataSource(DataSource):
def __init__(self, qa_bucket_name, table_name):
Expand Down Expand Up @@ -76,9 +84,10 @@ def read(self, source):
redshift_db = os.environ['REDSHIFT_DB']
redshift_secret = os.environ['REDSHIFT_SECRET']
try:
sort_keys_config = json.loads(
wr.s3.read_json(path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json())
sort_key = list(map(str.lower, sort_keys_config[self.table_name]["sortKey"]))
sort_keys_config = json.loads(wr.s3.read_json(
path=f"s3://{self.qa_bucket_name}/test_configs/sort_keys.json").to_json())
sort_key = list(map(str.lower,
sort_keys_config[self.table_name]["sortKey"]))
except KeyError:
sort_key = ['update_dt']
try:
Expand All @@ -87,29 +96,34 @@ def read(self, source):
target_table = None
if target_table:
table_name = target_table
con = wr.redshift.connect(secret_id=redshift_secret, dbname=redshift_db)
con = wr.redshift.connect(
secret_id=redshift_secret, dbname=redshift_db)
try:
nunique = final_df.nunique()[sort_key][0]
except (KeyError,IndexError) as e:
except (KeyError, IndexError) as e:
nunique = final_df.nunique()[sort_key]
if nunique > 1:
min_key = final_df[sort_key].min()
max_key = final_df[sort_key].max()
if type(min_key) != str or type(max_key) != str:
if not isinstance(
min_key,
str) or not isinstance(
max_key,
str):
min_key = final_df[sort_key].min()[0]
max_key = final_df[sort_key].max()[0]
sql_query = f"SELECT * FROM public.{self.table_name} WHERE {sort_key[0]} between \\'{min_key}\\' and \\'{max_key}\\'"
else:
key = final_df[sort_key].values[0]
if type(key) != str:
if not isinstance(key, str):
key = str(key[0])
sql_query = f"SELECT * FROM {table_name}.{table_name} WHERE {sort_key[0]}=\\'{key}\\'"
path = f"s3://{self.qa_bucket_name}/redshift/{self.table_name}/"
final_df = self.unload_final_df(sql_query, con, path)
return final_df, source

def unload_final_df(self, sql_query, con, path):
try:
try:
final_df = wr.redshift.unload(
sql=sql_query,
con=con,
Expand All @@ -119,20 +133,27 @@ def unload_final_df(self, sql_query, con, path):
con.close()
return final_df


class HudiDataSource(DataSource):
def __init__(self, qa_bucket_name, run_name, table_name):
self.qa_bucket_name = qa_bucket_name
self.run_name = run_name
self.table_name = table_name

def read(self, source):
columns_to_drop = ['_hoodie_commit_time', '_hoodie_commit_seqno', '_hoodie_record_key',
'_hoodie_partition_path', '_hoodie_file_name']
pk_config = wr.s3.read_json(path=f"s3://{self.qa_bucket_name}/test_configs/pks.json")
columns_to_drop = [
'_hoodie_commit_time',
'_hoodie_commit_seqno',
'_hoodie_record_key',
'_hoodie_partition_path',
'_hoodie_file_name']
pk_config = wr.s3.read_json(
path=f"s3://{self.qa_bucket_name}/test_configs/pks.json")
parquet_args = {
'timestamp_as_object': True
}
df = wr.s3.read_parquet(path=source, pyarrow_additional_kwargs=parquet_args)
df = wr.s3.read_parquet(path=source,
pyarrow_additional_kwargs=parquet_args)
try:
primary_key = pk_config[self.table_name][0]
except KeyError:
Expand All @@ -147,12 +168,16 @@ def read(self, source):
ctas_approach=False,
s3_output=f"s3://{self.qa_bucket_name}/athena_results/"
)
final_df = data[data['dms_load_at'].isin(keys)].reset_index(drop=True)
final_df = data[data['dms_load_at'].isin(
keys)].reset_index(drop=True)
try:
path = final_df['_hoodie_commit_time'].iloc[0]
except IndexError:
raise IndexError('Keys from CDC not found in HUDI table')
final_df = final_df.drop(columns_to_drop, axis=1).reset_index(drop=True)
final_df = final_df.drop(
columns_to_drop,
axis=1).reset_index(
drop=True)
return final_df, path
else:
keys = df.groupby(primary_key)['dms_load_at'].max().tolist()
Expand Down
14 changes: 8 additions & 6 deletions functions/data_test/data_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def handler(event, context):
mapping_config = json.loads(
s3.Object(qa_bucket_name, "test_configs/mapping.json").get()["Body"]
.read().decode("utf-8"))
if type(source_input) is not list:
if not isinstance(source_input, list):
source = [source_input]
else:
source = source_input
Expand All @@ -54,11 +54,13 @@ def handler(event, context):
final_ds, path = prepare_final_ds(source, engine, source_root, run_name,
source_name, suite_coverage_config)

profile_link, folder_key, config = profile_data(final_ds, suite_name,
cloudfront, source_root,
source_covered,
mapping_config, run_name)
validate_id = validate_data(final_ds, suite_name, config)
profile_link, folder_key, saved_context, data_asset = profile_data(
final_ds, suite_name, cloudfront, source_root, source_covered, mapping_config, run_name)
validate_id = validate_data(
final_ds,
suite_name,
saved_context,
data_asset)
test_suite = f"{cloudfront}/data_docs/validations/{validate_id}.html"

report = {
Expand Down
Loading