diff --git a/README.md b/README.md index aff547669f76f..547310ebeb3c1 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,11 @@ # Benchmarking scripts that are used to run OmniSciDB benchmarks in automated way in TeamCity and for performance analyzes in development cycle. ## Requirements -Scripts require the following python3 packages to be installed: -pymapd, braceexpand, mysql-connector-python. OmnisciDB server often +Scripts require to be installed: +* the following python3 packages: pymapd, braceexpand, mysql-connector-python; +* conda or miniconda for ibis tests and benchmarks. + +OmnisciDB server often requires a lot of open files, so it is a good idea to run it with `ulimit -n 10000`. @@ -91,3 +94,72 @@ Sample script command line: ``` python3 taxi/taxibench_pandas.py -df 2 -i 5 -dp '/datadir/taxi/trips_*.csv.gz' ``` + +## Ibis script + +Ibis build, tests and benchmarks run through `run_ibis_test.py`. It has three distinct +modes of operation: +* build and install ibis; +* run ibis tests using pytest; +* run benchmarks using Omnisci. + +Parameters which can be used: + +Switch | Default value | Meaning +------ | ------------- | ------- +-t, --task | | Task for execute from supported list [build, test, benchmark]. Use "," separator for multiple tasks. +-en, --env_name | ibis-tests | Conda env name. +-ec, --env_check | False | Check if env exists. If it exists don't recreate. +-s, --save_env | False | Save conda env after executing. +-r, --report_path | parent dir of omnscripts | Path to report file. +-ci, --ci_requirements | ci_requirements.yml | File with ci requirements for conda env. +-py, --python_version | 3.7 | File with ci requirements for conda env. +-i, --ibis_path | | Path to ibis directory. +-e, --executable | | Path to omnisci_server executable. +-w, --workdir | | Path to omnisci working directory. By default parent directory of executable location is used. Data directory is used in this location. +-o, --omnisci_port | 6274 | TCP port number to run omnisci_server on. +-u, --user | admin | User name to use on omniscidb server. +-p, --password | HyperInteractive | User password to use on omniscidb server. +-n, --name | agent_test_ibis | Database name to use in omniscidb server. +-commit_omnisci | 123456... | Omnisci commit hash to use for tests. +-commit_ibis | 123456... | Ibis commit hash to use for tests. + +For benchmark task and recording its results in a MySQL database: + +Switch | Default value | Meaning +------ | ------------- | ------- +-bn, --bench_name | | Benchmark name from supported list [ny_taxi, santander] +-db-server | localhost | Host name of MySQL server. +-db-port | 3306 | Port number of MySQL server. +-db-user | | Username to use to connect to MySQL database. If user name is specified, script attempts to store results in MySQL database using other -db-* parameters. +-db-pass | omniscidb | Password to use to connect to MySQL database. +-db-name | omniscidb | MySQL database to use to store benchmark results. +-db-table | | Table to use to store results for this benchmark. +-df, --dfiles_num | 1 | Number of datafiles to input into database for processing. +-dp, --dpattern | | Wildcard pattern of datafiles that should be loaded. +-it, --iters | 5 | Number of iterations to run every query. Best result is selected. + +Script automatically creates conda environment if it doesn't exist or you want to recreate it, +starts up omniscidb server, creates and initializes data directory if it doesn't exist or it is not +initialized. All subsequent work is being done in created conda environment. Environment can be +removed or saved after executing. + +Sample build ibis command line: +``` +python3 run_ibis_tests.py --env_name ibis-test --env_check False --save_env True --python_version 3.7 --task build --name agent_test_ibis --ci_requirements /localdisk/username/omniscripts/ci_requirements.yml --ibis_path /localdisk/username/ibis/ --executable /localdisk/username/omniscidb/release/bin/omnisci_server +``` + +Sample run ibis tests command line: +``` +python3 run_ibis_tests.py --env_name ibis-test --env_check True --save_env True --python_version 3.7 --task test --name agent_test_ibis --report /localdisk/username/ --ibis_path /localdisk/username/ibis/ --executable /localdisk/username/omniscidb/build/bin/omnisci_server --user admin --password HyperInteractive +``` + +Sample run taxi benchmark command line: +``` +python3 run_ibis_tests.py --env_name ibis-test --env_check True --python_version 3.7 --task benchmark --ci_requirements /localdisk/username/omniscripts/ci_requirements.yml --save_env True --report /localdisk/username/ --ibis_path /localdisk/username/ibis/ --executable /localdisk/username/omniscidb/build/bin/omnisci_server -u admin -p HyperInteractive -n agent_test_ibis --bench_name ny_taxi --dfiles_num 20 --dpattern '/localdisk/username/benchmark_datasets/taxi/trips_xa{a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t}.csv.gz' --iters 5 -db-server localhost -db-port 3306 -db-user user -db-pass omniscidb -db-name omniscidb -db-table taxibench_ibis +``` + +Sample run santander benchmark command line: +``` +python3 run_ibis_tests.py --env_name ibis-test --env_check True --python_version 3.7 --task benchmark --ci_requirements /localdisk/username/omniscripts/ci_requirements.yml --save_env True --report /localdisk/username/ --ibis_path /localdisk/username/ibis/ --executable /localdisk/username/omniscidb/build/bin/omnisci_server -u admin -p HyperInteractive -n agent_test_ibis --bench_name santander --dpattern '/localdisk/benchmark_datasets/santander/train.csv.gz' --iters 5 -db-server localhost -db-port 3306 -db-user user -db-pass omniscidb -db-name omniscidb -db-table santander_ibis +``` \ No newline at end of file diff --git a/ci_requirements.yml b/ci_requirements.yml new file mode 100644 index 0000000000000..017ef83c4e58d --- /dev/null +++ b/ci_requirements.yml @@ -0,0 +1,5 @@ + - pytest-html + - braceexpand + - mysql + - mysql-connector-python + \ No newline at end of file diff --git a/omnisci.conf b/omnisci.conf index fc139bec93930..6d4aa5bc375c0 100644 --- a/omnisci.conf +++ b/omnisci.conf @@ -5,6 +5,7 @@ data = "data" read-only = false verbose = false enable-watchdog = false +allow-cpu-retry = true [web] port = 62073 diff --git a/report/__init__ .py b/report/__init__ .py new file mode 100644 index 0000000000000..46a3b2b2f404d --- /dev/null +++ b/report/__init__ .py @@ -0,0 +1 @@ +from .report import DbReport diff --git a/report/report.py b/report/report.py index 40d628b709beb..1a90ad60fa5c6 100644 --- a/report/report.py +++ b/report/report.py @@ -4,6 +4,7 @@ import platform import subprocess + class DbReport: "Initialize and submit reports to MySQL database" @@ -123,4 +124,3 @@ def submit(self, benchmark_specific_values): print("Executing statement", sql_statement) self.__database.cursor().execute(sql_statement) self.__database.commit() - diff --git a/run_ibis_tests.py b/run_ibis_tests.py new file mode 100644 index 0000000000000..2b7d0ac2077cb --- /dev/null +++ b/run_ibis_tests.py @@ -0,0 +1,279 @@ +import os +import sys +import argparse +from server import OmnisciServer +from server import execute_process + + +def str_arg_to_bool(v): + if isinstance(v, bool): + return v + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise argparse.ArgumentTypeError('Cannot recognize boolean value.') + + +def add_conda_execution(cmdline): + cmd_res = ['conda', 'run', '-n', args.env_name] + cmd_res.extend(cmdline) + return cmd_res + + +def combinate_requirements(ibis, ci, res): + with open(res, "w") as f_res: + with open(ibis) as f_ibis: + for line in f_ibis: + f_res.write(line) + with open(ci) as f_ci: + for line in f_ci: + f_res.write(line) + + +omniscript_path = os.path.dirname(__file__) +omnisci_server = None +args = None + +parser = argparse.ArgumentParser(description='Run internal tests from ibis project') +optional = parser._action_groups.pop() +required = parser.add_argument_group("required arguments") +parser._action_groups.append(optional) + +possible_tasks = ['build', 'test', 'benchmark'] +benchmarks = {'ny_taxi': os.path.join(omniscript_path, "taxi", "taxibench_ibis.py"), + 'santander': os.path.join(omniscript_path, "santander", "santander_ibis.py")} +# Task +required.add_argument("-t", "--task", dest="task", required=True, + help=f"Task for execute {possible_tasks}. Use , separator for multiple tasks") + +# Environment +required.add_argument('-en', '--env_name', dest="env_name", default="ibis-tests", + help="Conda env name.") +optional.add_argument('-ec', '--env_check', dest="env_check", default=False, type=str_arg_to_bool, + help="Check if env exists. If it exists don't recreate.") +optional.add_argument('-s', '--save_env', dest="save_env", default=False, type=str_arg_to_bool, + help="Save conda env after executing.") +optional.add_argument('-r', '--report_path', dest="report_path", + default=os.path.join(omniscript_path, ".."), help="Path to report file.") +optional.add_argument('-ci', '--ci_requirements', dest="ci_requirements", + default=os.path.join(omniscript_path, "ci_requirements.yml"), + help="File with ci requirements for conda env.") +optional.add_argument('-py', '--python_version', dest="python_version", default="3.7", + help="File with ci requirements for conda env.") +# Ibis +required.add_argument('-i', '--ibis_path', dest="ibis_path", required=True, + help="Path to ibis directory.") +# Benchmarks +optional.add_argument('-bn', '--bench_name', dest="bench_name", choices=list(benchmarks.keys()), + help=f"Benchmark name.") +optional.add_argument('-df', '--dfiles_num', dest="dfiles_num", default=1, type=int, + help="Number of datafiles to input into database for processing.") +optional.add_argument('-dp', '--dpattern', dest="dpattern", + help="Wildcard pattern of datafiles that should be loaded.") +optional.add_argument('-it', '--iters', default=5, type=int, dest="iters", + help="Number of iterations to run every query. Best result is selected.") +# MySQL database parameters +optional.add_argument('-db-server', dest="db_server", default="localhost", + help="Host name of MySQL server.") +optional.add_argument('-db-port', dest="db_port", default=3306, type=int, + help="Port number of MySQL server.") +optional.add_argument('-db-user', dest="db_user", default="", + help="Username to use to connect to MySQL database. " + "If user name is specified, script attempts to store results in MySQL " + "database using other -db-* parameters.") +optional.add_argument('-db-pass', dest="db_password", default="omniscidb", + help="Password to use to connect to MySQL database.") +optional.add_argument('-db-name', dest="db_name", default="omniscidb", + help="MySQL database to use to store benchmark results.") +optional.add_argument('-db-table', dest="db_table", + help="Table to use to store results for this benchmark.") +# Omnisci server parameters +optional.add_argument("-e", "--executable", dest="omnisci_executable", required=True, + help="Path to omnisci_server executable.") +optional.add_argument("-w", "--workdir", dest="omnisci_cwd", + help="Path to omnisci working directory. " + "By default parent directory of executable location is used. " + "Data directory is used in this location.") +optional.add_argument("-o", "--omnisci_port", dest="omnisci_port", default=6274, type=int, + help="TCP port number to run omnisci_server on.") +optional.add_argument("-u", "--user", dest="user", default="admin", + help="User name to use on omniscidb server.") +optional.add_argument("-p", "--password", dest="password", default="HyperInteractive", + help="User password to use on omniscidb server.") +optional.add_argument("-n", "--name", dest="name", default="agent_test_ibis", required=True, + help="Database name to use in omniscidb server.") + +optional.add_argument("-commit_omnisci", dest="commit_omnisci", + default="1234567890123456789012345678901234567890", + help="Omnisci commit hash to use for tests.") +optional.add_argument("-commit_ibis", dest="commit_ibis", + default="1234567890123456789012345678901234567890", + help="Ibis commit hash to use for tests.") + +try: + args = parser.parse_args() + + os.environ["IBIS_TEST_OMNISCIDB_DATABASE"] = args.name + os.environ["IBIS_TEST_DATA_DB"] = args.name + + required_tasks = args.task.split(',') + tasks = {} + task_checker = False + for task in possible_tasks: + if task in required_tasks: + tasks[task] = True + task_checker = True + else: + tasks[task] = False + if not task_checker: + print(f"Only {list(tasks.keys())} are supported, {required_tasks} cannot find possible tasks") + sys.exit(1) + + if args.python_version not in ['3.7', '3,6']: + print(f"Only 3.7 and 3.6 python versions are supported, {args.python_version} is not supported") + sys.exit(1) + ibis_requirements = os.path.join(args.ibis_path, "ci", + f"requirements-{args.python_version}-dev.yml") + ibis_data_script = os.path.join(args.ibis_path, "ci", "datamgr.py") + + requirements_file = "requirements.yml" + report_file_name = f"report-{args.commit_ibis[:8]}-{args.commit_omnisci[:8]}.html" + if not os.path.isdir(args.report_path): + os.makedirs(args.report_path) + report_file_path = os.path.join(args.report_path, report_file_name) + + install_ibis_cmdline = ['python3', + os.path.join('setup.py'), + 'install', + '--user'] + + check_env_cmdline = ['conda', + 'env', + 'list'] + + create_env_cmdline = ['conda', + 'env', + 'create', + '--name', args.env_name, + '--file', requirements_file] + + remove_env_cmdline = ['conda', + 'env', + 'remove', + '--name', args.env_name] + + dataset_download_cmdline = ['python3', + ibis_data_script, + 'download'] + + dataset_import_cmdline = ['python3', + ibis_data_script, + 'omniscidb', + '-P', str(args.omnisci_port), + '--database', args.name] + + ibis_tests_cmdline = ['pytest', + '-m', 'omniscidb', + '--disable-pytest-warnings', + f'--html={report_file_path}'] + + if tasks['benchmark']: + if not args.bench_name or args.bench_name not in benchmarks.keys(): + print(f"Benchmark {args.bench_name} is not supported, only {list(benchmarks.keys())} are supported") + sys.exit(1) + + if not args.dpattern: + print(f"Parameter --dpattern was received empty, but it is required for benchmarks") + sys.exit(1) + + benchmarks_cmd = {} + + ny_taxi_bench_cmdline = ['python3', + benchmarks[args.bench_name], + '-e', args.omnisci_executable, + '-port', str(args.omnisci_port), + '-db-port', str(args.db_port), + '-df', str(args.dfiles_num), + '-dp', f"'{args.dpattern}'", + '-i', str(args.iters), + '-u', args.user, + '-p', args.password, + '-db-server', args.db_server, + '-n', args.name, + f'-db-user={args.db_user}', + '-db-pass', args.db_password, + '-db-name', args.db_name, + '-db-table', + args.db_table if args.db_table else 'taxibench_ibis', + '-commit_omnisci', args.commit_omnisci, + '-commit_ibis', args.commit_ibis] + + benchmarks_cmd['ny_taxi'] = ny_taxi_bench_cmdline + + santander_bench_cmdline = ['python3', + benchmarks[args.bench_name], + '-e', args.omnisci_executable, + '-port', str(args.omnisci_port), + '-db-port', str(args.db_port), + '-dp', f"'{args.dpattern}'", + '-i', str(args.iters), + '-u', args.user, + '-p', args.password, + '-db-server', args.db_server, + '-n', args.name, + f'-db-user={args.db_user}', + '-db-pass', args.db_password, + '-db-name', args.db_name, + '-db-table', + args.db_table if args.db_table else 'santander_ibis', + '-commit_omnisci', args.commit_omnisci, + '-commit_ibis', args.commit_ibis] + + benchmarks_cmd['santander'] = santander_bench_cmdline + + print("PREPARING ENVIRONMENT") + combinate_requirements(ibis_requirements, args.ci_requirements, requirements_file) + _, envs = execute_process(check_env_cmdline) + if args.env_name in envs: + if args.env_check is False: + execute_process(remove_env_cmdline) + execute_process(create_env_cmdline, print_output=False) + else: + execute_process(create_env_cmdline, print_output=False) + + if tasks['build']: + print("IBIS INSTALLATION") + execute_process(add_conda_execution(install_ibis_cmdline), cwd=args.ibis_path, + print_output=False) + + if tasks['test']: + print("STARTING OMNISCI SERVER") + omnisci_server = OmnisciServer(omnisci_executable=args.omnisci_executable, + omnisci_port=args.omnisci_port, database_name=args.name, + omnisci_cwd=args.omnisci_cwd, user=args.user, + password=args.password) + omnisci_server.launch() + + if tasks['test']: + print("PREPARING DATA") + execute_process(add_conda_execution(dataset_download_cmdline)) + execute_process(add_conda_execution(dataset_import_cmdline)) + + print("RUNNING TESTS") + execute_process(add_conda_execution(ibis_tests_cmdline), cwd=args.ibis_path) + + if tasks['benchmark']: + print(f"RUNNING BENCHMARK {args.bench_name}") + execute_process(add_conda_execution(benchmarks_cmd[args.bench_name])) + +except Exception as err: + print("Failed", err) + sys.exit(1) + +finally: + if omnisci_server: + omnisci_server.terminate() + if args and args.save_env is False: + execute_process(remove_env_cmdline) diff --git a/run_omnisci_benchmark.py b/run_omnisci_benchmark.py index 02464bfb7113f..f82250f119539 100644 --- a/run_omnisci_benchmark.py +++ b/run_omnisci_benchmark.py @@ -1,29 +1,19 @@ from braceexpand import braceexpand import mysql.connector -import subprocess -import threading import argparse import pathlib -import signal import glob -import time import json import copy import sys import os -import io +from server import OmnisciServer +from server import execute_process +from report import DbReport -def execute_process(cmdline, cwd=None): - try: - process = subprocess.Popen(cmdline, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - out = process.communicate()[0].strip().decode() - print(out) - except OSError as err: - print("Failed to start", omnisciCmdLine, err) - if process.returncode != 0: - raise Exception("Command returned {}".format(process.returncode)) -def execute_benchmark(datafiles, import_cmdline, benchmark_cwd, benchmark_cmdline, fragment_size, results_file_name, report): +def execute_benchmark(datafiles, import_cmdline, benchmark_cwd, benchmark_cmdline, + fragment_size, results_file_name, report): if import_cmdline is not None: ic = copy.copy(import_cmdline) # Import dataset mode @@ -71,16 +61,10 @@ def execute_benchmark(datafiles, import_cmdline, benchmark_cwd, benchmark_cmdlin 'AverageTotalTimeMS': result['results']['query_total_avg'] }) -def print_omnisci_output(stdout): - for line in iter(stdout.readline, b''): - print("OMNISCI>>", line.decode().strip()) +omnisci_server = None -# Load database reporting functions -pathToReportDir = os.path.join(pathlib.Path(__file__).parent, "report") -sys.path.insert(1, pathToReportDir) -import report - -parser = argparse.ArgumentParser(description='Run arbitrary omnisci benchmark and submit report values to MySQL database') +parser = argparse.ArgumentParser(description='Run arbitrary omnisci benchmark and submit report ' + 'values to MySQL database') optional = parser._action_groups.pop() required = parser.add_argument_group("required arguments") parser._action_groups.append(optional) @@ -94,7 +78,8 @@ def print_omnisci_output(stdout): required.add_argument("-e", "--executable", dest="omnisci_executable", required=True, help="Path to omnisci_server executable.") optional.add_argument("-w", "--workdir", dest="omnisci_cwd", - help="Path to omnisci working directory. By default parent directory of executable location is used. Data directory is used in this location.") + help="Path to omnisci working directory. By default parent directory of " + "executable location is used. Data directory is used in this location.") optional.add_argument("-o", "--port", dest="omnisci_port", default=62274, type=int, help="TCP port number to run omnisci_server on.") required.add_argument("-u", "--user", dest="user", default="admin", required=True, @@ -104,45 +89,68 @@ def print_omnisci_output(stdout): required.add_argument("-n", "--name", dest="name", default="omnisci", required=True, help="Database name to use on omniscidb server.") required.add_argument("-t", "--import-table-name", dest="import_table_name", required=True, - help="Name of table to import data to. NOTE: This table will be dropped before and after the import test.") + help="Name of table to import data to. NOTE: This table will be dropped " + "before and after the import test.") # Required by omnisci benchmark scripts required.add_argument("-l", "--label", dest="label", required=True, help="Benchmark run label.") required.add_argument("-i", "--iterations", dest="iterations", type=int, required=True, help="Number of iterations per query. Must be > 1") required.add_argument("-m", "--mode", dest="mode", choices=['synthetic', 'dataset'], required=True, - help="Select benchmark mode. It is either synthetic or dataset. Required switches for synthetic benchmark are --synthetic-query, --num-fragments and --fragment-size. Required switches for dataset benchmark are --import-file, --table-schema-file and --queries-dir and --fragment-size is optional.") + help="Select benchmark mode. It is either synthetic or dataset. Required " + "switches for synthetic benchmark are --synthetic-query, --num-fragments" + " and --fragment-size. Required switches for dataset benchmark are " + "--import-file, --table-schema-file and --queries-dir and " + "--fragment-size is optional.") # Fragment size optional.add_argument('-fs', '--fragment-size', dest="fragment_size", action='append', type=int, - help="Fragment size to use for created table. Multiple values are allowed and encouraged. If no -fs switch is specified, default fragment size is used and templated CREATE TABLE sql files cannot be used.") + help="Fragment size to use for created table. Multiple values are allowed " + "and encouraged. If no -fs switch is specified, default fragment size " + "is used and templated CREATE TABLE sql files cannot be used.") # Required for synthetic benchmarks optional.add_argument("-nf", "--num-fragments", dest="num_synthetic_fragments", - help="Number of fragments to generate for synthetic benchmark. Dataset size is fragment_size * num_fragments.") -optional.add_argument("-sq", "--synthetic-query", choices=['BaselineHash', 'MultiStep', 'NonGroupedAgg', 'PerfectHashMultiCol', 'PerfectHashSingleCol', 'Sort'], dest="synthetic_query", + help="Number of fragments to generate for synthetic benchmark. Dataset size " + "is fragment_size * num_fragments.") +optional.add_argument("-sq", "--synthetic-query", choices=['BaselineHash', 'MultiStep', + 'NonGroupedAgg', 'PerfectHashMultiCol', + 'PerfectHashSingleCol', 'Sort'], + dest="synthetic_query", help="Synthetic benchmark query group.") # Required for traditional data benchmarks optional.add_argument("-f", "--import-file", dest="import_file", - help="Absolute path to file or wildcard on omnisci_server machine with data for import test. If wildcard is used, all files are imported in one COPY statement. Limiting number of files is possible using curly braces wildcard, e.g. trips_xa{a,b,c}.csv.gz.") + help="Absolute path to file or wildcard on omnisci_server machine with data " + "for import test. If wildcard is used, all files are imported in one " + "COPY statement. Limiting number of files is possible using curly " + "braces wildcard, e.g. trips_xa{a,b,c}.csv.gz.") optional.add_argument("-c", "--table-schema-file", dest="table_schema_file", - help="Path to local file with CREATE TABLE sql statement for the import table.") + help="Path to local file with CREATE TABLE sql statement for " + "the import table.") optional.add_argument("-d", "--queries-dir", dest="queries_dir", help='Absolute path to dir with query files.') # MySQL database parameters optional.add_argument("-db-server", default="localhost", help="Host name of MySQL server.") optional.add_argument("-db-port", default=3306, type=int, help="Port number of MySQL server.") -optional.add_argument("-db-user", default="", help="Username to use to connect to MySQL database. If user name is specified, script attempts to store results in MySQL database using other -db-* parameters.") -optional.add_argument("-db-pass", default="omniscidb", help="Password to use to connect to MySQL database.") -optional.add_argument("-db-name", default="omniscidb", help="MySQL database to use to store benchmark results.") +optional.add_argument("-db-user", default="", + help="Username to use to connect to MySQL database. " + "If user name is specified, script attempts to store results in MySQL " + "database using other -db-* parameters.") +optional.add_argument("-db-pass", default="omniscidb", + help="Password to use to connect to MySQL database.") +optional.add_argument("-db-name", default="omniscidb", + help="MySQL database to use to store benchmark results.") optional.add_argument("-db-table", help="Table to use to store results for this benchmark.") -optional.add_argument("-commit", default="1234567890123456789012345678901234567890", help="Commit hash to use to record this benchmark results.") +optional.add_argument("-commit", default="1234567890123456789012345678901234567890", + help="Commit hash to use to record this benchmark results.") args = parser.parse_args() +args.import_file=args.import_file.replace("'","") + if args.omnisci_cwd is not None: server_cwd = args.omnisci_cwd else: @@ -157,13 +165,6 @@ def print_omnisci_output(stdout): initdb_executable = os.path.join(pathlib.Path(args.omnisci_executable).parent, "initdb") execute_process([initdb_executable, '-f', '--data', data_dir]) -server_cmdline = [args.omnisci_executable, - 'data', - '--port', str(args.omnisci_port), - '--http-port', "62278", - '--calcite-port', "62279", - '--config', 'omnisci.conf'] - dataset_import_cmdline = ['python3', os.path.join(args.benchmarks_path, 'run_benchmark_import.py'), '-u', args.user, @@ -213,16 +214,20 @@ def print_omnisci_output(stdout): '--result_dir', 'synthetic_results'] if args.mode == 'synthetic': - if args.synthetic_query is None or args.num_synthetic_fragments is None or args.fragment_size is None: - print("For synthetic type of benchmark the following parameters are mandatory: --synthetic-query, --num-fragments and --fragment-size.") + if args.synthetic_query is None or args.num_synthetic_fragments is None \ + or args.fragment_size is None: + print("For synthetic type of benchmark the following parameters are mandatory: " + "--synthetic-query, --num-fragments and --fragment-size.") sys.exit(3) datafiles = 0 - results_file_name = os.path.join(args.benchmarks_path, 'synthetic_results', args.label, 'CPU', 'Benchmarks', args.synthetic_query + '.json') + results_file_name = os.path.join(args.benchmarks_path, 'synthetic_results', args.label, + 'CPU', 'Benchmarks', args.synthetic_query + '.json') import_cmdline = None benchmark_cmdline = synthetic_benchmark_cmdline else: if args.import_file is None or args.table_schema_file is None or args.queries_dir is None: - print("For dataset type of benchmark the following parameters are mandatory: --import-file, --table-schema-file and --queries-dir and --fragment-size is optional.") + print("For dataset type of benchmark the following parameters are mandatory: --import-file," + " --table-schema-file and --queries-dir and --fragment-size is optional.") sys.exit(3) datafiles_names = list(braceexpand(args.import_file)) datafiles_names = sorted([x for f in datafiles_names for x in glob.glob(f)]) @@ -238,8 +243,9 @@ def print_omnisci_output(stdout): print("--db-table parameter is mandatory to store results in MySQL database") sys.exit(4) print("CONNECTING TO DATABASE") - db = mysql.connector.connect(host=args.db_server, port=args.db_port, user=args.db_user, passwd=args.db_pass, db=args.db_name); - db_reporter = report.DbReport(db, args.db_table, { + db = mysql.connector.connect(host=args.db_server, port=args.db_port, user=args.db_user, + passwd=args.db_pass, db=args.db_name) + db_reporter = DbReport(db, args.db_table, { 'FilesNumber': 'INT UNSIGNED NOT NULL', 'FragmentSize': 'BIGINT UNSIGNED NOT NULL', 'BenchName': 'VARCHAR(500) NOT NULL', @@ -254,22 +260,18 @@ def print_omnisci_output(stdout): 'CommitHash': args.commit }) -try: - server_process = subprocess.Popen(server_cmdline, cwd=server_cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) -except OSError as err: - print("Failed to start", omnisciCmdLine, err) - sys.exit(1) try: - pt = threading.Thread(target=print_omnisci_output, args=(server_process.stdout,), daemon=True) - pt.start() - - # Allow server to start up. It has to open TCP port and start - # listening, otherwise the following benchmarks fail. - time.sleep(5) + omnisci_server = OmnisciServer(omnisci_executable=args.omnisci_executable, + omnisci_port=args.omnisci_port,database_name=args.name, + omnisci_cwd=args.omnisci_cwd, user=args.user, + password=args.passwd) + omnisci_server.launch() with open(args.report, "w") as report: - print("datafiles,fragment_size,query,query_exec_min,query_total_min,query_exec_max,query_total_max,query_exec_avg,query_total_avg,query_error_info", file=report, flush=True) + print("datafiles,fragment_size,query,query_exec_min,query_total_min,query_exec_max," + "query_total_max,query_exec_avg,query_total_avg,query_error_info", + file=report, flush=True) if args.fragment_size is not None: for fs in args.fragment_size: print("RUNNING WITH FRAGMENT SIZE", fs) @@ -281,8 +283,5 @@ def print_omnisci_output(stdout): benchmark_cmdline, None, results_file_name, report) finally: print("TERMINATING SERVER") - server_process.send_signal(signal.SIGINT) - time.sleep(2) - server_process.kill() - time.sleep(1) - server_process.terminate() + if omnisci_server: + omnisci_server.terminate() diff --git a/santander/santander_ibis.py b/santander/santander_ibis.py index 56c0a5fdd7316..e4a6430e72ea6 100644 --- a/santander/santander_ibis.py +++ b/santander/santander_ibis.py @@ -1,251 +1,82 @@ import mysql.connector -import pandas as pd -import numpy as np -import subprocess -import threading import argparse -import pathlib import time -import glob import sys import os - -omnisci_executable = "build/bin/omnisql" -datafile_directory = "/localdisk/work/train.csv" -database_name = "santanderdb" -train_table_name = "train_table" - -# Load database reporting, server and Ibis modules -path_to_report_dir = os.path.join(pathlib.Path(__file__).parent, "..", "report") -path_to_server_dir = os.path.join(pathlib.Path(__file__).parent, "..", "server") -path_to_ibis_dir = os.path.join(pathlib.Path(__file__).parent.parent, "..", "ibis/build/lib") -sys.path.insert(1, path_to_report_dir) -sys.path.insert(1, path_to_server_dir) -sys.path.insert(1, path_to_ibis_dir) -import report -import server import ibis -parser = argparse.ArgumentParser(description='Run Santander benchmark using Ibis.') - -parser.add_argument('-e', default=omnisci_executable, help='Path to executable "omnisql".') -parser.add_argument('-r', default="report_santander_ibis.csv", help="Report file name.") -parser.add_argument('-dp', default=datafile_directory, help="Datafile that should be loaded.") -parser.add_argument('-i', default=5, type=int, help="Number of iterations to run every query. Best result is selected.") -parser.add_argument('-dnd', action='store_true', help="Do not delete old table.") -parser.add_argument('-dni', action='store_true', help="Do not create new table and import any data from CSV files.") -parser.add_argument("-port", default=62074, type=int, help="TCP port that omnisql client should use to connect to server.") - -parser.add_argument("-db-server", default="localhost", help="Host name of MySQL server.") -parser.add_argument("-db-port", default=3306, type=int, help="Port number of MySQL server.") -parser.add_argument("-db-user", default="", help="Username to use to connect to MySQL database. If user name is specified, script attempts to store results in MySQL database using other -db-* parameters.") -parser.add_argument("-db-pass", default="omniscidb", help="Password to use to connect to MySQL database.") -parser.add_argument("-db-name", default="omniscidb", help="MySQL database to use to store benchmark results.") -parser.add_argument("-db-table", help="Table to use to store results for this benchmark.") - -parser.add_argument("-commit", default="1234567890123456789012345678901234567890", help="Commit hash to use to record this benchmark results.") - -args = parser.parse_args() - -if args.i < 1: - print("Bad number of iterations specified", args.i) - -def print_omnisci_output(stdout): - for line in iter(stdout.readline, b''): - print("OMNISCI>>", line.decode().strip()) - -datafile_columns_names = ["ID_code", "target"] + ["var_" + str(index) for index in range(200)] -datafile_columns_types = ["string", "int16"] + ["float32" for _ in range(200)] - -schema_train = ibis.Schema( - names = datafile_columns_names, - types = datafile_columns_types -) - -omnisci_server = server.Omnisci_server(omnisci_executable=args.e, omnisci_port=args.port, database_name=database_name) -omnisci_server.launch() - -time.sleep(2) -conn = omnisci_server.connect_to_server() - -db_reporter = None -if args.db_user is not "": - print("Connecting to database") - db = mysql.connector.connect(host=args.db_server, port=args.db_port, user=args.db_user, passwd=args.db_pass, db=args.db_name) - db_reporter = report.DbReport(db, args.db_table, { - 'QueryName': 'VARCHAR(500) NOT NULL', - 'FirstExecTimeMS': 'BIGINT UNSIGNED', - 'WorstExecTimeMS': 'BIGINT UNSIGNED', - 'BestExecTimeMS': 'BIGINT UNSIGNED', - 'AverageExecTimeMS': 'BIGINT UNSIGNED', - 'TotalTimeMS': 'BIGINT UNSIGNED' - }, { - 'ScriptName': 'santander_ibis.py', - 'CommitHash': args.commit - }) - -# Delete old table -if not args.dnd: - print("Deleting", database_name ,"old database") - try: - conn.drop_database(database_name, force=True) - time.sleep(2) - conn = omnisci_server.connect_to_server() - except Exception as err: - print("Failed to delete", database_name, "old database: ", err) - +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from server import OmnisciServer +from report import DbReport +from server_worker import OmnisciServerWorker -print("Creating new database") -try: - conn.create_database(database_name) # Ibis list_databases method is not supported yet -except Exception as err: - print("Database creation is skipped, because of error:", err) - - -cast_dict_train = {('var_%s'%str(i)):'float32' for i in range(200)} -cast_dict_train['target'] = 'int16' - -if not args.dni: - # Datafiles import - t_import_pandas, t_import_ibis = omnisci_server.import_data_by_ibis(table_name = train_table_name, data_files_names=args.dp, files_limit=1, columns_names=datafile_columns_names, columns_types=datafile_columns_types, cast_dict=cast_dict_train, header=0) - print("Pandas import time:", t_import_pandas) - print("Ibis import time:", t_import_ibis) -try: - db = conn.database(database_name) -except Exception as err: - print("Failed to connect to database:", err) - -try: - tables_names = db.list_tables() - print("Database tables:", tables_names) -except Exception as err: - print("Failed to read database tables:", err) - -try: - df = db.table(train_table_name) -except Exception as err: - print("Failed to access", train_table_name, "table:", err) - - - -# Since OmniSciDB doesn't support JOIN operation for tables with non-integer -# values, tables for filter and split queries were reproduced by Pandas (as it -# it was done in the similar Pandas benchmark https://gitlab.devtools.intel.com/jianminl/rapids-response-e2e-workloads/blob/master/e2e/santander/santander_cpu.py) - -train_pd = omnisci_server.get_pd_df(table_name=train_table_name) - -for i in range(200): - col = 'var_%d'%i - var_count = train_pd.groupby(col).agg({col:'count'}) - var_count.columns = ['%s_count'%col] - var_count = var_count.reset_index() - train_pd = train_pd.merge(var_count,on=col,how='left') - -for i in range(200): - col = 'var_%d'%i - mask = train_pd['%s_count'%col]>1 - train_pd.loc[mask,'%s_gt1'%col] = train_pd.loc[mask,col] - - -datafile_columns_names_train_pd = datafile_columns_names + ["var_" + str(index) + "_count" for index in range(200)] + ["var_" + str(index) + "_gt1" for index in range(200)] -datafile_columns_types_train_pd = datafile_columns_types + ["float32" for _ in range(200)] + ["float32" for _ in range(200)] - -schema_train_pd = ibis.Schema( - names = datafile_columns_names_train_pd, - types = datafile_columns_types_train_pd -) - -cast_dict = {} -cast_dict['target'] = 'int16' -for i in range(200): - cast_dict['var_%s'%str(i)] = 'float32' - cast_dict['var_%s'%str(i)+'_count'] = 'float32' - cast_dict['var_%s'%str(i)+'_gt1'] = 'float32' - -train_pd = train_pd.astype(dtype=cast_dict, copy=False) - -conn.create_table(table_name = 'train_pd_table', schema=schema_train_pd, database=database_name) -conn.load_data('train_pd_table', train_pd) -train_pd_ibis = db.table('train_pd_table') - - -table_name_where = 'train_where_table' -datafile_columns_names_train_where = datafile_columns_names + ["var_" + str(index) + "_count" for index in range(200)] -datafile_columns_types_train_where = datafile_columns_types + ["float32" for _ in range(200)] - -schema_train_where = ibis.Schema( - names = datafile_columns_names_train_where, - types = datafile_columns_types_train_where -) - -train = train_pd.copy() -train_selected = train[datafile_columns_names_train_where] -conn.create_table(table_name = table_name_where, schema=schema_train_where, database=database_name) -conn.load_data(table_name_where, train_selected) -train_where_ibis = db.table(table_name_where) - - - -# Queries definitions -tmp_table_name = 'tmp_table' def q1(): - t_import = 0 t0 = time.time() - _, _ = omnisci_server.import_data_by_ibis(table_name = tmp_table_name, data_files_names=args.dp, files_limit=1, columns_names=datafile_columns_names, columns_types=datafile_columns_types, cast_dict=cast_dict_train, header=0) + _, _ = omnisci_server_worker.import_data_by_ibis(table_name=tmp_table_name, + data_files_names=args.dp, files_limit=1, + columns_names=datafile_columns_names, + columns_types=datafile_columns_types, + cast_dict=cast_dict_train, header=0) t_import = time.time() - t0 - omnisci_server.drop_table(tmp_table_name) - + omnisci_server_worker.drop_table(tmp_table_name) + return t_import + def q2(): for i in range(200): t_groupby = 0 - col = 'var_%d'%i + col = 'var_%d' % i t0 = time.time() - metric = df[col].count().name('%s_count'%col) + metric = df[col].count().name('%s_count' % col) group_by_expr = df.group_by(col).aggregate(metric) _ = group_by_expr.execute() t_groupby += time.time() - t0 - return t_groupby + def q3(): t_where = 0 global train_where_ibis - for c,col in enumerate(['var_0','var_1','var_2']): - for i in range(1,4): + for c, col in enumerate(['var_0', 'var_1', 'var_2']): + for i in range(1, 4): t0 = time.time() - train_where_ibis_filtered = train_where_ibis[train_where_ibis['%s_count'%col]==i].execute() + train_where_ibis[ + train_where_ibis['%s_count' % col] == i].execute() t_where += time.time() - t0 t0 = time.time() - train_where_ibis_filtered = train_where_ibis[train_where_ibis['%s_count'%col]>i].execute() + train_where_ibis[ + train_where_ibis['%s_count' % col] > i].execute() t_where += time.time() - t0 - col_to_sel = datafile_columns_names + ["var_" + str(index) + "_count" for index in range(200)] + col_to_sel = datafile_columns_names + ["var_" + str(index) + "_count" for index in + range(200)] train_where_ibis2 = train_pd_ibis[col_to_sel] for i in range(200): - col = 'var_%d'%i + col = 'var_%d' % i t0 = time.time() - mask = train_where_ibis2['%s_count'%col]>1 + mask = train_where_ibis2['%s_count' % col] > 1 t_where += time.time() - t0 - col_to_sel += ['%s_gt1'%col] + col_to_sel += ['%s_gt1' % col] train_where_ibis2 = train_pd_ibis[col_to_sel] - + return t_where + def q4(): - t_split = 0 t0 = time.time() - train,valid = train_pd_ibis[0:190000].execute(),train_pd_ibis[190000:200000].execute() + train_pd_ibis[0:190000].execute() + train_pd_ibis[190000:200000].execute() t_split = time.time() - t0 - + return t_split + queries_list = [q1, q2, q3, q4] queries_description = {} queries_description[1] = 'Santander data file import query' @@ -253,50 +84,244 @@ def q4(): queries_description[3] = 'Rows filtration query' queries_description[4] = 'Rows split query' +omnisci_executable = "../omnisci/build/bin/omnisci_server" +datafile_directory = "/localdisk/work/train.csv" +train_table_name = "train_table" +omnisci_server = None + +parser = argparse.ArgumentParser(description='Run Santander benchmark using Ibis.') + +parser.add_argument('-e', default=omnisci_executable, help='Path to executable "omnisql".') +parser.add_argument('-r', default="report_santander_ibis.csv", help="Report file name.") +parser.add_argument('-dp', default=datafile_directory, help="Datafile that should be loaded.") +parser.add_argument('-i', default=5, type=int, + help="Number of iterations to run every query. Best result is selected.") +parser.add_argument('-dnd', action='store_true', help="Do not delete old table.") +parser.add_argument('-dni', action='store_true', + help="Do not create new table and import any data from CSV files.") +parser.add_argument("-port", default=62074, type=int, + help="TCP port that omnisql client should use to connect to server.") +parser.add_argument("-u", default="admin", + help="User name to use on omniscidb server.") +parser.add_argument("-p", default="HyperInteractive", + help="User password to use on omniscidb server.") +parser.add_argument("-n", default="agent_test_ibis", + help="Database name to use on omniscidb server.") + +parser.add_argument("-db-server", default="localhost", help="Host name of MySQL server.") +parser.add_argument("-db-port", default=3306, type=int, help="Port number of MySQL server.") +parser.add_argument("-db-user", default="", + help="Username to use to connect to MySQL database. " + "If user name is specified, script attempts to store results in " + "MySQL database using other -db-* parameters.") +parser.add_argument("-db-pass", default="omniscidb", + help="Password to use to connect to MySQL database.") +parser.add_argument("-db-name", default="omniscidb", + help="MySQL database to use to store benchmark results.") +parser.add_argument("-db-table", help="Table to use to store results for this benchmark.") + +parser.add_argument("-commit_omnisci", dest="commit_omnisci", + default="1234567890123456789012345678901234567890", + help="Omnisci commit hash to use for tests.") +parser.add_argument("-commit_ibis", dest="commit_ibis", + default="1234567890123456789012345678901234567890", + help="Ibis commit hash to use for tests.") + try: - pt = threading.Thread(target=print_omnisci_output, args=(omnisci_server.server_process.stdout,), daemon=True) - pt.start() - - with open(args.r, "w") as report: - t_begin = time.time() - for query_number in range(0,4): - exec_times = [None]*5 - best_exec_time = float("inf") - worst_exec_time = 0.0 - first_exec_time = float("inf") - times_sum = 0.0 - for iteration in range(1, args.i + 1): - print("Running query number:", query_number + 1, "Iteration number:", iteration) - time_tmp = int(round(queries_list[query_number]() * 1000)) - exec_times[iteration - 1] = time_tmp - if iteration == 1: - first_exec_time = exec_times[iteration - 1] - if best_exec_time > exec_times[iteration - 1]: - best_exec_time = exec_times[iteration - 1] - if iteration != 1 and worst_exec_time < exec_times[iteration - 1]: - worst_exec_time = exec_times[iteration - 1] - if iteration != 1: - times_sum += exec_times[iteration - 1] - average_exec_time = times_sum/(args.i - 1) - total_exec_time = int(round(time.time() - t_begin)) - print("Query", query_number + 1, "Exec time (ms):", best_exec_time, "Total time (s):", total_exec_time) - print("QueryName: ", queries_description[query_number + 1], ",", - "FirstExecTimeMS: ", first_exec_time, ",", - "WorstExecTimeMS: ", worst_exec_time, ",", - "BestExecTimeMS: ", best_exec_time, ",", - "AverageExecTimeMS: ", average_exec_time, ",", - "TotalTimeMS: ", total_exec_time, ",", - "", '\n', file=report, sep='', end='', flush=True) - if db_reporter is not None: - db_reporter.submit({ - 'QueryName': queries_description[query_number + 1], - 'FirstExecTimeMS': first_exec_time, - 'WorstExecTimeMS': worst_exec_time, - 'BestExecTimeMS': best_exec_time, - 'AverageExecTimeMS': average_exec_time, - 'TotalTimeMS': total_exec_time - }) -except IOError as err: - print("Failed writing report file", args.r, err) + args = parser.parse_args() + + if args.i < 1: + print("Bad number of iterations specified", args.i) + + datafile_columns_names = ["ID_code", "target"] + ["var_" + str(index) for index in range(200)] + datafile_columns_types = ["string", "int16"] + ["float32" for _ in range(200)] + + schema_train = ibis.Schema( + names=datafile_columns_names, + types=datafile_columns_types + ) + + database_name = args.n + omnisci_server = OmnisciServer(omnisci_executable=args.e, omnisci_port=args.port, + database_name=database_name, user=args.u, + password=args.p) + omnisci_server.launch() + omnisci_server_worker = OmnisciServerWorker(omnisci_server) + + time.sleep(2) + conn = omnisci_server_worker.connect_to_server() + + db_reporter = None + if args.db_user is not "": + print("Connecting to database") + db = mysql.connector.connect(host=args.db_server, port=args.db_port, user=args.db_user, + passwd=args.db_pass, db=args.db_name) + db_reporter = DbReport(db, args.db_table, { + 'QueryName': 'VARCHAR(500) NOT NULL', + 'FirstExecTimeMS': 'BIGINT UNSIGNED', + 'WorstExecTimeMS': 'BIGINT UNSIGNED', + 'BestExecTimeMS': 'BIGINT UNSIGNED', + 'AverageExecTimeMS': 'BIGINT UNSIGNED', + 'TotalTimeMS': 'BIGINT UNSIGNED' + }, { + 'ScriptName': 'santander_ibis.py', + 'CommitHash': args.commit + }) + + # Delete old table + if not args.dnd: + print("Deleting", database_name, "old database") + try: + conn.drop_database(database_name, force=True) + time.sleep(2) + conn = omnisci_server_worker.connect_to_server() + except Exception as err: + print("Failed to delete", database_name, "old database: ", err) + + print("Creating new database") + try: + conn.create_database(database_name) # Ibis list_databases method is not supported yet + except Exception as err: + print("Database creation is skipped, because of error:", err) + + cast_dict_train = {('var_%s' % str(i)): 'float32' for i in range(200)} + cast_dict_train['target'] = 'int16' + + args.dp = args.dp.replace("'", "") + if not args.dni: + # Datafiles import + t_import_pandas, t_import_ibis = omnisci_server_worker.import_data_by_ibis( + table_name=train_table_name, data_files_names=args.dp, files_limit=1, + columns_names=datafile_columns_names, columns_types=datafile_columns_types, + cast_dict=cast_dict_train, header=0) + print("Pandas import time:", t_import_pandas) + print("Ibis import time:", t_import_ibis) + + try: + db = conn.database(database_name) + except Exception as err: + print("Failed to connect to database:", err) + + try: + tables_names = db.list_tables() + print("Database tables:", tables_names) + except Exception as err: + print("Failed to read database tables:", err) + + try: + df = db.table(train_table_name) + except Exception as err: + print("Failed to access", train_table_name, "table:", err) + + # Since OmniSciDB doesn't support JOIN operation for tables with non-integer + # values, tables for filter and split queries were reproduced by Pandas (as it + # it was done in the similar Pandas benchmark + # https://gitlab.devtools.intel.com/jianminl/rapids-response-e2e-workloads/blob/master/e2e/santander/santander_cpu.py) + + train_pd = omnisci_server_worker.get_pd_df(table_name=train_table_name) + + for i in range(200): + col = 'var_%d' % i + var_count = train_pd.groupby(col).agg({col: 'count'}) + var_count.columns = ['%s_count' % col] + var_count = var_count.reset_index() + train_pd = train_pd.merge(var_count, on=col, how='left') + + for i in range(200): + col = 'var_%d' % i + mask = train_pd['%s_count' % col] > 1 + train_pd.loc[mask, '%s_gt1' % col] = train_pd.loc[mask, col] + + datafile_columns_names_train_pd = datafile_columns_names + [ + "var_" + str(index) + "_count" for index in range(200)] + [ + "var_" + str(index) + "_gt1" for index in range(200)] + datafile_columns_types_train_pd = datafile_columns_types + [ + "float32" for _ in range(200)] + [ + "float32" for _ in range(200)] + + schema_train_pd = ibis.Schema( + names=datafile_columns_names_train_pd, + types=datafile_columns_types_train_pd + ) + + cast_dict = {} + cast_dict['target'] = 'int16' + for i in range(200): + cast_dict['var_%s' % str(i)] = 'float32' + cast_dict['var_%s' % str(i) + '_count'] = 'float32' + cast_dict['var_%s' % str(i) + '_gt1'] = 'float32' + + train_pd = train_pd.astype(dtype=cast_dict, copy=False) + + conn.create_table(table_name='train_pd_table', schema=schema_train_pd, database=database_name) + conn.load_data('train_pd_table', train_pd) + train_pd_ibis = db.table('train_pd_table') + + table_name_where = 'train_where_table' + datafile_columns_names_train_where = datafile_columns_names + ["var_" + str(index) + "_count" + for index in range(200)] + datafile_columns_types_train_where = datafile_columns_types + ["float32" for _ in range(200)] + + schema_train_where = ibis.Schema( + names=datafile_columns_names_train_where, + types=datafile_columns_types_train_where + ) + + train = train_pd.copy() + train_selected = train[datafile_columns_names_train_where] + conn.create_table(table_name=table_name_where, schema=schema_train_where, + database=database_name) + conn.load_data(table_name_where, train_selected) + train_where_ibis = db.table(table_name_where) + + # Queries definitions + tmp_table_name = 'tmp_table' + + try: + with open(args.r, "w") as report: + t_begin = time.time() + for query_number in range(0, 4): + exec_times = [None] * 5 + best_exec_time = float("inf") + worst_exec_time = 0.0 + first_exec_time = float("inf") + times_sum = 0.0 + for iteration in range(1, args.i + 1): + print("Running query number:", query_number + 1, "Iteration number:", iteration) + time_tmp = int(round(queries_list[query_number]() * 1000)) + exec_times[iteration - 1] = time_tmp + if iteration == 1: + first_exec_time = exec_times[iteration - 1] + if best_exec_time > exec_times[iteration - 1]: + best_exec_time = exec_times[iteration - 1] + if iteration != 1 and worst_exec_time < exec_times[iteration - 1]: + worst_exec_time = exec_times[iteration - 1] + if iteration != 1: + times_sum += exec_times[iteration - 1] + average_exec_time = times_sum / (args.i - 1) + total_exec_time = int(round(time.time() - t_begin)) + print("Query", query_number + 1, "Exec time (ms):", best_exec_time, + "Total time (s):", total_exec_time) + print("QueryName: ", queries_description[query_number + 1], ",", + "FirstExecTimeMS: ", first_exec_time, ",", + "WorstExecTimeMS: ", worst_exec_time, ",", + "BestExecTimeMS: ", best_exec_time, ",", + "AverageExecTimeMS: ", average_exec_time, ",", + "TotalTimeMS: ", total_exec_time, ",", + "", '\n', file=report, sep='', end='', flush=True) + if db_reporter is not None: + db_reporter.submit({ + 'QueryName': queries_description[query_number + 1], + 'FirstExecTimeMS': first_exec_time, + 'WorstExecTimeMS': worst_exec_time, + 'BestExecTimeMS': best_exec_time, + 'AverageExecTimeMS': average_exec_time, + 'TotalTimeMS': total_exec_time + }) + except IOError as err: + print("Failed writing report file", args.r, err) +except Exception as exc: + print("Failed: ", exc) finally: - omnisci_server.terminate() + if omnisci_server: + omnisci_server.terminate() diff --git a/server/__init__.py b/server/__init__.py new file mode 100644 index 0000000000000..421a0aea3c4f6 --- /dev/null +++ b/server/__init__.py @@ -0,0 +1,2 @@ +from .server import OmnisciServer +from .server import execute_process diff --git a/server/server.py b/server/server.py index 4a617313e0d42..16db886feb98c 100644 --- a/server/server.py +++ b/server/server.py @@ -1,6 +1,4 @@ -import glob import os -import pandas as pd import pathlib import signal import sys @@ -8,24 +6,47 @@ import threading import time -path_to_ibis_dir = os.path.join(pathlib.Path(__file__).parent.parent, "..", "ibis/build/lib") -sys.path.insert(1, path_to_ibis_dir) -import ibis -class Omnisci_server: +def execute_process(cmdline, cwd=None, shell=False, daemon=False, print_output=True): + "Execute cmdline in user-defined directory by creating separated process" + try: + print("CMD: ", " ".join(cmdline)) + output = "" + process = subprocess.Popen(cmdline, cwd=cwd, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, shell=shell) + if not daemon: + output = process.communicate()[0].strip().decode() + if print_output: + print(output) + if process.returncode != 0 and process.returncode is not None: + raise Exception("Command returned {}".format(process.returncode)) + except OSError as err: + print("Failed to start", cmdline, err) + + return process, output + + +class OmnisciServer: "Manage interactions with OmniSciDB server (launch/termination, connection establishing, etc.)" - _http_port = 62278 - _calcite_port = 62279 server_process = None _header_santander_train = False - _imported_pd_df = {} - def __init__(self, omnisci_executable, omnisci_port, database_name, omnisci_cwd=None): + def __init__(self, omnisci_executable, omnisci_port, database_name, + omnisci_cwd=None, user="admin", password="HyperInteractive", http_port=62278, + calcite_port=62279): + self.omnisci_executable = omnisci_executable + self.server_port = omnisci_port + self.user = user + self.password = password + self.database_name = database_name + self._http_port = http_port + self._calcite_port = calcite_port + if omnisci_cwd is not None: self._server_cwd = omnisci_cwd else: - self._server_cwd = pathlib.Path(omnisci_executable).parent.parent + self._server_cwd = pathlib.Path(self.omnisci_executable).parent.parent self._data_dir = os.path.join(self._server_cwd, "data") if not os.path.isdir(self._data_dir): @@ -33,53 +54,43 @@ def __init__(self, omnisci_executable, omnisci_port, database_name, omnisci_cwd= os.makedirs(self._data_dir) if not os.path.isdir(os.path.join(self._data_dir, "mapd_data")): print("INITIALIZING DATA DIR", self._data_dir) - self._initdb_executable = os.path.join(pathlib.Path(omnisci_executable).parent, "initdb") - _ = self._execute_process([self._initdb_executable, '-f', '--data', self._data_dir]) - - self._server_port = omnisci_port - self._database_name = database_name - self._omnisci_server_executable = os.path.join(pathlib.Path(omnisci_executable).parent, "omnisci_server") - self._server_start_cmdline = [self._omnisci_server_executable, - "data", - '--port', str(omnisci_port), - '--http-port', str(self._http_port), - '--calcite-port', str(self._calcite_port), - '--config', "omnisci.conf"] - - self._omnisci_cmd_line = [omnisci_executable] + [str(self._database_name), "-u", "admin", "-p", "HyperInteractive"] + ["--port", str(self._server_port)] - self._command_2_import_CSV = "COPY %s FROM '%s' WITH (header='%s');" - self._conn = None - - def _execute_process(self, cmdline, cwd=None): - "Execute cmdline in user-defined directory by creating separated process" - - try: - process = subprocess.Popen(cmdline, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - if process.returncode != 0 and process.returncode != None: - raise Exception("Command returned {}".format(process.returncode)) - except OSError as err: - print("Failed to start", cmdline, err) - - return process - - def _read_csv_datafile(self, file_name, columns_names, header=None, compression_type='gzip', nrows=200000): - "Read csv by Pandas. Function returns Pandas DataFrame, which can be used by ibis load_data function" - - print("Reading datafile", file_name) - return pd.read_csv(file_name, compression=compression_type, header=header, names=columns_names, nrows=nrows) - - def connect_to_server(self): - "Connect to Omnisci server using Ibis framework" - - self._conn = ibis.omniscidb.connect(host="localhost", port=self._server_port, user="admin", password="HyperInteractive") - return self._conn + self._initdb_executable = os.path.join(pathlib.Path(self.omnisci_executable).parent, + "initdb") + execute_process([self._initdb_executable, '-f', '--data', self._data_dir]) + + self.omnisci_sql_executable = os.path.join(pathlib.Path(self.omnisci_executable).parent, + "omnisql") + self._server_start_cmdline = [self.omnisci_executable, + "data", + '--port', str(omnisci_port), + '--http-port', str(self._http_port), + '--calcite-port', str(self._calcite_port), + '--config', "omnisci.conf", + '--enable-watchdog=false', + '--allow-cpu-retry'] def launch(self): "Launch OmniSciDB server" print("Launching server ...") - self.server_process = self._execute_process(self._server_start_cmdline, cwd=self._server_cwd) + self.server_process, _ = execute_process(self._server_start_cmdline, cwd=self._server_cwd, + daemon=True) print("Server is launched") + try: + pt = threading.Thread(target=self._print_omnisci_output, + args=(self.server_process.stdout,), daemon=True) + pt.start() + + # Allow server to start up. It has to open TCP port and start + # listening, otherwise the following benchmarks fail. + time.sleep(5) + except Exception as err: + print("Failed", err) + sys.exit(1) + + def _print_omnisci_output(self, stdout): + for line in iter(stdout.readline, b''): + print("OMNISCI>>", line.decode().strip()) def terminate(self): "Terminate OmniSci server" @@ -87,102 +98,13 @@ def terminate(self): print("Terminating server ...") try: - #self._conn.close() - self.server_process.send_signal(signal.SIGINT) - time.sleep(2) - self.server_process.kill() - time.sleep(1) - self.server_process.terminate() + if self.server_process: + self.server_process.send_signal(signal.SIGINT) + time.sleep(2) + self.server_process.kill() + time.sleep(1) except Exception as err: print("Failed to terminate server, error occured:", err) sys.exit(1) print("Server is terminated") - - def import_data(self, table_name, data_files_names, files_limit, columns_names, columns_types, header=False): - "Import CSV files using COPY SQL statement" - - if header == True: - header_value = 'true' - elif header == False: - header_value = 'false' - else: - print("Wrong value of header argument!") - sys.exit(2) - - schema_table = ibis.Schema( - names = columns_names, - types = columns_types - ) - - if not self._conn.exists_table(name=table_name, database=self._database_name): - try: - self._conn.create_table(table_name = table_name, schema=schema_table, database=self._database_name) - except Exception as err: - print("Failed to create table:", err) - - for f in data_files_names[:files_limit]: - print("Importing datafile", f) - copy_str = self._command_2_import_CSV % (table_name, f, header_value) - - try: - import_process = subprocess.Popen(self._omnisci_cmd_line, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE) - output = import_process.communicate(copy_str.encode()) - except OSError as err: - print("Failed to start", self._omnisci_cmd_line, err) - - print(str(output[0].strip().decode())) - print("Command returned", import_process.returncode) - - def import_data_by_ibis(self, table_name, data_files_names, files_limit, columns_names, columns_types, cast_dict, header=None): - "Import CSV files using Ibis load_data from the Pandas.DataFrame" - - schema_table = ibis.Schema( - names = columns_names, - types = columns_types - ) - - if not self._conn.exists_table(name=table_name, database=self._database_name): - try: - self._conn.create_table(table_name = table_name, schema=schema_table, database=self._database_name) - except Exception as err: - print("Failed to create table:", err) - - t0 = time.time() - if files_limit > 1: - pandas_df_from_each_file = (self._read_csv_datafile(file_name, columns_names, header) for file_name in data_files_names[:files_limit]) - self._imported_pd_df[table_name] = pd.concat(pandas_df_from_each_file, ignore_index=True) - else: - self._imported_pd_df[table_name] = self._read_csv_datafile(data_files_names, columns_names, header) - - t_import_pandas = time.time() - t0 - - pandas_concatenated_df_casted = self._imported_pd_df[table_name].astype(dtype=cast_dict, copy=True) - - t0 = time.time() - self._conn.load_data(table_name=table_name, obj=pandas_concatenated_df_casted, database=self._database_name) - t_import_ibis = time.time() - t0 - - return t_import_pandas, t_import_ibis - - def drop_table(self, table_name): - "Drop table by table_name using Ibis framework" - - if self._conn.exists_table(name=table_name, database=self._database_name): - db = self._conn.database(self._database_name) - df = db.table(table_name) - df.drop() - if table_name in self._imported_pd_df: - del self._imported_pd_df[table_name] - else: - print("Table", table_name, "doesn't exist!") - sys.exit(3) - - def get_pd_df(self, table_name): - "Get already imported Pandas DataFrame" - - if self._conn.exists_table(name=table_name, database=self._database_name): - return self._imported_pd_df[table_name] - else: - print("Table", table_name, "doesn't exist!") - sys.exit(4) diff --git a/server_worker/__init__.py b/server_worker/__init__.py new file mode 100644 index 0000000000000..b4894ebc60024 --- /dev/null +++ b/server_worker/__init__.py @@ -0,0 +1 @@ +from .server_worker import OmnisciServerWorker diff --git a/server_worker/server_worker.py b/server_worker/server_worker.py new file mode 100644 index 0000000000000..2ef017cdd9580 --- /dev/null +++ b/server_worker/server_worker.py @@ -0,0 +1,139 @@ +import pandas as pd +import subprocess +import time +import os +import sys +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from server import OmnisciServer +import ibis + +class OmnisciServerWorker: + _imported_pd_df = {} + + def __init__(self, omnisci_server): + self.omnisci_server = omnisci_server + self._omnisci_cmd_line = [self.omnisci_server.omnisci_sql_executable] \ + + [str(self.omnisci_server.database_name), + "-u", self.omnisci_server.user, + "-p", self.omnisci_server.password] \ + + ["--port", str(self.omnisci_server.server_port)] + self._command_2_import_CSV = "COPY %s FROM '%s' WITH (header='%s');" + self._conn = None + + def _read_csv_datafile(self, file_name, columns_names, header=None, compression_type='gzip', + nrows=200000): + "Read csv by Pandas. Function returns Pandas DataFrame,\ + which can be used by ibis load_data function" + + print("Reading datafile", file_name) + return pd.read_csv(file_name, compression=compression_type, header=header, + names=columns_names, nrows=nrows) + + def connect_to_server(self): + "Connect to Omnisci server using Ibis framework" + + self._conn = ibis.omniscidb.connect(host="localhost", port=self.omnisci_server.server_port, + user=self.omnisci_server.user, + password=self.omnisci_server.password) + return self._conn + + def terminate(self): + self.omnisci_server.terminate() + + def import_data(self, table_name, data_files_names, files_limit, columns_names, columns_types, + header=False): + "Import CSV files using COPY SQL statement" + + if header: + header_value = 'true' + elif not header: + header_value = 'false' + else: + print("Wrong value of header argument!") + sys.exit(2) + + schema_table = ibis.Schema( + names=columns_names, + types=columns_types + ) + + if not self._conn.exists_table(name=table_name, database=self.omnisci_server.database_name): + try: + self._conn.create_table(table_name=table_name, schema=schema_table, + database=self.omnisci_server.database_name) + except Exception as err: + print("Failed to create table:", err) + + for f in data_files_names[:files_limit]: + print("Importing datafile", f) + copy_str = self._command_2_import_CSV % (table_name, f, header_value) + + try: + import_process = subprocess.Popen(self._omnisci_cmd_line, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, stdin=subprocess.PIPE) + output = import_process.communicate(copy_str.encode()) + except OSError as err: + print("Failed to start", self._omnisci_cmd_line, err) + + print(str(output[0].strip().decode())) + print("Command returned", import_process.returncode) + + def import_data_by_ibis(self, table_name, data_files_names, files_limit, columns_names, + columns_types, cast_dict, header=None): + "Import CSV files using Ibis load_data from the Pandas.DataFrame" + + schema_table = ibis.Schema( + names=columns_names, + types=columns_types + ) + + if not self._conn.exists_table(name=table_name, database=self.omnisci_server.database_name): + try: + self._conn.create_table(table_name=table_name, schema=schema_table, + database=self.omnisci_server.database_name) + except Exception as err: + print("Failed to create table:", err) + + t0 = time.time() + if files_limit > 1: + pandas_df_from_each_file = (self._read_csv_datafile(file_name, columns_names, header) + for file_name in data_files_names[:files_limit]) + self._imported_pd_df[table_name] = pd.concat(pandas_df_from_each_file, + ignore_index=True) + else: + self._imported_pd_df[table_name] = self._read_csv_datafile(data_files_names, + columns_names, header) + + t_import_pandas = time.time() - t0 + + pandas_concatenated_df_casted = self._imported_pd_df[table_name].astype(dtype=cast_dict, + copy=True) + + t0 = time.time() + self._conn.load_data(table_name=table_name, obj=pandas_concatenated_df_casted, + database=self.omnisci_server.database_name) + t_import_ibis = time.time() - t0 + + return t_import_pandas, t_import_ibis + + def drop_table(self, table_name): + "Drop table by table_name using Ibis framework" + + if self._conn.exists_table(name=table_name, database=self.omnisci_server.database_name): + db = self._conn.database(self.omnisci_server.database_name) + df = db.table(table_name) + df.drop() + if table_name in self._imported_pd_df: + del self._imported_pd_df[table_name] + else: + print("Table", table_name, "doesn't exist!") + sys.exit(3) + + def get_pd_df(self, table_name): + "Get already imported Pandas DataFrame" + + if self._conn.exists_table(name=table_name, database=self.omnisci_server.database_name): + return self._imported_pd_df[table_name] + else: + print("Table", table_name, "doesn't exist!") + sys.exit(4) diff --git a/taxi/taxibench_ibis.py b/taxi/taxibench_ibis.py index 94dd99bb0a154..8f84662a4f7c3 100644 --- a/taxi/taxibench_ibis.py +++ b/taxi/taxibench_ibis.py @@ -1,155 +1,42 @@ from braceexpand import braceexpand import mysql.connector -import pandas as pd -import numpy as np -import subprocess -import threading import argparse -import pathlib import time import glob -import sys import os +import sys +sys.path.append(os.path.join(os.path.dirname(__file__), "..")) +from server import OmnisciServer +from report import DbReport +from server_worker import OmnisciServerWorker -omnisci_executable = "build/bin/omnisql" -taxi_trips_directory = "/localdisk/work/trips_x*.csv" -database_name = "taxibenchdb" -taxibench_table_name = "trips" - -# Load database reporting, server and Ibis modules -path_to_report_dir = os.path.join(pathlib.Path(__file__).parent, "..", "report") -path_to_server_dir = os.path.join(pathlib.Path(__file__).parent, "..", "server") -path_to_ibis_dir = os.path.join(pathlib.Path(__file__).parent.parent, "..", "ibis/build/lib") -sys.path.insert(1, path_to_report_dir) -sys.path.insert(1, path_to_server_dir) -sys.path.insert(1, path_to_ibis_dir) -import report -import server -import ibis - -parser = argparse.ArgumentParser(description='Run NY Taxi benchmark using Ibis.') - -parser.add_argument('-e', default=omnisci_executable, help='Path to executable "omnisql".') -parser.add_argument('-r', default="report_taxibench_ibis.csv", help="Report file name.") -parser.add_argument('-df', default=1, type=int, help="Number of datafiles to input into database for processing.") -parser.add_argument('-dp', default=taxi_trips_directory, help="Wildcard pattern of datafiles that should be loaded.") -parser.add_argument('-i', default=5, type=int, help="Number of iterations to run every query. Best result is selected.") -parser.add_argument('-dnd', action='store_true', help="Do not delete old table.") -parser.add_argument('-dni', action='store_true', help="Do not create new table and import any data from CSV files.") -parser.add_argument("-port", default=62074, type=int, help="TCP port that omnisql client should use to connect to server.") - -parser.add_argument("-db-server", default="localhost", help="Host name of MySQL server.") -parser.add_argument("-db-port", default=3306, type=int, help="Port number of MySQL server.") -parser.add_argument("-db-user", default="", help="Username to use to connect to MySQL database. If user name is specified, script attempts to store results in MySQL database using other -db-* parameters.") -parser.add_argument("-db-pass", default="omniscidb", help="Password to use to connect to MySQL database.") -parser.add_argument("-db-name", default="omniscidb", help="MySQL database to use to store benchmark results.") -parser.add_argument("-db-table", help="Table to use to store results for this benchmark.") - -parser.add_argument("-commit", default="1234567890123456789012345678901234567890", help="Commit hash to use to record this benchmark results.") - -args = parser.parse_args() - -if args.df <= 0: - print("Bad number of data files specified", args.df) - sys.exit(1) - -if args.i < 1: - print("Bad number of iterations specified", args.i) - -def print_omnisci_output(stdout): - for line in iter(stdout.readline, b''): - print("OMNISCI>>", line.decode().strip()) - -omnisci_server = server.Omnisci_server(omnisci_executable=args.e, omnisci_port=args.port, database_name=database_name) -omnisci_server.launch() - -time.sleep(2) -conn = omnisci_server.connect_to_server() - -taxibench_columns_names = ["trip_id","vendor_id","pickup_datetime","dropoff_datetime","store_and_fwd_flag","rate_code_id","pickup_longitude","pickup_latitude","dropoff_longitude","dropoff_latitude","passenger_count","trip_distance","fare_amount","extra","mta_tax","tip_amount","tolls_amount","ehail_fee","improvement_surcharge","total_amount","payment_type","trip_type","pickup","dropoff","cab_type","precipitation","snow_depth","snowfall","max_temperature","min_temperature","average_wind_speed","pickup_nyct2010_gid","pickup_ctlabel","pickup_borocode","pickup_boroname","pickup_ct2010","pickup_boroct2010","pickup_cdeligibil","pickup_ntacode","pickup_ntaname","pickup_puma","dropoff_nyct2010_gid","dropoff_ctlabel","dropoff_borocode","dropoff_boroname","dropoff_ct2010","dropoff_boroct2010","dropoff_cdeligibil","dropoff_ntacode","dropoff_ntaname", "dropoff_puma"] -taxibench_columns_types = ['int32','string','timestamp','timestamp','string','int16','decimal','decimal','decimal','decimal','int16','decimal','decimal','decimal','decimal','decimal','decimal','decimal','decimal','decimal','string','int16','string','string','string','int16','int16','int16','int16','int16','int16','int16','string','int16','string','string','string','string','string','string','string','int16','string','int16','string','string','string','string','string','string','string'] - -db_reporter = None -if args.db_user is not "": - print("Connecting to database") - db = mysql.connector.connect(host=args.db_server, port=args.db_port, user=args.db_user, passwd=args.db_pass, db=args.db_name) - db_reporter = report.DbReport(db, args.db_table, { - 'FilesNumber': 'INT UNSIGNED NOT NULL', - 'QueryName': 'VARCHAR(500) NOT NULL', - 'FirstExecTimeMS': 'BIGINT UNSIGNED', - 'WorstExecTimeMS': 'BIGINT UNSIGNED', - 'BestExecTimeMS': 'BIGINT UNSIGNED', - 'AverageExecTimeMS': 'BIGINT UNSIGNED', - 'TotalTimeMS': 'BIGINT UNSIGNED' - }, { - 'ScriptName': 'taxibench_ibis.py', - 'CommitHash': args.commit - }) - -# Delete old table -if not args.dnd: - print("Deleting", database_name ,"old database") - try: - conn.drop_database(database_name, force=True) - time.sleep(2) - conn = omnisci_server.connect_to_server() - except Exception as err: - print("Failed to delete", database_name, "old database: ", err) - - -data_files_names = list(braceexpand(args.dp)) -data_files_names = sorted([x for f in data_files_names for x in glob.glob(f)]) -data_files_number = len(data_files_names[:args.df]) - -try: - print("Creating", database_name ,"new database") - conn.create_database(database_name) # Ibis list_databases method is not supported yet -except Exception as err: - print("Database creation is skipped, because of error:", err) - -if len(data_files_names) == 0: - print("Could not find any data files matching", args.dp) - sys.exit(2) - -# Create table and import data -if not args.dni: - # Datafiles import - omnisci_server.import_data(table_name=taxibench_table_name, data_files_names=data_files_names, files_limit=args.df, columns_names=taxibench_columns_names, columns_types=taxibench_columns_types, header=False) - -try: - db = conn.database(database_name) -except Exception as err: - print("Failed to connect to database:", err) - -try: - tables_names = db.list_tables() - print("Database tables:", tables_names) -except Exception as err: - print("Failed to read database tables:", err) - -try: - df = db.table(taxibench_table_name) -except Exception as err: - print("Failed to access", taxibench_table_name,"table:", err) # Queries definitions def q1(df): df.groupby('cab_type')[['cab_type']].count().execute() + def q2(df): - df.groupby('passenger_count').aggregate(total_amount=df.total_amount.mean())[['passenger_count','total_amount']].execute() + df.groupby('passenger_count').aggregate( + total_amount=df.total_amount.mean())[['passenger_count','total_amount']].execute() + def q3(df): df.groupby([df.passenger_count, df.pickup_datetime.year().name('pickup_datetime')]).aggregate(count=df.passenger_count.count()).execute() + def q4(df): - df.groupby([df.passenger_count, df.pickup_datetime.year().name('pickup_datetime'), df.trip_distance]).size().sort_by([('pickup_datetime', True), ('count', False)]).execute() + df.groupby([df.passenger_count, df.pickup_datetime.year().name('pickup_datetime'), + df.trip_distance]).size().sort_by([('pickup_datetime', True), + ('count', False)]).execute() + def timeq(q): t = time.time() q(df) return time.time()-t + def queries_exec(index): if index == 1: return timeq(q1) @@ -164,50 +51,208 @@ def queries_exec(index): sys.exit(3) return None + +omnisci_executable = "../omnisci/build/bin/omnisci_server" +taxi_trips_directory = "/localdisk/work/trips_x*.csv" +taxibench_table_name = "trips" +omnisci_server = None + +parser = argparse.ArgumentParser(description='Run NY Taxi benchmark using Ibis.') + +parser.add_argument('-e', default=omnisci_executable, help='Path to executable "omnisci_server".') +parser.add_argument('-r', default="report_taxibench_ibis.csv", help="Report file name.") +parser.add_argument('-df', default=1, type=int, + help="Number of datafiles to input into database for processing.") +parser.add_argument('-dp', default=taxi_trips_directory, + help="Wildcard pattern of datafiles that should be loaded.") +parser.add_argument('-i', default=5, type=int, + help="Number of iterations to run every query. Best result is selected.") +parser.add_argument('-dnd', action='store_true', + help="Do not delete old table.") +parser.add_argument('-dni', action='store_true', + help="Do not create new table and import any data from CSV files.") +parser.add_argument("-port", default=62074, type=int, + help="TCP port that omnisql client should use to connect to server.") +parser.add_argument("-u", default="admin", + help="User name to use on omniscidb server.") +parser.add_argument("-p", default="HyperInteractive", + help="User password to use on omniscidb server.") +parser.add_argument("-n", default="agent_test_ibis", + help="Database name to use on omniscidb server.") +parser.add_argument("-db-server", default="localhost", help="Host name of MySQL server.") +parser.add_argument("-db-port", default=3306, type=int, help="Port number of MySQL server.") +parser.add_argument("-db-user", default="", + help="Username to use to connect to MySQL database. If user name is specified,\ + script attempts to store results in MySQL database using other -db-parameters.") +parser.add_argument("-db-pass", default="omniscidb", + help="Password to use to connect to MySQL database.") +parser.add_argument("-db-name", default="omniscidb", + help="MySQL database to use to store benchmark results.") +parser.add_argument("-db-table", help="Table to use to store results for this benchmark.") + +parser.add_argument("-commit_omnisci", dest="commit_omnisci", + default="1234567890123456789012345678901234567890", + help="Omnisci commit hash to use for tests.") +parser.add_argument("-commit_ibis", dest="commit_ibis", + default="1234567890123456789012345678901234567890", + help="Ibis commit hash to use for tests.") + try: - pt = threading.Thread(target=print_omnisci_output, args=(omnisci_server.server_process.stdout,), daemon=True) - pt.start() - with open(args.r, "w") as report: - t_begin = time.time() - for bench_number in range(1,5): - exec_times = [None]*5 - best_exec_time = float("inf") - worst_exec_time = 0.0 - first_exec_time = float("inf") - times_sum = 0.0 - for iteration in range(1, args.i + 1): - print("RUNNING QUERY NUMBER", bench_number, "ITERATION NUMBER", iteration) - exec_times[iteration - 1] = int(round(queries_exec(bench_number) * 1000)) - if iteration == 1: - first_exec_time = exec_times[iteration - 1] - if best_exec_time > exec_times[iteration - 1]: - best_exec_time = exec_times[iteration - 1] - if iteration != 1 and worst_exec_time < exec_times[iteration - 1]: - worst_exec_time = exec_times[iteration - 1] - if iteration != 1: - times_sum += exec_times[iteration - 1] - average_exec_time = times_sum/(args.i - 1) - total_exec_time = int(round((time.time() - t_begin)*1000)) - print("QUERY", bench_number, "EXEC TIME MS", best_exec_time, "TOTAL TIME MS", total_exec_time) - print("FilesNumber: ", data_files_number, ",", - "QueryName: ", 'Query' + str(bench_number), ",", - "FirstExecTimeMS: ", first_exec_time, ",", - "WorstExecTimeMS: ", worst_exec_time, ",", - "BestExecTimeMS: ", best_exec_time, ",", - "AverageExecTimeMS: ", average_exec_time, ",", - "TotalTimeMS: ", total_exec_time, ",", - "", '\n', file=report, sep='', end='', flush=True) - if db_reporter is not None: - db_reporter.submit({ - 'FilesNumber': data_files_number, - 'QueryName': 'Query' + str(bench_number), - 'FirstExecTimeMS': first_exec_time, - 'WorstExecTimeMS': worst_exec_time, - 'BestExecTimeMS': best_exec_time, - 'AverageExecTimeMS': average_exec_time, - 'TotalTimeMS': total_exec_time - }) -except IOError as err: - print("Failed writing report file", args.r, err) + args = parser.parse_args() + if args.df <= 0: + print("Bad number of data files specified", args.df) + sys.exit(1) + + if args.i < 1: + print("Bad number of iterations specified", args.i) + + database_name = args.n + omnisci_server = OmnisciServer(omnisci_executable=args.e, omnisci_port=args.port, + database_name=database_name, user=args.u, + password=args.p) + omnisci_server.launch() + omnisci_server_worker = OmnisciServerWorker(omnisci_server) + + time.sleep(2) + conn = omnisci_server_worker.connect_to_server() + + taxibench_columns_names = ["trip_id", "vendor_id", "pickup_datetime", "dropoff_datetime", + "store_and_fwd_flag", "rate_code_id", "pickup_longitude", + "pickup_latitude", "dropoff_longitude", "dropoff_latitude", + "passenger_count", "trip_distance", "fare_amount", "extra", + "mta_tax", "tip_amount", "tolls_amount", "ehail_fee", + "improvement_surcharge", "total_amount", "payment_type", + "trip_type", "pickup", "dropoff", "cab_type", "precipitation", + "snow_depth", "snowfall", "max_temperature", "min_temperature", + "average_wind_speed", "pickup_nyct2010_gid", "pickup_ctlabel", + "pickup_borocode", "pickup_boroname", "pickup_ct2010", + "pickup_boroct2010", "pickup_cdeligibil", "pickup_ntacode", + "pickup_ntaname", "pickup_puma", "dropoff_nyct2010_gid", + "dropoff_ctlabel", "dropoff_borocode", "dropoff_boroname", + "dropoff_ct2010", "dropoff_boroct2010", "dropoff_cdeligibil", + "dropoff_ntacode", "dropoff_ntaname", "dropoff_puma"] + taxibench_columns_types = ['int32', 'string', 'timestamp', 'timestamp', 'string', 'int16', + 'decimal', 'decimal', 'decimal', 'decimal', 'int16', 'decimal', + 'decimal', 'decimal', 'decimal', 'decimal', 'decimal', 'decimal', + 'decimal', 'decimal', 'string', 'int16', 'string', 'string', + 'string', 'int16', 'int16', 'int16', 'int16', 'int16', 'int16', + 'int16', 'string', 'int16', 'string', 'string', 'string', 'string', + 'string', 'string', 'string', 'int16', 'string', 'int16', 'string', + 'string', 'string', 'string', 'string', 'string', 'string'] + + db_reporter = None + if args.db_user is not "": + print("Connecting to database") + db = mysql.connector.connect(host=args.db_server, port=args.db_port, user=args.db_user, + passwd=args.db_pass, db=args.db_name) + db_reporter = DbReport(db, args.db_table, { + 'FilesNumber': 'INT UNSIGNED NOT NULL', + 'QueryName': 'VARCHAR(500) NOT NULL', + 'FirstExecTimeMS': 'BIGINT UNSIGNED', + 'WorstExecTimeMS': 'BIGINT UNSIGNED', + 'BestExecTimeMS': 'BIGINT UNSIGNED', + 'AverageExecTimeMS': 'BIGINT UNSIGNED', + 'TotalTimeMS': 'BIGINT UNSIGNED' + }, { + 'ScriptName': 'taxibench_ibis.py', + 'CommitHash': f"{args.commit_omnisci}-{args.commit_ibis}" + }) + + # Delete old table + if not args.dnd: + print("Deleting", database_name ,"old database") + try: + conn.drop_database(database_name, force=True) + time.sleep(2) + conn = omnisci_server_worker.connect_to_server() + except Exception as err: + print("Failed to delete", database_name, "old database: ", err) + + args.dp = args.dp.replace("'", "") + data_files_names = list(braceexpand(args.dp)) + data_files_names = sorted([x for f in data_files_names for x in glob.glob(f)]) + data_files_number = len(data_files_names[:args.df]) + + try: + print("Creating", database_name ,"new database") + conn.create_database(database_name) # Ibis list_databases method is not supported yet + except Exception as err: + print("Database creation is skipped, because of error:", err) + + if len(data_files_names) == 0: + print("Could not find any data files matching", args.dp) + sys.exit(2) + + # Create table and import data + if not args.dni: + # Datafiles import + omnisci_server_worker.import_data(table_name=taxibench_table_name, + data_files_names=data_files_names, files_limit=args.df, + columns_names=taxibench_columns_names, + columns_types=taxibench_columns_types, header=False) + + try: + db = conn.database(database_name) + except Exception as err: + print("Failed to connect to database:", err) + + try: + tables_names = db.list_tables() + print("Database tables:", tables_names) + except Exception as err: + print("Failed to read database tables:", err) + + try: + df = db.table(taxibench_table_name) + except Exception as err: + print("Failed to access", taxibench_table_name, "table:", err) + + try: + with open(args.r, "w") as report: + t_begin = time.time() + for bench_number in range(1,5): + exec_times = [None]*5 + best_exec_time = float("inf") + worst_exec_time = 0.0 + first_exec_time = float("inf") + times_sum = 0.0 + for iteration in range(1, args.i + 1): + print("RUNNING QUERY NUMBER", bench_number, "ITERATION NUMBER", iteration) + exec_times[iteration - 1] = int(round(queries_exec(bench_number) * 1000)) + if iteration == 1: + first_exec_time = exec_times[iteration - 1] + if best_exec_time > exec_times[iteration - 1]: + best_exec_time = exec_times[iteration - 1] + if iteration != 1 and worst_exec_time < exec_times[iteration - 1]: + worst_exec_time = exec_times[iteration - 1] + if iteration != 1: + times_sum += exec_times[iteration - 1] + average_exec_time = times_sum/(args.i - 1) + total_exec_time = int(round((time.time() - t_begin)*1000)) + print("QUERY", bench_number, "EXEC TIME MS", best_exec_time, + "TOTAL TIME MS", total_exec_time) + print("FilesNumber: ", data_files_number, ",", + "QueryName: ", 'Query' + str(bench_number), ",", + "FirstExecTimeMS: ", first_exec_time, ",", + "WorstExecTimeMS: ", worst_exec_time, ",", + "BestExecTimeMS: ", best_exec_time, ",", + "AverageExecTimeMS: ", average_exec_time, ",", + "TotalTimeMS: ", total_exec_time, ",", + "", '\n', file=report, sep='', end='', flush=True) + if db_reporter is not None: + db_reporter.submit({ + 'FilesNumber': data_files_number, + 'QueryName': 'Query' + str(bench_number), + 'FirstExecTimeMS': first_exec_time, + 'WorstExecTimeMS': worst_exec_time, + 'BestExecTimeMS': best_exec_time, + 'AverageExecTimeMS': average_exec_time, + 'TotalTimeMS': total_exec_time + }) + except IOError as err: + print("Failed writing report file", args.r, err) +except Exception as exc: + print("Failed: ", exc) finally: - omnisci_server.terminate() + if omnisci_server: + omnisci_server.terminate() diff --git a/teamcity_build_scripts/01-build.sh b/teamcity_build_scripts/01-build.sh index 6ccfb6882ec23..a130d6a0e9449 100644 --- a/teamcity_build_scripts/01-build.sh +++ b/teamcity_build_scripts/01-build.sh @@ -9,9 +9,4 @@ cp /localdisk/benchmark_datasets/omnisci.conf . mkdir data ./bin/initdb --data data -mkdir ibis -cd ibis -git clone https://github.com/ibis-project/ibis.git . -pip install -r requirements.txt -python3 setup.py build -cd .. + diff --git a/teamcity_build_scripts/02-build_ibis.sh b/teamcity_build_scripts/02-build_ibis.sh new file mode 100644 index 0000000000000..af8e5ff5395ef --- /dev/null +++ b/teamcity_build_scripts/02-build_ibis.sh @@ -0,0 +1 @@ +python3 run_ibis_tests.py --env_name ibis-test --env_check False --save_env True --python_version 3.7 --task build --name agent_test_ibis --ci_requirements "${PWD}"/ci_requirements.yml --ibis_path $PWD/../ibis/ --executable $PWD/../omniscidb/build/bin/omnisci_server \ No newline at end of file diff --git a/teamcity_build_scripts/02-sanity_tests.sh b/teamcity_build_scripts/03-sanity_tests.sh similarity index 100% rename from teamcity_build_scripts/02-sanity_tests.sh rename to teamcity_build_scripts/03-sanity_tests.sh diff --git a/teamcity_build_scripts/04-ibis_test.sh b/teamcity_build_scripts/04-ibis_test.sh new file mode 100644 index 0000000000000..e5357f6998931 --- /dev/null +++ b/teamcity_build_scripts/04-ibis_test.sh @@ -0,0 +1 @@ +python3 run_ibis_tests.py --env_name ibis-test --env_check True --save_env True --python_version 3.7 --task test --name agent_test_ibis --report "${PWD}"/.. --ibis_path "${PWD}"/../ibis/ --executable "${PWD}"/../omniscidb/build/bin/omnisci_server --user admin --password HyperInteractive -commit_omnisci ${BUILD_REVISION} -commit_ibis ${BUILD_IBIS_REVISION} \ No newline at end of file diff --git a/teamcity_build_scripts/03-synthetic_BaselineHash.sh b/teamcity_build_scripts/05-synthetic_BaselineHash.sh similarity index 100% rename from teamcity_build_scripts/03-synthetic_BaselineHash.sh rename to teamcity_build_scripts/05-synthetic_BaselineHash.sh diff --git a/teamcity_build_scripts/04-synthetic_MultiStep.sh b/teamcity_build_scripts/06-synthetic_MultiStep.sh similarity index 100% rename from teamcity_build_scripts/04-synthetic_MultiStep.sh rename to teamcity_build_scripts/06-synthetic_MultiStep.sh diff --git a/teamcity_build_scripts/05-synthetic_NonGroupedAgg.sh b/teamcity_build_scripts/07-synthetic_NonGroupedAgg.sh similarity index 100% rename from teamcity_build_scripts/05-synthetic_NonGroupedAgg.sh rename to teamcity_build_scripts/07-synthetic_NonGroupedAgg.sh diff --git a/teamcity_build_scripts/06-synthetic_PerfectHashMultiCol.sh b/teamcity_build_scripts/08-synthetic_PerfectHashMultiCol.sh similarity index 100% rename from teamcity_build_scripts/06-synthetic_PerfectHashMultiCol.sh rename to teamcity_build_scripts/08-synthetic_PerfectHashMultiCol.sh diff --git a/teamcity_build_scripts/07-synthetic_PerfectHashSingleCol.sh b/teamcity_build_scripts/09-synthetic_PerfectHashSingleCol.sh similarity index 100% rename from teamcity_build_scripts/07-synthetic_PerfectHashSingleCol.sh rename to teamcity_build_scripts/09-synthetic_PerfectHashSingleCol.sh diff --git a/teamcity_build_scripts/08-synthetic_Sort.sh b/teamcity_build_scripts/10-synthetic_Sort.sh similarity index 100% rename from teamcity_build_scripts/08-synthetic_Sort.sh rename to teamcity_build_scripts/10-synthetic_Sort.sh diff --git a/teamcity_build_scripts/09-flights_7M.sh b/teamcity_build_scripts/11-flights_7M.sh similarity index 100% rename from teamcity_build_scripts/09-flights_7M.sh rename to teamcity_build_scripts/11-flights_7M.sh diff --git a/teamcity_build_scripts/10-ny_taxi_1_file.sh b/teamcity_build_scripts/12-ny_taxi_1_file.sh similarity index 100% rename from teamcity_build_scripts/10-ny_taxi_1_file.sh rename to teamcity_build_scripts/12-ny_taxi_1_file.sh diff --git a/teamcity_build_scripts/13-ny_taxi_ibis_20_files.sh b/teamcity_build_scripts/13-ny_taxi_ibis_20_files.sh deleted file mode 100644 index cb55025c3c2fc..0000000000000 --- a/teamcity_build_scripts/13-ny_taxi_ibis_20_files.sh +++ /dev/null @@ -1,3 +0,0 @@ -ROOT_DIR="${PWD}" -cd omniscripts/taxi -python3 taxibench_ibis.py -e="${ROOT_DIR}"/omniscidb/build/bin/omnisql --port 61274 -df 20 -dp '/localdisk/benchmark_datasets/taxi/trips_xa{a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t}.csv.gz' -i 5 -db-server=ansatlin07.an.intel.com -db-user=gashiman -db-pass=omniscidb -db-name=omniscidb -db-table=taxibench_ibis -commit ${BUILD_REVISION} diff --git a/teamcity_build_scripts/11-ny_taxi_pandas_1_file.sh b/teamcity_build_scripts/13-ny_taxi_pandas_1_file.sh similarity index 100% rename from teamcity_build_scripts/11-ny_taxi_pandas_1_file.sh rename to teamcity_build_scripts/13-ny_taxi_pandas_1_file.sh diff --git a/teamcity_build_scripts/12-ny_taxi_20_files.sh b/teamcity_build_scripts/14-ny_taxi_20_files.sh similarity index 100% rename from teamcity_build_scripts/12-ny_taxi_20_files.sh rename to teamcity_build_scripts/14-ny_taxi_20_files.sh diff --git a/teamcity_build_scripts/14-santander_ibis.sh b/teamcity_build_scripts/14-santander_ibis.sh deleted file mode 100644 index 1f0b8dfe0cd02..0000000000000 --- a/teamcity_build_scripts/14-santander_ibis.sh +++ /dev/null @@ -1,3 +0,0 @@ -ROOT_DIR="${PWD}" -cd omniscripts/santander -python3 santander_ibis.py -e="${ROOT_DIR}"/omniscidb/build/bin/omnisql -port 61274 -dp '/localdisk/benchmark_datasets/santander/train.csv.gz' -i 5 -db-server=ansatlin07.an.intel.com -db-user=gashiman -db-pass=omniscidb -db-name=omniscidb -db-table=santander_ibis -commit ${BUILD_REVISION} diff --git a/teamcity_build_scripts/15-ny_taxi_ibis_20_files.sh b/teamcity_build_scripts/15-ny_taxi_ibis_20_files.sh new file mode 100644 index 0000000000000..c9b59c282e8c5 --- /dev/null +++ b/teamcity_build_scripts/15-ny_taxi_ibis_20_files.sh @@ -0,0 +1 @@ +python3 run_ibis_tests.py --env_name ibis-test --env_check True --python_version 3.7 --task benchmark --ci_requirements "${PWD}"/ci_requirements.yml --save_env True --report "${PWD}"/.. --ibis_path "${PWD}"/../ibis/ --executable "${PWD}"/../omniscidb/build/bin/omnisci_server -u admin -p HyperInteractive -n agent_test_ibis --bench_name ny_taxi --dfiles_num 20 --dpattern '/localdisk/benchmark_datasets/taxi/trips_xa{a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t}.csv.gz' --iters 5 -db-server ansatlin07.an.intel.com -db-port 3306 -db-user gashiman -db-pass omniscidb -db-name omniscidb -db-table taxibench_ibis -commit_omnisci ${BUILD_REVISION} -commit_ibis ${BUILD_IBIS_REVISION} \ No newline at end of file diff --git a/teamcity_build_scripts/16-santander_ibis.sh b/teamcity_build_scripts/16-santander_ibis.sh new file mode 100644 index 0000000000000..a78c17440267e --- /dev/null +++ b/teamcity_build_scripts/16-santander_ibis.sh @@ -0,0 +1 @@ +python3 run_ibis_tests.py --env_name ibis-test --env_check True --python_version 3.7 --task benchmark --ci_requirements "${PWD}"/ci_requirements.yml --save_env True --report "${PWD}"/.. --ibis_path "${PWD}"/../ibis/ --executable "${PWD}"/../omniscidb/build/bin/omnisci_server -u admin -p HyperInteractive -n agent_test_ibis --bench_name santander --dpattern '/localdisk/benchmark_datasets/santander/train.csv.gz' --iters 5 -db-server ansatlin07.an.intel.com -db-port 3306 -db-user gashiman -db-pass omniscidb -db-name omniscidb -db-table santander_ibis -commit_omnisci ${BUILD_REVISION} -commit_ibis ${BUILD_IBIS_REVISION} \ No newline at end of file diff --git a/teamcity_build_scripts/README.md b/teamcity_build_scripts/README.md index a76ce4ac325d0..be461b0b82340 100644 --- a/teamcity_build_scripts/README.md +++ b/teamcity_build_scripts/README.md @@ -9,3 +9,8 @@ pymapd, braceexpand, mysql-connector-python. 4. User should have permissions to connect to MySQL host and insert records there. Modify test scripts with user credentials and MySQL server host name. +5. User should have installed conda or miniconda. Ibis tests and benchmarks +run through run_ibis_test.py script which initially creates environment with +combined requirements from ([ibis](https://github.com/ibis-project/ibis/blob/master/ci/requirements-3.7-dev.yml)) +and your personal requirements, default is ci_requirements.yml. +All subsequent tasks such as build, test or benchmarks running are being done in created conda env. \ No newline at end of file