- Notifications
You must be signed in to change notification settings - Fork 31.7k
/
Copy pathutil.py
85 lines (67 loc) · 2.36 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
importcontextlib
importfunctools
importio
importre
importsqlite3
importtest.support
importunittest
# Helper for temporary memory databases
defmemory_database(*args, **kwargs):
cx=sqlite3.connect(":memory:", *args, **kwargs)
returncontextlib.closing(cx)
# Temporarily limit a database connection parameter
@contextlib.contextmanager
defcx_limit(cx, category=sqlite3.SQLITE_LIMIT_SQL_LENGTH, limit=128):
try:
_prev=cx.setlimit(category, limit)
yieldlimit
finally:
cx.setlimit(category, _prev)
defwith_tracebacks(exc, regex="", name=""):
"""Convenience decorator for testing callback tracebacks."""
defdecorator(func):
_regex=re.compile(regex) ifregexelseNone
@functools.wraps(func)
defwrapper(self, *args, **kwargs):
withtest.support.catch_unraisable_exception() ascm:
# First, run the test with traceback enabled.
withcheck_tracebacks(self, cm, exc, _regex, name):
func(self, *args, **kwargs)
# Then run the test with traceback disabled.
func(self, *args, **kwargs)
returnwrapper
returndecorator
@contextlib.contextmanager
defcheck_tracebacks(self, cm, exc, regex, obj_name):
"""Convenience context manager for testing callback tracebacks."""
sqlite3.enable_callback_tracebacks(True)
try:
buf=io.StringIO()
withcontextlib.redirect_stderr(buf):
yield
self.assertEqual(cm.unraisable.exc_type, exc)
ifregex:
msg=str(cm.unraisable.exc_value)
self.assertIsNotNone(regex.search(msg))
ifobj_name:
self.assertEqual(cm.unraisable.object.__name__, obj_name)
finally:
sqlite3.enable_callback_tracebacks(False)
classMemoryDatabaseMixin:
defsetUp(self):
self.con=sqlite3.connect(":memory:")
self.cur=self.con.cursor()
deftearDown(self):
self.cur.close()
self.con.close()
@property
defcx(self):
returnself.con
@property
defcu(self):
returnself.cur
defrequires_virtual_table(module):
withmemory_database() ascx:
supported= (module,) inlist(cx.execute("PRAGMA module_list"))
reason=f"Requires {module!r} virtual table support"
returnunittest.skipUnless(supported, reason)