MQTT基础概念
MQTT 入门介绍 | 菜鸟教程
MQTT使用
在linux下搭建MQTT服务器(Broker)
在linux下执行下面命令安装MQTT服务器
📎mosquitto-1.6.3.tar.gzhttps://www.yuque.com/attachments/yuque/0/2023/gz/35243076/1687955850547-b5941261-e660-4c0a-bb58-04b9c8efb56d.gz
tar xf mosquitto-1.6.3.tar.gz cd mosquitto-1.6.3/ make sudo make install
安装完成后,终端输入mosquitto
命令即可运行。端口默认是1883
$ mosquitto 1639476502: mosquitto version 1.6.3 starting 1639476502: Using default config. 1639476502: Opening ipv4 listen socket on port 1883. 1639476502: Opening ipv6 listen socket on port 1883.
MQTT客户端
PC客户端
PC客户端测试推荐使用MQTT.fx
客户端软件
》1.设置MQTT.fx软件
》2.开启linux下的MQTT服务器
终端输入mosquitto
命令即可运行
》3.链接
链接后linux服务器出现提示
》4.发布者页面
》5.订阅者页面
》6.通过上面介绍可以开启两次软件,一个是发布者一个是订阅者,进行通信测试
客户端库移植
编译安装
官方下载地址
GitHub - eclipse/paho.mqtt.c at v1.3.0
把源码包放到自己家目录任意位置,执行下面的指令
📎paho.mqtt.c-1.3.0.ziphttps://www.yuque.com/attachments/yuque/0/2023/zip/35243076/1687958647466-904614f1-e4a9-47a4-b514-8a5b34d77f03.zip
unzip paho.mqtt.c-1.3.0.zip cd paho.mqtt.c-1.3.0/ cmake -DCMAKE_INSTALL_PREFIX=/usr make sudo make install
执行完上面的命令后,会在/usr目录下生成samples文件
进入目录我们能看到很多代码
这是官方给我们提供的示例
但是在根目录下进行编写执行代码是不妥的,我们一般在家目录下自己的文件夹进行编写。
把/usr目录里的代码拿到自己的工作目录再修改编译
我们主要使用MQTTClient_subscribe.c(订阅)
和MQTTClient_publish.c(发布)
MQTTClient_subscribe.c代码分析(中文版)
/******************************************************************************** 版权所有 (c) 2012, 2017 IBM Corp.** 保留所有权利。此程序及其附带材料* 根据 Eclipse Public License v1.0 和 Eclipse Distribution License v1.0* 提供。许可证附带此发行版。** Eclipse Public License 可在以下网址获取:* http://www.eclipse.org/legal/epl-v10.html* Eclipse Distribution License 可在以下网址获取:* http://www.eclipse.org/org/documents/edl-v10.php。** 贡献者:* Ian Craggs - 初始贡献*******************************************************************************/#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"#define ADDRESS "tcp://192.168.31.240:1883" // MQTT服务器地址
#define CLIENTID "ExampleClientSub" // 客户端ID
#define TOPIC "MQTT Examples" // 订阅的主题
#define PAYLOAD "Hello World!" // 发布的消息内容
#define QOS 1 // 服务质量
#define TIMEOUT 10000Lvolatile MQTTClient_deliveryToken deliveredtoken;void delivered(void *context, MQTTClient_deliveryToken dt)
{printf("消息传递令牌值为 %d 的消息已确认传递\n", dt);deliveredtoken = dt;
}/*** @brief * * @param context * @param topicName 收到来自哪个主题的消息* @param topicLen 主题的长度* @param message 消息体:payload(消息体,字符串) payloadlen:消息的长度* @return int */
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{int i;char* payloadptr;printf("收到消息\n");printf(" 主题: %s\n", topicName);printf(" 消息内容: ");
#if 0payloadptr = message->payload;for(i=0; i<message->payloadlen; i++){putchar(*payloadptr++);}
#endifprintf("接收到的消息 = %s\n", (char *)message->payload);MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}void connlost(void *context, char *cause)
{printf("\n连接丢失\n");printf(" 原因: %s\n", cause);
}int main(int argc, char* argv[])
{// 客户端句柄(描述符)MQTTClient client;// 连接参数MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;int rc;int ch;// 创建客户端,并且指定客户端连接的 MQTT 服务器地址和客户端 IDMQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);// 初始化连接参数conn_opts.keepAliveInterval = 20;conn_opts.cleansession = 1;// 设置回调接口,只需要关注 msgarrvd:消息到达后,会自动调用这个接口MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);// 连接到 brokerif ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("连接失败,返回代码 %d\n", rc);exit(EXIT_FAILURE);}printf("订阅主题 %s\n客户端: %s\n使用服务质量 QoS%d\n\n""按下 Q<Enter> 退出\n\n", TOPIC, CLIENTID, QOS);// 订阅某个主题,指定订阅主题的名字,可以指定服务质量 QoSMQTTClient_subscribe(client, TOPIC, QOS);// 死循环,直到收到了一个 'Q' 就退出do {ch = getchar();} while(ch!='Q' && ch != 'q');MQTTClient_unsubscribe(client, TOPIC);MQTTClient_disconnect(client, 10000);MQTTClient_destroy(&client);return rc;
}
回调函数实现原理
func(msgarrd)
{while(1){recv(data);msgarrd(data);}
}
MQTTClient_setCallbacks(msgarrd)pthread_create(func, msgarrd);
注意:接收回调函数的返回值一定不要改,返回0会导致段错误。
MQTTClient_publish.c代码分析(中文版)
/******************************************************************************** 版权所有 (c) 2012, 2017 IBM Corp.** 保留所有权利。此程序及其附带材料* 根据 Eclipse Public License v1.0 和 Eclipse Distribution License v1.0* 提供。许可证附带此发行版。** Eclipse Public License 可在以下网址获取:* http://www.eclipse.org/legal/epl-v10.html* Eclipse Distribution License 可在以下网址获取:* http://www.eclipse.org/org/documents/edl-v10.php。** 贡献者:* Ian Craggs - 初始贡献*******************************************************************************/#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"#define ADDRESS "tcp://localhost:1883" // MQTT 服务器地址
#define CLIENTID "ExampleClientPub" // 客户端ID
#define TOPIC "MQTT Examples" // 发布的主题
#define PAYLOAD "Hello World!" // 发布的消息内容
#define QOS 1 // 服务质量
#define TIMEOUT 10000L // 发布超时时间int main(int argc, char* argv[])
{MQTTClient client; // MQTT 客户端MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; // 连接选项MQTTClient_message pubmsg = MQTTClient_message_initializer; // 发布的消息MQTTClient_deliveryToken token; // 消息传递令牌int rc; // 返回代码MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL); // 创建 MQTT 客户端conn_opts.keepAliveInterval = 20; // 保持活动状态的时间间隔conn_opts.cleansession = 1; // 清除会话信息// 连接到 MQTT 服务器if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("连接失败,返回代码 %d\n", rc);exit(EXIT_FAILURE);}pubmsg.payload = PAYLOAD; // 设置发布的消息内容pubmsg.payloadlen = (int)strlen(PAYLOAD); // 设置消息内容的长度pubmsg.qos = QOS; // 设置服务质量pubmsg.retained = 0; // 设置是否保留消息MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token); // 发布消息到指定主题printf("等待 %d 秒钟以确保消息 %s\n""发布到主题 %s,客户端ID为: %s\n",(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); // 等待消息发布完成printf("传递令牌为 %d 的消息已发布\n", token);MQTTClient_disconnect(client, 10000); // 断开连接MQTTClient_destroy(&client); // 销毁客户端return rc;
}
编码测试
》1.以MQTTClient_subscribe.c为例,修改代码中IP地址和主题即可完成最简单的通信。
》2.使用如下命令编译代码,注意需要指定需要的库-l paho-mqtt3c
gcc MQTTClient_subscribe.c -l paho-mqtt3c
》3.在执行程序时注意开启linux服务器
终端输入mosquitto
命令即可运行
》4.执行编译生成的程序
》5.这样就可和windows下的MQTT.fx
进行通信测试了
我们可以看到最后一个字符出现了乱码
是因为修改了程序的这部分导致的,但并不影响我们的测试
练习(重点)
利用mqtt.fx软件实现聊天功能,fx订阅"up"
主题,程序订阅"down"
主题。使用如下的json通信协议。
{"name": "zhangsan","age": 16,"msg": "hello world"
}
提示:mqtt的连接类似与TCP的连接,有且仅有一个连接。连接和订阅动作不能放到循环中。通信时不要用中文,我们终端默认是utf-8编码,而fx软件用的是其它编码,会出现乱码现象。
注意: 官方订阅和发布是两个例子,需要整合到一个代码里,最后只启动一个进程 。进程启动后,从终端获取用户输入然后发送给fx,并且能接收来自fx的消息。终端只接收msg,name和age按照上面的例子定死即可。涉及到多文件编译,可以写个Makefile来组织工程。对于同一个Broker地址只需要一路mqtt链接即可。
答案如下:下面是源码使用tar -xvf samples.tar进行解压
📎samples.tarhttps://www.yuque.com/attachments/yuque/0/2023/tar/35243076/1688999287758-23d9df26-fb88-4fda-9238-a012bf0b5a40.tar
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#include "cJSON.h"#define ADDRESS "tcp://192.168.19.129:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "down"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000Lvolatile MQTTClient_deliveryToken deliveredtoken;void delivered(void *context, MQTTClient_deliveryToken dt)
{printf("Message with token value %d delivery confirmed\n", dt);deliveredtoken = dt;
}//线程处理接收到的消息
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{int i;char *payloadptr;//下边处理收到的消息//把传入的字符串转成cJSON的结构(反序列化)//最好添加判断,若不是json结构不进行处理,直接提示cJSON *cjson = cJSON_Parse((char *)message->payload);//消息来到了这个数组里if (cjson == NULL){printf("cjson error...\r\n");MQTTClient_freeMessage(&message); //直接释放MQTTClient_free(topicName); return 1;}//开始摘果子char *name = cJSON_GetObjectItem(cjson, "name")->valuestring;printf("%s:", name);int age = cJSON_GetObjectItem(cjson, "age")->valueint;char *msg = cJSON_GetObjectItem(cjson, "msg")->valuestring;printf("%s\n", msg);MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}void connlost(void *context, char *cause)
{printf("\nConnection lost\n");printf(" cause: %s\n", cause);
}
//主函数int main(int argc, char *argv[])
{pthread_t tid;//客户端句柄(描述符)MQTTClient client;//连接参数MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;MQTTClient_message pubmsg = MQTTClient_message_initializer;MQTTClient_deliveryToken token;int rc;int ch;//创建客户端,并且指定客户端连接的mqtt服务器地址和客户端IDMQTTClient_create(&client, ADDRESS, CLIENTID,MQTTCLIENT_PERSISTENCE_NONE, NULL);//初始化连接参数conn_opts.keepAliveInterval = 20;conn_opts.cleansession = 1;//设置回调接口,只需要关注msgarrvd:消息到达后,会自动调用这个接口MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);//连接到broker(只链接一次即可)if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("Failed to connect, return code %d\n", rc);exit(EXIT_FAILURE);}printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n""Press Q<Enter> to quit\n\n",TOPIC, CLIENTID, QOS);//订阅某个主题,指定订阅主题的名字,可以指定qos服务质量//订阅down主题MQTTClient_subscribe(client, TOPIC, QOS);//死循环,直到收到了一个q就退出do{char buf[128];fgets(buf, sizeof(buf), stdin);if (buf[strlen(buf) - 1] == '\n')buf[strlen(buf) - 1] = '\0';cJSON *TCP = cJSON_CreateObject();cJSON_AddStringToObject(TCP, "name", "southernbrid");cJSON_AddNumberToObject(TCP, "age", 18);cJSON_AddStringToObject(TCP, "msg", buf);char *json_data = cJSON_Print(TCP);//填充pubmsg结构体pubmsg.payload = json_data;pubmsg.payloadlen = (int)strlen(json_data);pubmsg.qos = QOS;pubmsg.retained = 0;deliveredtoken = 0;//向up发送消息MQTTClient_publishMessage(client, "up", &pubmsg, &token);printf("Waiting for publication of %s\n""on topic %s for client with ClientID: %s\n",PAYLOAD, "up", CLIENTID);//老师友情提醒:注意资源释放//不然等与慢性自杀free(json_data);cJSON_Delete(TCP);} while (1);//回收资源MQTTClient_unsubscribe(client, TOPIC);MQTTClient_disconnect(client, 10000);MQTTClient_destroy(&client);return rc;
}
你不得不看的图文并茂的MQTT协议通信过程!!!_你不得不看mqtt__杰杰_的博客-CSDN博客图文并茂讲解MQTT协议通信过程,深入理解MQTT协议工作过程。_你不得不看mqtthttps://jiejie.blog.csdn.net/article/details/106737995?spm=1001.2014.3001.5502
MQTT协议简介及协议原理__杰杰_的博客-CSDN博客带你看看MQTT协议简介及协议原理_mqtt协议https://jiejie.blog.csdn.net/article/details/106732811?spm=1001.2014.3001.5502
可以结合着wireshark抓包看看内部实现原理。