- Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathpg_qs_test_runner.py
146 lines (127 loc) · 3.78 KB
/
pg_qs_test_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
'''
pg_qs_test_runner.py
Copyright (c) 2016-2024, Postgres Professional
'''
importargparse
importgetpass
importos
importsys
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(os.path.abspath('tmp/env'))
importpsycopg2
fromtest_casesimport*
importtpcds
classPasswordPromptAction(argparse.Action):
def__call__(self, parser, args, values, option_string=None):
password=getpass.getpass()
setattr(args, self.dest, password)
classSetupException(Exception): pass
classTeardownException(Exception): pass
unlock_if_eq_1="""
CREATE OR REPLACE FUNCTION unlock_if_eq_1(x integer) RETURNS integer AS $$
BEGIN
IF x = 1 THEN
perform pg_advisory_unlock(1);
perform pg_advisory_lock(2);
return 1;
ELSE
return x;
END IF;
END;
$$ LANGUAGE plpgsql
"""
setup_cmd= [
'drop extension if exists pg_query_state cascade',
'drop table if exists foo cascade',
'drop table if exists bar cascade',
'create extension pg_query_state',
'create table foo(c1 integer, c2 text)',
'create table bar(c1 integer, c2 boolean)',
'insert into foo select i, md5(random()::text) from generate_series(1, 1000000) as i',
'insert into bar select i, i%2=1 from generate_series(1, 500000) as i',
'analyze foo',
'analyze bar',
unlock_if_eq_1,
]
teardown_cmd= [
'drop table foo cascade',
'drop table bar cascade',
'drop extension pg_query_state cascade',
]
tests= [
test_deadlock,
test_simple_query,
test_concurrent_access,
test_nested_call,
test_trigger,
test_costs,
test_buffers,
test_timing,
test_formats,
test_timing_buffers_conflicts,
test_insert_on_conflict,
]
defsetup(con):
''' Creates pg_query_state extension, creates tables for tests, fills it with data '''
print('setting up...')
try:
cur=con.cursor()
forcmdinsetup_cmd:
cur.execute(cmd)
con.commit()
cur.close()
exceptExceptionase:
raiseSetupException('Setup failed: %s'%e)
print('done!')
defteardown(con):
''' Drops table and extension '''
print('tearing down...')
try:
cur=con.cursor()
forcmdinteardown_cmd:
cur.execute(cmd)
con.commit()
cur.close()
exceptExceptionase:
raiseTeardownException('Teardown failed: %s'%e)
print('done!')
defmain(config):
''' Main test function '''
conn_params= {
key:config.__dict__[key] forkeyin ('host', 'port', 'user', 'database', 'password')
}
ifconfig.tpcds_setup:
print('Setup database for TPC-DS bench')
tpcds.setup_tpcds(conn_params)
print('Database is setup successfully')
return
ifconfig.tpcds_run:
print('Starting stress test')
tpcds.run_tpcds(conn_params)
print('Stress finished successfully')
return
# run default tests
init_conn=psycopg2.connect(**conn_params)
setup(init_conn)
fori, testinenumerate(tests):
iftest.__doc__:
descr=test.__doc__
else:
descr='test case %d'% (i+1)
print(("%s..."%descr))
sys.stdout.flush()
test(conn_params)
print('ok!')
teardown(init_conn)
init_conn.close()
if__name__=='__main__':
parser=argparse.ArgumentParser(description='Query state of running backends tests')
parser.add_argument('--host', default='localhost', help='postgres server host')
parser.add_argument('--port', type=int, default=5432, help='postgres server port')
parser.add_argument('--user', dest='user', default='postgres', help='user name')
parser.add_argument('--database', dest='database', default='postgres', help='database name')
parser.add_argument('--password', dest='password', nargs=0, action=PasswordPromptAction, default='', help='password')
parser.add_argument('--tpc-ds-setup', dest='tpcds_setup', action='store_true', help='setup database to run TPC-DS benchmark')
parser.add_argument('--tpc-ds-run', dest='tpcds_run', action='store_true', help='run only stress test based on TPC-DS benchmark')
args=parser.parse_args()
main(args)