RabbitMQ 最新版 安装,配置,java接入使用(详细教程)

server/2024/10/25 13:33:12/

一 RabbitMQ下载

RabbitMQ 官网最新版下载:

RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ依赖erlang-26.2.5.2-1.el7.x86_64.rpm下载:

https://github.com/rabbitmq/erlang-rpm/releases/download/v26.2.5.2/erlang-26.2.5.2-1.el7.x86_64.rpm

二 RabbitMQ安装

   1 安装erlang环境

  安装RabbitMQ前要先安装erlang环境,因为RabbitMQ是用erlang开发的

  执行安装指令如下:

rpm -ivh erlang-26.2.5.2-1.el7.x86_64.rpm

   执行后如下图:

  验证 erlang 安装是否成功,执行erl可以查看版本,说明安装成功如下图:

 

2 安装RabbitMQ

  执行安装RabbitMQ指令如下:

rpm -ivh rabbitmq-server-3.13.6-1.el8.noarch.rpm 

  执行安装中,如下图: 

 注意:如果erlang环境没有安装好,或者版本与当前rabbitMQ不匹配则会报错以下错误,提示需要指定范围的依赖版本,如下图: 

如果出现上图的错误,请参考上一步重新安装erlang环境即可。

 安装结束后,消息队列数据保存在哪?日记在哪?想了解更多的信息?

只需一条指令可查询当前状态信息:

rabbitmq-diagnostics status

执行后如下图:

从上图状态中可以看出目前没有使用任何配置文件,以可以看到以下有用的信息:

  • 数据目录: /var/lib/rabbitmq/mnesia/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b
  • 日记文件:/var/log/rabbitmq/rabbit@server-c868cc62-54b3-4125-80a3-a809f3caff6b.log

 上图信息很详细,可以说开发者开发这个工具非常的细心,对软件有足够了解使用也安心!

3 配置RabbitMQ(可选项

  安装好后RabbitMQ没有使用任何的配置文件(也没有默认配置文件),但会生成一个空目录位置在:/etc/rabbitmq/ ,在这里你可以按照自己的需求参考官方网站配置自己的项目,格式支持有多种,下面我这里要变更默认端口为例创建一个配置文件:

vi /etc/rabbitmq/rabbitmq.config

配置文件内容:

[{rabbit, [{tcp_listeners, [{"0.0.0.0", 51091}]}]},{rabbitmq_management, [{listener, [{port,59876}, {ssl, false}]}]}
].

  通过配置配置文件实现变更:

  1.   客户端 51091 用于消费或生产端连接,IP 0.0.0.0 代表绑定服务器内外网IP。
  2.   管理端口 59876 用于RabbitMQ的Web管理。

再次执行 rabbitmq-diagnostics status 查看新增的配置文件是否被使用,如下图:

 上图可以看到刚刚创建的配置文件已被引用状态。

4 RabbitMQ 启动与关闭

   RabbitMQ安装好后最终是服务状态,可以通过服务管理控制:

#启动
systemctl start rabbitmq-server#停止关闭
systemctl stop rabbitmq-server#重启
systemctl restart rabbitmq-server#开机启动
systemctl enable rabbitmq-server#查看状态
systemctl status rabbitmq-server

  操作如下图:

5 开启RabbitMQ的Web管理界面(可选项,强烈建议开启

RabbitMQ的安装后自带Web管理界面,但是需要执行以下指令开启:

rabbitmq-plugins enable rabbitmq_management

 我们平时只需要一名管员即可,后面要增加用户或设置权限直接在Web操作即可。

   新增一位 RabbitMQ的Web管理员并增加设置管理权限 ,用于管理RabbitMQ.

#新增人员
rabbitmqctl add_user hua abc123uuPP#设置权限
rabbitmqctl set_permissions -p / hua ".*" ".*" ".*"#设置为管理员
rabbitmqctl set_user_tags hua administrator

* 表示授予该用户对该虚拟主机上所有队列和交换机的 configurewriteread 权限。

  • 第一个 ".*" 表示用户可以配置任意队列和交换机。
  • 第二个 ".*" 表示用户可以向任意队列和交换机发送消息。
  • 第三个 ".*" 表示用户可以从任意队列中消费消息。

 执行过程如图:

执行上面命令增加一个Web管理员:

  • 用户名称:hua
  • 密码:abc123uuPP
  • 权限 :管理员

  如果只在本地localhost登陆RabbitWeb管理平台,用默认的账号登陆即可:

  • 默认用户:guest
  • 默认密码:guest

三 RabbitMQ Web 管理

1 RabbitMQ Web 登陆

 进入RabbitMQ Web 登陆页面如下:

首先我们使用默认账号密码尝试登陆,为了安全确实限制本地登陆,如下图:

使用上面新建的账号hua登陆,登陆成功如下图:

2 用户管理

 用户管理,用户增加操作简单,如下图:

  用户管理,用户权限设置操作简单,如下图:

用户操作界面非常人性化,可以很方便设置权限,修改用户资料。 

3 虚拟主机(重要)

虚拟主机(vhost)是 RabbitMQ 中的一种逻辑隔离机制,它相当于一个独立的命名空间。每个虚拟主机内部可以拥有自己独立的队列、交换机、绑定等资源,彼此之间相互隔离,不能共享资源。

  • 命名空间:每个虚拟主机都有自己的队列、交换机、绑定等资源。
  • 资源隔离:不同虚拟主机之间的资源(如队列和交换机)完全隔离,防止不同应用间的资源冲突。
  • 用户权限:不同的用户可以被授予不同虚拟主机的访问权限,确保用户只能访问指定的虚拟主机中的资源。

虚拟主机提供了一种隔离和权限管理的方式,适用于以下场景:

  • 多租户架构:在 SaaS(软件即服务)或多租户应用中,你可以为不同的租户创建不同的虚拟主机,以确保数据隔离。
  • 开发与生产环境隔离:你可以为开发环境和生产环境创建不同的虚拟主机,避免资源冲突和干扰。
  • 权限管理:不同的用户或应用可以通过虚拟主机进行权限分离,确保只有特定用户才能访问某些资源。

默认虚拟主机

RabbitMQ 默认创建一个虚拟主机 /,这是一个特殊的虚拟主机,通常用于测试或默认情况下的资源管理。生产环境中,建议创建和使用新的虚拟主机,以更好地管理资源和权限。 

虚拟主机操作也非常简单,如下图:

 在用户管理界面选择用户绑定指定的虚拟主机,非常方便,如下图:

 

功能强大,非常好用。 

四 java代码接入

  方式一 java通用:

  1 引入mvn依赖

        <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version></dependency>

  JAVA 连接RabbitMQ生产消息与接收消费测试代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author hua* @date 2024-08-21 18:01*/
public class TestRabbitMQ {private final static String QUEUE_NAME = "hello";public static void main1(String[] args) {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(51091);factory.setUsername("java_producer");factory.setPassword("java_producer");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");} catch (TimeoutException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();}}public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(51091);factory.setUsername("java_consumer");factory.setPassword("java_consumer");// 连接到 RabbitMQ 服务器try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列(确保队列存在)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 定义回调函数,当有消息送达时执行DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};// 消费消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}}
}

 测试运行发送消息,发送成功。如下图:

 

 测试运行接收消息,消费成功。如下图:

 上面测试通过后,改成服务类方便生产环境使用来发送消息代码:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author hua* @date 2024-08-22*/
@Service
public class RabbitMqServiceImpl {private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);private static final String QUEUE_NAME = "test";private Connection connection;private Channel channel;public RabbitMqServiceImpl() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(51091);factory.setUsername("java_producer");factory.setPassword("java_producer");//如果不指定虚拟机默认会使用/factory.setVirtualHost("test");try {this.connection = factory.newConnection();this.channel = connection.createChannel();this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);logger.info("RabbitMqServiceImpl initialized successfully.");} catch (IOException | TimeoutException e) {e.printStackTrace();logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage());throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);}}public void sendMessage(String message) {try {channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");} catch (IOException e) {e.printStackTrace();logger.error("Failed to send message: {}", e.getMessage());}}public void close() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}} catch (IOException | TimeoutException e) {e.printStackTrace();}}
}

   上面的代码存在问题,未确认发送成功,有丢失风险,再改善如下:

import com.qyhua.common.table.db_hex_fail_log.entity.DbHexFailLog;
import com.qyhua.common.table.db_hex_fail_log.service.impl.DbHexFailLogServiceImpl;
import com.rabbitmq.client.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;/*** @author hua* @date 2024-08-22*/
@Service
public class RabbitMqServiceImpl {private static final Logger logger = LogManager.getLogger(RabbitMqServiceImpl.class);private static final String QUEUE_NAME = "hex_kyc";private Connection connection;private Channel channel;//存放所有消息,确认时删除,没确认的保存到数据库private ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();@AutowiredDbHexFailLogServiceImpl dbHexFailLogService;@PostConstructpublic void init() {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xx.xx.xx.xx");factory.setPort(xxx);factory.setUsername("java_producer");factory.setPassword("java_producer");factory.setVirtualHost("xxxx");factory.setConnectionTimeout(3000);try {this.connection = factory.newConnection();this.channel = connection.createChannel();this.channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 启用发布者确认模式this.channel.confirmSelect();setupConfirmListener();logger.info("RabbitMqServiceImpl initialized successfully.");} catch (IOException | TimeoutException e) {logger.error("Failed to initialize RabbitMqServiceImpl: {}", e.getMessage(), e);throw new RuntimeException("Failed to initialize RabbitMqServiceImpl", e);}}public void sendMessage(String message) {try {long nextSeqNo = channel.getNextPublishSeqNo();outstandingConfirms.put(nextSeqNo, message);channel.basicPublish("", QUEUE_NAME, null, message.getBytes());logger.info(" [x] Sent '{}'", message);} catch (Exception e) {logger.error("Failed to send message: {}", e.getMessage(), e);saveFailedMessageToDatabase(message,"CF");}}// 设置接收监听器,记录未确认的消息private void setupConfirmListener() {ConfirmCallback ackCallback = (deliveryTag, multiple) -> {if (multiple) {outstandingConfirms.headMap(deliveryTag + 1).clear();} else {outstandingConfirms.remove(deliveryTag);}System.out.println("Message confirmed ok deliveryTag="+deliveryTag);};ConfirmCallback nackCallback = (deliveryTag, multiple) -> {if (multiple) {// 获取从起点到 `deliveryTag + 1` 之间的所有未确认的消息ConcurrentNavigableMap<Long, String> unconfirmedMessages = outstandingConfirms.headMap(deliveryTag + 1);List<String> FailList= new ArrayList<>();for (Map.Entry<Long, String> entry : unconfirmedMessages.entrySet()) {String failedMessage = entry.getValue();logger.error("Message not confirmed: deliveryTag={}, message={}", entry.getKey(), failedMessage);FailList.add(failedMessage);}saveFailedMessageToDatabaseBy(FailList);  // 批量保存到数据库unconfirmedMessages.clear();  // 清除这些未确认的消息} else {String failedMessage = outstandingConfirms.get(deliveryTag);logger.error("Message not confirmed: deliveryTag={}, message={}", deliveryTag, failedMessage);saveFailedMessageToDatabase(failedMessage,"SF");outstandingConfirms.remove(deliveryTag);  // 移除单条未确认的消息}};channel.addConfirmListener(ackCallback, nackCallback);}private void saveFailedMessageToDatabaseBy(List<String> failList) {List<DbHexFailLog> list=new ArrayList<>(failList.size());LocalDateTime now = LocalDateTime.now();for (String message : failList) {DbHexFailLog f=new DbHexFailLog();f.setInHexStr(message);f.setCtime(now);f.setFlag("SF");list.add(f);}dbHexFailLogService.saveBatch(list,list.size());failList.clear();}private void saveFailedMessageToDatabase(String message,String flag) {DbHexFailLog f=new DbHexFailLog();f.setInHexStr(message);f.setCtime(LocalDateTime.now());f.setFlag(flag);dbHexFailLogService.save(f);}@PreDestroypublic void close() {try {if (channel != null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}logger.info("RabbitMqServiceImpl resources closed successfully.");} catch (IOException | TimeoutException e) {logger.error("Failed to close RabbitMqServiceImpl resources: {}", e.getMessage(), e);}}
}

上面的代码优化后,主要增加了三项如下:

 1 Publisher Confirms 机制

  • 启用 channel.confirmSelect() 来激活发布者确认模式。
  • 使用 ConfirmCallbackNackCallback 来处理消息的确认与未确认逻辑。
  • 未确认的消息会被保存到数据库中。

2 保存失败的消息到数据库。

3 在 @PreDestroy 方法中关闭 ChannelConnection,确保服务销毁时正确关闭资源。

 

方式二 SpringBoot框架使用

mvn依赖包:

    <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>

spring配置文件:

Spring: rabbitmq:host: xx.xx.xx.xxport: 51091username: java_consumerpassword: java_consumervirtual-host: hellowconnection-timeout: 6000

JAVA代码:

  发送消息java代码:

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @author hua* @date 2024-08-22*/
@Component
public class MessageProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate Queue queue;public void sendMessage(String message) {rabbitTemplate.convertAndSend(queue.getName(), message);System.out.println(" [x] Sent '" + message + "'");}
}

  接收消息java代码:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author hua* @date 2024-08-22*/
@Component
public class RabbitListener {private static final Logger logger = LogManager.getLogger(RabbitListener.class);@RabbitListener(queues = "test")public void receiveMessage(String message) {try {System.out.println("rabbit rev <- "+message);//具体业务} catch (Exception e) {e.printStackTrace();logger.error("rabbit err= ", e);}}
}

   上面代码在生产发送消息时通过编码方式更灵活,接收直接使用注解更简单。


http://www.ppmy.cn/server/106147.html

相关文章

《Cloud Native Data Center Networking》(云原生数据中心网络设计)读书笔记 -- 07数据中心的边缘

本章将帮助你回答以下问题 可以用哪些方式将 Clos 拓扑连接到外部网终?边缘部署路由协议的最佳实践是什么?企业应如何处理混合云中的连接? 连接模型 为什么要连接到外部世界? 数据中心连接到外部世界的原因很多。如果你要对外提供某种服务(例如搜索服务广告推荐系统或内…

【STM32】驱动OLED屏

其实我没买OLED屏哈哈哈&#xff0c;这个只是学习了&#xff0c;没机会实践。 大部分图片来源&#xff1a;正点原子HAL库课程 专栏目录&#xff1a;记录自己的嵌入式学习之路-CSDN博客 目录 1 显示原理 2 读写方式&#xff1a;8080并口 2.1 支持的指令类型 2.2 …

013、架构_配置文件_os.ini

文件说明配置文件 “os.ini” 用于设置 os 链路信息、MDS 模块、PM 模块、CM 模块、GTM 模块、CN 模块、SPEngine 等模块的链路信息等参数。文件路径为 “~/etc”。 配置文件 “os.ini” 是普通的文本文件,具有以下特点: 文件中以 ; 开始的行,表示注释内容(如:;osheartbea…

目标检测多模态大模型实践:貌似是全网唯一Shikra的部署和测试教程,内含各种踩坑以及demo代码

原文&#xff1a; Shikra: Unleashing Multimodal LLM’s Referential Dialogue Magic 代码&#xff1a; https://github.com/shikras/shikra 模型&#xff1a; https://huggingface.co/shikras/shikra-7b-delta-v1 https://huggingface.co/shikras/shikra7b-delta-v1-0708 第一…

Day01-生命周期函数

&#x1f3c6; 个人愚见&#xff0c;没事写写笔记 &#x1f3c6;《博客内容》&#xff1a;Unity3D开发内容 &#x1f3c6;&#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f50e;目标&#xff1a;常用Unity生命周期函数 ☀️实现&#xff1a; 1.Unity游戏帧和时间 1…

ffmpeg6.1集成ffmpeg-gl-transition滤镜

可代安装,有需要可以私信 ffmpeg-gl-transition 是基于 ffmpeg 4.x 进行开发的一个滤镜插件,在高版本上安装会有很多问题,以下是安装步骤,过程中可能会遇到很多报错,每个人的环境不一样,遇到的报错也不一样,但是都有解决办法。以下步骤中如果是在容器中docker 中,如果…

【数据结构】总结二叉树的概念以及存储结构

目录 1. 树的概念及结构 1.1 树的名词定义 1.2 树的表示 2. 二叉树的概念及结构 2.1 二叉树的概念 2.2 特殊的二叉树 2.2.1 满二叉树 2.2.2 完全二叉树 2.3 二叉树的存储结构 2.3.1 顺序存储 2.3.2 链式存储 3. 选择题 1. 树的概念及结构 1.1 树的名词定义 1. 节…

springCloud 网关(gateway)配置跨域访问

如果项目是分布式架构&#xff0c;通过网关进行路由转发的&#xff0c;那么项目中如果存在跨域的访问&#xff0c;在每一个项目中单独配置&#xff0c;显示是错误的&#xff0c;我们只需要在网关处进行处理&#xff0c;其它项目都是由网关进行转发的&#xff0c;他们是不会存在…