- Notifications
You must be signed in to change notification settings - Fork 633
/
Copy pathconftest.py
439 lines (383 loc) · 16.7 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
importfcntl
importjson
importos
importsubprocess
importtempfile
importtime
fromtypingimportList
importpytest
importrequests
fromsmoke_tests.dockerimportdocker_utils
fromskyimportsky_logging
# Initialize logger at the top level
logger=sky_logging.init_logger(__name__)
# We need to import all the mock functions here, so that the smoke
# tests can access them.
fromcommon_test_fixturesimportaws_config_region
fromcommon_test_fixturesimportenable_all_clouds
fromcommon_test_fixturesimportmock_aws_backend
fromcommon_test_fixturesimportmock_client_requests
fromcommon_test_fixturesimportmock_controller_accessible
fromcommon_test_fixturesimportmock_job_table_no_job
fromcommon_test_fixturesimportmock_job_table_one_job
fromcommon_test_fixturesimportmock_queue
fromcommon_test_fixturesimportmock_redirect_log_file
fromcommon_test_fixturesimportmock_services_no_service
fromcommon_test_fixturesimportmock_services_one_service
fromcommon_test_fixturesimportmock_stream_utils
fromcommon_test_fixturesimportskyignore_dir
fromsky.serverimportcommonasserver_common
# Usage: use
# @pytest.mark.slow
# to mark a test as slow and to skip by default.
# https://docs.pytest.org/en/latest/example/simple.html#control-skipping-of-tests-according-to-command-line-option
# By default, only run generic tests and cloud-specific tests for AWS and Azure,
# due to the cloud credit limit for the development account.
#
# A "generic test" tests a generic functionality (e.g., autostop) that
# should work on any cloud we support. The cloud used for such a test
# is controlled by `--generic-cloud` (typically you do not need to set it).
#
# To only run tests for a specific cloud (as well as generic tests), use
# --aws, --gcp, --azure, or --lambda.
#
# To only run tests for managed jobs (without generic tests), use
# --managed-jobs.
all_clouds_in_smoke_tests= [
'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', 'do',
'kubernetes', 'vsphere', 'cudo', 'fluidstack', 'paperspace', 'runpod',
'vast', 'nebius'
]
default_clouds_to_run= ['aws', 'azure']
# Translate cloud name to pytest keyword. We need this because
# @pytest.mark.lambda is not allowed, so we use @pytest.mark.lambda_cloud
# instead.
cloud_to_pytest_keyword= {
'aws': 'aws',
'gcp': 'gcp',
'azure': 'azure',
'lambda': 'lambda_cloud',
'cloudflare': 'cloudflare',
'ibm': 'ibm',
'scp': 'scp',
'oci': 'oci',
'kubernetes': 'kubernetes',
'vsphere': 'vsphere',
'runpod': 'runpod',
'fluidstack': 'fluidstack',
'cudo': 'cudo',
'paperspace': 'paperspace',
'do': 'do',
'vast': 'vast',
'runpod': 'runpod',
'nebius': 'nebius'
}
defpytest_addoption(parser):
# tests marked as `slow` will be skipped by default, use --runslow to run
parser.addoption('--runslow',
action='store_true',
default=False,
help='run slow tests.')
forcloudinall_clouds_in_smoke_tests:
parser.addoption(f'--{cloud}',
action='store_true',
default=False,
help=f'Only run {cloud.upper()} tests.')
parser.addoption('--managed-jobs',
action='store_true',
default=False,
help='Only run tests for managed jobs.')
parser.addoption('--serve',
action='store_true',
default=False,
help='Only run tests for sky serve.')
parser.addoption('--tpu',
action='store_true',
default=False,
help='Only run tests for TPU.')
parser.addoption(
'--generic-cloud',
type=str,
choices=all_clouds_in_smoke_tests,
help='Cloud to use for generic tests. If the generic cloud is '
'not within the clouds to be run, it will be reset to the first '
'cloud in the list of the clouds to be run.')
parser.addoption('--terminate-on-failure',
dest='terminate_on_failure',
action='store_true',
default=True,
help='Terminate test VMs on failure.')
parser.addoption('--no-terminate-on-failure',
dest='terminate_on_failure',
action='store_false',
help='Do not terminate test VMs on failure.')
parser.addoption(
'--remote-server',
action='store_true',
default=False,
help='Run tests against a remote server in Docker container.')
# Custom options for backward compatibility tests
parser.addoption(
'--need-launch',
action='store_true',
default=False,
help='Whether to launch clusters in tests',
)
parser.addoption(
'--base-branch',
type=str,
default='master',
help='Base branch to test backward compatibility against',
)
defpytest_configure(config):
config.addinivalue_line('markers', 'slow: mark test as slow to run')
config.addinivalue_line('markers',
'local: mark test to run only on local API server')
forcloudinall_clouds_in_smoke_tests:
cloud_keyword=cloud_to_pytest_keyword[cloud]
config.addinivalue_line(
'markers', f'{cloud_keyword}: mark test as {cloud} specific')
pytest.terminate_on_failure=config.getoption('--terminate-on-failure')
def_get_cloud_to_run(config) ->List[str]:
cloud_to_run= []
forcloudinall_clouds_in_smoke_tests:
ifconfig.getoption(f'--{cloud}'):
ifcloud=='cloudflare':
cloud_to_run.append(default_clouds_to_run[0])
else:
cloud_to_run.append(cloud)
generic_cloud_option=config.getoption('--generic-cloud')
ifgeneric_cloud_optionisnotNoneandgeneric_cloud_optionnotincloud_to_run:
cloud_to_run.append(generic_cloud_option)
iflen(cloud_to_run) ==0:
cloud_to_run=default_clouds_to_run
returncloud_to_run
defpytest_collection_modifyitems(config, items):
skip_marks= {}
skip_marks['slow'] =pytest.mark.skip(reason='need --runslow option to run')
skip_marks['managed_jobs'] =pytest.mark.skip(
reason='skipped, because --managed-jobs option is set')
skip_marks['serve'] =pytest.mark.skip(
reason='skipped, because --serve option is set')
skip_marks['tpu'] =pytest.mark.skip(
reason='skipped, because --tpu option is set')
skip_marks['local'] =pytest.mark.skip(
reason='test requires local API server')
forcloudinall_clouds_in_smoke_tests:
skip_marks[cloud] =pytest.mark.skip(
reason=f'tests for {cloud} is skipped, try setting --{cloud}')
cloud_to_run=_get_cloud_to_run(config)
generic_cloud=_generic_cloud(config)
generic_cloud_keyword=cloud_to_pytest_keyword[generic_cloud]
foriteminitems:
if'smoke_tests'notinitem.location[0]:
# Only mark smoke test cases
continue
if'slow'initem.keywordsandnotconfig.getoption('--runslow'):
item.add_marker(skip_marks['slow'])
if'local'initem.keywordsandnotserver_common.is_api_server_local():
item.add_marker(skip_marks['local'])
if_is_generic_test(
item) andf'no_{generic_cloud_keyword}'initem.keywords:
item.add_marker(skip_marks[generic_cloud])
forcloudinall_clouds_in_smoke_tests:
cloud_keyword=cloud_to_pytest_keyword[cloud]
if (cloud_keywordinitem.keywordsandcloudnotincloud_to_run):
# Need to check both conditions as the first default cloud is
# added to cloud_to_run when tested for cloudflare
ifconfig.getoption('--cloudflare') andcloud=='cloudflare':
continue
item.add_marker(skip_marks[cloud])
if (not'managed_jobs'
initem.keywords) andconfig.getoption('--managed-jobs'):
item.add_marker(skip_marks['managed_jobs'])
if (not'tpu'initem.keywords) andconfig.getoption('--tpu'):
item.add_marker(skip_marks['tpu'])
if (not'serve'initem.keywords) andconfig.getoption('--serve'):
item.add_marker(skip_marks['serve'])
# Check if tests need to be run serially for Kubernetes and Lambda Cloud
# We run Lambda Cloud tests serially because Lambda Cloud rate limits its
# launch API to one launch every 10 seconds.
# We run Kubernetes tests serially because the Kubernetes cluster may have
# limited resources (e.g., just 8 cpus).
serial_mark=pytest.mark.xdist_group(
name=f'serial_{generic_cloud_keyword}')
# Handle generic tests
ifgeneric_cloudin ['lambda']:
foriteminitems:
if (_is_generic_test(item) and
f'no_{generic_cloud_keyword}'notinitem.keywords):
item.add_marker(serial_mark)
# Adding the serial mark does not update the item.nodeid,
# but item.nodeid is important for pytest.xdist_group, e.g.
# https://github.com/pytest-dev/pytest-xdist/blob/master/src/xdist/scheduler/loadgroup.py
# This is a hack to update item.nodeid
item._nodeid=f'{item.nodeid}@serial_{generic_cloud_keyword}'
# Handle generic cloud specific tests
foriteminitems:
ifgeneric_cloudin ['lambda', 'kubernetes']:
ifgeneric_cloud_keywordinitem.keywords:
item.add_marker(serial_mark)
item._nodeid=f'{item.nodeid}@serial_{generic_cloud_keyword}'# See comment on item.nodeid above
ifconfig.option.collectonly:
foriteminitems:
full_name=item.nodeid
marks= [mark.nameformarkinitem.iter_markers()]
print(f"Collected {full_name} with marks: {marks}")
def_is_generic_test(item) ->bool:
forcloudinall_clouds_in_smoke_tests:
ifcloud_to_pytest_keyword[cloud] initem.keywords:
returnFalse
returnTrue
def_generic_cloud(config) ->str:
generic_cloud_option=config.getoption('--generic-cloud')
ifgeneric_cloud_optionisnotNone:
returngeneric_cloud_option
return_get_cloud_to_run(config)[0]
@pytest.fixture
defgeneric_cloud(request) ->str:
return_generic_cloud(request.config)
@pytest.fixture(scope='session', autouse=True)
defsetup_docker_container(request):
"""Setup Docker container for remote server testing if --remote-server is specified."""
ifnotrequest.config.getoption('--remote-server'):
yield
return
# Set environment variable to indicate we're using remote server
os.environ['PYTEST_SKYPILOT_REMOTE_SERVER_TEST'] ='1'
# Docker image and container names
dockerfile_path='tests/smoke_tests/docker/Dockerfile_test'
default_user=os.environ.get('USER', 'buildkite')
# Create a lockfile and counter file in a temporary directory that all processes can access
lock_file=os.path.join(tempfile.gettempdir(), 'sky_docker_setup.lock')
counter_file=os.path.join(tempfile.gettempdir(), 'sky_docker_workers.txt')
lock_fd=open(lock_file, 'w')
fcntl.flock(lock_fd, fcntl.LOCK_EX)
try:
try:
withopen(counter_file, 'r') asf:
worker_count=int(f.read().strip())
except (FileNotFoundError, ValueError):
worker_count=0
worker_count+=1
withopen(counter_file, 'w') asf:
f.write(str(worker_count))
# Check if container is already running (another worker might have started it)
try:
# Use docker ps with filter to check for running container
result=subprocess.run([
'docker', 'ps', '--filter',
f'name={docker_utils.get_container_name()}', '--format',
'{{.Names}}'
],
check=True,
capture_output=True,
text=True)
ifdocker_utils.get_container_name() inresult.stdout:
fcntl.flock(lock_fd, fcntl.LOCK_UN)
yielddocker_utils.get_container_name()
return
exceptsubprocess.CalledProcessError:
pass
# Use docker images with filter to check for existing image
result=subprocess.run([
'docker', 'images', '--filter',
f'reference={docker_utils.IMAGE_NAME}', '--format',
'{{.Repository}}'
],
check=True,
capture_output=True,
text=True)
ifdocker_utils.IMAGE_NAMEinresult.stdout:
logger.info(
f'Docker image {docker_utils.IMAGE_NAME} already exists')
else:
in_container=docker_utils.is_inside_docker()
ifin_container:
# We're inside a container, so we can't build the Docker image
raiseException(
f"Docker image {docker_utils.IMAGE_NAME} must be built on "
f"the host first when running inside a container. Please "
f"run 'docker build -t {docker_utils.IMAGE_NAME} "
f"--build-arg USERNAME={default_user} -f "
f"tests/smoke_tests/docker/Dockerfile_test .' on the host "
f"machine.")
else:
logger.info(
f'Docker image {docker_utils.IMAGE_NAME} not found, building...'
)
subprocess.run([
'docker', 'build', '-t', docker_utils.IMAGE_NAME,
'--build-arg', f'USERNAME={default_user}', '-f',
dockerfile_path, '.'
],
check=True)
logger.info(
f'Successfully built Docker image {docker_utils.IMAGE_NAME}'
)
# Start new container
logger.info(
f'Starting Docker container {docker_utils.get_container_name()}...')
# Use create_and_setup_new_container to create and start the container
docker_utils.create_and_setup_new_container(
target_container_name=docker_utils.get_container_name(),
host_port=docker_utils.get_host_port(),
container_port=46580,
username=default_user)
logger.info(f'Container {docker_utils.get_container_name()} started')
# Wait for container to be ready
logger.info('Waiting for container to be ready...')
url=docker_utils.get_api_server_endpoint_inside_docker()
health_endpoint=f'{url}/api/health'
max_retries=40
retry_count=0
whileretry_count<max_retries:
try:
response=requests.get(health_endpoint)
response.raise_for_status()
# Parse JSON response
ifresponse.json().get('status') =='healthy':
logger.info('Container is ready!')
break
retry_count+=1
time.sleep(1)
exceptExceptionase:
logger.error(f'Error connecting to container: {e}, retrying...')
retry_count+=1
time.sleep(10)
else:
raiseException(
'Container failed to start properly - health check did not pass'
)
# Release the lock before yielding
fcntl.flock(lock_fd, fcntl.LOCK_UN)
yielddocker_utils.get_container_name()
exceptExceptionase:
logger.exception(f'Error in Docker setup: {e}')
raise
finally:
# Reacquire lock for file operations
fcntl.flock(lock_fd, fcntl.LOCK_EX)
# Decrement worker counter and cleanup if this is the last worker
withopen(counter_file, 'r') asf:
worker_count=int(f.read().strip())
worker_count-=1
withopen(counter_file, 'w') asf:
f.write(str(worker_count))
ifworker_count==0:
logger.info('Last worker finished, cleaning up container...')
subprocess.run([
'docker', 'stop', '-t', '600',
docker_utils.get_container_name()
],
check=False)
subprocess.run(['docker', 'rm',
docker_utils.get_container_name()],
check=False)
try:
os.remove(counter_file)
exceptOSError:
pass
# Release the lock and close the file
fcntl.flock(lock_fd, fcntl.LOCK_UN)
lock_fd.close()