- Notifications
You must be signed in to change notification settings - Fork 417
/
Copy pathtest_adversity.py
73 lines (61 loc) · 2.54 KB
/
test_adversity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# Copyright (C) 2016-present the asyncpg authors and contributors
# <see AUTHORS file>
#
# This module is part of asyncpg and is released under
# the Apache 2.0 License: http://www.apache.org/licenses/LICENSE-2.0
"""Tests how asyncpg behaves in non-ideal conditions."""
importasyncio
importos
importplatform
importunittest
importsys
fromasyncpgimport_testbaseastb
@unittest.skipIf(os.environ.get('PGHOST'), 'using remote cluster for testing')
@unittest.skipIf(
platform.system() =='Windows'and
sys.version_info>= (3, 8),
'not compatible with ProactorEventLoop which is default in Python 3.8')
classTestConnectionLoss(tb.ProxiedClusterTestCase):
@tb.with_timeout(30.0)
asyncdeftest_connection_close_timeout(self):
con=awaitself.connect()
self.proxy.trigger_connectivity_loss()
withself.assertRaises(asyncio.TimeoutError):
awaitcon.close(timeout=0.5)
@tb.with_timeout(30.0)
asyncdeftest_pool_release_timeout(self):
pool=awaitself.create_pool(
database='postgres', min_size=2, max_size=2)
try:
withself.assertRaises(asyncio.TimeoutError):
asyncwithpool.acquire(timeout=0.5):
self.proxy.trigger_connectivity_loss()
finally:
self.proxy.restore_connectivity()
pool.terminate()
@tb.with_timeout(30.0)
asyncdeftest_pool_handles_abrupt_connection_loss(self):
pool_size=3
query_runtime=0.5
pool_timeout=cmd_timeout=1.0
concurrency=9
pool_concurrency= (concurrency-1) //pool_size+1
# Worst expected runtime + 20% to account for other latencies.
worst_runtime= (pool_timeout+cmd_timeout) *pool_concurrency*1.2
asyncdefworker(pool):
asyncwithpool.acquire(timeout=pool_timeout) ascon:
awaitcon.fetch('SELECT pg_sleep($1)', query_runtime)
defkill_connectivity():
self.proxy.trigger_connectivity_loss()
new_pool=self.create_pool(
database='postgres', min_size=pool_size, max_size=pool_size,
timeout=cmd_timeout, command_timeout=cmd_timeout)
withself.assertRunUnder(worst_runtime):
pool=awaitnew_pool
try:
workers= [worker(pool) for_inrange(concurrency)]
self.loop.call_later(1, kill_connectivity)
awaitasyncio.gather(
*workers, return_exceptions=True)
finally:
pool.terminate()