【MQTT协议】使用Mosquitto实现mqtt协议(二):编写视频帧的发布/订阅服务

news/2025/2/12 1:24:44/

目录

  • 一、Mosquitto中的QoS定义
    • QoS1和3区别
  • 二、安装base64库
  • 三、cjson简介
  • 四、主程序代码
  • 五、调用Mosquitto库使用的cmakelist

更多内容详见 【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译)

一、Mosquitto中的QoS定义

MQTT协议中的QoS(Quality of Service)表示消息传输的服务质量等级,它是MQTT协议中非常重要的一个概念。
MQTT协议中定义了三个不同等级的QoS:

QoS 0:最多一次(At most once)传输。消息发布者只发送一次消息,不进行确认,也不关心消息是否到达订阅者。这种QoS等级的消息传输效率最高,但可靠性最低。

QoS 1:最少一次(At least once)传输。消息发布者会发送消息,并等待确认。如果消息没有被确认,会再次发送,直到收到确认为止。这种QoS等级的消息传输可靠性较高,但效率稍低。

QoS 2:恰好一次(Exactly once)传输。消息发布者发送消息,并等待确认。如果没有收到确认,会再次发送,直到收到确认为止。订阅者接收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。这种QoS等级的消息传输可靠性最高,但效率最低。

QoS1和3区别

在MQTT协议中,客户端和服务端可以通过协商来确定消息传输的QoS等级,以满足消息传输效率和可靠性的要求。
MQTT协议中的QoS 1和QoS 2都保证了消息的可靠性,但它们之间有一些区别。
QoS 1(最少一次)保证了消息至少会被传输一次,但不保证消息一定会被准确地传输一次。在QoS 1级别下,发布者发送消息并等待一个确认,如果没有收到确认,会再次发送消息,直到收到确认为止。但是,如果确认消息丢失或延迟,可能会导致发布者不断地发送相同的消息。因此,QoS 1级别下可能会出现重复的消息。
QoS 2(恰好一次)保证了消息会被恰好传输一次,不会出现重复的消息。在QoS 2级别下,发布者发送消息并等待一个确认。订阅者收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。如果订阅者收到重复的消息,可以通过消息中的标识符进行去重,以保证只处理一次。
综上所述,QoS 2级别下的消息传输可靠性更高,但是会带来更大的网络开销和延迟。在选择QoS等级时,需要根据具体的应用需求来确定。

二、安装base64库

base64库用于完成图片与base64之间的转换工作。
git clone https://github.com/ReneNyffenegger/cpp-base64.git
得到base64.h及base64.cpp,直接在工程中进行引用即可。

三、cjson简介

由于Mosquitto传递字节消息,需使用CJSON完成对象的序列化:
如果需要将C语言结构体转化为JSON字符串,需要手动遍历结构体中的成员,并使用CJSON提供的接口将它们转化为JSON对象。
cjson库在上一篇博文中已安装。这里使用cjson获取字符串实现mqtt节点间的信息传递

四、主程序代码

视频在拉流后,视频帧到不同算法间的关系是发布/订阅关系,
下面代码进行了2节点简单模拟,文中对于Mosquitto原始库做了一个简易封装,了解逻辑即可

#include "mqtt-client.hpp"
#include <opencv2/opencv.hpp>
#include <base64.h>
#include "cJSON.h"
#include <fstream>
using namespace std;
string matToBase64(const cv::Mat& image) {// 将图像编码为base64字符串vector<uchar> data;cv::imencode(".jpg", image, data);string base64_img = base64_encode(data.data(), data.size());return base64_img;
}
cv::Mat base64ToMat(const std::string& base64_str) {// 解码base64字符串std::string decoded_str = base64_decode(base64_str);// 将解码后的字符串转换为cv::Matcv::Mat image = cv::imdecode(std::vector<uchar>(decoded_str.begin(), decoded_str.end()), cv::IMREAD_COLOR);return image;
}
int main(){  typedef struct {int id;std::string pic;} Student;// 读取PNG图片为Matcv::Mat image = cv::imread("/home/trtlearn/mqtt-client/workspace/pic.jpg");string base64_img = matToBase64(image);// 创建一个JSON对象cJSON *root = cJSON_CreateObject();// 将结构体成员转化为JSON对象cJSON_AddNumberToObject(root, "num", 0.75);cJSON_AddStringToObject(root, "pic", base64_img.c_str());// 将JSON对象转换为JSON字符串char* json_str = cJSON_Print(root);cJSON_Delete(root);//新建两个客户端,其中一个发布消息接收数值,另一个接收图片。其中第一个节点即是发布者又是订阅者。string strPic(json_str);MQTTInitializer _;MQTTConnectCredential credential;credential.client_id = "wish";credential.subscribe_topic = "cc";credential.server_address = "127.0.0.1";credential.port = 9999;MQTTConnectCredential credential2;credential2.client_id = "wish1";credential2.subscribe_topic = "cc";credential2.server_address = "127.0.0.1";credential2.port = 9999;auto client = create_mqtt_client(credential);auto client2 = create_mqtt_client(credential2);if(client == nullptr){printf("Failed to create client.\n");return -1;}if(client2 == nullptr){printf("Failed to create client2.\n");return -1;}printf("Create client success.\n");//设置订阅后,收到消息时的回调函数client->set_message_callback([](MQTTClient* client, std::string topic, std::string message){cJSON *root = cJSON_Parse(message.c_str());cJSON *num = cJSON_GetObjectItem(root, "num");printf("客户端1收到 [%s] 数值: %f\n", topic.c_str(), (float)(num->valuedouble));cJSON_Delete(root);});client2->set_message_callback([&](MQTTClient* client, std::string topic, std::string message){cJSON *root = cJSON_Parse(message.c_str());cJSON *pics = cJSON_GetObjectItem(root, "pic");cv::Mat image=base64ToMat(string(pics->valuestring));if (!image.empty()) {cv::imwrite("receiver.jpg",image);}printf("客户端2收到 [%s] 图片\n", topic.c_str());cJSON_Delete(root);});while(true){char c = getchar();if(c == 'q'){break;}if(c == 's'){if(client->publish("cc", strPic)){printf("Publish success\n");}else{printf("Publish failed\n");}}}printf("Done.\n");return 0;
}

运行程序,输入s发布数值与图片信息,打印订阅输出结果:
在这里插入图片描述

五、调用Mosquitto库使用的cmakelist

cmake_minimum_required(VERSION 3.0)
project(pro)
add_definitions(-std=c++11)option(CUDA_USE_STATIC_CUDA_RUNTIME OFF)
set(CMAKE_CXX_STANDARD 11)
#set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Debug)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/workspace)set(OpenCV_DIR   "/usr/local/opencv/lib64/cmake/opencv4")
set(mosquitto_Include   "/usr/local/mosquitto/include/")
set(CJSON_INCLUDE   "/usr/local/include/cjson")
set(mosquitto_DIR   "/usr/local/mosquitto")
set(mosquitto_LIBS mosquitto mosquittopp)
set(CJSON_LIBS cjson)find_package(OpenCV)include_directories(${PROJECT_SOURCE_DIR}/src${OpenCV_INCLUDE_DIRS}${mosquitto_Include}${CJSON_INCLUDE}
)link_directories(${TENSORRT_DIR}/lib${CUDA_DIR}/lib64${CUDNN_DIR}/lib${mosquitto_DIR}/lib
)set(CMAKE_CXX_FLAGS  "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -O0 -Wfatal-errors -pthread -w -g")#递归地添加的相关文件
file(GLOB_RECURSE cpp_srcs ${PROJECT_SOURCE_DIR}/src/*.cpp)
file(GLOB_RECURSE c_srcs ${PROJECT_SOURCE_DIR}/src/*.c)add_executable(pro ${cpp_srcs} ${c_srcs})target_link_libraries(pro ${OpenCV_LIBS} ${mosquitto_LIBS} ${CJSON_LIBS})add_custom_target(runDEPENDS proWORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/workspaceCOMMAND ./pro
)

http://www.ppmy.cn/news/43994.html

相关文章

「SQL面试题库」 No_36 树节点

&#x1f345; 1、专栏介绍 「SQL面试题库」是由 不是西红柿 发起&#xff0c;全员免费参与的SQL学习活动。我每天发布1道SQL面试真题&#xff0c;从简单到困难&#xff0c;涵盖所有SQL知识点&#xff0c;我敢保证只要做完这100道题&#xff0c;不仅能轻松搞定面试&#xff0…

Tableau-创建环状图:使用2个饼图

步骤 1&#xff1a;创建饼图 在“标记”下面&#xff0c;选择“饼图”标记类型。将分类拖到颜色。将任务总数拖到角度。再拖动一次任务总数&#xff0c;放到标签。根据需要调整饼图大小。 步骤 2&#xff1a;切换到双轴图表 右键点击任意一个字段&#xff0c;创建-->计算…

MySQL索引及SQL优化

先对索引做个大概回顾,然后我们详细探讨SQL优化 索引 索引的分类 主键索引 设定为主键后数据库会自动建立索引&#xff0c;innodb为聚簇索引 单值索引 即一个索引只包含单个列&#xff0c;一个表可以有多个单列索引【建议不要超过3】 唯一索引 索引列的值必须唯一&#xff0…

基于典型相关分析的故障检测和过程监控算法研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。⛳座右铭&#…

SpringSecurity+OAUTH2集成多种登录方式

一、目前OAUTH2的提供了四种授权类型 Authorization Code&#xff08;授权码模式&#xff09;&#xff1a;授权码模式&#xff0c; 通过授权码获取token进行资源访问。 Implicit&#xff08;简化模式&#xff09;&#xff1a;用于移动应用程序或 Web 应用程序&#xff0c;这种…

Python基础-09 高阶函数

高阶函数 一个函数作为另一个的返回值 def demo():print(我是test函数)return hellodef demo():print(我是demo函数)return testdef bar():print(我是bar函数)return test()x test() # 我是test函数 print(x) # helloy demo() # 我是demo函数 print(y) # <function …

数字电源专用IC,国产C2000, QX320F280049

一、特性参数 1、独立双核&#xff0c;32位CPU&#xff0c;单核主频400MHz 2、IEEE 754 单精度浮点单元 &#xff08;FPU&#xff09; 3、三角函数单元 &#xff08;TMU&#xff09; 4、1MB 的 FLASH &#xff08;ECC保护&#xff09; 5、1MB 的 SRAM &#xff08;ECC保护&…

【华为OD机试真题 C++】1001 - 在字符串中找出连续最长的数字串含-号 | 机试题+算法思路+考点+代码解析

文章目录 一、题目🔸题目描述🔸输入输出二、代码参考作者:KJ.JK🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🌈 🍂个人博客首页: KJ.JK 💖系列专栏:华为OD机试真题(C++) 🍂专栏介绍: 华为OD机试真题汇总,使用C++来进行解析解答实现,帮…