Skip to content

Commit

Permalink
Merge pull request ibis-project#33 from amyskov/santander
Browse files Browse the repository at this point in the history
[Santander]: ETL query implementation correction
  • Loading branch information
semelianova authored Mar 24, 2020
2 parents 8c20d10 + 2790a5e commit 6570934
Showing 1 changed file with 129 additions and 189 deletions.
318 changes: 129 additions & 189 deletions santander/santander_pandas_ibis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
import warnings
from timeit import default_timer as timer

import pandas as pd

import ibis
import mysql.connector
import pandas as pd

sys.path.append(os.path.join(os.path.dirname(__file__), ".."))

from report import DbReport
from server import OmnisciServer
from server_worker import OmnisciServerWorker
Expand Down Expand Up @@ -316,8 +316,9 @@ def etl_ibis(args, run_import_queries, columns_names, columns_types, validation=
# nested sql requests
t0 = timer()
count_cols = []
orig_cols = ["ID_code"] + ["var_%s" % i for i in range(200)]
orig_cols = ["ID_code", "target"] + ['var_%s'%i for i in range(200)]
cast_cols = []
cast_cols.append(table["target"].cast("int64").name("target0"))
gt1_cols = []
for i in range(200):
col = "var_%d" % i
Expand Down Expand Up @@ -349,16 +350,14 @@ def etl_ibis(args, run_import_queries, columns_names, columns_types, validation=
t0 = timer()
training_part, validation_part = table_df[:-10000], table_df[-10000:]
etl_times["t_train_test_split"] = timer() - t0

etl_times["t_etl"] = (
etl_times["t_groupby_merge_where"] + etl_times["t_train_test_split"]
)

x_train = training_part.drop(["target"], axis=1)
y_train = training_part["target"]
x_valid = validation_part.drop(["target"], axis=1)
y_valid = validation_part["target"]


etl_times["t_etl"] = etl_times["t_groupby_merge_where"] + etl_times["t_train_test_split"]

x_train = training_part.drop(['target0'],axis=1)
y_train = training_part['target0']
x_valid = validation_part.drop(['target0'],axis=1)
y_valid = validation_part['target0']

omnisci_server.terminate()
omnisci_server = None

Expand Down Expand Up @@ -753,192 +752,133 @@ def main():
columns_types_ibis_val = ["string", "string"] + ["string" for _ in range(200)]
columns_types_pd_val = ["object", "object"] + ["object" for _ in range(200)]

# try:
db_reporter = None
if args.db_user is not "":
print("Connecting to database")
db = mysql.connector.connect(
host=args.db_server,
port=args.db_port,
user=args.db_user,
passwd=args.db_pass,
db=args.db_name,
)
db_reporter = DbReport(
db,
args.db_table,
{
"QueryName": "VARCHAR(500) NOT NULL",
"FirstExecTimeMS": "BIGINT UNSIGNED",
"WorstExecTimeMS": "BIGINT UNSIGNED",
"BestExecTimeMS": "BIGINT UNSIGNED",
"AverageExecTimeMS": "BIGINT UNSIGNED",
"TotalTimeMS": "BIGINT UNSIGNED",
"IbisCommitHash": "VARCHAR(500) NOT NULL",
"BackEnd": "VARCHAR(100) NOT NULL",
},
{
"ScriptName": "santander_pandas_ibis.py",
"CommitHash": args.commit_omnisci,
"IbisCommitHash": args.commit_ibis,
},
)

if not args.no_ibis:
if args.omnisci_executable is None:
parser.error("Omnisci executable should be specified with -e/--executable")
try:
db_reporter = None
if args.db_user is not "":
print("Connecting to database")
db = mysql.connector.connect(host=args.db_server, port=args.db_port, user=args.db_user,
passwd=args.db_pass, db=args.db_name)
db_reporter = DbReport(db, args.db_table, {
'QueryName': 'VARCHAR(500) NOT NULL',
'FirstExecTimeMS': 'BIGINT UNSIGNED',
'WorstExecTimeMS': 'BIGINT UNSIGNED',
'BestExecTimeMS': 'BIGINT UNSIGNED',
'AverageExecTimeMS': 'BIGINT UNSIGNED',
'TotalTimeMS': 'BIGINT UNSIGNED',
'IbisCommitHash': 'VARCHAR(500) NOT NULL',
'BackEnd': 'VARCHAR(100) NOT NULL'
}, {
'ScriptName': 'santander_pandas_ibis.py',
'CommitHash': args.commit_omnisci,
'IbisCommitHash': args.commit_ibis
})

if not args.no_ibis:
if args.omnisci_executable is None:
parser.error("Omnisci executable should be specified with -e/--executable")

etl_ibis_args = {'args': args, 'run_import_queries': "False",
'columns_names': columns_names, 'columns_types': columns_types_ibis,
'validation': "False"}
x_train_ibis, y_train_ibis, x_valid_ibis, y_valid_ibis, etl_times_ibis = query_measurement_etl(etl_ibis,
etl_ibis_args,
args.iterations,
"etl_ibis")

print_times_nested(etl_times_ibis, name='Ibis')
if db_reporter is not None:
submit_results_to_db(db_reporter=db_reporter, args=args, backend='etl_ibis', results=etl_times_ibis)

import_pandas_into_module_namespace(main.__globals__,
args.pandas_mode, args.ray_tmpdir, args.ray_memory)

etl_pandas_args = {'filename': args.file, 'columns_names': columns_names, 'columns_types': columns_types_pd}
x_train_pandas, y_train_pandas, x_valid_pandas, y_valid_pandas, etl_times_pandas = query_measurement_etl(etl_pandas,
etl_pandas_args,
args.iterations,
"etl_pandas")

print_times_nested(etl_times_pandas, name=args.pandas_mode)

etl_ibis_args = {
"args": args,
"run_import_queries": "False",
"columns_names": columns_names,
"columns_types": columns_types_ibis,
"validation": "False",
}
(
x_train_ibis,
y_train_ibis,
x_valid_ibis,
y_valid_ibis,
etl_times_ibis,
) = query_measurement_etl(etl_ibis, etl_ibis_args, args.iterations, "etl_ibis")

print_times_nested(etl_times_ibis, name="Ibis")
if db_reporter is not None:
submit_results_to_db(
db_reporter=db_reporter,
args=args,
backend="etl_ibis",
results=etl_times_ibis,
)

import_pandas_into_module_namespace(
main.__globals__, args.pandas_mode, args.ray_tmpdir, args.ray_memory
)

etl_pandas_args = {
"filename": args.file,
"columns_names": columns_names,
"columns_types": columns_types_pd,
}
(
x_train_pandas,
y_train_pandas,
x_valid_pandas,
y_valid_pandas,
etl_times_pandas,
) = query_measurement_etl(
etl_pandas, etl_pandas_args, args.iterations, "etl_pandas"
)

print_times_nested(etl_times_pandas, name=args.pandas_mode)

if db_reporter is not None:
submit_results_to_db(
db_reporter=db_reporter,
args=args,
backend="etl_pandas",
results=etl_times_pandas,
)

if not args.no_ml:
ml_args_pd = {
"x_train": x_train_pandas,
"y_train": y_train_pandas,
"x_valid": x_valid_pandas,
"y_valid": y_valid_pandas,
}
score_mse_pandas, score_cod_pandas, ml_times_pandas = query_measurement_ml(
ml, ml_args_pd, args.iterations, "ml"
)
print("Scores with etl_pandas ML inputs: ")
print(" mse = ", score_mse_pandas)
print(" cod = ", score_cod_pandas)
print_times_nested(ml_times_pandas)
if db_reporter is not None:
submit_results_to_db(
db_reporter=db_reporter,
args=args,
backend="ml_pandas",
results=ml_times_pandas,
)

ml_args_ibis = {
"x_train": x_train_ibis,
"y_train": y_train_ibis,
"x_valid": x_valid_ibis,
"y_valid": y_valid_ibis,
}
score_mse_ibis, score_cod_ibis, ml_times_ibis = query_measurement_ml(
ml, ml_args_ibis, args.iterations, "ml"
)
print("Scores with etl_ibis ML inputs: ")
print(" mse = ", score_mse_ibis)
print(" cod = ", score_cod_ibis)
print_times_nested(ml_times_ibis)
if db_reporter is not None:
submit_results_to_db(
db_reporter=db_reporter,
args=args,
backend="ml_ibis",
results=ml_times_pandas,
)

# Results validation block (comparison of etl_ibis and etl_pandas outputs)
if args.val:
print("Validation of ETL query results with original input table ...")
cols_to_sort = ["var_0", "var_1", "var_2", "var_3", "var_4"]

x_ibis = pd.concat([x_train_ibis, x_valid_ibis])
x_ibis = x_ibis.sort_values(by=cols_to_sort)
x_pandas = pd.concat([x_train_pandas, x_valid_pandas])
x_pandas = x_pandas.sort_values(by=cols_to_sort)

print("Validating queries results (var_xx columns) ...")
compare_result1 = compare_dataframes(
ibis_df=[x_ibis[var_cols]], pandas_df=[x_pandas[var_cols]]
)
print("Validating queries results (var_xx_count columns) ...")
compare_result2 = compare_dataframes(
ibis_df=[x_ibis[count_cols]], pandas_df=[x_pandas[count_cols]]
)
print("Validating queries results (var_xx_gt1 columns) ...")
compare_result3 = compare_dataframes(
ibis_df=[x_ibis[gt1_cols]], pandas_df=[x_pandas[gt1_cols]]
)
submit_results_to_db(db_reporter=db_reporter, args=args, backend='etl_pandas', results=etl_times_pandas)

if not args.no_ml:
print("Validation of ML queries results ...")
if score_mse_ibis == score_mse_pandas:
print("Scores mse are equal!")
else:
print(
"Scores mse are unequal, score mse Ibis =",
score_mse_ibis,
"score mse Pandas =",
score_mse_pandas,
)

if score_mse_ibis == score_mse_pandas:
print("Scores cod are equal!")
else:
print(
"Scores cod are unequal, score cod Ibis =",
score_cod_ibis,
"score cod Pandas =",
score_cod_pandas,
)

"""
ml_args_pd = {'x_train': x_train_pandas, 'y_train': y_train_pandas,
'x_valid': x_valid_pandas, 'y_valid': y_valid_pandas}
score_mse_pandas, score_cod_pandas, ml_times_pandas = query_measurement_ml(ml,
ml_args_pd,
args.iterations,
"ml")
print('Scores with etl_pandas ML inputs: ')
print(' mse = ', score_mse_pandas)
print(' cod = ', score_cod_pandas)
print_times_nested(ml_times_pandas)
if db_reporter is not None:
submit_results_to_db(db_reporter=db_reporter, args=args, backend='ml_pandas', results=ml_times_pandas)

if not args.no_ibis:
ml_args_ibis = {'x_train': x_train_ibis, 'y_train': y_train_ibis,
'x_valid': x_valid_ibis, 'y_valid': y_valid_ibis}
score_mse_ibis, score_cod_ibis, ml_times_ibis = query_measurement_ml(ml,
ml_args_ibis,
args.iterations,
"ml")
print('Scores with etl_ibis ML inputs: ')
print(' mse = ', score_mse_ibis)
print(' cod = ', score_cod_ibis)
print_times_nested(ml_times_ibis)
if db_reporter is not None:
submit_results_to_db(db_reporter=db_reporter, args=args, backend='ml_ibis', results=ml_times_pandas)


# Results validation block (comparison of etl_ibis and etl_pandas outputs)
if args.val and not args.no_ibis:
print("Validation of ETL query results with original input table ...")
cols_to_sort = ['var_0', 'var_1', 'var_2', 'var_3', 'var_4']

x_ibis = pd.concat([x_train_ibis, x_valid_ibis])
y_ibis = pd.concat([y_train_ibis, y_valid_ibis])
etl_ibis_res = pd.concat([x_ibis, y_ibis], axis=1)
etl_ibis_res = etl_ibis_res.sort_values(by=cols_to_sort)
x_pandas = pd.concat([x_train_pandas, x_valid_pandas])
y_pandas = pd.concat([y_train_pandas, y_valid_pandas])
etl_pandas_res = pd.concat([x_pandas, y_pandas], axis=1)
etl_pandas_res = etl_pandas_res.sort_values(by=cols_to_sort)

print("Validating queries results (var_xx columns) ...")
compare_result1 = compare_dataframes(ibis_df=[etl_ibis_res[var_cols]],
pandas_df=[etl_pandas_res[var_cols]])
print("Validating queries results (var_xx_count columns) ...")
compare_result2 = compare_dataframes(ibis_df=[etl_ibis_res[count_cols]],
pandas_df=[etl_pandas_res[count_cols]])
print("Validating queries results (var_xx_gt1 columns) ...")
compare_result3 = compare_dataframes(ibis_df=[etl_ibis_res[gt1_cols]],
pandas_df=[etl_pandas_res[gt1_cols]])
print("Validating queries results (target column) ...")
compare_result4 = compare_dataframes(ibis_df=[etl_ibis_res['target0']],
pandas_df=[etl_pandas_res['target']])

if not args.no_ml:
print("Validation of ML queries results ...")
if score_mse_ibis == score_mse_pandas:
print("Scores mse are equal!")
else:
print("Scores mse are unequal, score mse Ibis =", score_mse_ibis,
"score mse Pandas =", score_mse_pandas)

if score_cod_ibis == score_cod_pandas:
print("Scores cod are equal!")
else:
print("Scores cod are unequal, score cod Ibis =", score_cod_ibis,
"score cod Pandas =", score_cod_pandas)

except Exception as err:
print("Failed: ", err)
sys.exit(1)
finally:
if omnisci_server:
omnisci_server.terminate()

"""


if __name__ == "__main__":
main()

0 comments on commit 6570934

Please sign in to comment.