-
Notifications
You must be signed in to change notification settings - Fork 26
Add TPC-DS stress tests #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
957d83f
bbc89f5
459a060
37bedbe
166ead3
91d7754
5bc7769
8c1a8d7
7118ea2
7bb3c33
087b0c8
4d4734a
80a5e1a
302350c
18f5a38
02049e1
a4a2ec3
ba34af1
b2f8e82
772062a
2ac1f8a
61cf837
c1776bc
8545705
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,3 +9,5 @@ pg_query_state--*.sql | |
cscope.out | ||
tags | ||
Dockerfile | ||
tmp_stress | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
''' | ||
common.py | ||
Copyright (c) 2016-2019, Postgres Professional | ||
''' | ||
|
||
import psycopg2 | ||
import psycopg2.extensions | ||
import select | ||
import time | ||
|
||
BACKEND_IS_IDLE_INFO = 'INFO: state of backend is idle\n' | ||
BACKEND_IS_ACTIVE_INFO = 'INFO: state of backend is active\n' | ||
|
||
def wait(conn): | ||
"""wait for some event on connection to postgres""" | ||
while 1: | ||
state = conn.poll() | ||
if state == psycopg2.extensions.POLL_OK: | ||
break | ||
elif state == psycopg2.extensions.POLL_WRITE: | ||
select.select([], [conn.fileno()], []) | ||
elif state == psycopg2.extensions.POLL_READ: | ||
select.select([conn.fileno()], [], []) | ||
else: | ||
raise psycopg2.OperationalError("poll() returned %s" % state) | ||
|
||
def n_async_connect(config, n=1): | ||
"""establish n asynchronious connections to the postgres with specified config""" | ||
|
||
aconfig = config.copy() | ||
aconfig['async'] = True | ||
|
||
result = [] | ||
for _ in range(n): | ||
conn = psycopg2.connect(**aconfig) | ||
wait(conn) | ||
result.append(conn) | ||
return result | ||
|
||
def n_close(conns): | ||
"""close connections to postgres""" | ||
|
||
for conn in conns: | ||
conn.close() | ||
|
||
def pg_query_state(config, pid, verbose=False, costs=False, timing=False, \ | ||
buffers=False, triggers=False, format='text'): | ||
""" | ||
Get query state from backend with specified pid and optional parameters. | ||
Save any warning, info, notice and log data in global variable 'notices' | ||
""" | ||
|
||
conn = psycopg2.connect(**config) | ||
curs = conn.cursor() | ||
|
||
curs.callproc('pg_query_state', (pid, verbose, costs, timing, buffers, triggers, format)) | ||
result = curs.fetchall() | ||
notices = conn.notices[:] | ||
conn.close() | ||
|
||
return result, notices | ||
|
||
def onetime_query_state(config, async_conn, query, args={}, num_workers=0): | ||
""" | ||
Get intermediate state of 'query' on connection 'async_conn' after number of 'steps' | ||
of node executions from start of query | ||
""" | ||
|
||
acurs = async_conn.cursor() | ||
|
||
set_guc(async_conn, 'enable_mergejoin', 'off') | ||
set_guc(async_conn, 'max_parallel_workers_per_gather', num_workers) | ||
acurs.execute(query) | ||
|
||
# extract current state of query progress | ||
MAX_PG_QS_RETRIES = 10 | ||
DELAY_BETWEEN_RETRIES = 0.1 | ||
pg_qs_args = { | ||
'config': config, | ||
'pid': async_conn.get_backend_pid() | ||
} | ||
for k, v in args.items(): | ||
pg_qs_args[k] = v | ||
n_retries = 0 | ||
while True: | ||
result, notices = pg_query_state(**pg_qs_args) | ||
n_retries += 1 | ||
if len(result) > 0: | ||
break | ||
if n_retries >= MAX_PG_QS_RETRIES: | ||
# pg_query_state callings don't return any result, more likely run | ||
# query has completed | ||
break | ||
time.sleep(DELAY_BETWEEN_RETRIES) | ||
wait(async_conn) | ||
|
||
set_guc(async_conn, 'enable_mergejoin', 'on') | ||
return result, notices | ||
|
||
def set_guc(async_conn, param, value): | ||
acurs = async_conn.cursor() | ||
acurs.execute('set %s to %s' % (param, value)) | ||
wait(async_conn) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,13 +1,17 @@ | ||
''' | ||
pg_qs_test_cases.py | ||
Tests extract query state from running backend (including concurrent extracts) | ||
Copyright (c) 2016-2016, Postgres Professional | ||
pg_qs_test_runner.py | ||
Copyright (c) 2016-2019, Postgres Professional | ||
''' | ||
|
||
import argparse | ||
import getpass | ||
import os | ||
import psycopg2 | ||
import sys | ||
|
||
sys.path.append(os.path.dirname(os.path.abspath(__file__))) | ||
from test_cases import * | ||
import tpcds | ||
|
||
class PasswordPromptAction(argparse.Action): | ||
def __call__(self, parser, args, values, option_string=None): | ||
|
@@ -28,77 +32,97 @@ class TeardownException(Exception): pass | |
'insert into bar select i, i%2=1 from generate_series(1, 500000) as i', | ||
'analyze foo', | ||
'analyze bar', | ||
] | ||
] | ||
|
||
teardown_cmd = [ | ||
'drop table foo cascade', | ||
'drop table bar cascade', | ||
'drop extension pg_query_state cascade', | ||
] | ||
] | ||
|
||
tests = [ | ||
test_deadlock, | ||
test_simple_query, | ||
test_concurrent_access, | ||
test_nested_call, | ||
test_trigger, | ||
test_costs, | ||
test_buffers, | ||
test_timing, | ||
test_formats, | ||
test_timing_buffers_conflicts, | ||
test_insert_on_conflict, | ||
] | ||
test_deadlock, | ||
test_simple_query, | ||
test_concurrent_access, | ||
test_nested_call, | ||
test_trigger, | ||
test_costs, | ||
test_buffers, | ||
test_timing, | ||
test_formats, | ||
test_timing_buffers_conflicts, | ||
test_insert_on_conflict, | ||
] | ||
|
||
def setup(con): | ||
''' Creates pg_query_state extension, creates tables for tests, fills it with data ''' | ||
print 'setting up...' | ||
print('setting up...') | ||
try: | ||
cur = con.cursor() | ||
for cmd in setup_cmd: | ||
cur.execute(cmd) | ||
con.commit() | ||
cur.close() | ||
except Exception, e: | ||
except Exception as e: | ||
raise SetupException('Setup failed: %s' % e) | ||
print 'done!' | ||
print('done!') | ||
|
||
def teardown(con): | ||
''' Drops table and extension ''' | ||
print 'tearing down...' | ||
print('tearing down...') | ||
try: | ||
cur = con.cursor() | ||
for cmd in teardown_cmd: | ||
cur.execute(cmd) | ||
con.commit() | ||
cur.close() | ||
except Exception, e: | ||
except Exception as e: | ||
raise TeardownException('Teardown failed: %s' % e) | ||
print 'done!' | ||
print('done!') | ||
|
||
def main(config): | ||
''' Main test function ''' | ||
con = psycopg2.connect(**config) | ||
setup(con) | ||
conn_params = { | ||
key:config.__dict__[key] for key in ('host', 'port', 'user', 'database', 'password') | ||
} | ||
|
||
if config.tpcds_setup: | ||
print('Setup database for TPC-DS bench') | ||
tpcds.setup_tpcds(conn_params) | ||
print('Database is setup successfully') | ||
return | ||
|
||
if config.tpcds_run: | ||
print('Starting stress test') | ||
tpcds.run_tpcds(conn_params) | ||
print('Stress finished successfully') | ||
return | ||
|
||
# run default tests | ||
init_conn = psycopg2.connect(**conn_params) | ||
setup(init_conn) | ||
for i, test in enumerate(tests): | ||
if test.__doc__: | ||
descr = test.__doc__ | ||
else: | ||
descr = 'test case %d' % (i+1) | ||
print ("%s..." % descr),; sys.stdout.flush() | ||
test(config) | ||
print 'ok!' | ||
|
||
teardown(con) | ||
con.close() | ||
print(("%s..." % descr)) | ||
sys.stdout.flush() | ||
test(conn_params) | ||
print('ok!') | ||
teardown(init_conn) | ||
init_conn.close() | ||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser(description='Query state of running backends tests') | ||
|
||
parser.add_argument('--host', default='localhost', help='postgres server host') | ||
parser.add_argument('--port', type=int, default=5432, help='postgres server port') | ||
parser.add_argument('--user', dest='user', default='postgres', help='user name') | ||
parser.add_argument('--database', dest='database', default='postgres', help='database name') | ||
parser.add_argument('--password', dest='password', nargs=0, action=PasswordPromptAction, default='') | ||
parser.add_argument('--password', dest='password', nargs=0, action=PasswordPromptAction, default='', help='password') | ||
parser.add_argument('--tpc-ds-setup', dest='tpcds_setup', action='store_true', help='setup database to run TPC-DS benchmark') | ||
parser.add_argument('--tpc-ds-run', dest='tpcds_run', action='store_true', help='run only stress test based on TPC-DS benchmark') | ||
Comment on lines
+125
to
+126
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you separate TPC-DS run and setup? You cannot run tests without a setup I guess. And what is the point to setup without run? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the whole task(TPC-DS running) is huge and its running takes much time. Clearly, it have to be decomposed to separate subtasks so to be able to run their more granular. This separation is the fist step to achieve it. In particular, it provides to run tpc-ds bench multiple times for debugging purpose without initializing database at all time. |
||
|
||
args = parser.parse_args() | ||
main(args.__dict__) | ||
main(args) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
#!/usr/bin/env sh | ||
|
||
mkdir -p tmp_stress | ||
cd tmp_stress | ||
rm -rf ./* | ||
|
||
git clone --depth 1 --single-branch --branch master https://github.com/gregrahn/tpcds-kit.git | ||
git clone --depth 1 --single-branch --branch master https://github.com/cwida/tpcds-result-reproduction.git | ||
|
||
cd tpcds-kit/tools | ||
make -s | ||
|
||
#Generate data | ||
./dsdgen -FORCE -VERBOSE -SCALE 1 | ||
|
||
#Prepare data | ||
mkdir -p tables | ||
for i in `ls *.dat`; do | ||
echo "Preparing file" $i | ||
sed 's/|$//' $i > tables/$i | ||
done | ||
|
||
#Generate queries | ||
./dsqgen -DIRECTORY ../query_templates \ | ||
-INPUT ../query_templates/templates.lst \ | ||
-VERBOSE Y \ | ||
-QUALIFY Y \ | ||
-SCALE 1 \ | ||
-DIALECT netezza |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
PyYAML | ||
psycopg2 | ||
progressbar2 |
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
''' | ||
test_cases.py | ||
Copyright (c) 2016-2019, Postgres Professional | ||
''' | ||
|
||
import common | ||
import os | ||
import progressbar | ||
import psycopg2.extensions | ||
import subprocess | ||
import time | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer following order (built-in library first, extra modules second, local modules last): import os
import time
import subprocess
import psycopg2.extensions
import progressbar
import .common Just for cleanliness There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thx, accepted |
||
|
||
class DataLoadException(Exception): pass | ||
class StressTestException(Exception): pass | ||
|
||
TPC_DS_EXCLUDE_LIST = [] # actual numbers of TPC-DS tests to exclude | ||
TPC_DS_STATEMENT_TIMEOUT = 20000 # statement_timeout in ms | ||
|
||
def setup_tpcds(config): | ||
print('Setting up TPC-DS test...') | ||
subprocess.call(['./tests/prepare_stress.sh']) | ||
|
||
try: | ||
conn = psycopg2.connect(**config) | ||
cur = conn.cursor() | ||
|
||
# Create pg_query_state extension | ||
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_query_state') | ||
|
||
# Create tables | ||
with open('tmp_stress/tpcds-kit/tools/tpcds.sql', 'r') as f: | ||
cur.execute(f.read()) | ||
|
||
# Copy table data from files | ||
for table_datafile in os.listdir('tmp_stress/tpcds-kit/tools/'): | ||
if table_datafile.endswith('.dat'): | ||
table_name = os.path.splitext(os.path.basename(table_datafile))[0] | ||
|
||
print('Loading table', table_name) | ||
with open('tmp_stress/tpcds-kit/tools/tables/%s' % table_datafile) as f: | ||
cur.copy_from(f, table_name, sep='|', null='') | ||
|
||
conn.commit() | ||
|
||
except Exception as e: | ||
cur.close() | ||
conn.close() | ||
raise DataLoadException('Load failed: %s' % e) | ||
|
||
print('done!') | ||
|
||
def run_tpcds(config): | ||
"""TPC-DS stress test""" | ||
|
||
print('Preparing TPC-DS queries...') | ||
queries = [] | ||
for query_file in sorted(os.listdir('tmp_stress/tpcds-result-reproduction/query_qualification/')): | ||
with open('tmp_stress/tpcds-result-reproduction/query_qualification/%s' % query_file, 'r') as f: | ||
queries.append(f.read()) | ||
|
||
acon, = common.n_async_connect(config) | ||
pid = acon.get_backend_pid() | ||
|
||
print('Starting TPC-DS queries...') | ||
timeout_list = [] | ||
bar = progressbar.ProgressBar(max_value=len(queries)) | ||
for i, query in enumerate(queries): | ||
bar.update(i + 1) | ||
if i + 1 in TPC_DS_EXCLUDE_LIST: | ||
continue | ||
try: | ||
# Set query timeout to TPC_DS_STATEMENT_TIMEOUT / 1000 seconds | ||
common.set_guc(acon, 'statement_timeout', TPC_DS_STATEMENT_TIMEOUT) | ||
|
||
# run query | ||
acurs = acon.cursor() | ||
acurs.execute(query) | ||
|
||
# periodically run pg_query_state on running backend trying to get | ||
# crash of PostgreSQL | ||
MAX_FIRST_GETTING_QS_RETRIES = 10 | ||
PG_QS_DELAY, BEFORE_GETTING_QS_DELAY = 0.1, 0.1 | ||
BEFORE_GETTING_QS, GETTING_QS = range(2) | ||
Comment on lines
+83
to
+85
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These look like constants, so I would prefer to move them to the place near There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Accepted |
||
state, n_first_getting_qs_retries = BEFORE_GETTING_QS, 0 | ||
while True: | ||
result, notices = common.pg_query_state(config, pid) | ||
# run state machine to determine the first getting query state | ||
# and query finishing | ||
if state == BEFORE_GETTING_QS: | ||
if len(result) > 0 or common.BACKEND_IS_ACTIVE_INFO in notices: | ||
state = GETTING_QS | ||
continue | ||
n_first_getting_qs_retries += 1 | ||
if n_first_getting_qs_retries >= MAX_FIRST_GETTING_QS_RETRIES: | ||
# pg_query_state callings don't return any result, more likely run | ||
# query has completed | ||
break | ||
time.sleep(BEFORE_GETTING_QS_DELAY) | ||
elif state == GETTING_QS: | ||
if common.BACKEND_IS_IDLE_INFO in notices: | ||
break | ||
time.sleep(PG_QS_DELAY) | ||
|
||
# wait for real query completion | ||
common.wait(acon) | ||
|
||
except psycopg2.extensions.QueryCanceledError: | ||
timeout_list.append(i + 1) | ||
|
||
common.n_close((acon,)) | ||
|
||
if len(timeout_list) > 0: | ||
print('\nThere were pg_query_state timeouts (%s s) on queries:' % TPC_DS_STATEMENT_TIMEOUT, timeout_list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you can put 2020 everywhere already :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done