- Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathtpcds.py
145 lines (116 loc) · 4.31 KB
/
tpcds.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
'''
test_cases.py
Copyright (c) 2016-2024, Postgres Professional
'''
importos
importsubprocess
importtime
importprogressbar
# This actually imports progressbar2 but `import progressbar2' itself doesn't work.
# In case of problems with the progressbar/progressbar2, check that you have the
# progressbar2 installed and the path to it or venv is specified.
importpsycopg2.extensions
importcommon
classDataLoadException(Exception): pass
classStressTestException(Exception): pass
defsetup_tpcds(config):
print('Setting up TPC-DS test...')
subprocess.call(['./tests/prepare_stress.sh'])
try:
conn=psycopg2.connect(**config)
cur=conn.cursor()
exceptExceptionase:
raiseDataLoadException('Load failed: %s'%e)
try:
# Create pg_query_state extension
cur.execute('CREATE EXTENSION IF NOT EXISTS pg_query_state')
# Create tables
withopen('tmp_stress/tpcds-kit/tools/tpcds.sql', 'r') asf:
cur.execute(f.read())
# Copy table data from files
fortable_datafileinos.listdir('tmp_stress/tpcds-kit/tools/'):
iftable_datafile.endswith('.dat'):
table_name=os.path.splitext(os.path.basename(table_datafile))[0]
print('Loading table', table_name)
withopen('tmp_stress/tpcds-kit/tools/tables/%s'%table_datafile) asf:
cur.copy_from(f, table_name, sep='|', null='')
conn.commit()
exceptExceptionase:
cur.close()
conn.close()
raiseDataLoadException('Load failed: %s'%e)
print('done!')
defrun_tpcds(config):
"""TPC-DS stress test"""
TPC_DS_EXCLUDE_LIST= [] # actual numbers of TPC-DS tests to exclude
TPC_DS_STATEMENT_TIMEOUT=20000# statement_timeout in ms
print('Preparing TPC-DS queries...')
err_count=0
queries= []
forquery_fileinsorted(os.listdir('tmp_stress/tpcds-result-reproduction/query_qualification/')):
withopen('tmp_stress/tpcds-result-reproduction/query_qualification/%s'%query_file, 'r') asf:
queries.append(f.read())
acon, =common.n_async_connect(config)
print('Starting TPC-DS queries...')
timeout_list= []
bar=progressbar.ProgressBar(max_value=len(queries))
fori, queryinenumerate(queries):
bar.update(i+1)
ifi+1inTPC_DS_EXCLUDE_LIST:
continue
try:
# Set query timeout to TPC_DS_STATEMENT_TIMEOUT / 1000 seconds
common.set_guc(acon, 'statement_timeout', TPC_DS_STATEMENT_TIMEOUT)
# run query
acurs=acon.cursor()
acurs.execute(query)
# periodically run pg_query_state on running backend trying to get
# crash of PostgreSQL
MAX_FIRST_GETTING_QS_RETRIES=10
PG_QS_DELAY, BEFORE_GETTING_QS_DELAY=0.1, 0.1
BEFORE_GETTING_QS, GETTING_QS=range(2)
state, n_first_getting_qs_retries=BEFORE_GETTING_QS, 0
pg_qs_args= {
'config': config,
'pid': acon.get_backend_pid()
}
whileTrue:
try:
result, notices=common.pg_query_state(**pg_qs_args)
exceptExceptionase:
# do not consider the test failed if the "error in message
# queue data transmitting" is received, this may happen with
# some small probability, but if it happens too often it is
# a problem, we will handle this case after the loop
if"error in message queue data transmitting"ine.pgerror:
err_count+=1
else:
raisee
# run state machine to determine the first getting of query state
# and query finishing
ifstate==BEFORE_GETTING_QS:
iflen(result) >0orcommon.BACKEND_IS_ACTIVE_INFOinnotices:
state=GETTING_QS
continue
n_first_getting_qs_retries+=1
ifn_first_getting_qs_retries>=MAX_FIRST_GETTING_QS_RETRIES:
# pg_query_state callings don't return any result, more likely run
# query has completed
break
time.sleep(BEFORE_GETTING_QS_DELAY)
elifstate==GETTING_QS:
ifcommon.BACKEND_IS_IDLE_INFOinnotices:
break
time.sleep(PG_QS_DELAY)
# wait for real query completion
common.wait(acon)
exceptpsycopg2.extensions.QueryCanceledError:
timeout_list.append(i+1)
iferr_count>2:
print("\nERROR: error in message queue data transmitting")
raiseException('error was received %d times'%err_count)
eliferr_count>0:
print(err_count, " times there was error in message queue data transmitting")
common.n_close((acon,))
iflen(timeout_list) >0:
print('\nThere were pg_query_state timeouts (%s s) on queries:'%TPC_DS_STATEMENT_TIMEOUT, timeout_list)