-
Notifications
You must be signed in to change notification settings - Fork 0
/
MQTT-EMQX
459 lines (383 loc) · 18.2 KB
/
MQTT-EMQX
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
MQTT是应用层的通信协议.
ubuntu上安装emq:
https://www.jianshu.com/p/ebbe25d1c4b2?tdsourcetag=s_pctim_aiomsg
首先启动emqx;
在emqx上安装的文件夹内:
/emq...........下:输入命令:./bin/ emqx start
C:\emqx-windows10-v3.1.1\bin>emqx start
在paho或者其他iot客户端上输入:主机ip地址;端口号是:1883
注意要在测试文件中,将ip地址192.168.204.1;修改为:ipconfig出的本地地址;
注意链接iot时候的问题:
1.id要唯一.
使用http://localhost:18083登录控制器台
EMQ的安装
https://blog.csdn.net/qq_26182739/article/details/87437721
实现与MQTT的发布订阅
Eclipse Paho MQTT 工具下载与安装与使用的方法
1.首先安装java开发环境:https://www.cnblogs.com/xttc/p/6475670.html
2.安装paho客户端,充当iot端.https://jingyan.baidu.com/article/25648fc16b8e969191fd00be.html
开启emq:
https://blog.csdn.net/xsfqh/article/details/88981508
mqtt学习
https://www.ibm.com/developerworks/cn/iot/iot-mqtt-why-good-for-iot/index.html
参考:paho mqtt的库学习
坑:
https://www.jianshu.com/p/1717f45394b0
关于will遗嘱消息:
按照paho的库设置后, 还有问题, 需要断开: MQTTClient_disconnect(client, 10000);
client iD:mqtt服务器发生抢占时, 进行修改
keepAliveInterval:mqtt连接关键字, 是进行mqtt的连接
用TLS/SSL保证EMQ的网络传输安全:
https://www.jianshu.com/p/1602f6db1aa1
https://www.cnblogs.com/xiating/p/8672887.html
https://blog.csdn.net/ptonlix/article/details/79866925
https://www.cnblogs.com/shihuc/p/10285836.html
https://blog.csdn.net/ptonlix/article/details/79866925
官方文档:
https://docs.emqx.io/broker/v3/cn/config.html#mqtt-websocket-with-ssl-8084
ubuntu上安装emq.
https://www.jianshu.com/p/ebbe25d1c4b2?tdsourcetag=s_pctim_aiomsg
使用openssl生成服务器证书和客户端证书
1.安装openssl : sudo apt-get install openssl
2.创建CA证书,创建openssl目录 : 在home/ptonlix/work目录下创建openssl目录 mkdir -p /home/ptonlix/work/openssl
3.将原openssl目录下的ssl/misc/CA.sh拷贝到自己创建的测试目录下:cp /usr/lib/ssl/misc/CA.sh /home/ptonlix/work/openssl
4.创建主证书: ./CA.sh -newca
5.生成服务器证书:https://blog.csdn.net/ptonlix/article/details/79866925
1.生成服务器私钥:openssl genrsa -des3 -out server.key 1024
2.生成csr文件:openssl req -new -key server.key -out server.csr
3.签名并生成证书:openssl ca -in server.csr -out server.crt
生成客户端证书:
1.生成客户端私钥:openssl genrsa -des3 -out client.key 1024
2.生成csr文件:openssl req -new -key client.key -out client.csr
3.签名并生成证书:openssl ca -in client.csr -out client.crt
6.https://www.jianshu.com/p/1602f6db1aa1
启用TLS/SSL的过程是比较简单的,只需要修改emq.conf文件中的一些配置就可以了:
重启EMQ后,用mosquittu_sub来验证TLS是否正常启动:mosquitto_sub -t abc -h emq1 -p 8883 -d --cafile ~/test_certs/MyRootCA.pem --insecure
https://www.jianshu.com/p/511cc22fa700
https://www.jianshu.com/p/448f566d3ff0
7.DLLExport int MQTTClient_waitForCompletion (
MQTTClient handle,
MQTTClient_deliveryToken dt,
unsigned long timeout
)
客户端应用程序调用此函数来将主线程的执行与消息的完成发布同步。 被调用时,MQTTClient_waitForCompletion()阻塞执行,
直到消息成功传递或已超过指定的时间。
参数 含义
handle 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充
dt 代表消息的MQTTClient_deliveryToken用来检测是否成功传递。传递token由发布函数MQTTClient_publish () 和 MQTTClient_publishMessage ()所产生。
timeout 等待的最大毫秒数。
返回值:
消息成功传递则返回MQTTCLIENT_SUCCESS(0) ,如果时间已过期或检测token时出问题,则返回错误码。
8.
struct mqtt_subscribe_message
{
std::string topic;
std::string payload;
};
messageArrived()
{
//增加链表消息
struct mqtt_subscribe_message msg;
char *messageContent;
msg.topic = "";
int len = strlen(topicName);
for (int i = 0; i < len; i++)
{
msg.topic += *topicName;
topicName++;
}
printf("this is a test+++++++++++++++++%s\n", msg.topic);
/* translate char message to string */
messageContent = (char *)message->payload;
msg.payload = "";
for (int i = 0; i < message->payloadlen; i++)
{
msg.payload += *messageContent;
messageContent++;
}
pthread_mutex_lock(&mqtt_subscribe_msg_list_lock);
mqtt_subscribe_msg_list.push_back(msg);
pthread_mutex_unlock(&mqtt_subscribe_msg_list_lock);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void ()
{
pthread_mutex_init(&mqtt_subscribe_msg_list_lock, NULL);
mqtt_subscribe_msg_thread_exit = 0;
int err = pthread_create(&mqtt_subscribe_msg_tid, NULL, &mqtt_subscribe_message_thread_entry, NULL);
if (err != 0)
printf("can't create mqtt subscribe message handle thread: %s\n", strerror(err));
}
static void *mqtt_subscribe_message_thread_entry(void *param)
{
struct mqtt_subscribe_message msg;
while (!mqtt_subscribe_msg_thread_exit) {
pthread_mutex_lock(&mqtt_subscribe_msg_list_lock);
if (mqtt_subscribe_msg_list.empty()) {
pthread_mutex_unlock(&mqtt_subscribe_msg_list_lock);
usleep(100000);
continue;
}
struct mqtt_subscribe_message tmp_msg = mqtt_subscribe_msg_list.front();
msg.topic = tmp_msg.topic;
msg.payload = tmp_msg.payload;
mqtt_subscribe_msg_list.pop_front();
pthread_mutex_unlock(&mqtt_subscribe_msg_list_lock);
mqtt_subscribe_message_handle(&msg);
//usleep(10000);
}
pthread_exit(0);
}
static void mqtt_subscribe_message_handle(struct mqtt_subscribe_message *msg)
{
printf("+++++++++++++ subscribe message from iot ++++++++++++\n");
printf("Topic: %s\n", msg->topic.c_str());
printf("Message: %s\n", msg->payload.c_str());
printf("+++++++++++++++++++++++++++++++++++++++++++++++++++++\n");
printf("Topic: %s\n", msg->topic.c_str());
/* actual business */
if (0 == strcmp(msg->topic.c_str(), CONNECT_DEVICEID_TASK))
{
/* receive message from ""connect/Dev001/task"" topic */
/* parse json contained map or task data , or both */
Json::Reader reader;
Json::Value root;
if (reader.parse(msg->payload, root) && root.isObject())
{
printf("Parse json success.\n");
if (root["MsgID"].isString())
{
/* ack to connect/dispatch/task-ack topic , whether MsgID repeated or not */
// std::string ackContent = CommunicateUtil::deviceDeviceIdTaskAckData(root["MsgID"].asString(),
// 0,
// 1,
// "",
// "15323456890");
// publish(DEVICE_DEVICEID_CONNECT_TASK_ACK, ackContent,QOS_0);
std::string ackTimestamp = std::to_string(ros::Time::now().toNSec()/1000000);
std::string ackContent = CommunicateUtil::deviceDeviceIdTaskAckData(root["MsgID"].asString(),
1,
1,
"",
ackTimestamp);
publish(DEVICE_DEVICEID_CONNECT_TASK_ACK, ackContent, QOS_0);
// printf("Ack for had sent task data to agv_task_control.\n");
printf("Ack for receive an exist MsgID.\n");
/* repeated MsgID will not work for generate map and agv_task_control */
if (lastMessageId != root["MsgID"].asString())
{
lastMessageId = root["MsgID"].asString();
/* if MapData is exist , generate pgm and yaml first */
if (root["MapData"]["PgmData"].isObject())
{
std::string mapJsonString = root["MapData"]["PgmData"].toStyledString();
printf("MapData: \n");
printf("%s\n", mapJsonString.c_str());
/* generate pgm and yaml */
ConverUtil::generatePgmAndYaml(MAP_SAVE_NAME,
MAP_YAML_SAVE_NAME,
mapJsonString.c_str(),
MAP_RESOLUTION);
}
/* get TaskData , interact with agv_task_control */
if (root["TaskData"].isObject())
{
printf("TaskData: \n");
std::string taskJsonString = root["TaskData"].toStyledString();
printf("%s\n", taskJsonString.c_str());
/* send task data to agv_task_control */
std_msgs::String taskData;
taskData.data = taskJsonString;
printf("Send route path data to topic: %s\n", ROS_PUB_TOPIC_ROUTE_DATA);
TaskControlCommunicate::routePathPublish(taskData);
/* ack to connect/dispatch/task-ack topic with task data had sent to agv_task_control */
//ackContent.clear();
}
/*taskInstructionAction值 2:stop task 3:go on task 4:kill task*/
if (root["Action"].isObject())
{
printf("task Action: \n");
std::string taskInstructionActionJsonString = root["Action"].toStyledString();
printf("%s\n", taskInstructionActionJsonString.c_str());
/*send task action to agv_task_control*/
std_msgs::String taskInstructionAction;
taskInstructionAction.data = taskInstructionActionJsonString;
TaskControlCommunicate::taskInstructionPublish(taskInstructionAction);
printf("Ack for had sent task data to agv_task_control.\n");
}
}
else
{
printf("MsgID is equal to last MsgID , message had sent to agv_task_control.\n");
}
}
else
{
printf("MsgID isn't string.\n");
}
}
else
{
printf("Parse json failed.\n");
}
}
// else if (0 == strcmp(topicName, CONNECT_DEVICEID_DEVICE_ACK))
// {
// /* TODO: receive message from "connect/Dev001/device-ack" topic */
// /* ... */
// }
else if (0 == strcmp(msg->topic.c_str(), CONNECT_DEVICEID_CONTROL))
{
/*TODO: receive message from "connect/"DEVICE_ID"/control" topic */
Json::Reader reader;
Json::Value root;
if (reader.parse(msg->payload, root) && root.isObject())
{
printf("Parse json success.\n");
if (root["MsgID"].isString())
{
/* repeated MsgID will not work for generate map and agv_task_control */
if (lastMessageId != root["MsgID"].asString())
{
lastMessageId = root["MsgID"].asString();
/*send TaskInstruction to agv task control*/
if (root["Instruct"].isObject())
{
printf("Control instruct: \n");
std::string controlInstructionJsonString = root["Instruct"].toStyledString();
printf("%s\n", controlInstructionJsonString.c_str());
/*send task instruction to agv_task_control*/
std_msgs::String controlInstruction;
controlInstruction.data = controlInstructionJsonString;
TaskControlCommunicate::controlInstructionPublish(controlInstruction);
//ackContent.clear();
std::string ackTimestamp = std::to_string(ros::Time::now().toNSec()/1000000);
std::string ackContent = CommunicateUtil::deviceDeviceIdControlAckData(root["MsgID"].asString(),
1,
1,
"",
ackTimestamp);
publish(DEVICE_DEVICEID_CONNECT_CONTROL_ACK, ackContent,QOS_0);
printf("Ack for had sent task data to agv_task_control.\n");
}
}
else
{
printf("MsgID is equal to last MsgID , message had sent to agv_task_control.\n");
}
}
else
{
printf("MsgID isn't string.\n");
}
}
else
{
printf("Parse json failed.\n");
}
}
else if (0 == strcmp(msg->topic.c_str(), CONNECT_DEVICEID_CONF))
{
/*TODO: receive message from "connect/"DEVICE_ID"/conf" topic */
/* code */
/* parse json config data , or both */
Json::Reader reader;
Json::Value root;
if (reader.parse(msg->payload, root) && root.isObject())
{
printf("Parse json success.\n");
if (root["MsgID"].isString())
{
/* repeated MsgID will not work for agv_task_control */
if (lastMessageId != root["MsgID"].asString())
{
lastMessageId = root["MsgID"].asString();
/* get configData , interact with agv_task_control */
if (root["configData"].isObject())
{
printf("configData: \n");
std::string confJsonString = root["configData"].toStyledString();
printf("%s\n", confJsonString.c_str());
/* send confData to agv_task_control */
std_msgs::String confData;
confData.data = confJsonString;
printf("Send route path data to topic: %s\n", ROS_PUB_TOPIC_ROUTE_DATA);
TaskControlCommunicate::confPublish(confData);
/* ack to bim with confData had sent to agv_task_control */
//ackContent.clear();
std::string ackTimestamp = std::to_string(ros::Time::now().toNSec()/1000000);
std::string ackContent = CommunicateUtil::deviceDeviceIdConfAckData(root["MsgID"].asString(),
1,
1,
"",
ackTimestamp);
publish(DEVICE_DEVICEID_CONNECT_CONF_ACK, ackContent, QOS_0);
printf("Ack for had sent task data to agv_task_control.\n");
}
}
else
{
printf("MsgID is equal to last MsgID , message had sent to agv_task_control.\n");
}
}
else
{
printf("MsgID isn't string.\n");
}
}
else
{
printf("Parse json failed.\n");
}
}
else if (0 == strcmp(msg->topic.c_str(), CONNECT_DEVICEID_SWITCH))
{
/*TODO: receive message from "connect/"DEVICE_ID"/conf" topic */
/* parse json config data , or both */
Json::Reader reader;
Json::Value root;
if (reader.parse(msg->payload, root) && root.isObject())
{
printf("Parse json success.\n");
if (root["MsgID"].isString())
{
/* repeated MsgID will not work for agv_task_control */
if (lastMessageId != root["MsgID"].asString())
{
lastMessageId = root["MsgID"].asString();
/* get TaskData , interact with agv_task_control */
if (root["Action"].isObject())
{
printf("switch action: \n");
std::string actionJsonString = root["Action"].toStyledString();
printf("%s\n", actionJsonString.c_str());
/* send switch data to agv_task_control ,switchInstructData value 10:关机 11:开机*/
std_msgs::String switchActionData;
switchActionData.data = actionJsonString;
printf("Send route path data to topic: %s\n", ROS_SUB_TOPIC_DEVICE_INFO_SWITCH_STATE);
TaskControlCommunicate::switchInstructDataPublish(switchActionData);
printf("Ack for had sent switch instruct data to agv_task_control.\n");
}
}
else
{
printf("MsgID is equal to last MsgID , message had sent to agv_task_control.\n");
}
}
else
{
printf("MsgID isn't string.\n");
}
}
else
{
printf("Parse json failed.\n");
}
}
else
{
printf("Unsupported topic name.\n");
}
}