- Notifications
You must be signed in to change notification settings - Fork 219
/
Copy pathtest_ggv2.py
99 lines (75 loc) · 4.6 KB
/
test_ggv2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
importconcurrent.futures
importthreading
fromunittestimportTestCase
fromunittest.mockimportpatch
importio
importcontextlib
fromawsiot.greengrasscoreipc.clientimportSubscribeToTopicStreamHandler
fromawsiot.greengrasscoreipc.modelimportCreateLocalDeploymentResponse, SubscribeToTopicResponse, \
SubscriptionResponseMessage, BinaryMessage
TIMEOUT=10.0# seconds
classGGV2Test(TestCase):
def_mock_operation(self, mock_op, response):
activate_fut=concurrent.futures.Future()
activate_fut.set_result(None)
mock_op.activate.return_value=activate_fut
response_fut=concurrent.futures.Future()
response_fut.set_result(response)
mock_op.get_response.return_value=response_fut
returnmock_op
@patch('awsiot.greengrasscoreipc.client.GreengrassCoreIPCClient')
@patch('awsiot.greengrasscoreipc.client.CreateLocalDeploymentOperation')
@patch('awsiot.greengrasscoreipc.client.SubscribeToTopicOperation')
deftest_connect(self, mock_client, mock_deployment_op, mock_subscribe_op):
fromawsiot.greengrasscoreipc.clientv2importGreengrassCoreIPCClientV2asClient
c=Client(client=mock_client)
self._mock_operation(mock_deployment_op, CreateLocalDeploymentResponse(deployment_id="deployment"))
mock_client.new_create_local_deployment.return_value=mock_deployment_op
resp=c.create_local_deployment()
self.assertEqual("deployment", resp.deployment_id)
# Verify subscription works and callback is called on the executor thread
self._mock_operation(mock_subscribe_op, SubscribeToTopicResponse(topic_name="abc"))
mock_client.new_subscribe_to_topic.return_value=mock_subscribe_op
subscription_fut=concurrent.futures.Future()
thread_id_fut=concurrent.futures.Future()
defon_stream_event(r):
subscription_fut.set_result(r)
thread_id_fut.set_result(threading.get_ident())
resp, op=c.subscribe_to_topic(topic="abc", on_stream_event=on_stream_event)
self.assertEqual("abc", resp.topic_name)
sub_handler=mock_client.new_subscribe_to_topic.call_args[0][0]
sub_handler.on_stream_event(SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz")))
self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message)
self.assertNotEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT))
# Verify that when using the stream_handler option, the callback is run in the executor
subscription_fut=concurrent.futures.Future()
thread_id_fut=concurrent.futures.Future()
classhandler(SubscribeToTopicStreamHandler):
defon_stream_event(self, event):
on_stream_event(event)
resp, op=c.subscribe_to_topic(topic="abc", stream_handler=handler())
self.assertEqual("abc", resp.topic_name)
sub_handler=mock_client.new_subscribe_to_topic.call_args[0][0]
sub_handler.on_stream_event(SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz")))
self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message)
self.assertNotEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT))
# Remove executor from client to verify that we are not running the callback in a different thread
c=Client(client=mock_client, executor=None)
subscription_fut=concurrent.futures.Future()
thread_id_fut=concurrent.futures.Future()
resp, op=c.subscribe_to_topic(topic="abc", on_stream_event=on_stream_event)
self.assertEqual("abc", resp.topic_name)
sub_handler=mock_client.new_subscribe_to_topic.call_args[0][0]
sub_handler.on_stream_event(SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz")))
self.assertEqual("xyz".encode("utf-8"), subscription_fut.result(TIMEOUT).binary_message.message)
self.assertEqual(threading.get_ident(), thread_id_fut.result(TIMEOUT))
# Verify we nicely print errors in user-provided handler methods
defon_stream_event(r):
raiseValueError("Broken!")
c.subscribe_to_topic(topic="abc", on_stream_event=on_stream_event)
sub_handler=mock_client.new_subscribe_to_topic.call_args[0][0]
f=io.StringIO()
withcontextlib.redirect_stderr(f):
self.assertRaises(ValueError, lambda: sub_handler.on_stream_event(
SubscriptionResponseMessage(binary_message=BinaryMessage(message="xyz"))))
self.assertIn("ValueError: Broken!", f.getvalue())