diff --git a/pandas/join-pandas.py b/pandas/join-pandas.py index 5c3b45b0..5ad2c1fa 100755 --- a/pandas/join-pandas.py +++ b/pandas/join-pandas.py @@ -1,12 +1,10 @@ -#!/usr/bin/env python - -print("# join-pandas.py") - +#!/usr/bin/env python3 import os import gc import timeit import pandas as pd -# import pydoop.hdfs as hd +import sys +import argparse exec(open("./helpers.py").read()) @@ -23,9 +21,9 @@ fun = "merge" cache = "TRUE" -if os.path.basename(src_x)=="X1e9_2c.csv": - print("# join with pandas skipped for 1e9 x 1e9 (20GB x 20GB) due to memory error on 125GB mem machine") - exit(0) +if os.path.basename(src_x) == "X1e9_2c.csv": + print("# join with pandas skipped for 1e9 x 1e9 (20GB x 20GB) due to memory error on 125GB mem machine") + sys.exit(0) print("loading datasets...") @@ -39,40 +37,25 @@ print("joining...") -gc.collect() -t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) -del ans +def run_benchmark(run: int): + '''Run benchark for join operation. + + run: The iteration number of current run. + ''' + t_start = timeit.default_timer() + ans = x.merge(y, how='inner', on='KEY') + print('Shape:', ans.shape) + m = memory_usage() + t_start = timeit.default_timer() + chk = [ans['X2'].sum(), ans['Y2'].sum()] + chkt = timeit.default_timer() - t_start + write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=run, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) -gc.collect() -t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) -del ans -gc.collect() -t_start = timeit.default_timer() -ans = x.merge(y, how='inner', on='KEY') -print(ans.shape) -t = timeit.default_timer() - t_start -m = memory_usage() -t_start = timeit.default_timer() -chk = [ans['X2'].sum(), ans['Y2'].sum()] -chkt = timeit.default_timer() - t_start -write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=3, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt) -del ans +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Run pandas join operation.') + parser.add_argument('--run', type=int, required=True) + args = parser.parse_args() -exit(0) + run_benchmark(args.run) + sys.exit(0)