- Notifications
You must be signed in to change notification settings - Fork 220
/
Copy pathDATestRun.py
305 lines (271 loc) · 11.6 KB
/
DATestRun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
importboto3
importuuid
importjson
importos
importsubprocess
importre
importrandom
fromtimeimportsleep
##############################################
# Cleanup Certificates and Things and created certificate and private key file
defdelete_thing_with_certi(thingName, certiId, certiArn):
client.detach_thing_principal(
thingName=thingName,
principal=certiArn)
client.update_certificate(
certificateId=certiId,
newStatus='INACTIVE')
client.delete_certificate(certificateId=certiId, forceDelete=True)
client.delete_thing(thingName=thingName)
os.remove(os.environ["DA_CERTI"])
os.remove(os.environ["DA_KEY"])
# Export the testing log and upload it to S3 bucket
defprocess_logs(log_group, log_stream, thing_name):
logs_client=boto3.client('logs')
response=logs_client.get_log_events(
logGroupName=log_group,
logStreamName=log_stream
)
log_file="DA_Log_Python_"+thing_name+".log"
f=open(log_file, 'w')
foreventinresponse["events"]:
f.write(event['message'])
f.close()
try:
secrets_client=boto3.client(
"secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"])
s3_bucket_name=secrets_client.get_secret_value(SecretId="ci/DeviceAdvisor/s3bucket")["SecretString"]
s3.Bucket(s3_bucket_name).upload_file(log_file, log_file)
print("[Device Advisor] Device Advisor Log file uploaded to "+log_file)
exceptException:
print ("[Device Advisor] Error: could not store log in S3 bucket!")
os.remove(log_file)
print("[Device Advisor] Device Advisor Log file uploaded to "+log_file)
# Sleep for a random time between base and bax
defsleep_with_backoff(base, max):
sleep(random.randint(base,max))
##############################################
# Initialize variables
# create aws clients
try:
client=boto3.client('iot', region_name="us-east-1")
dataClient=boto3.client('iot-data', region_name="us-east-1")
deviceAdvisor=boto3.client('iotdeviceadvisor', region_name="us-east-1")
s3=boto3.resource('s3')
exceptExceptionasex:
print ("[Device Advisor] Error: could not create boto3 clients.")
print (ex)
exit(-1)
# const
BACKOFF_BASE=5
BACKOFF_MAX=10
# 60 minutes divided by the maximum back-off = longest time a DA run can last with this script.
MAXIMUM_CYCLE_COUNT= (3600/BACKOFF_MAX)
# Did Device Advisor fail a test? If so, this should be true
did_at_least_one_test_fail=False
# load test config
f=open('deviceadvisor/script/DATestConfig.json')
DATestConfig=json.load(f)
f.close()
# create an temporary certificate/key file path
certificate_path=os.path.join(os.getcwd(), 'certificate.pem.crt')
key_path=os.path.join(os.getcwd(), 'private.pem.key')
# load environment variables requried for testing
shadowProperty=os.environ['DA_SHADOW_PROPERTY']
shadowDefault=os.environ['DA_SHADOW_VALUE_DEFAULT']
# test result
test_result= {}
##############################################
# Run device advisor
fortest_nameinDATestConfig['tests']:
##############################################
# create a test thing
thing_name="DATest_"+str(uuid.uuid4())
try:
# create_thing_response:
# {
# 'thingName': 'string',
# 'thingArn': 'string',
# 'thingId': 'string'
# }
print("[Device Advisor] Info: Started to create thing...")
create_thing_response=client.create_thing(
thingName=thing_name
)
os.environ["DA_THING_NAME"] =thing_name
exceptExceptionase:
print("[Device Advisor] Error: Failed to create thing: "+thing_name)
exit(-1)
##############################################
# create certificate and keys used for testing
try:
print("[Device Advisor] Info: Started to create certificate...")
# create_cert_response:
# {
# 'certificateArn': 'string',
# 'certificateId': 'string',
# 'certificatePem': 'string',
# 'keyPair':
# {
# 'PublicKey': 'string',
# 'PrivateKey': 'string'
# }
# }
create_cert_response=client.create_keys_and_certificate(
setAsActive=True
)
# write certificate to file
f=open(certificate_path, "w")
f.write(create_cert_response['certificatePem'])
f.close()
# write private key to file
f=open(key_path, "w")
f.write(create_cert_response['keyPair']['PrivateKey'])
f.close()
# setup environment variable
os.environ["DA_CERTI"] =certificate_path
os.environ["DA_KEY"] =key_path
exceptException:
try:
client.delete_thing(thingName=thing_name)
exceptException:
print("[Device Advisor] Error: Could not delete thing.")
print("[Device Advisor] Error: Failed to create certificate.")
exit(-1)
certificate_arn=create_cert_response['certificateArn']
certificate_id=create_cert_response['certificateId']
##############################################
# attach policy to certificate
try:
secrets_client=boto3.client(
"secretsmanager", region_name=os.environ["AWS_DEFAULT_REGION"])
policy_name=secrets_client.get_secret_value(SecretId="ci/DeviceAdvisor/policy_name")["SecretString"]
client.attach_policy (
policyName=policy_name,
target=certificate_arn
)
exceptExceptionasex:
print (ex)
delete_thing_with_certi(thing_name, certificate_id, certificate_arn )
print("[Device Advisor] Error: Failed to attach policy.")
exit(-1)
##############################################
# attach certification to thing
try:
print("[Device Advisor] Info: Attach certificate to test thing...")
# attache the certificate to thing
client.attach_thing_principal(
thingName=thing_name,
principal=certificate_arn
)
exceptException:
delete_thing_with_certi(thing_name, certificate_id ,certificate_arn )
print("[Device Advisor] Error: Failed to attach certificate.")
exit(-1)
try:
######################################
# set default shadow, for shadow update, if the
# shadow does not exists, update will fail
print("[Device Advisor] Info: About to update shadow.")
payload_shadow=json.dumps(
{
"state": {
"desired": {
shadowProperty: shadowDefault
},
"reported": {
shadowProperty: shadowDefault
}
}
})
shadow_response=dataClient.update_thing_shadow(
thingName=thing_name,
payload=payload_shadow)
get_shadow_response=dataClient.get_thing_shadow(thingName=thing_name)
# make sure shadow is created before we go to next step
print("[Device Advisor] Info: About to wait for shadow update.")
while(get_shadow_responseisNone):
get_shadow_response=dataClient.get_thing_shadow(thingName=thing_name)
# start device advisor test
# test_start_response
# {
# 'suiteRunId': 'string',
# 'suiteRunArn': 'string',
# 'createdAt': datetime(2015, 1, 1)
# }
print("[Device Advisor] Info: Start device advisor test: "+test_name)
sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX)
test_start_response=deviceAdvisor.start_suite_run(
suiteDefinitionId=DATestConfig['test_suite_ids'][test_name],
suiteRunConfiguration={
'primaryDevice': {
'thingArn': create_thing_response['thingArn'],
},
'parallelRun': True
})
# get DA endpoint
print("[Device Advisor] Info: Getting Device Advisor endpoint.")
endpoint_response=deviceAdvisor.get_endpoint(
thingArn=create_thing_response['thingArn']
)
os.environ['DA_ENDPOINT'] =endpoint_response['endpoint']
cycle_number=0
whileTrue:
cycle_number+=1
if (cycle_number>=MAXIMUM_CYCLE_COUNT):
print(f"[Device Advisor] Error: {cycle_number} of cycles lasting {BACKOFF_BASE} to {BACKOFF_MAX} seconds have passed.")
raiseException(f"ERROR - {cycle_number} of cycles lasting {BACKOFF_BASE} to {BACKOFF_MAX} seconds have passed.")
# Add backoff to avoid TooManyRequestsException
sleep_with_backoff(BACKOFF_BASE, BACKOFF_MAX)
print ("[Device Advisor] Info: About to get Device Advisor suite run.")
test_result_responds=deviceAdvisor.get_suite_run(
suiteDefinitionId=DATestConfig['test_suite_ids'][test_name],
suiteRunId=test_start_response['suiteRunId']
)
# If the status is PENDING or the responds does not loaded, the test suite is still loading
if (test_result_responds['status'] =='PENDING'or
len(test_result_responds['testResult']['groups']) ==0or# test group has not been loaded
len(test_result_responds['testResult']['groups'][0]['tests']) ==0or#test case has not been loaded
test_result_responds['testResult']['groups'][0]['tests'][0]['status'] =='PENDING'):
continue
# Start to run the test sample after the status turns into RUNNING
elif (test_result_responds['status'] =='RUNNING'and
test_result_responds['testResult']['groups'][0]['tests'][0]['status'] =='RUNNING'):
print ("[Device Advisor] Info: About to get start Device Advisor companion test application.")
exe_path=os.path.join("deviceadvisor/tests/",DATestConfig['test_exe_path'][test_name])
result=subprocess.run('python3 '+exe_path, timeout=60*2, shell=True)
# If the test finalizing then store the test result
elif (test_result_responds['status'] !='RUNNING'):
test_result[test_name] =test_result_responds['status']
# If the test failed, upload the logs to S3 before clean up
if(test_result[test_name] !="PASS"):
print ("[Device Advisor] Info: About to upload log to S3.")
log_url=test_result_responds['testResult']['groups'][0]['tests'][0]['logUrl']
group_string=re.search('group=(.*);', log_url)
log_group=group_string.group(1)
stream_string=re.search('stream=(.*)', log_url)
log_stream=stream_string.group(1)
process_logs(log_group, log_stream, thing_name)
delete_thing_with_certi(thing_name, certificate_id ,certificate_arn)
break
exceptException:
delete_thing_with_certi(thing_name, certificate_id ,certificate_arn)
print("[Device Advisor] Error: Failed to test: "+test_name)
did_at_least_one_test_fail=True
exit(-1)
##############################################
# print result and cleanup things
print(test_result)
failed=False
fortestintest_result:
if(test_result[test] !="PASS"and
test_result[test] !="PASS_WITH_WARNINGS"):
print("[Device Advisor] Error: Test \""+test+"\" Failed with status:"+test_result[test])
failed=True
iffailed:
# if the test failed, we dont clean the Thing so that we can track the error
exit(-1)
if (did_at_least_one_test_fail==True):
print("[Device Advisor] Error: At least one test failed!")
exit(-1)
exit(0)