- Notifications
You must be signed in to change notification settings - Fork 417
/
Copy pathtest_cursor.py
161 lines (124 loc) · 5.78 KB
/
test_cursor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
importasyncpg
importinspect
fromasyncpgimport_testbaseastb
classTestIterableCursor(tb.ConnectedTestCase):
asyncdeftest_cursor_iterable_01(self):
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
expected=awaitst.fetch()
forprefetchinrange(1, 25):
withself.subTest(prefetch=prefetch):
asyncwithself.con.transaction():
result= []
asyncforrecinst.cursor(prefetch=prefetch):
result.append(rec)
self.assertEqual(
result, expected,
'result != expected for prefetch={}'.format(prefetch))
asyncdeftest_cursor_iterable_02(self):
# Test that it's not possible to create a cursor without hold
# outside of a transaction
s=awaitself.con.prepare(
'DECLARE t BINARY CURSOR WITHOUT HOLD FOR SELECT 1')
withself.assertRaises(asyncpg.NoActiveSQLTransactionError):
awaits.fetch()
# Now test that statement.cursor() does not let you
# iterate over it outside of a transaction
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
it=st.cursor(prefetch=5).__aiter__()
ifinspect.isawaitable(it):
it=awaitit
withself.assertRaisesRegex(asyncpg.NoActiveSQLTransactionError,
'cursor cannot be created.*transaction'):
awaitit.__anext__()
asyncdeftest_cursor_iterable_03(self):
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
it=st.cursor().__aiter__()
ifinspect.isawaitable(it):
it=awaitit
st._state.mark_closed()
withself.assertRaisesRegex(asyncpg.InterfaceError,
'statement is closed'):
asyncfor_init: # NOQA
pass
asyncdeftest_cursor_iterable_04(self):
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
st._state.mark_closed()
withself.assertRaisesRegex(asyncpg.InterfaceError,
'statement is closed'):
asyncfor_inst.cursor(): # NOQA
pass
asyncdeftest_cursor_iterable_05(self):
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
forprefetchinrange(-1, 1):
withself.subTest(prefetch=prefetch):
withself.assertRaisesRegex(asyncpg.InterfaceError,
'must be greater than zero'):
asyncfor_inst.cursor(prefetch=prefetch): # NOQA
pass
asyncdeftest_cursor_iterable_06(self):
recs= []
asyncwithself.con.transaction():
awaitself.con.execute('''
CREATE TABLE cursor_iterable_06 (id int);
INSERT INTO cursor_iterable_06 VALUES (0), (1);
''')
try:
cur=self.con.cursor('SELECT * FROM cursor_iterable_06')
asyncforrecincur:
recs.append(rec)
finally:
# Check that after iteration has exhausted the cursor,
# its associated portal is closed properly, unlocking
# the table.
awaitself.con.execute('DROP TABLE cursor_iterable_06')
self.assertEqual(recs, [(i,) foriinrange(2)])
classTestCursor(tb.ConnectedTestCase):
asyncdeftest_cursor_01(self):
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
withself.assertRaisesRegex(asyncpg.NoActiveSQLTransactionError,
'cursor cannot be created.*transaction'):
awaitst.cursor()
asyncdeftest_cursor_02(self):
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
asyncwithself.con.transaction():
cur=awaitst.cursor()
foriinrange(-1, 1):
withself.assertRaisesRegex(asyncpg.InterfaceError,
'greater than zero'):
awaitcur.fetch(i)
res=awaitcur.fetch(2)
self.assertEqual(res, [(0,), (1,)])
rec=awaitcur.fetchrow()
self.assertEqual(rec, (2,))
r=repr(cur)
self.assertTrue(r.startswith('<asyncpg.Cursor '))
self.assertNotIn(' exhausted ', r)
self.assertIn('"SELECT generate', r)
moved=awaitcur.forward(5)
self.assertEqual(moved, 5)
rec=awaitcur.fetchrow()
self.assertEqual(rec, (8,))
res=awaitcur.fetch(100)
self.assertEqual(res, [(i,) foriinrange(9, 21)])
self.assertIsNone(awaitcur.fetchrow())
self.assertEqual(awaitcur.fetch(5), [])
r=repr(cur)
self.assertTrue(r.startswith('<asyncpg.Cursor '))
self.assertIn(' exhausted ', r)
self.assertIn('"SELECT generate', r)
asyncdeftest_cursor_03(self):
st=awaitself.con.prepare('SELECT generate_series(0, 20)')
asyncwithself.con.transaction():
withself.assertRaisesRegex(asyncpg.InterfaceError,
'prefetch argument can only'):
awaitst.cursor(prefetch=10)
asyncdeftest_cursor_04(self):
asyncwithself.con.transaction():
st=awaitself.con.cursor('SELECT generate_series(0, 100)')
awaitst.forward(42)
self.assertEqual(awaitst.fetchrow(), (42,))