- Notifications
You must be signed in to change notification settings - Fork 31.7k
/
Copy pathsignals.py
71 lines (59 loc) · 2.35 KB
/
signals.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
importsignal
importweakref
fromfunctoolsimportwraps
__unittest=True
class_InterruptHandler(object):
def__init__(self, default_handler):
self.called=False
self.original_handler=default_handler
ifisinstance(default_handler, int):
ifdefault_handler==signal.SIG_DFL:
# Pretend it's signal.default_int_handler instead.
default_handler=signal.default_int_handler
elifdefault_handler==signal.SIG_IGN:
# Not quite the same thing as SIG_IGN, but the closest we
# can make it: do nothing.
defdefault_handler(unused_signum, unused_frame):
pass
else:
raiseTypeError("expected SIGINT signal handler to be "
"signal.SIG_IGN, signal.SIG_DFL, or a "
"callable object")
self.default_handler=default_handler
def__call__(self, signum, frame):
installed_handler=signal.getsignal(signal.SIGINT)
ifinstalled_handlerisnotself:
# if we aren't the installed handler, then delegate immediately
# to the default handler
self.default_handler(signum, frame)
ifself.called:
self.default_handler(signum, frame)
self.called=True
forresultin_results.keys():
result.stop()
_results=weakref.WeakKeyDictionary()
defregisterResult(result):
_results[result] =1
defremoveResult(result):
returnbool(_results.pop(result, None))
_interrupt_handler=None
definstallHandler():
global_interrupt_handler
if_interrupt_handlerisNone:
default_handler=signal.getsignal(signal.SIGINT)
_interrupt_handler=_InterruptHandler(default_handler)
signal.signal(signal.SIGINT, _interrupt_handler)
defremoveHandler(method=None):
ifmethodisnotNone:
@wraps(method)
definner(*args, **kwargs):
initial=signal.getsignal(signal.SIGINT)
removeHandler()
try:
returnmethod(*args, **kwargs)
finally:
signal.signal(signal.SIGINT, initial)
returninner
global_interrupt_handler
if_interrupt_handlerisnotNone:
signal.signal(signal.SIGINT, _interrupt_handler.original_handler)