[RabbitMQ] Spring Boot整合RabbitMQ

devtools/2024/10/8 22:20:45/

🌸个人主页:https://blog.csdn.net/2301_80050796?spm=1000.2115.3001.5343
🏵️热门专栏:
🧊 Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm=1001.2014.3001.5482
🍕 Collection与数据结构 (92平均质量分)https://blog.csdn.net/2301_80050796/category_12621348.html?spm=1001.2014.3001.5482
🧀线程与网络(96平均质量分) https://blog.csdn.net/2301_80050796/category_12643370.html?spm=1001.2014.3001.5482
🍭MySql数据库(93平均质量分)https://blog.csdn.net/2301_80050796/category_12629890.html?spm=1001.2014.3001.5482
🍬算法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12676091.html?spm=1001.2014.3001.5482
🍃 Spring(97平均质量分)https://blog.csdn.net/2301_80050796/category_12724152.html?spm=1001.2014.3001.5482
🎃Redis(97平均质量分)https://blog.csdn.net/2301_80050796/category_12777129.html?spm=1001.2014.3001.5482
🐰RabbitMQ(97平均质量分) https://blog.csdn.net/2301_80050796/category_12792900.html?spm=1001.2014.3001.5482
感谢点赞与关注~~~
在这里插入图片描述

目录

  • 1. 工作队列模式
  • 2. Publish/Subscribe(发布订阅模式)
  • 3. Routing(路由模式)
  • 4. Topics(通配符模式)
  • 5. 实际案例
    • 5.1 创建项目
    • 5.2 订单系统(生产者)
    • 5.3 物流系统(消费者)

RabbitMQ开发,Spring也提供了一些便利.下面我们就来学习如何使用Spring操作RabbitMQ.

1. 工作队列模式

步骤:

  1. 引入依赖
  2. 编写yml设置,基本信息配置.
  3. 编写生产者代码.
  4. 编写消费者代码.(定义监听类,使用@RabbitListener注解完成队列监听).
  5. 观察运行结果.
  • 引入依赖
    在我们创建Spring项目的依赖的时候,我们可以引入Spring For RabbitMQ依赖.
    在这里插入图片描述
    也可以手动导入xml依赖:
<dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope>
</dependency>
  • 添加配置
    我们需要在yml文件中配置服务器的IP地址,RabbitMQ的服务端口号(默认5672),用户名,密码,以及我们要使用的虚拟机.
spring:application:name: rabbitmq-springrabbitmq:host: 39.105.137.64port: 5672username: jiangruijiapassword: qwe123524virtual-host: /
  • 编写生产者代码
    为了方便测试,我们通过接口来发送信息.
    首先我们需要在Constant类中定义队列的名称.
public static final String WORK_QUEUE = "work_queue";

之后在config中声明队列.

@Configuration
public class RabbitMQConfig {@Beanpublic Queue workQueue(){return QueueBuilder.durable(Constant.WORK_QUEUE).build();//指定队列的名称}
}

durable中写入的是队列的名称,最后方法返回的是创建好的队列.

@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work(){rabbitTemplate.convertAndSend("",Constant.WORK_QUEUE,"发送消息");//指定发送消息的交换机和队列,以及发送的消息return "发送成功";}
}

我们在编写生产者代码的时候,我们需要注入RabbitTemplate对象(类似于Redis中的StringRedisTemplate对象),使用这个对象来操作RabbitMQ.之后我们需要在一个方法中发送信息,convertAndSend创建了生产者并向指定的队列中发送了信息,convertAndSend需要指定交换机,队列名称和需要发送的消息.
我们来运行代码,并访问指定的端口,从RabbitMQ的管理界面中查看消息.
在这里插入图片描述

在这里插入图片描述
我们可以从队列中获取到预期的消息:
在这里插入图片描述

  • 编写消费者代码
@Component
public class WorkListener {//工作模式中,消费者有两个@RabbitListener(queues = Constant.WORK_QUEUE)//监听指定的队列public void workListener1(Message message){System.out.println("workListener1收到消息:"+message);}@RabbitListener(queues = Constant.WORK_QUEUE)public void workListener2(Message message){System.out.println("workListener2收到消息:"+message);}
}

在创建消费者的时候,我们需要使用@Component把消费者类注入到Spring中,之后,我们要在对应的方法上添加@RabbitListener,指定该方法为监听者,在后面的queues属性中指定需要监听的队列.
运行上述代码:在运行之后,我们发现消息队列中的信息减少了.而且在控制台中也打印出了收到的对应的消息.
在这里插入图片描述
在这里插入图片描述
@RabbitListener修饰的方法的参数中,不仅仅可以指定Message类型的参数,还可以指定一下常见的参数:

  1. String: 返回消息的内容
  2. Message: 注意是org.springframework.amqp.core.Message包中的Message,在导包的时候不要导错.返回的是原始的消息体以及消息的属性,如消息ID,内容,队列.
  3. Channel: RabbitMQ中的通道消息对象,可以用于进行更高级的操作,比如手动确认.

2. Publish/Subscribe(发布订阅模式)

在发布/订阅模式中,多了一个交换机的角色.exchange常见的有三种类型,我们在前面介绍过,发别是:fanout广播模式,将消息交给所有绑定交换机的队列.direct定向模式,把消息交给符合指定RoutingKey的队列.topic通配符模式,把消息交给符合routing pattern(路由模式)的队列,只不过这里的RoutingKey中含有通配符.而定向模式中的RoutingKey是写死的.

  • 编写生产者代码
    和简单模式,工作队列模式最大的区别就是需要创建交换机,并绑定交换机和队列.
    首先我们声明队列,交换机的名称.
public static final String FANOUT_QUEUE1 = "fanout_queue1";
public static final String FANOUT_QUEUE2 = "fanout_queue2";
public static final String FANOUT_EXCHANGE = "fanout_exchange";

之后我们需要在声明交换机和队列,并绑定交换机和队列.之后使用@Bean注解进行注入.

@Bean
public Queue fanoutQueue1(){return QueueBuilder.durable(Constant.FANOUT_QUEUE1).build();
}
@Bean
public Queue fanoutQueue2(){return QueueBuilder.durable(Constant.FANOUT_QUEUE2).build();
}
@Bean
public FanoutExchange fanoutExchange(){return ExchangeBuilder.fanoutExchange(Constant.FANOUT_EXCHANGE).durable(true).build();
}

在创建交换机的时候,我们需要使用 ExchangeBuilder.fanoutExchange方法来指定交换机的名字,之后使用durable指定交换机为持久化.注意在返回交换机的时候,必须返回的是具体类型的交换机.队列的创建方法和前面的创建方法相同.
之后我们对交换机和队列进行绑定.

@Bean
public Binding binding1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding binging2(@Qualifier("fanoutExchange") FanoutExchange exchange,@Qualifier("fanoutQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange);
}

在绑定的时候,我们需要使用@Qualifier注解进行注入,注意注入的交换机类型必须和Spring容器中的交换机类型一致,都是fanout类型的交换机.bind方法中写入的是需要绑定的队列,to中写入的是需要绑定的交换机.
之后我们使用接口发送消息.

@RequestMapping("/fanout")
public String fanout(){rabbitTemplate.convertAndSend(Constant.FANOUT_EXCHANGE,"","fanout交换机发送消息");//指定发送的交换机即可,由于是广播模式,RoutingKey不需要指定return "发送成功";
}
  • 编写消费者代码
    定义监听类,处理收到的消息即可.
@Component
public class FanoutListener {@RabbitListener(queues = Constant.FANOUT_QUEUE1)public void ListenerQueue1(String message){System.out.println("queue1收到消息:"+message);}@RabbitListener(queues = Constant.FANOUT_QUEUE2)public void ListenerQueue2(String message){System.out.println("queue2收到消息:"+message);}
}

运行项目,之后访问Controller对应的接口.
在这里插入图片描述
在这里插入图片描述
rabbbitMQ的管理界面,消息条数发生了波动,控制台成功收到了生产者发送的消息.

3. Routing(路由模式)

交换机类型为direct类型的时候,消息会把消息交给符合指定RoutingKey的队列.队列和交换机的绑定,不是任意绑定了,而是需要指定一个RoutingKey.交换机也不再把消息交给每一个绑定的key,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey和消息的RoutingKey完全一致,才会收到消息.

  • 编写生产者代码
    指定队列和交换机的名称
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DIRECT_QUEUE1 = "direct_queue1";
public static final String DIREct_QUEUE2 = "direct_queue2";

接下来声明交换机和队列

@Bean
public Queue directQueue1(){return QueueBuilder.durable(Constant.DIRECT_QUEUE1).build();
}
@Bean
public Queue directQueue2(){return QueueBuilder.durable(Constant.DIREct_QUEUE2).build();
}
@Bean
public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constant.DIRECT_EXCHANGE).durable(true).build();
}

接下来绑定队列和交换机,指定绑定时候的BindingKey.

@Bean
public Binding directBinding1(@Qualifier("directExchange") DirectExchange exchange,@Qualifier("directQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("orange");
}
@Bean
public Binding directBinding2(@Qualifier("directExchange") DirectExchange exchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("green");
}
@Bean
public Binding directBinding3(@Qualifier("directExchange") DirectExchange exchange,@Qualifier("directQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("red");
}

相比广播模式,多了with指定了绑定时候的BindingKey.
使用接口发送消息.

@RequestMapping("/direct")
public String direct(String routingKey){rabbitTemplate.convertAndSend(Constant.DIRECT_EXCHANGE,routingKey,"direct生产者发送消息");return "发送成功";
}

与前面广播不同的是,在发送消息的时候,需要在参数中指定RoutingKey,在给交换机发送消息的时候,需要把传入的RoutingKey传入convertAndSend中.

  • 编写消费者代码
    消费者的编写方式和前面几种方式都差不多,都是在指定的方法上添加@RabbitListener注解,在注解中指定需要监听的队列.
@Component
public class DirectListener {@RabbitListener(queues = Constant.DIRECT_QUEUE1)public void ListenerQueue1(String message){System.out.println("监听队列1:"+message);}@RabbitListener(queues = Constant.DIREct_QUEUE2)public void ListenerQueue2(String message){System.out.println("监听队列2:"+message);}
}

运行项目并访问指定接口.
http://127.0.0.1:8080/producer/direct?routingKey=green
在这里插入图片描述
http://127.0.0.1:8080/producer/direct?routingKey=red
在这里插入图片描述
http://127.0.0.1:8080/producer/direct?routingKey=orange
在这里插入图片描述
在这里插入图片描述

4. Topics(通配符模式)

Topic和Routing模式的区别是:

  1. topics模式使用的交换机类型为topic(Routing模式使用的交换机类型为direct)
  2. topic类型的交换机在匹配规则上进行了扩展,BindingKey支持通配符匹配.
  • 编写生产者代码
    首先还是指定队列和交换机的名称
public static final String TOPICS_QUEUE1 = "topics_queue1";
public static final String TOPICS_QUEUE2 = "topics_queue2";
public static final String TOPICS_EXCHANGE = "topics_exchange";

之后声明交换机和队列.

@Bean
public Queue topicsQueue1(){return QueueBuilder.durable(Constant.TOPICS_QUEUE1).build();
}
@Bean
public Queue topicsQueue2(){return QueueBuilder.durable(Constant.TOPICS_QUEUE2).build();
}
@Bean
public TopicExchange topicExchange(){return ExchangeBuilder.topicExchange(Constant.TOPICS_EXCHANGE).durable(true).build();
}

绑定交换机和队列,并指定BindingKey

@Bean
public Binding topicsBinding1(@Qualifier("topicsExchange") TopicExchange exchange, @Qualifier("topicsQueue1") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("#.error");
}
@Bean
public Binding topicsBinding2(@Qualifier("topicsExchange") TopicExchange exchange,@Qualifier("topicsQueue2") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("#.info");
}

之后我们使用接口来接收信息.

@RequestMapping("/topics")
public String topics(String routingKey){rabbitTemplate.convertAndSend(Constant.TOPICS_EXCHANGE,routingKey,"topics生产者发送消息");return "发送成功";
}
  • 编写消费者代码
@Component
public class TopicsListener {@RabbitListener(queues = Constant.TOPICS_QUEUE1)public void topicsListener1(String message){System.out.println("topics队列1接收到消息"+message);}@RabbitListener(queues = Constant.TOPICS_QUEUE2)public void topicsListener2(String message){System.out.println("topics队列2接收到消息"+message);}
}

运行项目,访问接口,传递指定参数,观察结果
http://127.0.0.1:8080/producer/topics?routingKey=404.error
在这里插入图片描述
http://127.0.0.1:8080/producer/topics?routingKey=200.info
在这里插入图片描述
在这里插入图片描述

5. 实际案例

作为一个消息队列,RabbitMQ也可以用作应用程序之间的通信.上述代码和生产者和消费者代码放在不同的项目模块中即可完成不同的应用程序通信.
比如我们需要实现下面的功能:
用户下单成功之后,通知物流系统进行发货.
在这里插入图片描述
这里我们只做应用通信,我们不对业务逻辑做具体实现.

5.1 创建项目

我们在创建项目的时候,把两个项目放在同一个空项目中.

  1. 创建一个空项目
    在这里插入图片描述
  2. 之后在这个项目中创建一个模块
    在这里插入图片描述
  3. 后续流程和创建SpringBoot项目一样.
    在这里插入图片描述
    在模块创建的时候,我们在每个项目中都会引入如下的依赖:
    在这里插入图片描述
    最终的项目结构如下:
    在这里插入图片描述
    我们在创建项目之后有可能无法加载项目为Maven项目,我们可以在指定的项目的pom文件中右键—>添加为Maven项目即可.

5.2 订单系统(生产者)

  1. 完善配置信息
spring:application:name: logistics-servicerabbitmq:host: 39.105.137.64port: 5672username: jiangruijiapassword: qwe123524virtual-host: /
  1. 声明队列
@Configuration
public class RabbitMQConfig {@Beanpublic Queue workQueue(){return QueueBuilder.durable("order.create").build();}
}
  1. 编写下单接口,下单成功之后,发送订单消息到队列.
@RestController
@RequestMapping("/order")
public class OrderController {@Autowiredprivate RabbitTemplate rabbitTemplate;@RequestMapping("/createOrder")public String createOrder(){String order = UUID.randomUUID().toString();rabbitTemplate.convertAndSend("","order.create","下单成功:"+order);return "下单成功";}
}
  1. 启动服务,访问对应接口,观察结果

    消息条数出现了波动.
    在这里插入图片描述
    从队列中查看消息:
    在这里插入图片描述

5.3 物流系统(消费者)

  1. 完善配置信息
    8080端口号已经订单系统被占领,所以我们需要更改物流系统的端口号为9090.
spring:application:name: logistics-servicerabbitmq:host: 39.105.137.64port: 5672username: jiangruijiapassword: qwe123524virtual-host: /
server:port: 9090
  1. 消费者监听队列
@Component
public class OrderControllerListener {@RabbitListener(queues = "order.create")public void ListenerQueue(String message){System.out.println("接收到订单:"+message);//此处业务逻辑省略}
}

启动服务,观察结果:
在这里插入图片描述
在这里插入图片描述
我们看到了消费者成功接收到了订单id.


http://www.ppmy.cn/devtools/122379.html

相关文章

【postman】本地接口文件

问题描述 win10 升级postman到10.x之后&#xff0c;原来整理的接口不见了&#xff0c;只有在history中按日期排序的无汉字描述的url地址。 原来的接口以及接口文件夹去哪里了呢&#xff1f;怎么导入呢&#xff1f;从哪里导入呢&#xff1f; 解决方法 笔者的安装目录和文件目…

WIFI网速不够是不是光猫的“路由模式”和“桥接模式”配置错了?

光猫&#xff08;光纤调制解调器&#xff09;是一种用于将光纤信号转换为数字信号的设备&#xff0c;通常用于家庭或企业网络中。光猫可以在不同的工作模式下运行&#xff0c;其中最常见的两种模式是“路由模式”和“桥接模式”。以下是这两种模式的详细解释及其优缺点。 一、路…

04.useTitle

在 React 应用中,动态更新页面标题是提升用户体验的一个重要方面。它可以让用户更清楚地知道当前页面的内容或状态,特别是在单页应用(SPA)中。useTitle 钩子提供了一种简单而有效的方式来管理文档标题。以下是如何实现和使用这个自定义钩子: const useTitle = title =>…

顺丰Android面试题集锦及参考答案

TCP 三次握手和四次挥手是什么,挥手过程中主动方的状态是什么? TCP 三次握手是建立连接的过程: 第一次握手:客户端向服务器发送一个 SYN 报文,该报文包含客户端的初始序列号(seq=x)。此时客户端进入 SYN_SENT 状态。第二次握手:服务器收到客户端的 SYN 报文后,向客户端…

05:(寄存器开发)定时器一

定时器 1、系统定时器SysTick1.1、SysTick中断的使用1.2、使用SysTick制作延迟函数 2、基本定时器2.1、基本定时器中断的使用2.2、使用基本定时器制作延时函数 1、系统定时器SysTick 1.1、SysTick中断的使用 ①SysTcik系统滴答定时器和片上外设定时器不同&#xff0c;它在CPU…

Python 读取与处理出入库 Excel 数据实战案例(HTML 网页展示)

有如下数据&#xff0c;需要对数据合并处理&#xff0c;输出到数据库。 数据样例&#xff1a;&#x1f447; excel内容&#xff1a; 出入库统计表河北库.xlsx: 出入库统计表天津库.xlsx: 01实现过程 1、创建test.py文件&#xff0c;然后将下面代码复制到里面&#xff0c;最后…

大厂面试真题-介绍以下Docker的Overlay网络

Overlay网络&#xff0c;又称为叠加网络或覆盖网络&#xff0c;是一种在现有物理网络&#xff08;Underlay网络&#xff09;之上构建的虚拟网络架构。它通过虚拟化技术&#xff0c;在不对基础网络进行大规模修改的条件下&#xff0c;实现应用在网络上的承载&#xff0c;并能与其…

论文阅读笔记-Pre-trained Models for Natural Language Processing: A Survey

前言 预训练模型给下游任务带来的效果不言而喻,有了预训练模型,我们可以使用它来加速解决问题的过程。正如论文中所说的那样,预训练模型(PTMs)的出现将自然语言处理(NLP)带入了一个新时代。本篇论文基于分类从四个角度对现有PTMs进行系统分类,描述如何使PTMs的知识适应…