- Notifications
You must be signed in to change notification settings - Fork 103
/
Copy pathindex.ts
90 lines (78 loc) · 3.4 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
import{mqtt}from'aws-iot-device-sdk-v2';
import{TextDecoder}from'util';
typeArgs={[index: string]: any};
constyargs=require('yargs');
// The relative path is '../../util/cli_args' from here, but the compiled javascript file gets put one level
// deeper inside the 'dist' folder
constcommon_args=require('../../../util/cli_args');
yargs.command('*',false,(yargs: any)=>{
common_args.add_direct_connection_establishment_arguments(yargs);
common_args.add_topic_message_arguments(yargs);
},main).parse();
asyncfunctionexecute_session(connection: mqtt.MqttClientConnection,argv: Args){
returnnewPromise<void>(async(resolve,reject)=>{
try{
letpublished=false;
letsubscribed=false;
constdecoder=newTextDecoder('utf8');
conston_publish=async(topic: string,payload: ArrayBuffer,dup: boolean,qos: mqtt.QoS,retain: boolean)=>{
constjson=decoder.decode(payload);
console.log(`Publish received. topic:"${topic}" dup:${dup} qos:${qos} retain:${retain}`);
console.log(`Payload: ${json}`);
try{
constmessage=JSON.parse(json);
if(message.sequence==argv.count){
subscribed=true;
if(subscribed&&published){
resolve();
}
}
}
catch(error){
console.log("Warning: Could not parse message as JSON...");
}
}
awaitconnection.subscribe(argv.topic,mqtt.QoS.AtLeastOnce,on_publish);
letpublished_counts=0;
for(letop_idx=0;op_idx<argv.count;++op_idx){
constpublish=async()=>{
constmsg={
message: argv.message,
sequence: op_idx+1,
};
constjson=JSON.stringify(msg);
connection.publish(argv.topic,json,mqtt.QoS.AtLeastOnce).then(()=>{
++published_counts;
if(published_counts==argv.count){
published=true;
if(subscribed&&published){
resolve();
}
}
})
}
setTimeout(publish,op_idx*1000);
}
}
catch(error){
reject(error);
}
});
}
asyncfunctionmain(argv: Args){
common_args.apply_sample_arguments(argv);
constconnection=common_args.build_connection_from_cli_args(argv);
// force node to wait 60 seconds before killing itself, promises do not keep node alive
// ToDo: we can get rid of this but it requires a refactor of the native connection binding that includes
// pinning the libuv event loop while the connection is active or potentially active.
consttimer=setInterval(()=>{},60*1000);
awaitconnection.connect()
awaitexecute_session(connection,argv)
awaitconnection.disconnect()
// Allow node to die if the promise above resolved
clearTimeout(timer);
}