一 RabbitMQ下载
RabbitMQ 官网最新版下载:
RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ依赖erlang-26.2.5.2-1.el7.x86_64.rpm下载:
https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm
二 RabbitMQ安装
1 安装erlang环境
安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开发的
执行安装指令如下:
rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm
执行后如下图:
验证 erlang 安装是否成功,执行erl可以查看版本,说明安装成功如下图:
2 安装RabbitMQ
执行安装RabbitMQ指令如下:
rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm
执行安装中,如下图:
注意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依赖版本,如下图:
如果出现上图的错误,请参考上一步重新安装erlang环境即可。
安装结束后,消息队列数据保存在哪?日记在哪?想了解更多的信息?
只需一条指令可查询当前状态信息:
rabbitmq-diagnostics status
执行后如下图:
从上图状态中可以看出目前没有使用任何配置文件,以可以看到以下有用的信息:
- 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
- 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log
上图信息很详细,可以说开发者开发这个工具非常的细心,对软件有足够了解使用也安心!
3 配置RabbitMQ(可选项)
安装好后RabbitMQ没有使用任何的配置文件(也没有默认配置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站配置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个配置文件:
vi /etc/rabbitmq/rabbitmq.config
配置文件内容:
[{rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},{rabbitmq_management, [{listener, [{port,59876}, {ssl, false}]}]}
].
通过配置配置文件实现变更:
- 客户端 51091 用于消费或生产端连接,IP 0.0.0.0 代表绑定服务器内外网IP。
- 管理端口 59876 用于RabbitMQ的Web管理。
再次执行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下图:
上图可以看到刚刚创建的配置文件已被引用状态。
4 RabbitMQ 启动与关闭
RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:
#启动
systemctl start rabbitmq-server#停止关闭
systemctl stop rabbitmq-server#重启
systemctl restart rabbitmq-server#开机启动
systemctl enable rabbitmq-server#查看状态
systemctl status rabbitmq-server
操作如下图:
5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启)
RabbitMQ的安装后自带Web管理界面,但是需要执行以下指令开启:
rabbitmq-plugins enable rabbitmq_management
我们平时只需要一名管员即可,后面要增加用户或设置权限直接在Web操作即可。
新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.
#新增人员
rabbitmqctl add_user hua abc123uuPP#设置权限
rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"#设置为管理员
rabbitmqctl set_user_tags hua administrator
*
表示授予该用户对该虚拟主机上所有队列和交换机的 configure
、write
和 read
权限。
- 第一个
".*"
表示用户可以配置任意队列和交换机。 - 第二个
".*"
表示用户可以向任意队列和交换机发送消息。 - 第三个
".*"
表示用户可以从任意队列中消费消息。
执行过程如图:
执行上面命令增加一个Web管理员:
- 用户名称:hua
- 密码:abc123uuPP
- 权限 :管理员
如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:
- 默认用户:guest
- 默认密码:guest
三 RabbitMQ Web 管理
1 RabbitMQ Web 登陆
进入RabbitMQ Web 登陆页面如下:
首先我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:
使用上面新建的账号hua登陆,登陆成功如下图:
2 用户管理
用户管理,用户增加操作简单,如下图:
用户管理,用户权限设置操作简单,如下图:
用户操作界面非常人性化,可以很方便设置权限,修改用户资料。
3 虚拟主机(重要)
虚拟主机(vhost)是 RabbitMQ 中的一种逻辑隔离机制,它相当于一个独立的命名空间。每个虚拟主机内部可以拥有自己独立的队列、交换机、绑定等资源,彼此之间相互隔离,不能共享资源。
- 命名空间:每个虚拟主机都有自己的队列、交换机、绑定等资源。
- 资源隔离:不同虚拟主机之间的资源(如队列和交换机)完全隔离,防止不同应用间的资源冲突。
- 用户权限:不同的用户可以被授予不同虚拟主机的访问权限,确保用户只能访问指定的虚拟主机中的资源。
虚拟主机提供了一种隔离和权限管理的方式,适用于以下场景:
- 多租户架构:在 SaaS(软件即服务)或多租户应用中,你可以为不同的租户创建不同的虚拟主机,以确保数据隔离。
- 开发与生产环境隔离:你可以为开发环境和生产环境创建不同的虚拟主机,避免资源冲突和干扰。
- 权限管理:不同的用户或应用可以通过虚拟主机进行权限分离,确保只有特定用户才能访问某些资源。
默认虚拟主机
RabbitMQ 默认创建一个虚拟主机 /
,这是一个特殊的虚拟主机,通常用于测试或默认情况下的资源管理。生产环境中,建议创建和使用新的虚拟主机,以更好地管理资源和权限。
虚拟主机操作也非常简单,如下图:
在用户管理界面选择用户绑定指定的虚拟主机,非常方便,如下图:
功能强大,非常好用。
四 java代码接入
方式一 java通用:
1 引入mvn依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>
JAVA 连接RabbitMQ生产消息与接收消费测试代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author hua* @date 2024-08-21 18:01*/
public class TestRabbitMQ {private final static String QUEUE_NAME = "hello";public static void main1(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(51091);factory.setUsername("java_producer");factory.setPassword("java_producer");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");} catch (TimeoutException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(51091);factory.setUsername("java_consumer");factory.setPassword("java_consumer");// 连接到 RabbitMQ 服务器try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列(确保队列存在)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义回调函数,当有消息送达时执行DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 消费消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}
测试运行发送消息,发送成功。如下图:
测试运行接收消息,消费成功。如下图:
上面测试通过后,改成服务类方便生产环境使用来发送消息代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author hua* @date 2024-08-22*/
@Service
public class RabbitMqServiceImpl {private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);private static final String QUEUE_NAME = "test";private Connection connection;private Channel channel;public RabbitMqServiceImpl() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(51091);factory.setUsername("java_producer");factory.setPassword("java_producer");//如果不指定虚拟机默认会使用/factory.setVirtualHost("test");try {this.connection = factory.newConnection();this.channel = connection.createChannel();this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);logger.info("RabbitMqServiceImpl initialized successfully.");} catch (IOException | TimeoutException e) {e.printStackTrace();logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);}}public void sendMessage(String message) {try {channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");} catch (IOException e) {e.printStackTrace();logger.error("Failed to send message: {}", e.getMessage());}}public void close() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}} catch (IOException | TimeoutException e) {e.printStackTrace();}}
}
上面的代码存在问题,未确认发送成功,有丢失风险,再改善如下:
import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
import com.rabbitmq.client.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;/*** @author hua* @date 2024-08-22*/
@Service
public class RabbitMqServiceImpl {private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);private static final String QUEUE_NAME = "hex_kyc";private Connection connection;private Channel channel;//存放所有消息,确认时删除,没确认的保存到数据库private ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();@AutowiredDbHexFailLogServiceImpl dbHexFailLogService;@PostConstructpublic void init() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(xxx);factory.setUsername("java_producer");factory.setPassword("java_producer");factory.setVirtualHost("xxxx");factory.setConnectionTimeout(3000);try {this.connection = factory.newConnection();this.channel = connection.createChannel();this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 启用发布者确认模式this.channel.confirmSelect();setupConfirmListener();logger.info("RabbitMqServiceImpl initialized successfully.");} catch (IOException | TimeoutException e) {logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);}}public void sendMessage(String message) {try {long nextSeqNo = channel.getNextPublishSeqNo();outstandingConfirms.put(nextSeqNo, message);channel.basicPublish("", QUEUE_NAME, null, message.getBytes());logger.info(" [x] Sent '{}'", message);} catch (Exception e) {logger.error("Failed to send message: {}", e.getMessage(), e);saveFailedMessageToDatabase(message,"CF");}}// 设置接收监听器,记录未确认的消息private void setupConfirmListener() {ConfirmCallback ackCallback = (deliveryTag, multiple) -> {if (multiple) {outstandingConfirms.headMap(deliveryTag + 1).clear();} else {outstandingConfirms.remove(deliveryTag);}System.out.println("Message confirmed ok deliveryTag="+deliveryTag);};ConfirmCallback nackCallback = (deliveryTag, multiple) -> {if (multiple) {// 获取从起点到 `deliveryTag + 1` 之间的所有未确认的消息ConcurrentNavigableMap<Long, String> unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);List<String> FailList= new ArrayList<>();for (Map.Entry<Long, String> entry : unconfirmedMessages.entrySet()) {String failedMessage = entry.getValue();logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);FailList.add(failedMessage);}saveFailedMessageToDatabaseBy(FailList); // 批量保存到数据库unconfirmedMessages.clear(); // 清除这些未确认的消息} else {String failedMessage = outstandingConfirms.get(deliveryTag);logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);saveFailedMessageToDatabase(failedMessage,"SF");outstandingConfirms.remove(deliveryTag); // 移除单条未确认的消息}};channel.addConfirmListener(ackCallback, nackCallback);}private void saveFailedMessageToDatabaseBy(List<String> failList) {List<DbHexFailLog> list=new ArrayList<>(failList.size());LocalDateTime now = LocalDateTime.now();for (String message : failList) {DbHexFailLog f=new DbHexFailLog();f.setInHexStr(message);f.setCtime(now);f.setFlag("SF");list.add(f);}dbHexFailLogService.saveBatch(list,list.size());failList.clear();}private void saveFailedMessageToDatabase(String message,String flag) {DbHexFailLog f=new DbHexFailLog();f.setInHexStr(message);f.setCtime(LocalDateTime.now());f.setFlag(flag);dbHexFailLogService.save(f);}@PreDestroypublic void close() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMqServiceImpl resources closed successfully.");} catch (IOException | TimeoutException e) {logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);}}
}
上面的代码优化后,主要增加了三项如下:
1 Publisher Confirms 机制:
- 启用
channel.confirmSelect()
来激活发布者确认模式。 - 使用
ConfirmCallback
和NackCallback
来处理消息的确认与未确认逻辑。 - 未确认的消息会被保存到数据库中。
2 保存失败的消息到数据库。
3 在 @PreDestroy
方法中关闭 Channel
和 Connection
,确保服务销毁时正确关闭资源。
方式二 SpringBoot框架使用
mvn依赖包:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
spring配置文件:
Spring: rabbitmq:host: xx.xx.xx.xxport: 51091username: java_consumerpassword: java_consumervirtual-host: hellowconnection-timeout: 6000
JAVA代码:
发送消息java代码:
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @author hua* @date 2024-08-22*/
@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate Queue queue;public void sendMessage(String message) {rabbitTemplate.convertAndSend(queue.getName(), message);System.out.println(" [x] Sent '" + message + "'");}
}
接收消息java代码:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author hua* @date 2024-08-22*/
@Component
public class RabbitListener {private static final Logger logger = LogManager.getLogger(RabbitListener.class);@RabbitListener(queues = "test")public void receiveMessage(String message) {try {System.out.println("rabbit rev <- "+message);//具体业务} catch (Exception e) {e.printStackTrace();logger.error("rabbit err= ", e);}}
}
上面代码在生产发送消息时通过编码方式更灵活,接收直接使用注解更简单。