docker安装Emqx并使用自签名证书开启 SSL/TLS 连接
- 一、获取自签名证书
- 1、创建openssl.cnf文件
- 2、生成证书自签名证书
- 二、docker安装EMQX
- 1、初始化目录
- 2、加载镜像文件并挂载相应的文件目录
- 3、启动docker容器
- 4、EMQX加载自签名证书
- 三、客户端MQTTX连接测试
- 四、Springboot整合
详细操作过程可以参考官方说明文档:官方说明文档
客户端TLS接入示例:客户端接入示例
从官方的文档中我们了解到,EMQX支持通过 X.509 证书实现单向和双向客户端/服务器互信认证,这里我们使用单项认证。
认证方式 | 说明 | 验证方式 | 优缺点 |
---|---|---|---|
单向认证 | 客户端验证服务器身份,但服务器不验证客户端的身份 | 客户端通常不需要提供证书,仅需验证服务器的证书是否由受信任的证书颁发机构(CA)签发 | 只能实现通信数据的机密性和完整性,但无法保证通信双方的身份 |
双向认证 | 服务器和客户端彼此验证对方的身份 | 需要为每个设备签发证书,服务器验证客户端的证书以确认其身份的合法性 | 可以确保服务器和客户端之间的互信关系,并防止中间人攻击 |
一、获取自签名证书
sslcnf_11">1、创建openssl.cnf文件
[ req ]
default_bits = 2048
default_keyfile = server-key.pem
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no[ req_distinguished_name ]
C = CN
ST = California
L = San Francisco
O = Example Organization
OU = Example Unit
CN = 192.168.3.51 # 这里设置为服务器的IP地址[ v3_req ]
subjectAltName = @alt_names[ alt_names ]
DNS.1 = 192.168.3.51 # 这里设置为服务器的IP地址
IP.1 = 192.168.3.51 # 这里设置为服务器的IP地址
2、生成证书自签名证书
在openssl.cnf目录下执行如下命令生成自签名证书:
openssl genpkey -algorithm RSA -out server-key.pem
openssl req -new -key server-key.pem -out server.csr -config openssl.cnf
openssl x509 -req -in server.csr -out server-cert.pem -signkey server-keyi.pem -days 3650 -extensions v3_req -extfile openssl.cnf
二、docker安装EMQX
1、初始化目录
mkdir -p /wz_conf/emqx/etc
mkdir -p /wz_conf/emqx/data
mkdir -p /wz_conf/emqx/etc
2、加载镜像文件并挂载相应的文件目录
docker load -i emqx.tar.gz
这里也可以使用docker pull emqx/emqx:latest拉取镜像包
cd /wz_conf/emqx
chmod 777 ./*
docker run -d --privileged=true --name emqx emqx/emqx:latest
# 将容器中的相关配置文件挂在到物理机,方便后续修改,*******则为容器id
docker cp *********:/opt/emqx/etc /wz_conf/emqx
docker stop *******
docker rm *******
3、启动docker容器
docker run -d \
--privileged=true \
--restart=always \
--name emqx \
-p 1883:1883 \
-p 8883:8883 \
-p 8083:8083 \
-p 8084:8084 \
-p 8081:8081 \
-p 18083:18083 \
-v /wz_conf/emqx/etc:/opt/emqx/etc \
-v /wz_conf/emqx/data:/opt/emqx/data \
-v /wz_conf/emqx/log:/opt/emqx/log \
emqx/emqx:latest
这里就不再详细说明了,如果不需要开启SSL/TLS,到这里也就可以了。
4、EMQX加载自签名证书
这里我是用的EMQX是5.8.0版本的,通过修改emqx.conf配置文件实现,也可以通过官方文档说的在dashboard中配置。
将生成的证书文件挂载到容器内/opt/emqx/etc/certs目录下,修改/opt/emqx/etc/emqx.conf配置文件末尾添加如下内容:
listeners.ssl.default {bind = "0.0.0.0:8883"ssl_options {# PEM 格式的文件,包含一个或多个用于验证客户端证书的根 CA 证书# 单向认证时,该文件内容可以为空# cacertfile = "etc/certs/rootCAs.pem"# PEM 格式的服务器证书,如果证书不是直接由根 CA 签发,那么中间 CA 的证书必须加在服务器证书的后面组成一个证书链certfile = "/opt/emqx/etc/certs/server-cert.pem"# PEM 格式的密钥文件keyfile = "/opt/emqx/etc/certs/server-keyi.pem"# 设置成 'verify_peer' 来验证客户端证书是否为 cacertfile 中某个根证书签发。双向认证时,必须设置成 'verify_peer'。# 设置成 'verify_none' 则不验证客户端证书,即单向认证。verify = verify_none# 如果设置成 true,但是客户端在握手时候没有发送证书,服务端会终止握手。双向认证时,必须设置成 true。# 如果设置成 false,那么服务端只有在客户端发送一个非法证书时才会终止握手fail_if_no_peer_cert = false}
}
配置后重启docker容器即可。重启后也可以通过dashboard查看
三、客户端MQTTX连接测试
四、Springboot整合
<!-- mqtt -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
核心代码块
MqttPushCallback.java
import com.webuild.ai.outside.service.OutsideBaseService;
import com.webuild.ai.utils.WelMaxEncryptionUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Collection;@Component
@Slf4j
public class MqttPushCallback implements MqttCallback {@Autowiredprivate MqttConfig mqttConfig;private static MqttClient client;@Overridepublic void connectionLost(Throwable throwable) {log.info("连接断开,可以做重连");// 连接丢失后,一般在这里面进行重连if (client == null || !client.isConnected()) {mqttConfig.getMqttPushClient();}}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {byte[] payload = mqttMessage.getPayload();String mqttMessagePayload = new String(payload);// 接收消息内容(转为字符串格式)// TODO 自己的处理逻辑}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());}
}
MqttPushClient.java
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.cert.X509Certificate;@Component
@Slf4j
public class MqttPushClient {@Autowiredprivate MqttPushCallback mqttPushCallback;private static MqttClient client;public MqttClient getClient() {return client;}private static void setClient(MqttClient client) {MqttPushClient.client = client;}/*** 客户端连接** @param host ip+端口* @param clientID 客户端Id* @param username 用户名* @param password 密码* @param timeout 超时时间* @param keepalive 保留数*/public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {MqttClient client;try {client = new MqttClient(host, clientID, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();if(host.startsWith("ssl")) {// 配置忽略 CN 验证SSLContext sslContext = SSLContext.getInstance("TLSv1.2");sslContext.init(null, new TrustManager[]{new X509TrustManager() {@Overridepublic void checkClientTrusted(X509Certificate[] chain, String authType) {}@Overridepublic void checkServerTrusted(X509Certificate[] chain, String authType) {}@Overridepublic X509Certificate[] getAcceptedIssuers() {return new X509Certificate[0];}}}, new java.security.SecureRandom());options.setSocketFactory(sslContext.getSocketFactory());}options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setConnectionTimeout(timeout);options.setKeepAliveInterval(keepalive);MqttPushClient.setClient(client);try {client.setCallback(mqttPushCallback);client.connect(options);} catch (Exception e) {log.error("设置mqttPushCallback异常. e={}",e);}} catch (Exception e) {log.error("mqtt链接异常. e={}", e);}}
}
MqttClientUtil.java
import com.webuild.ai.config.MqttPushClient;
import com.webuild.ai.config.SpringValueConfig;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class MqttClientUtil {@Autowiredprivate MqttPushClient mqttPushClient;/*** 发布消息** @Description* @Param pushMessage* @Param topic* @Return void* @Author Administrator* @Date 2024/9/2 11:13**/public void publish(String pushMessage, String topic) {try {log.info(">>>>>>>mqtt推送消息>>>>>>>>. pushMessage={}, topic={}", pushMessage, topic);publish(pushMessage, topic, 0, false);} catch (Exception e) {log.error("mqtt发布消息异常. topic={}, pushMessage={}", topic, pushMessage);}}/*** 发布消息* * @Description * @Param pushMessage* @Param topic* @Param qos* QoS 0: 至多一次(At most once) 消息会被尽力而为地传递,但不保证消息会被送达。也就是说,消息可能会丢失,不会有确认消息发送给发布者* 1: 至少一次(At least once) 消息会被至少传递一次。发布者发送消息后会等待接收方的确认(PUBACK)。如果没有收到确认,消息会被重新发送,直到收到确认消息为止。这意味着接收方可能会收到重复的消息。* 2: 只一次(Only once)消息确保只会传递一次。通过四步握手(四个消息交换:PUBLISH、PUBREC、PUBREL、PUBCOMP)来保证消息既不会丢失也不会重复。这是最可靠的传输模式,但也会带来最大的开销* @Param retained* True (保留消息):当retained设为true时,Broker会保留这条消息,当新的客户端订阅这个Topic时,Broker会立即将这条保留消息发送给订阅者。新的保留消息会覆盖之前的保留消息。* False (非保留消息):当retained设为false时,消息不会被Broker保留。只有在消息发布后已有订阅者能收到该消息,新的订阅者不会收到这个消息。* @Return void* @Author Administrator* @Date 2024/9/2 11:07 **/public void publish(String pushMessage, String topic, int qos, boolean retained) {MqttMessage message = new MqttMessage();message.setPayload(pushMessage.getBytes());message.setQos(qos);message.setRetained(retained);MqttTopic mqttTopic = mqttPushClient.getClient().getTopic(topic);if (null == mqttTopic) {log.error("topic is not exist");}MqttDeliveryToken token;//Delivery:配送synchronized (this) {//注意:这里一定要同步,否则,在多线程publish的情况下,线程会发生死锁try {token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件
// token.waitForCompletion(5000L);} catch (Exception e) {log.error("mqtt publish error:={}", e);}}}
}