- Notifications
You must be signed in to change notification settings - Fork 31.7k
/
Copy pathsocket_helper.py
339 lines (296 loc) · 13.2 KB
/
socket_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
importcontextlib
importerrno
importos.path
importsocket
importsys
importsubprocess
importtempfile
importunittest
from .. importsupport
HOST="localhost"
HOSTv4="127.0.0.1"
HOSTv6="::1"
# WASI SDK 15.0 does not provide gethostname, stub raises OSError ENOTSUP.
has_gethostname=notsupport.is_wasi
deffind_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
"""Returns an unused port that should be suitable for binding. This is
achieved by creating a temporary socket with the same family and type as
the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
the specified host address (defaults to 0.0.0.0) with the port set to 0,
eliciting an unused ephemeral port from the OS. The temporary socket is
then closed and deleted, and the ephemeral port is returned.
Either this method or bind_port() should be used for any tests where a
server socket needs to be bound to a particular port for the duration of
the test. Which one to use depends on whether the calling code is creating
a python socket, or if an unused port needs to be provided in a constructor
or passed to an external program (i.e. the -accept argument to openssl's
s_server mode). Always prefer bind_port() over find_unused_port() where
possible. Hard coded ports should *NEVER* be used. As soon as a server
socket is bound to a hard coded port, the ability to run multiple instances
of the test simultaneously on the same host is compromised, which makes the
test a ticking time bomb in a buildbot environment. On Unix buildbots, this
may simply manifest as a failed test, which can be recovered from without
intervention in most cases, but on Windows, the entire python process can
completely and utterly wedge, requiring someone to log in to the buildbot
and manually kill the affected process.
(This is easy to reproduce on Windows, unfortunately, and can be traced to
the SO_REUSEADDR socket option having different semantics on Windows versus
Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
listen and then accept connections on identical host/ports. An EADDRINUSE
OSError will be raised at some point (depending on the platform and
the order bind and listen were called on each socket).
However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
will ever be raised when attempting to bind two identical host/ports. When
accept() is called on each socket, the second caller's process will steal
the port from the first caller, leaving them both in an awkwardly wedged
state where they'll no longer respond to any signals or graceful kills, and
must be forcibly killed via OpenProcess()/TerminateProcess().
The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
instead of SO_REUSEADDR, which effectively affords the same semantics as
SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
Source world compared to Windows ones, this is a common mistake. A quick
look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
openssl.exe is called with the 's_server' option, for example. See
http://bugs.python.org/issue2550 for more info. The following site also
has a very thorough description about the implications of both REUSEADDR
and EXCLUSIVEADDRUSE on Windows:
https://learn.microsoft.com/windows/win32/winsock/using-so-reuseaddr-and-so-exclusiveaddruse
XXX: although this approach is a vast improvement on previous attempts to
elicit unused ports, it rests heavily on the assumption that the ephemeral
port returned to us by the OS won't immediately be dished back out to some
other process when we close and delete our temporary socket but before our
calling code has a chance to bind the returned port. We can deal with this
issue if/when we come across it.
"""
withsocket.socket(family, socktype) astempsock:
port=bind_port(tempsock)
deltempsock
returnport
defbind_port(sock, host=HOST):
"""Bind the socket to a free port and return the port number. Relies on
ephemeral ports in order to ensure we are using an unbound port. This is
important as many tests may be running simultaneously, especially in a
buildbot environment. This method raises an exception if the sock.family
is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
or SO_REUSEPORT set on it. Tests should *never* set these socket options
for TCP/IP sockets. The only case for setting these options is testing
multicasting via multiple UDP sockets.
Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
on Windows), it will be set on the socket. This will prevent anyone else
from bind()'ing to our host/port for the duration of the test.
"""
ifsock.family==socket.AF_INETandsock.type==socket.SOCK_STREAM:
ifhasattr(socket, 'SO_REUSEADDR'):
ifsock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) ==1:
raisesupport.TestFailed("tests should never set the "
"SO_REUSEADDR socket option on "
"TCP/IP sockets!")
ifhasattr(socket, 'SO_REUSEPORT'):
try:
ifsock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) ==1:
raisesupport.TestFailed("tests should never set the "
"SO_REUSEPORT socket option on "
"TCP/IP sockets!")
exceptOSError:
# Python's socket module was compiled using modern headers
# thus defining SO_REUSEPORT but this process is running
# under an older kernel that does not support SO_REUSEPORT.
pass
ifhasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
sock.bind((host, 0))
port=sock.getsockname()[1]
returnport
defbind_unix_socket(sock, addr):
"""Bind a unix socket, raising SkipTest if PermissionError is raised."""
assertsock.family==socket.AF_UNIX
try:
sock.bind(addr)
exceptPermissionError:
sock.close()
raiseunittest.SkipTest('cannot bind AF_UNIX sockets')
def_is_ipv6_enabled():
"""Check whether IPv6 is enabled on this host."""
ifsocket.has_ipv6:
sock=None
try:
sock=socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.bind((HOSTv6, 0))
returnTrue
exceptOSError:
pass
finally:
ifsock:
sock.close()
returnFalse
IPV6_ENABLED=_is_ipv6_enabled()
_bind_nix_socket_error=None
defskip_unless_bind_unix_socket(test):
"""Decorator for tests requiring a functional bind() for unix sockets."""
ifnothasattr(socket, 'AF_UNIX'):
returnunittest.skip('No UNIX Sockets')(test)
global_bind_nix_socket_error
if_bind_nix_socket_errorisNone:
from .os_helperimportTESTFN, unlink
path=TESTFN+"can_bind_unix_socket"
withsocket.socket(socket.AF_UNIX) assock:
try:
sock.bind(path)
_bind_nix_socket_error=False
exceptOSErrorase:
_bind_nix_socket_error=e
finally:
unlink(path)
if_bind_nix_socket_error:
msg='Requires a functional unix bind(): %s'%_bind_nix_socket_error
returnunittest.skip(msg)(test)
else:
returntest
defget_socket_conn_refused_errs():
"""
Get the different socket error numbers ('errno') which can be received
when a connection is refused.
"""
errors= [errno.ECONNREFUSED]
ifhasattr(errno, 'ENETUNREACH'):
# On Solaris, ENETUNREACH is returned sometimes instead of ECONNREFUSED
errors.append(errno.ENETUNREACH)
ifhasattr(errno, 'EADDRNOTAVAIL'):
# bpo-31910: socket.create_connection() fails randomly
# with EADDRNOTAVAIL on Travis CI
errors.append(errno.EADDRNOTAVAIL)
ifhasattr(errno, 'EHOSTUNREACH'):
# bpo-37583: The destination host cannot be reached
errors.append(errno.EHOSTUNREACH)
ifnotIPV6_ENABLED:
errors.append(errno.EAFNOSUPPORT)
returnerrors
_NOT_SET=object()
@contextlib.contextmanager
deftransient_internet(resource_name, *, timeout=_NOT_SET, errnos=()):
"""Return a context manager that raises ResourceDenied when various issues
with the internet connection manifest themselves as exceptions."""
importurllib.error
iftimeoutis_NOT_SET:
timeout=support.INTERNET_TIMEOUT
default_errnos= [
('ECONNREFUSED', 111),
('ECONNRESET', 104),
('EHOSTUNREACH', 113),
('ENETUNREACH', 101),
('ETIMEDOUT', 110),
# socket.create_connection() fails randomly with
# EADDRNOTAVAIL on Travis CI.
('EADDRNOTAVAIL', 99),
]
default_gai_errnos= [
('EAI_AGAIN', -3),
('EAI_FAIL', -4),
('EAI_NONAME', -2),
('EAI_NODATA', -5),
# Encountered when trying to resolve IPv6-only hostnames
('WSANO_DATA', 11004),
]
denied=support.ResourceDenied("Resource %r is not available"%resource_name)
captured_errnos=errnos
gai_errnos= []
ifnotcaptured_errnos:
captured_errnos= [getattr(errno, name, num)
for (name, num) indefault_errnos]
gai_errnos= [getattr(socket, name, num)
for (name, num) indefault_gai_errnos]
deffilter_error(err):
n=getattr(err, 'errno', None)
if (isinstance(err, TimeoutError) or
(isinstance(err, socket.gaierror) andningai_errnos) or
(isinstance(err, urllib.error.HTTPError) and
500<=err.code<=599) or
(isinstance(err, urllib.error.URLError) and
(("ConnectionRefusedError"inerr.reason) or
("TimeoutError"inerr.reason) or
("EOFError"inerr.reason))) or
nincaptured_errnos):
ifnotsupport.verbose:
sys.stderr.write(denied.args[0] +"\n")
raisedeniedfromerr
old_timeout=socket.getdefaulttimeout()
try:
iftimeoutisnotNone:
socket.setdefaulttimeout(timeout)
yield
exceptOSErroraserr:
# urllib can wrap original socket errors multiple times (!), we must
# unwrap to get at the original error.
whileTrue:
a=err.args
iflen(a) >=1andisinstance(a[0], OSError):
err=a[0]
# The error can also be wrapped as args[1]:
# except socket.error as msg:
# raise OSError('socket error', msg) from msg
eliflen(a) >=2andisinstance(a[1], OSError):
err=a[1]
else:
break
filter_error(err)
raise
# XXX should we catch generic exceptions and look for their
# __cause__ or __context__?
finally:
socket.setdefaulttimeout(old_timeout)
defcreate_unix_domain_name():
"""
Create a UNIX domain name: socket.bind() argument of a AF_UNIX socket.
Return a path relative to the current directory to get a short path
(around 27 ASCII characters).
"""
returntempfile.mktemp(prefix="test_python_", suffix='.sock',
dir=os.path.curdir)
# consider that sysctl values should not change while tests are running
_sysctl_cache= {}
def_get_sysctl(name):
"""Get a sysctl value as an integer."""
try:
return_sysctl_cache[name]
exceptKeyError:
pass
# At least Linux and FreeBSD support the "-n" option
cmd= ['sysctl', '-n', name]
proc=subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True)
ifproc.returncode:
support.print_warning(f'{' '.join(cmd)!r} command failed with '
f'exit code {proc.returncode}')
# cache the error to only log the warning once
_sysctl_cache[name] =None
returnNone
output=proc.stdout
# Parse '0\n' to get '0'
try:
value=int(output.strip())
exceptExceptionasexc:
support.print_warning(f'Failed to parse {' '.join(cmd)!r} '
f'command output {output!r}: {exc!r}')
# cache the error to only log the warning once
_sysctl_cache[name] =None
returnNone
_sysctl_cache[name] =value
returnvalue
deftcp_blackhole():
ifnotsys.platform.startswith('freebsd'):
returnFalse
# gh-109015: test if FreeBSD TCP blackhole is enabled
value=_get_sysctl('net.inet.tcp.blackhole')
ifvalueisNone:
# don't skip if we fail to get the sysctl value
returnFalse
return (value!=0)
defskip_if_tcp_blackhole(test):
"""Decorator skipping test if TCP blackhole is enabled."""
skip_if=unittest.skipIf(
tcp_blackhole(),
"TCP blackhole is enabled (sysctl net.inet.tcp.blackhole)"
)
returnskip_if(test)