基于EMQX+MQTT+ESP32+Openharmony的开发实例

news/2025/1/21 5:31:27/

EMQX介绍

EMQ X 是基于 Erlang/OTP 平台开发的 MQTT 消息服务器,是开源社区中最流行的 MQTT 消息服务器。EMQX 是开源百万级分布式 MQTT 消息服务器(MQTT Messaging Broker),用于支持各种接入标准 MQTT 协议的设备,实现从设备端到服务器端的消息传递,以及从服务器端到设备端的设备控制消息转发。从而实现物联网设备的数据采集,和对设备的操作和控制。

以下是EMQX的官方资料,这里为大家已经收集好了

官方文档: 产品概览 | EMQX 4.3 文档

安装包: https://packages.emqx.net/MQTTX/v1.11.1/MQTTX-Setup-1.11.1-ia32.exe

安装的客户端软件为MQTTX

配置MQTTX

将MQTXX安装到PC端后,需要对其进行一系列的配置:

修改客户端语言

新建MQTT服务器

点击左侧工具栏新建一个MQTT服务器

MQTT服务器配置

新建页中带*号的是必配项

  1. 这里的名称只是在MQTTX中用于区分不同mqtt连接的名字,可以任意命名
  2. 服务器地址是支持MQTT服务的服务器地址这里默认是EMQX官方提供的一个免费服务器
  3. mqtt服务的默认端口为1883,无需修改
  4. 点击右上角 connect即可连接上Broker

MQTT主题订阅

MQTT采用发布/订阅消息模型,在这个模型中

发布者将消息发送到Broker,Broker根据订阅者订阅的主题,将消息分发给相应的订阅者。

点击右侧消息框“添加订阅”即可实现主题订阅。

我们这里订阅主题为“wangziyvle”

订阅成功在右侧出现已订阅的主题

MQTT主题发布

修改右下角框出来的主题名为刚刚订阅的主题表示我们将下面文本框的内容发布到该主题中

配置成功的效果应该是实现了消息回显功能

为什么会消息回显?

实际上MQTT并不限制一个客户端只能是发布者或只能是订阅者 - 一个客户端可以订阅自己所发布的主题(如上所示)

这意味着同一个客户端可以在不同的上下文中既是信息的生产者也是消费者。这意味着同一个客户端可以在不同的上下文中既是信息的生产者也是消费者。

编程实现

虽然实现了串口回显,但上述的一切都只是在应用软件上的自娱自乐,我们该如何通过编程实现MQTT通信?

做过相关尝试的大家应该都知道,难度不在于MQTT客户端编程,最难的点在于移植MQTT库

虽然目前我也不会,但是通过神秘力量获得了一位大佬移植适配好的MQTT库

这里我们基于前面的LwIP连接TCP服务器的代码进行修改后成功实现了MQTT服务

mqtt_test.c
#include <stdio.h>
#include <unistd.h>
#include "ohos_init.h"
#include "cmsis_os2.h"#include "lwip/ip_addr.h"
#include "lwip/netifapi.h"
#include "lwip/sockets.h"#include "MQTTClient.h"static MQTTClient mq_client;unsigned char *onenet_mqtt_buf;
unsigned char *onenet_mqtt_readbuf;
int buf_size;Network n;
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;// 消息回调函数
void mqtt_callback(MessageData *msg_data)
{size_t res_len = 0;uint8_t *response_buf = NULL;char topicname[45] = {"$crsp/"};printf("topic %.*s receive a message\r\n", msg_data->topicName->lenstring.len, msg_data->topicName->lenstring.data);printf("message is %.*s\r\n", msg_data->message->payloadlen, msg_data->message->payload);
}int mqtt_connect(void)
{int rc = 0;NetworkInit(&n);NetworkConnect(&n, "44.232.241.40", 1883); /*Broker: broker.emqx.io TCP 端口:1883*/buf_size = 4096 + 1024;onenet_mqtt_buf = (unsigned char *)malloc(buf_size);onenet_mqtt_readbuf = (unsigned char *)malloc(buf_size);if (!(onenet_mqtt_buf && onenet_mqtt_readbuf)){printf("No memory for MQTT client buffer!");return -2;}MQTTClientInit(&mq_client, &n, 1000, onenet_mqtt_buf, buf_size, onenet_mqtt_readbuf, buf_size);MQTTStartTask(&mq_client);data.keepAliveInterval = 30;data.cleansession = 1;data.clientID.cstring = "ohos_hi3861";data.username.cstring = "123456";data.password.cstring = "222222";data.cleansession = 1;mq_client.defaultMessageHandler = mqtt_callback;// 连接服务器rc = MQTTConnect(&mq_client, &data);// 订阅消息,并设置回调函数MQTTSubscribe(&mq_client, "wangzilvle", 0, mqtt_callback);while (1){MQTTMessage message;message.qos = QOS1;message.retained = 0;message.payload = (void *)"openharmony";message.payloadlen = strlen("openharmony");// 发送消息if (MQTTPublish(&mq_client, "wangzilvle", &message) < 0){printf("MQTTPublish faild !\r\n");}sleep(5);}return 0;
}void mqtt_test(void)
{mqtt_connect();
}
mqtt_test.h
/** Copyright (c) 2020 HiSilicon (Shanghai) Technologies CO., LIMITED.* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**     http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/#ifndef __MQTT_TEST_H__
#define __MQTT_TEST_H__void mqtt_test(void);#endif /* __MQTT_TEST_H__ */
wifi_connect.c
/** Copyright (c) 2022 Hunan OpenValley Digital Industry Development Co., Ltd.* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include "securec.h"
#include "cmsis_os2.h"
#include "ohos_init.h"
#include "wifi_device.h"
#include "lwip/ip4_addr.h"
#include "lwip/netif.h"
#include "lwip/netifapi.h"
#include "wifi_error_code.h"#define DEF_TIMEOUT 15
#define ONE_SECOND 1
#define SELECT_WLAN_PORT "wlan0"
#define SELECT_WIFI_SECURITYTYPE WIFI_SEC_TYPE_OPEN
#define STD_TIMEZONE_OFFSET (+8)
#define OD_DELAY_100 100
#define OD_DELAY_200 200static int g_staScanSuccess = 0;
static int g_ConnectSuccess = 0;
static int ssid_count = 0;
static WifiErrorCode wifi_error;
static WifiEvent g_wifiEventHandler = {0};
static int wifi_sta_init_state = 0;
int sock_fd;
int addr_length;
const int timeZone = 8;static void WiFiInit(void);
static void WaitScanResult(void);
static int WaitConnectResult(void);
static void OnWifiScanStateChangedHandler(int state, int size);
static void OnWifiConnectionChangedHandler(int state, WifiLinkedInfo *info);
static void OnHotspotStaJoinHandler(StationInfo *info);
static void OnHotspotStateChangedHandler(int state);
static void OnHotspotStaLeaveHandler(StationInfo *info);void DisableWIFI(void)
{DisableWifi();
}static void OnHotspotStaJoinHandler(StationInfo *info)
{(void)info;printf("STA join AP\n");return;
}static void OnHotspotStaLeaveHandler(StationInfo *info)
{(void)info;printf("HotspotStaLeave:info is null.\n");return;
}static void OnHotspotStateChangedHandler(int state)
{printf("HotspotStateChanged:state is %d.\n", state);return;
}static void WiFiInit(void)
{printf("<--Wifi Init-->\r\n");g_wifiEventHandler.OnWifiScanStateChanged = OnWifiScanStateChangedHandler;g_wifiEventHandler.OnWifiConnectionChanged = OnWifiConnectionChangedHandler;g_wifiEventHandler.OnHotspotStaJoin = OnHotspotStaJoinHandler;g_wifiEventHandler.OnHotspotStaLeave = OnHotspotStaLeaveHandler;g_wifiEventHandler.OnHotspotStateChanged = OnHotspotStateChangedHandler;wifi_error = RegisterWifiEvent(&g_wifiEventHandler);if (wifi_error != WIFI_SUCCESS) {printf("register wifi event fail!\r\n");} else {printf("register wifi event succeed!\r\n");}
}static void OnWifiScanStateChangedHandler(int state, int size)
{(void)state;if (size > 0) {ssid_count = size;g_staScanSuccess = 1;}return;
}static int result;
int WifiConnect(const char *ssid, const char *psk)
{WifiScanInfo *info = NULL;unsigned int size = WIFI_SCAN_HOTSPOT_LIMIT;static struct netif *g_lwip_netif = NULL;WifiDeviceConfig select_ap_config = {0};osDelay(OD_DELAY_200);printf("<--System Init-->\r\n");WiFiInit();if (EnableWifi() != WIFI_SUCCESS) {printf("EnableWifi failed, wifi_error = %d\n", wifi_error);return -1;}if (IsWifiActive() == 0) {printf("Wifi station is not actived.\n");return -1;}info = malloc(sizeof(WifiScanInfo) * WIFI_SCAN_HOTSPOT_LIMIT);if (info == NULL) {printf("faild to create wifiscanInfo.\n");return -1;}do {ssid_count = 0;g_staScanSuccess = 0;Scan();WaitScanResult();wifi_error = GetScanInfoList(info, &size);} while (g_staScanSuccess != 1);strcpy_s(select_ap_config.ssid, sizeof(select_ap_config.ssid), ssid);printf("[%s][%s] \r\n", select_ap_config.ssid, select_ap_config.preSharedKey);select_ap_config.securityType = SELECT_WIFI_SECURITYTYPE;if (AddDeviceConfig(&select_ap_config, &result) == WIFI_SUCCESS) {if (ConnectTo(result) == WIFI_SUCCESS && WaitConnectResult() == 1) {printf("WiFi connect succeed!\r\n");wifi_sta_init_state = 1;}}osDelay(OD_DELAY_100);return 0;
}static int WaitConnectResult(void)
{int ConnectTimeout = DEF_TIMEOUT;while (ConnectTimeout > 0) {sleep(1);ConnectTimeout--;if (g_ConnectSuccess == 1) {printf("WaitConnectResult:wait success[%d]s\n", (DEF_TIMEOUT - ConnectTimeout));break;}}if (ConnectTimeout <= 0) {printf("WaitConnectResult:timeout!\n");return 0;}return 1;
}static void OnWifiConnectionChangedHandler(int state, WifiLinkedInfo *info)
{(void)info;if (state > 0) {g_ConnectSuccess = 1;printf("callback function for wifi connect\r\n");} else {g_ConnectSuccess = 0;printf("connect wifi_error, please check password, state:%d, try connect again\r\n", state);esp_wifi_connect();}return;
}static void WaitScanResult(void)
{int scanTimeout = DEF_TIMEOUT;while (scanTimeout > 0) {sleep(ONE_SECOND);scanTimeout--;if (g_staScanSuccess == 1) {printf("WaitScanResult:wait success[%d]s\n", (DEF_TIMEOUT - scanTimeout));break;}}if (scanTimeout <= 0) {printf("WaitScanResult:timeout!\n");}
}
wifi_example.c
/** Copyright (c) 2022 Hunan OpenValley Digital Industry Development Co., Ltd.* Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**    http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
#include <stdio.h>
#include "securec.h"
#include "cmsis_os2.h"
#include "ohos_run.h"
#include "lwip/sockets.h"
#include "lwip/ip_addr.h"
#include "wifi_device.h"#define OPEN_WIFI_NAME "test"
#define SERVER_IP "192.168.8.140"
#define SERVER_PORT 8080
#define OD_DELAY_1000 1000
#define OD_DELAY_100 100
#define RECV_LEN 511
#define STACK_SIZE 4096
#define PRIORITY 25osThreadId_t wifi_test_id = NULL;void wifi_test(void)
{int sock = -1;struct sockaddr_in client_addr;char recv_data[512] = {0};int recv_data_len;WifiConnect(OPEN_WIFI_NAME);printf("start wifi_test test\r\n");mqtt_test();
}static void wifi_test_example(void)
{osThreadAttr_t attr;attr.name = "wifi_test";attr.attr_bits = 0U;attr.cb_mem = NULL;attr.cb_size = 0U;attr.stack_mem = NULL;attr.stack_size = STACK_SIZE;attr.priority = PRIORITY;wifi_test_id = osThreadNew((osThreadFunc_t)wifi_test, NULL, &attr);if (wifi_test_id == NULL){printf("Failed to create wifi_test thread!\n");}
}OHOS_APP_RUN(wifi_test_example);
BUILD.gn
# Copyright (c) 2022 Hunan OpenValley Digital Industry Development Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import("//kernel/liteos_m/liteos.gni")module_name = get_path_info(rebase_path("."), "name")
kernel_module(module_name) {sources = ["wifi_example.c","wifi_connect.c","mqtt_test.c","./mqtt-sdk/MQTTClient-C/src/liteOS/MQTTLiteOS.c","./mqtt-sdk/MQTTClient-C/src/MQTTClient.c","./mqtt-sdk/MQTTPacket/src/MQTTConnectClient.c","./mqtt-sdk/MQTTPacket/src/MQTTConnectServer.c","./mqtt-sdk/MQTTPacket/src/MQTTDeserializePublish.c","./mqtt-sdk/MQTTPacket/src/MQTTFormat.c","./mqtt-sdk/MQTTPacket/src/MQTTPacket.c","./mqtt-sdk/MQTTPacket/src/MQTTSerializePublish.c","./mqtt-sdk/MQTTPacket/src/MQTTSubscribeClient.c","./mqtt-sdk/MQTTPacket/src/MQTTSubscribeServer.c","./mqtt-sdk/MQTTPacket/src/MQTTUnsubscribeClient.c","./mqtt-sdk/MQTTPacket/src/MQTTUnsubscribeServer.c",]include_dirs = ["//foundation/communication/wifi_lite/interfaces/wifiservice","//device/board/esp/esp32/liteos_m/hals/driver/wifi_lite","./mqtt-sdk/MQTTPacket/src","./mqtt-sdk/MQTTClient-C/src","./mqtt-sdk/MQTTClient-C/src/liteOS","//kernel/liteos_m/components/cmsis/2.0",]
}

编译并烧录

在源码根目录下使用hb工具对写好的代码进行编译

选择mini级系统

同理 产品选择esp公司下的esp32

选择完毕后在源码根目录下执行hb build -f 进行编译

编译完成后会有如下界面,并且编译后的代码固件位于:out\esp32\esp32

使用烧录软件进行烧录:

点击start,等待烧录完成

实验现象

重启esp32开发板后可以在串口助手看到:

发布功能:ESP32每五秒向主题“wangziyvle”发布一次“openharmony”

订阅功能:通过MQTTX发送“hello openharmony!”至主题“wangziyvle”

串口助手会打印出接收到的数据

注意!!!单纯的复制代码跑是跑不通的!!!需要结合移植的MQTT库才能成功运行

完整的工程目录如下:

 本项目代码已上传至gitee:ESP32_Oh: ESP32移植Openharmony


http://www.ppmy.cn/news/1564858.html

相关文章

Web3 时代,区块链与物联网的融合创新前景

随着Web3时代的到来&#xff0c;区块链技术和物联网&#xff08;IoT&#xff09;的融合成为科技领域的一个热点话题。Web3以去中心化为核心理念&#xff0c;而区块链正是这一理念的技术支撑。物联网则通过智能设备和传感器将现实世界的数据数字化&#xff0c;连接成一个庞大的网…

从零搭建一套远程手机的桌面操控和文件传输的小工具

从零搭建一套远程手机的桌面操控和文件传输的小工具 --ADB连接专题 一、前言 前面的篇章中&#xff0c;我们确定了通过基于TCP连接的ADB控制远程手机的操作思路。本篇中我们将进行实际的ADB桥接的具体链路搭建工作&#xff0c;从原理和实际部署和操作层面上&#xff0c;从零…

四、CSS效果

一、box-shadow box-shadow:在元素的框架上添加阴影效果 /* x 偏移量 | y 偏移量 | 阴影颜色 */ box-shadow: 60px -16px teal; /* x 偏移量 | y 偏移量 | 阴影模糊半径 | 阴影颜色 */ box-shadow: 10px 5px 5px black; /* x 偏移量 | y 偏移量 | 阴影模糊半径 | 阴影扩散半…

【JVM-10】IBM HeapAnalyzer 工具使用指南:深入解析 Java 堆转储分析

在 Java 应用程序开发中&#xff0c;内存泄漏和内存使用问题是非常常见的性能瓶颈。为了诊断这些问题&#xff0c;开发者通常需要分析 Java 堆转储文件&#xff08;Heap Dump&#xff09;。IBM HeapAnalyzer 是一款由 IBM 开发的免费工具&#xff0c;专门用于分析堆转储文件&am…

算法每日双题精讲 —— 二分查找(x 的平方根,搜索插入位置)

&#x1f31f;快来参与讨论&#x1f4ac;&#xff0c;点赞&#x1f44d;、收藏⭐、分享&#x1f4e4;&#xff0c;共创活力社区。 &#x1f31f; 别再犹豫了&#xff01;快来订阅我们的算法每日双题精讲专栏&#xff0c;一起踏上算法学习的精彩之旅吧&#x1f4aa; 在算法的…

第2章 深入理解Thread构造函数(Java高并发编程详解:多线程与系统设计)

1.线程的命名 1.1线程的默认命名 Thread() Thread(Runnable target) Thread(Thread Group group&#xff0c; Runnable target)打开JDK的源码会看到下面的代码&#xff1a; 1.2.命名线程 Thread(Runnable target, String name) Thread(String name) Thread(Thread Group gro…

Android 右键后无Java class创建

Android studio 创建java class &#xff1a; 最近几个月用Android studio 开发&#xff0c;因为电脑设置了一个新的用户使用&#xff0c;原来的android studio,打开之前的正常的项目总是报一些奇奇怪怪的错误&#xff0c;就重新安装了最新的版本 问题描述 但是新的android s…

C语言笔记——第二章 顺序结构程序设计(四)

printf函数附加格式说明符&#xff08;修饰符&#xff09; 1、m&#xff1a;输出数据域宽&#xff08;位长&#xff09; 数据长度<m&#xff0c;左补空格&#xff1b; 数据常数>m&#xff0c;按照实际输出。 2、 .n &#xff08;1&#xff09;实数&#xff1a;指定小…