目录
- 一、Mosquitto中的QoS定义
- QoS1和3区别
- 二、安装base64库
- 三、cjson简介
- 四、主程序代码
- 五、调用Mosquitto库使用的cmakelist
更多内容详见 【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译)
一、Mosquitto中的QoS定义
MQTT协议中的QoS(Quality of Service)表示消息传输的服务质量等级,它是MQTT协议中非常重要的一个概念。
MQTT协议中定义了三个不同等级的QoS:
QoS 0:最多一次(At most once)传输。消息发布者只发送一次消息,不进行确认,也不关心消息是否到达订阅者。这种QoS等级的消息传输效率最高,但可靠性最低。
QoS 1:最少一次(At least once)传输。消息发布者会发送消息,并等待确认。如果消息没有被确认,会再次发送,直到收到确认为止。这种QoS等级的消息传输可靠性较高,但效率稍低。
QoS 2:恰好一次(Exactly once)传输。消息发布者发送消息,并等待确认。如果没有收到确认,会再次发送,直到收到确认为止。订阅者接收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。这种QoS等级的消息传输可靠性最高,但效率最低。
QoS1和3区别
在MQTT协议中,客户端和服务端可以通过协商来确定消息传输的QoS等级,以满足消息传输效率和可靠性的要求。
MQTT协议中的QoS 1和QoS 2都保证了消息的可靠性,但它们之间有一些区别。
QoS 1(最少一次)保证了消息至少会被传输一次,但不保证消息一定会被准确地传输一次。在QoS 1级别下,发布者发送消息并等待一个确认,如果没有收到确认,会再次发送消息,直到收到确认为止。但是,如果确认消息丢失或延迟,可能会导致发布者不断地发送相同的消息。因此,QoS 1级别下可能会出现重复的消息。
QoS 2(恰好一次)保证了消息会被恰好传输一次,不会出现重复的消息。在QoS 2级别下,发布者发送消息并等待一个确认。订阅者收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。如果订阅者收到重复的消息,可以通过消息中的标识符进行去重,以保证只处理一次。
综上所述,QoS 2级别下的消息传输可靠性更高,但是会带来更大的网络开销和延迟。在选择QoS等级时,需要根据具体的应用需求来确定。
二、安装base64库
base64库用于完成图片与base64之间的转换工作。
git clone https://github.com/ReneNyffenegger/cpp-base64.git
得到base64.h及base64.cpp,直接在工程中进行引用即可。
三、cjson简介
由于Mosquitto传递字节消息,需使用CJSON完成对象的序列化:
如果需要将C语言结构体转化为JSON字符串,需要手动遍历结构体中的成员,并使用CJSON提供的接口将它们转化为JSON对象。
cjson库在上一篇博文中已安装。这里使用cjson获取字符串实现mqtt节点间的信息传递
四、主程序代码
视频在拉流后,视频帧到不同算法间的关系是发布/订阅关系,
下面代码进行了2节点简单模拟,文中对于Mosquitto原始库做了一个简易封装,了解逻辑即可
#include "mqtt-client.hpp"
#include <opencv2/opencv.hpp>
#include <base64.h>
#include "cJSON.h"
#include <fstream>
using namespace std;
string matToBase64(const cv::Mat& image) {// 将图像编码为base64字符串vector<uchar> data;cv::imencode(".jpg", image, data);string base64_img = base64_encode(data.data(), data.size());return base64_img;
}
cv::Mat base64ToMat(const std::string& base64_str) {// 解码base64字符串std::string decoded_str = base64_decode(base64_str);// 将解码后的字符串转换为cv::Matcv::Mat image = cv::imdecode(std::vector<uchar>(decoded_str.begin(), decoded_str.end()), cv::IMREAD_COLOR);return image;
}
int main(){ typedef struct {int id;std::string pic;} Student;// 读取PNG图片为Matcv::Mat image = cv::imread("/home/trtlearn/mqtt-client/workspace/pic.jpg");string base64_img = matToBase64(image);// 创建一个JSON对象cJSON *root = cJSON_CreateObject();// 将结构体成员转化为JSON对象cJSON_AddNumberToObject(root, "num", 0.75);cJSON_AddStringToObject(root, "pic", base64_img.c_str());// 将JSON对象转换为JSON字符串char* json_str = cJSON_Print(root);cJSON_Delete(root);//新建两个客户端,其中一个发布消息接收数值,另一个接收图片。其中第一个节点即是发布者又是订阅者。string strPic(json_str);MQTTInitializer _;MQTTConnectCredential credential;credential.client_id = "wish";credential.subscribe_topic = "cc";credential.server_address = "127.0.0.1";credential.port = 9999;MQTTConnectCredential credential2;credential2.client_id = "wish1";credential2.subscribe_topic = "cc";credential2.server_address = "127.0.0.1";credential2.port = 9999;auto client = create_mqtt_client(credential);auto client2 = create_mqtt_client(credential2);if(client == nullptr){printf("Failed to create client.\n");return -1;}if(client2 == nullptr){printf("Failed to create client2.\n");return -1;}printf("Create client success.\n");//设置订阅后,收到消息时的回调函数client->set_message_callback([](MQTTClient* client, std::string topic, std::string message){cJSON *root = cJSON_Parse(message.c_str());cJSON *num = cJSON_GetObjectItem(root, "num");printf("客户端1收到 [%s] 数值: %f\n", topic.c_str(), (float)(num->valuedouble));cJSON_Delete(root);});client2->set_message_callback([&](MQTTClient* client, std::string topic, std::string message){cJSON *root = cJSON_Parse(message.c_str());cJSON *pics = cJSON_GetObjectItem(root, "pic");cv::Mat image=base64ToMat(string(pics->valuestring));if (!image.empty()) {cv::imwrite("receiver.jpg",image);}printf("客户端2收到 [%s] 图片\n", topic.c_str());cJSON_Delete(root);});while(true){char c = getchar();if(c == 'q'){break;}if(c == 's'){if(client->publish("cc", strPic)){printf("Publish success\n");}else{printf("Publish failed\n");}}}printf("Done.\n");return 0;
}
运行程序,输入s发布数值与图片信息,打印订阅输出结果:
五、调用Mosquitto库使用的cmakelist
cmake_minimum_required(VERSION 3.0)
project(pro)
add_definitions(-std=c++11)option(CUDA_USE_STATIC_CUDA_RUNTIME OFF)
set(CMAKE_CXX_STANDARD 11)
#set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Debug)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/workspace)set(OpenCV_DIR "/usr/local/opencv/lib64/cmake/opencv4")
set(mosquitto_Include "/usr/local/mosquitto/include/")
set(CJSON_INCLUDE "/usr/local/include/cjson")
set(mosquitto_DIR "/usr/local/mosquitto")
set(mosquitto_LIBS mosquitto mosquittopp)
set(CJSON_LIBS cjson)find_package(OpenCV)include_directories(${PROJECT_SOURCE_DIR}/src${OpenCV_INCLUDE_DIRS}${mosquitto_Include}${CJSON_INCLUDE}
)link_directories(${TENSORRT_DIR}/lib${CUDA_DIR}/lib64${CUDNN_DIR}/lib${mosquitto_DIR}/lib
)set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -O0 -Wfatal-errors -pthread -w -g")#递归地添加的相关文件
file(GLOB_RECURSE cpp_srcs ${PROJECT_SOURCE_DIR}/src/*.cpp)
file(GLOB_RECURSE c_srcs ${PROJECT_SOURCE_DIR}/src/*.c)add_executable(pro ${cpp_srcs} ${c_srcs})target_link_libraries(pro ${OpenCV_LIBS} ${mosquitto_LIBS} ${CJSON_LIBS})add_custom_target(runDEPENDS proWORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/workspaceCOMMAND ./pro
)