Skip to content
This repository has been archived by the owner on Jul 7, 2023. It is now read-only.

hacked a deltas phase to play with applying deltas #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions tpch4pgsql/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,33 @@ def run_throughput_test(query_root, data_dir, update_dir, delete_dir, generated_
return 1
return 0

def apply_deltas(data_dir, update_dir,
host, port, database, user, password,
stream_number, verbose):
"""

:param data_dir: subdirectory with data to be loaded
:param update_dir: subdirectory with data to be updated
:param host: hostname where the Postgres database is running
:param port: port number where the Postgres database is listening
:param database: database name, where the benchmark will be run
:param user: username of the Postgres user with full access to the benchmark DB
:param password: password for the Postgres user
:param stream_number: stream to execute based on -n during prepare, indexed by zero
:param verbose: True if more verbose output is required
without (re)loading the data, e.g. while developing
:return: 0 if successful, 1 otherwise
"""
try:
print("apply_deltas started ...")
conn = pgdb.PGDB(host, port, database, user, password)
stream = int(stream_number)
if refresh_func1(conn, data_dir, update_dir, stream, 0, verbose):
return 1
except Exception as e:
print("unable to apply deltas. DB connection failed: %s" % e)
return 1
return 0

def get_json_files_from(path):
"""Get list of all JSON file names in path
Expand Down
21 changes: 14 additions & 7 deletions tpch_pgsql.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# default values for command line arguments:
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 5432
DEFAULT_USERNAME = "postgres"
DEFAULT_PASSWORD = "test123"
DEFAULT_DBNAME = "tpch"
DEFAULT_USERNAME = "tpch"
DEFAULT_PASSWORD = "tpch"
DEFAULT_DBNAME = "tpchdb"
DEFAULT_DATA_DIR = os.path.join(".", "data")
DEFAULT_QUERY_ROOT = os.path.join(".", "query_root")
DEFAULT_DBGEN_DIR = os.path.join(".", "tpch-dbgen")
Expand Down Expand Up @@ -78,7 +78,7 @@ def scale_to_num_streams(scale):

def main(phase, host, port, user, password, database,
dbgen_dir, data_dir, query_root,
scale, num_streams, verbose, read_only):
scale, num_streams, verbose, read_only, delta_stream):
# TODO: unify doctsring, some is in reStructuredText, some is Google style
# TODO: finish sphinx integration
"""Runs main code for three different phases.
Expand Down Expand Up @@ -157,12 +157,16 @@ def main(phase, host, port, user, password, database,
exit(1)
print("done performance tests")
query.calc_metrics(RESULTS_DIR, run_timestamp, scale, num_streams)

elif phase == "deltas":
if query.apply_deltas(data_dir, UPDATE_DIR, host, port, database, user, password, delta_stream, verbose):
print("applying deltas failed")
exit(1)
print("done applying deltas")

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="tpch_pgsql")

parser.add_argument("phase", choices=["prepare", "load", "query"],
parser.add_argument("phase", choices=["prepare", "load", "query", "deltas"],
help="Phase of TPC-H benchmark to run.")
parser.add_argument("-H", "--host", default=DEFAULT_HOST,
help="Address of host on which PostgreSQL instance runs; default is %s" % DEFAULT_HOST)
Expand Down Expand Up @@ -190,6 +194,8 @@ def main(phase, host, port, user, password, database,
parser.add_argument("-r", "--read-only", action="store_true",
help="Do not execute refresh functions during the query phase, " +
"which allows for running it repeatedly")
parser.add_argument("-x", "--delta-stream", type=int, default=0,
help="Delta stream to run indexed by 0, based on num_streams set during prepare")
args = parser.parse_args()

# Extract all arguments into variables
Expand All @@ -206,10 +212,11 @@ def main(phase, host, port, user, password, database,
password = args.password
verbose = args.verbose
read_only = args.read_only
delta_stream = args.delta_stream

# if no num_streams was provided, then calculate default based on scale factor
if num_streams == 0:
num_streams = scale_to_num_streams(scale)

# main
main(phase, host, port, user, password, database, dbgen_dir, data_dir, query_root, scale, num_streams, verbose, read_only)
main(phase, host, port, user, password, database, dbgen_dir, data_dir, query_root, scale, num_streams, verbose, read_only, delta_stream)