SpringBoot集成多个rabbitmq

ops/2024/11/25 18:15:52/

1、pom文件

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.9</version>
</dependency>

2、rabbitmq的连接配置文件

spring:rabbitmq:mq1:host: xxx.xxx.xxx.xxxport: 5672username: xxxxpassword: xxxxxenable: truemq2:host: xxx.xxx.xxx.xxxport: 5672username: xxxxxpassword: xxxxxenable: true

3、mq1的相关代码  MQ1RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;@Data
@Component("mq1RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack1 returnCallBack1;@Autowiredprivate ConfirmCallBack1 confirmCallBack1;@Bean(name = "mq1ConnectionFactory")//命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此@Primarypublic ConnectionFactory createConnectionFactory() {//消息队列1的连接CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq1RabbitTemplate")//命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此@Primarypublic RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {//消息生产RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack1);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack1);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory1")@Primarypublic SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "subQueue01")public Queue firstQueue() {return new Queue("subQueue01");}@Bean(name = "subQueue02")public Queue secondQueue() {return new Queue("subQueue02");}@Bean(name = "subQueue03")public Queue thirdQueue() {return new Queue("subQueue03", true);}@Bean(name = "subQueue04")public Queue fourQueue() {return new Queue("subQueue04", true);}@Bean(name = "topicExchangeOne")public TopicExchange topicExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new TopicExchange("topicExchangeOne");}@Bean(name = "binding1")public Binding binding1(@Qualifier("subQueue01") Queue queue, TopicExchange exchange) {//绑定队列1到TopicExchange  routingKey是队列1的队列名return BindingBuilder.bind(queue).to(exchange).with("subQueue01");}@Bean(name = "fanoutExchangeOne")public FanoutExchange fanoutExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new FanoutExchange("fanoutExchangeOne");}@Bean(name = "binding3")public Binding binding3(@Qualifier("subQueue03") Queue queue, FanoutExchange exchange) {//绑定队列3到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}@Bean(name = "binding4")public Binding binding4(@Qualifier("subQueue04") Queue queue, FanoutExchange exchange) {//绑定队列4到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}}

ConfirmCallBack1 .java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack1 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack1消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack1消息发送交换机成功");}}
}
ReturnCallBack1.java
package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack1 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack1消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

4、mq2的相关代码

  MQ2RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Data
@Component("mq2RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq2") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq2.enable", havingValue = "true") //是否启用
public class MQ2RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack2 returnCallBack2;@Autowiredprivate ConfirmCallBack2 confirmCallBack2;@Bean(name = "mq2ConnectionFactory")   //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此public ConnectionFactory createConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq2RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此public RabbitTemplate brainRabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack2);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack2);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory2")public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}}

ConfirmCallBack2.java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack2 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack2消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack2消息发送交换机成功");}}
}

ReturnCallBack2.java

package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack2 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack2消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

5、消息生产者

package com.pojo.prj.controller;import com.pojo.common.anno.NoNeedLogin;
import com.pojo.common.base.ApplicationContextUtils;
import com.pojo.common.base.BaseController;
import com.pojo.util.ResponseResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/*** <p>* 项目表 前端控制器* </p>** @author zhushangjin* @menu 项目管理* @since 2022-11-14*/
@RestController
@Slf4j
public class ProjectController extends BaseController {@Resource(name = "mq1RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq1RabbitTemplate;@Resource(name = "mq2RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq2RabbitTemplate;/*** 获取项目下拉列表** @return* @status done*/@GetMapping("/prj/project/list")@NoNeedLoginpublic ResponseResult<String> list() {String active = ApplicationContextUtils.getActiveProfile();logger.error(ApplicationContextUtils.getActiveProfile());return ResponseResult.ok("ReturnCallBack2");}@GetMapping("/prj/project/test1")public ResponseResult test1() {//发送到topicExchangeOne类型的交换机,根据routekey去找发送到哪个队列里,// 只有这一个队列才能收到这条消息String str = "test1test1test1test1test1";mq1RabbitTemplate.convertAndSend("topicExchangeOne","subQueue01", str);return buildResponseResult(true);}@GetMapping("/prj/project/test2")public ResponseResult test2() {//发送到direct类型的交换机,根据routekey去找发送到哪个队列里,//只有这一个队列才能收到这条消息mq2RabbitTemplate.convertAndSend("subQueue02", "test2test2test2test2test2");return buildResponseResult(true);}@GetMapping("/prj/project/test3")public ResponseResult test3() {//发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,// 故第二个参数routekey无需填写mq1RabbitTemplate.convertAndSend("fanoutExchangeOne", null, "test3test3test3test3test3");return buildResponseResult(true);}}

6、消息消费者

Receiver1.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue01", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver1 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver1: " + hello);}}

Receiver2.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue02", containerFactory = "simpleRabbitListenerContainerFactory2")
public class Receiver2 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver2: " + hello);}}

Receiver3.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue03", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver3 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver3 : " + hello);}}

Receiver4.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue04", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver4 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver4 : " + hello);}}

创建队列

@Bean(name = "uavTopicQueue")public Queue topicQueue() {Map<String, Object> argsMap = new HashMap<String, Object>();argsMap.put("x-max-priority", 5);Queue queue = new Queue(UAV_QUEUE, true, false, false, argsMap);return queue;}


http://www.ppmy.cn/ops/136634.html

相关文章

鸿蒙网络编程系列50-仓颉版TCP回声服务器示例

1. TCP服务端简介 TCP服务端是基于TCP协议构建的一种网络服务模式&#xff0c;它为HTTP&#xff08;超文本传输协议&#xff09;、SMTP&#xff08;简单邮件传输协议&#xff09;等高层协议的应用程序提供了可靠的底层支持。在TCP服务端中&#xff0c;服务器启动后会监听一个或…

C语言:深入理解指针

一.内存和地址 我们知道计算机上CPU&#xff08;中央处理器&#xff09;在处理数据的时候&#xff0c;需要的数据是在内存中读取的&#xff0c;处理后的数据也会放回内存中&#xff0c;那我们买电脑的时候&#xff0c;电脑上内存是 8GB/16GB/32GB 等&#xff0c;那这些内存空间…

深度学习:神经网络中线性层的使用

深度学习&#xff1a;神经网络中线性层的使用 在神经网络中&#xff0c;线性层&#xff08;也称为全连接层或密集层&#xff09;是基础组件之一&#xff0c;用于执行输入数据的线性变换。通过这种变换&#xff0c;线性层可以重新组合输入数据的特征&#xff0c;并将其映射到新…

Element UI Collapse 折叠面板和表格结合高度闪动问题

好久没写文章了&#xff0c;最近使用了 Element UI 的 Collapse 不是 Plus 版本&#xff0c;遇到一个问题。在折叠面板中使用&#xff0c;会出现打开的时候高度闪动&#xff0c;就是表格的高度计算出现了问题&#xff0c;都是接口返回数据时出现的&#xff0c;在 Github 上的 I…

神经网络的初始化

目录 为什么需要初始化&#xff1f; 初始化的常用方法&#xff1a; 是否必须初始化&#xff1f; 初始化神经网络中的权重和偏置是深度学习模型训练中非常重要的一步&#xff0c;虽然在某些情况下不进行初始化也能训练出模型&#xff0c;但正确的初始化方法能够显著提高训练效…

bash笔记

0 $0 是脚本的名称&#xff0c;$# 是传入的参数数量&#xff0c;$1 是第一个参数&#xff0c;$BOOK_ID 是变量BOOK_ID的内容 1 -echo用于在命令窗口输出信息 -$()&#xff1a;是命令替换的语法。$(...) 会执行括号内的命令&#xff0c;并将其输出捕获为一个字符串&#xff…

jupyter notebook的 markdown相关技巧

目录 1 先选择为markdown类型 2 开关技巧 2.1 运行markdown 2.2 退出markdown显示效果 2.3 注意点&#xff1a;一定要 先选择为markdown类型 3 一些设置技巧 3.1 数学公式 3.2 制表 3.3 目录和列表 3.4 设置各种字体效果&#xff1a;加粗&#xff0c;斜体&#x…

代替Spinnaker 的 POINTGREY工业级相机 FLIR相机 Python编程案例

SpinnakerSDK_FULL_4.0.0.116_x64 是一个用于FLIR相机的SDK&#xff0c;主要用于图像采集和处理。Spinnaker SDK主要提供C接口&#xff0c;无法直接应用在python环境。本文则基于Pycharm2019python3.7的环境下&#xff0c;调用opencv,EasySpin,PySpin,的库实现POINTGREY工业级相…