【RabbitMQ】rabbitmq广播模式的使用

server/2025/1/19 5:08:38/

前言:

        项目需要同步另一个系统的数据,对方系统采用MQ的发布/订阅模式方便我们同步数据,即当对方系统中的某条数据修改后,会向绑定他们交换机的每一个队列发布消息。消费者(即我们)监听到消息变动,进行信息消费同步至我们库中。

我们需要做的就是:

        1、创建一个新队列绑定到对方系统的交换机

        2、将监听到的消息进行合理解析,取出消息中的请求头:

              请求头信息为:"R"  ,则代表该生为入学操作;

              请求头信息为:"X"  ,则代表该生为休学操作;

              请求头信息为:"T"  ,则代表该生为退学操作;

        3、接下来根据获取到的请求头内容,来对对方系统传来的数据进行对应操作。

上代码,看思路:

    实现1:

java">/*** @Author: 宁兴星* @CreateTime: 2026-01-16  14:05* @Description: TODO*/
@Configuration
public class RabbitMqConfig extends AbstractRabbitMQConfig {/*** 创建广播模式交换机(扇形)*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(EventConstant.STUDENT_EXCHANGE, true, false);}/*** 创建被监听的队列*/@Beanpublic Queue dealerInfoQueue() {return new Queue(EventConstant.STUDENT_QUEUE, true, false, false);}/*** 将队列绑定到扇形交换机上,实现广播模式消息接收** @param dealerInfoQueue* @param fanoutExchange* @return*/@Beanpublic Binding binding(Queue dealerInfoQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(dealerInfoQueue).to(fanoutExchange);}/*** 配置消息监听容器工厂** @param connectionFactory* @return*/@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setPrefetchCount(10);return factory;}}

实现2:

       

java">    /*** MQ监听学生数据变更** @param message 消息体* @param deliveryTag 消息标识* @param channel 通道* @throws IOException IO异常*/@RabbitListener(queues = EventConstant.STUDENT_QUEUE)@Operation(summary = "MQ监听学生数据变更", description = "MQ监听学生数据变更")public void handleMessage(Message message,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,Channel channel) throws IOException {try {log.info("同步学生数据,接收到MQ消息: {}", message);// 解析学生信息StudentInfo studentInfo = parseStudentInfo(message);log.info("解析后的学生数据: {}", studentInfo);// 获取操作类型并处理String action = getActionFromHeaders(message);processStudentAction(action, studentInfo);// 确认消息处理完成channel.basicAck(deliveryTag, false);} catch (Exception e) {log.error("处理学生数据消息异常: ", e);// 消息处理失败,重新入队channel.basicNack(deliveryTag, false, true);}}/*** 解析消息中的学生信息*/private StudentInfo parseStudentInfo(Message message) throws IOException {ObjectMapper objectMapper = new ObjectMapper();return objectMapper.readValue(message.getBody(), StudentInfo.class);}/*** 从消息头获取action*/private String getActionFromHeaders(Message message) {Map<String, Object> headers = message.getMessageProperties().getHeaders();String action = headers.get("action").toString();log.info("操作类型: {}", action);return action;}

实现3:

java">     /*** 根据不同action处理学生数据*/private void processStudentAction(String action, StudentInfo studentInfo) {if (action == null) {return;}switch (action) {case EventConstant.LIGHT_UP:// 编写对应录取方法,此处省略具体信息handleLightUp(studentInfo);break;case EventConstant.OFFLINE:// 编写对应休学方法,此处省略具体信息handleOffline(studentInfo);break;case EventConstant.DELETE:// 编写对应退学方法,此处省略具体信息handleDelete(studentInfo);break;default:log.warn("未知的操作类型: {}", action);}}

结束啦,如有错误,敬请雅正!


http://www.ppmy.cn/server/159547.html

相关文章

机器学习与深度学习在气象中的应用

专题一 Python和科学计算基础1.1 Python入门和安装 1.1.1 Python背景及其在气象中的应用 1.1.2 Anaconda解释和安装以及Jupyter配置 1.1.3 Python基础语法 1.2 科学数据处理基础库 1.2.1 Numpy库 1.2.2 Pandas库 1.2.3 Scipy库 1.2.4 Matplotlib和Cartopy库 1.2.5 常用数据格式…

cmake + vscode + mingw 开发环境配置

1.软件准备 准备如下软件&#xff1a; mingw64&#xff08;安装完成之后检测是否有环境变量&#xff0c;如果没有需要配置&#xff09; cmake&#xff08;安装完成之后检测是否有环境变量&#xff0c;如果没有需要配置&#xff09; vscode&#xff08;安装CMake插件&#xff0…

深度剖析RabbitMQ:从基础组件到管理页面详解

文章目录 一、简介二、Overview2.1 Overview->Totals2.2 Overview->Nodesbroker的属性2.3 Overview->Churn statistics2.4 Overview->Ports and contexts2.5 Overview->Export definitions2.6 Overview->Import definitions 三、Connections连接的属性 四、C…

如何在亚马逊云科技上消除无服务器网页应用冷启动时间(下篇)

背景 我们在云端搭建无服务器&#xff08;serverless&#xff09;开发架构时&#xff0c;经常会被冷启动&#xff08;cold start&#xff09;带来的应用延迟所困扰。冷启动是指当无服务器资源在一段时间内未被调用&#xff0c;或需要扩展以处理新请求时&#xff0c;系统需要初…

阀井可燃气体监测仪,开启地下管网安全新篇章-旭华智能

在城市的脉络中&#xff0c;地下管网犹如隐秘的动脉&#xff0c;支撑着现代生活的运转。而在这庞大网络的关键节点上&#xff0c;阀井扮演着不可或缺的角色。然而&#xff0c;由于其密闭性和复杂性&#xff0c;阀井内部一旦发生可燃气体泄漏&#xff0c;将对公共安全构成严重威…

C 语言的void*到底是什么?

一、void* 的类型任意性 void* 是一种通用指针类型。它可以指向任意类型的数据。例如&#xff0c;它可以指向一个整数&#xff08;int&#xff09;、一个浮点数&#xff08;float&#xff09;、一个字符&#xff08;char&#xff09;或者一个结构体等。在C语言中&#xff0c;当…

求两个矩阵的乘积

求两个矩阵的乘积 分数 15 全屏浏览 切换布局 作者 C课程组-hwr-zy 单位 浙江大学 输入三个正整数m&#xff0c;l&#xff0c;n(0<m&#xff0c;n&#xff0c;l<10)&#xff0c;再输入两个的矩阵a&#xff08;mxl&#xff09;和b&#xff08;lxn&#xff09;。要求把a和…

CES 2025:XEO 展出双眼8K VR头显,售价2000美元

在2025年国际消费类电子产品展览会(CES)上,XEO公司凭借其最新的VR头显设备吸引了众多目光。这款标榜双眼8K分辨率的高端VR头显不仅展示了当前虚拟现实技术的顶尖水平,同时也设定了新的行业标杆。本文将详细介绍这款产品的特点、技术和市场定位。 XEO 双眼8K VR头显:极致视…