- Notifications
You must be signed in to change notification settings - Fork 31.7k
/
Copy pathpty_helper.py
80 lines (74 loc) · 2.98 KB
/
pty_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
"""
Helper to run a script in a pseudo-terminal.
"""
importos
importselectors
importsubprocess
importsys
fromcontextlibimportExitStack
fromerrnoimportEIO
fromtest.support.import_helperimportimport_module
defrun_pty(script, input=b"dummy input\r", env=None):
pty=import_module('pty')
output=bytearray()
[master, slave] =pty.openpty()
args= (sys.executable, '-c', script)
proc=subprocess.Popen(args, stdin=slave, stdout=slave, stderr=slave, env=env)
os.close(slave)
withExitStack() ascleanup:
cleanup.enter_context(proc)
defterminate(proc):
try:
proc.terminate()
exceptProcessLookupError:
# Workaround for Open/Net BSD bug (Issue 16762)
pass
cleanup.callback(terminate, proc)
cleanup.callback(os.close, master)
# Avoid using DefaultSelector and PollSelector. Kqueue() does not
# work with pseudo-terminals on OS X < 10.9 (Issue 20365) and Open
# BSD (Issue 20667). Poll() does not work with OS X 10.6 or 10.4
# either (Issue 20472). Hopefully the file descriptor is low enough
# to use with select().
sel=cleanup.enter_context(selectors.SelectSelector())
sel.register(master, selectors.EVENT_READ|selectors.EVENT_WRITE)
os.set_blocking(master, False)
whileTrue:
for [_, events] insel.select():
ifevents&selectors.EVENT_READ:
try:
chunk=os.read(master, 0x10000)
exceptOSErroraserr:
# Linux raises EIO when slave is closed (Issue 5380)
iferr.errno!=EIO:
raise
chunk=b""
ifnotchunk:
returnoutput
output.extend(chunk)
ifevents&selectors.EVENT_WRITE:
try:
input=input[os.write(master, input):]
exceptOSErroraserr:
# Apparently EIO means the slave was closed
iferr.errno!=EIO:
raise
input=b""# Stop writing
ifnotinput:
sel.modify(master, selectors.EVENT_READ)
######################################################################
## Fake stdin (for testing interactive debugging)
######################################################################
classFakeInput:
"""
A fake input stream for pdb's interactive debugger. Whenever a
line is read, print it (to simulate the user typing it), and then
return it. The set of lines to return is specified in the
constructor; they should not have trailing newlines.
"""
def__init__(self, lines):
self.lines=lines
defreadline(self):
line=self.lines.pop(0)
print(line)
returnline+'\n'