PC端QT实现mqtt客户端发布和订阅

news/2025/3/13 11:26:25/

在Windows11-64位系统下使用QT开发桌面应用程序,实现mqtt客户端的发布和订阅功能。

需求:

mqtt代理服务器 --mosquitto;

mqtt客户端工具 -- mqtt.fx;

qtcreator开发工具 -- qtcreator6.8.2版本;

过程:

Windows下MQTT服务器搭建

安装mosquitto-1.6.9-install-windows-x64.exe,全部按默认安装即可,路径可以自己修改。

安装后在安装路径会有一个名为mosquitto.conf的配置文件,按照如下方式进行配置,默认端口1883。

修改allow_anonymous为true以允许主机匿名访问。

在mosquitto安装目录下启用终端(cmd),执行命令启动mosquitto服务

windows的mosquitto刚启动时候不会输出任何信息,只要能启动,就代表成功了。

如果想查看调试信息,那么就把log_type的配置项打开。

MQTT客户端工具使用

在客户端发布过程中,可以通过MQTT服务端终端查看相关debug,如下图:

QT开发

由于使用的qt6.8.2版本,使用mqtt驱动库需要重新编译,在安装插件时需要选择source

这边已经安装过,实际安装可能需要几个G的安装内存,安装过程中会出现一些下载错误的情况,不用管,直接重新下载就可以。

安装好后,在qt安装目录下能看到qtmqtt文件夹,那么就说明source源码安装成功。

接下来需要编译MQTT源码,需要CMake和Ninja两个命令。

执行cmake --version和ninja --version查看版本号,能查看说明安装成功。注意需要更新环境变量path。

CSDN上有比较详细的编译MQTT驱动方式,这里简单介绍下。

mkdir build
cd build
cmake -G "Ninja" -DCMAKE_PREFIX_PATH=C:/Qt/6.8.2/msvcxxx_64 ../
ninja
ninja install

有的情况下,可能需要配置环境变量path

一般这样就能完成mqtt编译,打开qt工程,如果出现以下输出,说明工程没有获取到mqtt驱动,再仔细检查下,还有qt工程的.pro中增加mqtt模块。

QT开发UI界面

QT代码
#include "widget.h"
#include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget)
{ui->setupUi(this);tcpSocket = new QTcpSocket(this);connect(tcpSocket,SIGNAL(readyRead()),this,SLOT(readMessage()));//连接信号和相应槽函数connect(tcpSocket,SIGNAL(connected()),this,SLOT(sendMessage()));connect(tcpSocket,SIGNAL(error(QAbstractSocket::SocketError)),this,SLOT(displayError(QAbstractSocket::SocketError)));connect(ui->pushButton_recv,SIGNAL(clicked()),this,SLOT(pushButton_recv_clicked()));connect(ui->pushButton_del,SIGNAL(clicked()),this,SLOT(pushButton_del_clicked()));//连接mqttconnect(&mqtt_client, &QMqttClient::connected, [&]() {ui->pushButton_mqttconnect->setText("mqtt断开");ui->lineEdit_host->setEnabled(false);ui->lineEdit_port->setEnabled(false);ui->pushButton_mqttrecv->setEnabled(true);ui->pushButton_mqttsend->setEnabled(true);});//断开mqttconnect(&mqtt_client, &QMqttClient::disconnected, [&]() {ui->pushButton_mqttconnect->setText("mqtt连接");ui->lineEdit_host->setEnabled(true);ui->lineEdit_port->setEnabled(true);});// 检测连接状态// connect(&mqtt_client, &QMqttClient::stateChanged, [&](QMqttClient::ClientState state) {//     if (state == QMqttClient::Connected) {//         //qDebug() << "Connected to MQTT broker!";//         ui->textEdit_db->append("Connected to MQTT broker!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());//     } else if (state == QMqttClient::Disconnected) {//         //qDebug() << "Disconnected from MQTT broker!";//         ui->textEdit_db->append("Disconnected from MQTT broker!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());//     } else if (state == QMqttClient::Connecting) {//         //qDebug() << "Connecting to MQTT broker...";//         ui->textEdit_db->append("Connecting to MQTT broker...!"+ui->lineEdit_host->text()+":"+ui->lineEdit_port->text());//     }// });// 连接错误时的槽函数connect(&mqtt_client, &QMqttClient::errorChanged, [&](QMqttClient::ClientError error) {//qDebug() << "Error occurred:" << error;ui->textEdit_db->append("Error occurred: "+QString::fromStdString(std::to_string(error)));});//接收消息connect(&mqtt_client, &QMqttClient::messageReceived, [&](const QByteArray &message, const QMqttTopicName &topic) {ui->textEdit_db->append("mqtt_recv: "+topic.name()+":"+message);});connect(ui->pushButton_mqttconnect,SIGNAL(clicked()),this,SLOT(pushButton_mqttconnect_clicked()));connect(ui->pushButton_mqttrecv,SIGNAL(clicked()),this,SLOT(pushButton_mqttrecv_clicked()));connect(ui->pushButton_mqttsend,SIGNAL(clicked()),this,SLOT(pushButton_mqttsend_clicked()));ui->pushButton_mqttrecv->setEnabled(false);ui->pushButton_mqttsend->setEnabled(false);// 设置 QTextEdit 无法选择文本ui->textEdit_db->setTextInteractionFlags(Qt::NoTextInteraction);
}Widget::~Widget()
{delete ui;
}void Widget::connect_db()
{QStringList drivers = QSqlDatabase::drivers();foreach(QString driver, drivers) {ui->textEdit_db->append(driver);}ui->textEdit_db->append("test.");db = QSqlDatabase::addDatabase("QMYSQL");db.setHostName("118.31.2.73");db.setPort(3306); // MySQL的默认端口是3306db.setDatabaseName("test_db");db.setUserName("root");db.setPassword("123456");if (!db.open()) {ui->textEdit_db->append("Error: Unable to connect to the database.");return;}//ui->pushButton_mqttconnect->setText("数据库断连");
}void Widget::select_db()
{/*check_flag = 0;check_value.fill("Default", 10);  // 设置列表大小为 10,并用 "Default" 填充if(ui->checkBox_ar->isChecked()){check_value[check_flag] = ui->checkBox_ar->text();++check_flag;}if(ui->checkBox_hum->isChecked()){check_value[check_flag] = ui->checkBox_hum->text();++check_flag;}if(ui->checkBox_tem->isChecked()){check_value[check_flag] = ui->checkBox_tem->text();++check_flag;}model = new QStandardItemModel(0, 0, this);  // 0 行 0 列switch (check_flag) {case 1:model->setHorizontalHeaderLabels({check_value[0], "date"});break;case 2:model->setHorizontalHeaderLabels({check_value[0], check_value[1], "date"});break;case 3:model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "date"});break;default:model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "date"});break;}QSqlQuery query;query.exec("SELECT * FROM sensor_data");while (query.next()) {QString hum = query.value(0).toString(); // 获取第一列的值QString tem = query.value(1).toString(); // 获取第二列的值QString ar = "";QString created_at = query.value(2).toString(); // 获取第三列的值QList<QStandardItem*> row;switch(check_flag){case 1:if(check_value[0] == "hum")row << new QStandardItem(hum) << new QStandardItem(created_at);if(check_value[0] == "tem")row << new QStandardItem(tem) << new QStandardItem(created_at);if(check_value[0] == "ar")row << new QStandardItem(ar) << new QStandardItem(created_at);break;case 2:if(check_value[0] == "hum" && check_value[1] == "tem")row << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);if(check_value[0] == "ar" && check_value[1] == "hum")row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(created_at);if(check_value[0] == "ar" && check_value[1] == "tem")row << new QStandardItem(ar) << new QStandardItem(tem) << new QStandardItem(created_at);break;case 3:row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);break;default:row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);break;}model->appendRow(row);  // 在末尾添加一行}ui->tableView->setModel(model);
*/if (!db.open()) {ui->textEdit_db->append("db disconnect,cannot select db");return;}ui->textEdit_db->append("test select_button");
}void Widget::close_db()
{db.close(); //关闭数据库连接以释放资源
}bool isValidIP(const QString &ip)
{QRegularExpression ipRegex("^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$");return ipRegex.match(ip).hasMatch();
}
bool isValidPort(int port)
{return port > 0 && port <= 65535;
}void Widget::newConnect()
{// 创建数据模型model = new QStandardItemModel(0, 0, this);  // 0 行 0 列QString ip = ui->hostLineEdit->text();if (!isValidIP(ip)) {ui->textEdit_db->append("Invalid IP address!");return;}int port = ui->portLineEdit->text().toInt();if (!isValidPort(port)) {ui->textEdit_db->append("Invalid port number!");return;}check_flag = 0;check_value.fill("Default", 10);  // 设置列表大小为 10,并用 "Default" 填充if(ui->checkBox_ar->isChecked()){check_value[check_flag] = ui->checkBox_ar->text();++check_flag;}if(ui->checkBox_hum->isChecked()){check_value[check_flag] = ui->checkBox_hum->text();++check_flag;}if(ui->checkBox_tem->isChecked()){check_value[check_flag] = ui->checkBox_tem->text();++check_flag;}switch (check_flag) {case 1:model->setHorizontalHeaderLabels({check_value[0], "created_at"});break;case 2:model->setHorizontalHeaderLabels({check_value[0], check_value[1], "created_at"});break;case 3:model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "created_at"});break;default:model->setHorizontalHeaderLabels({check_value[0], check_value[1], check_value[2], "created_at"});break;}//连接到主机,这里从界面获取主机地址和端口号tcpSocket->connectToHost(ui->hostLineEdit->text(),ui->portLineEdit->text().toInt());// 你可以检查连接状态来确定是否已成功断开,如何已经断开连接,waitForDisconnected会直接返回0if (tcpSocket->waitForDisconnected(3000)) { // 等待最多3000毫秒ui->textEdit_db->append("Disconnected successfully.");} else {ui->textEdit_db->append("Failed to disconnect within the timeout period.");}
}void Widget::readMessage()
{QDataStream in(tcpSocket);in.setVersion(QDataStream::Qt_5_14);if (tcpSocket->bytesAvailable() == 0) return;QByteArray data = tcpSocket->readAll();QString receive_data = QString::fromUtf8(data);QStringList lines = receive_data.split('\n');for(const QString &line : lines){// 跳过空行(可选)if (line.trimmed().isEmpty()) {continue;}ui->textEdit_db->setText(line);int columnCount = model->columnCount();QList<QStandardItem*> row;int i = 0;for(; i < columnCount-1; i++){QVariant columnName = model->headerData(i, Qt::Horizontal, Qt::DisplayRole);// 正则表达式用于匹配数字(整数和浮点数)QRegularExpression regex(QString(R"(%1=(\d+(?:\.\d+)?))").arg(columnName.toString()));QRegularExpressionMatchIterator j = regex.globalMatch(line);while(j.hasNext()){QRegularExpressionMatch match = j.next();QString matchedString = match.captured(1);row << new QStandardItem(matchedString);}}QVariant columnName1 = model->headerData(i, Qt::Horizontal, Qt::DisplayRole);ui->textEdit_db->append(columnName1.toString());// 正则表达式用于匹配日期QRegularExpression regex1(QString(R"(%1=(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}))").arg(columnName1.toString()));QRegularExpressionMatchIterator k = regex1.globalMatch(line);while(k.hasNext()){QRegularExpressionMatch match1 = k.next();QString matchedString1 = match1.captured(1);row << new QStandardItem(matchedString1);}model->appendRow(row);/*//2025-03-03 10:11:52QRegularExpression re("hum=(\\d+),tem=(\\d+\\.\\d+),created_at=(\\d+\\-\\d+\\-\\d+\\ \\d+\\:\\d+\\:\\d+)");QRegularExpressionMatch match = re.match(line);if(match.hasMatch()) {QString hum = match.captured(1);QString tem = match.captured(2);QString ar = "";QString created_at = match.captured(3);//QString displayMessage = QString("Humidity: %1, Temperature: %2,time: %3").arg(hum).arg(tem).arg(created_at);QList<QStandardItem*> row;switch(check_flag){case 1:if(check_value[0] == "hum")row << new QStandardItem(hum) << new QStandardItem(created_at);if(check_value[0] == "tem")row << new QStandardItem(tem) << new QStandardItem(created_at);if(check_value[0] == "ar")row << new QStandardItem(ar) << new QStandardItem(created_at);break;case 2:if(check_value[0] == "hum" && check_value[1] == "tem")row << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);if(check_value[0] == "ar" && check_value[1] == "hum")row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(created_at);if(check_value[0] == "ar" && check_value[1] == "tem")row << new QStandardItem(ar) << new QStandardItem(tem) << new QStandardItem(created_at);break;case 3:row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);break;default:row << new QStandardItem(ar) << new QStandardItem(hum) << new QStandardItem(tem) << new QStandardItem(created_at);break;}model->appendRow(row);  // 在末尾添加一行}
*/}ui->tableView->setModel(model);//tcpSocket->abort(); //立即关闭套接字,而不保证所有数据都已发送或接收。tcpSocket->disconnectFromHost();//取消已有的连接//tcpSocket->abort();
}void Widget::sendMessage()
{//用于暂存我们要发送的数据QByteArray block;//设置数据流QDataStream out(&block,QIODevice::WriteOnly);out.setVersion(QDataStream::Qt_DefaultCompiledVersion);QString message_buf;switch(check_flag){case 1:message_buf = check_value[0];break;case 2:message_buf = check_value[0]+","+check_value[1];break;case 3:message_buf = check_value[0]+","+check_value[1]+","+check_value[2];break;default://message_buf = check_flag;break;}send_data = message_buf.toUtf8();out<<static_cast<quint8>(check_flag);block.append(send_data);tcpSocket->write(block);//发送数据成功后,显示提示ui->textEdit_db->append("send message successful!!!");
}void Widget::displayError(QAbstractSocket::SocketError)
{//ui->textEdit_db->setText("error "+tcpSocket->errorString()); //输出错误信息ui->textEdit_db->append("error "+tcpSocket->errorString());
}
void Widget::pushButton_recv_clicked() //连接按钮
{newConnect(); //请求连接
}
void Widget::pushButton_del_clicked() //连接按钮
{model->clear();  // 清空模型中的数据
}
void Widget::pushButton_mqttconnect_clicked()
{if (ui->pushButton_mqttconnect->text() == "mqtt连接"){// 将QString转换为intbool ok;int value = ui->lineEdit_port->text().toInt(&ok);// 检查转换是否成功if (ok) {ui->textEdit_db->append("hostname:"+ui->lineEdit_host->text());ui->textEdit_db->append("port:"+ui->lineEdit_port->text());} else {ui->textEdit_db->append("port failed");return;}// 设置MQTT代理地址和端口mqtt_client.setHostname(ui->lineEdit_host->text()); // 公共MQTT代理mqtt_client.setPort(value); // MQTT默认端口// 连接MQTT代理mqtt_client.connectToHost();}else if (ui->pushButton_mqttconnect->text() == "mqtt断开"){ui->textEdit_db->append("pushButton_db_clicked!");mqtt_client.disconnectFromHost();ui->pushButton_mqttrecv->setEnabled(false);ui->pushButton_mqttsend->setEnabled(false);}
}
void Widget::pushButton_mqttrecv_clicked()
{// 订阅主题mqtt_client.subscribe(ui->lineEdit_recvtopic->text());ui->textEdit_db->append("recv-topic:"+ui->lineEdit_recvtopic->text());
}void Widget::pushButton_mqttsend_clicked()
{// 发布消息mqtt_client.publish(ui->lineEdit_sendtopic->text(), ui->textEdit_send->toPlainText().toUtf8());
}

UI左侧通过选择控件(checkBox)将需要的数据内容通过tcp发送到云端获取数据,并在表格控件(tableView)中展示,UI右侧通过获取host、port等对应的输入,将这些参数通过mqtt处理连接代理服务器进行发布和订阅。该代码只是实现基本功能,各位可以增加一些前置条件,比如检测输入数据的格式是否正确、检测连接是否正常。在文本控件(textEdit)中输出一些debug打印。


http://www.ppmy.cn/news/1578782.html

相关文章

Java开发者如何接入并使用DeepSeek

目录 一、准备工作 二、添加DeepSeek SDK依赖 三、初始化DeepSeek客户端 四、数据上传与查询 五、数据处理与分析 六、实际应用案例 七、总结 【博主推荐】&#xff1a;最近发现了一个超棒的人工智能学习网站&#xff0c;内容通俗易懂&#xff0c;风格风趣幽默&#xff…

深度学习基础:线性代数本质3——矩阵与线性变换

你对线性代数的一切困惑&#xff0c;根源就在于没有真正理解矩阵到底是什么。 1. 线性变换 变换本质上就是函数。例如&#xff0c;你输入一个向量 &#xff0c;经过某个变换&#xff08;即函数&#xff09;的作用之后&#xff0c;输出另一个向量。 ​ 既然&#xff0c;变换本…

歌词相关实现

歌词相关 歌词数据模型&#xff1a; // Lyric.swift class Lyric: BaseModel {/// 是否是精确到字的歌词var isAccurate:Bool false/// 所有的歌词var datum:Array<LyricLine>! }// LyricLine.swift class LyricLine: BaseModel {/// 整行歌词var data:String!/// 开始…

centos8.0系统部署zabbix6.0监控

centos8.0系统部署zabbix6.0监控 一、部署过程1、确认系统版本2、主机基础环境设置3、安装MySQL 8.0数据库3.1 安装MySQL 8.0仓库3.2 安装软件3.3 设置root用户密码3.4 创建zabbix数据库&#xff0c;授权用户 4、配置zabbix6.0仓库5、安装zabbix服务端软件6、导入zabbix数据表7…

基于MATLAB的冰块变化仿真

如图1所示&#xff0c;边长为5cm的冰块&#xff0c;初始温度为-2℃&#xff0c;放在25℃的环境中自然冷却&#xff0c;对流换热系数为10W/mK&#xff0c;本文将通过matlab编程求解冰块融化的过程&#xff0c;计算其温度场。 图1 案例示意图 02 温度场计算 本文通过matlab分别…

力扣HOT100之双指针:11. 盛最多水的容器

这道题用双指针法很快就做出来了&#xff0c;但是为什么我的双指针法在时间和空间上都不占优啊&#xff1f; 用两个指针分别指向数组的首元素和尾元素&#xff0c;然后取其中的较小值两个位置之间的间隔就得到了这两根垂直线之间所能容纳的水量&#xff0c;例如&#xff0c;对于…

如何检查电脑的硬盘健康状况?

检查硬盘健康状况可以使用多种工具和方法。以下是一些常用的工具和步骤&#xff1a; Windows系统&#xff1a; 使用Windows内置工具&#xff1a; 磁盘检查&#xff1a;可以通过命令提示符&#xff08;cmd&#xff09;使用chkdsk命令来检查硬盘错误。例如&#xff0c;输入chkd…

Windows 上安装配置 Apache Tomcat 及Tomcat 与 JDK 版本对应

Apache Tomcat 是一种广泛使用的 Web 服务器和 Java 容器&#xff0c;对于部署和运行 Java Web 应用程序至关重要。它的可靠性和强大的功能使其成为全球开发人员和组织的首选。 在这篇文章中&#xff0c;我们将介绍在 Windows 机器上安装 Apache Tomcat 的过程&#xff0c;以确…