RabbitMQ深度探索:SpringBoot 整合 RabbitMQ

server/2025/2/6 10:21:58/
  1. 需创建复合项目
  2. 父工程 Maven 依赖:
    <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.0.RELEASE</version></parent><!--  父工程要打成 pom 包--><groupId>com.qcby</groupId><artifactId>springboot-rabbitmq</artifactId><packaging>pom</packaging><modules><module>producer</module><module>sms-consumer</module></modules><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><!-- springboot-web组件 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- 添加springboot对amqp的支持 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId></dependency><!--fastjson --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.49</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
  3. 生产者工程代码:
    1. 配置类:
      package com.qcby.config;import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.FanoutExchange;
      import org.springframework.amqp.core.Queue;
      import org.springframework.context.annotation.Bean;
      import org.springframework.stereotype.Component;@Component
      public class RabbitMQConfig {//定义交换机private String EXCHANGE_SPRINGBOOT_NAME = "boyatop_ex";//定义短信队列private String FANOUT_SMS_QUEUE = "fanout_sms_queue";//定义邮件队列private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";//配置邮件队列@Beanpublic Queue emailQueue(){return new Queue(FANOUT_EMAIL_QUEUE);}//配置消息队列@Beanpublic Queue smsQueue(){return new Queue(FANOUT_SMS_QUEUE);}//配置交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);}//将队列绑定交换机@Beanpublic Binding bindingSmsFanoutExchange(Queue smsQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(smsQueue).to(fanoutExchange);}@Beanpublic Binding bindingEmailFanoutExchange(Queue emailQueue,FanoutExchange fanoutExchange){return BindingBuilder.bind(emailQueue).to(fanoutExchange);}
      }
    2. 消息实体类:
      package com.qcby.entity;import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;import java.io.Serializable;@Data
      @AllArgsConstructor
      @NoArgsConstructor
      public class MsgEntity implements Serializable {private String MsgId;private String UserId;private String phone;private String email;
      }
    3. 生产者代码:
      package com.qcby.controller;import com.qcby.entity.MsgEntity;
      import org.springframework.amqp.core.AmqpTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;import java.util.UUID;@RestController
      public class FanoutProducer {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 发送消息** @return*/@RequestMapping("/sendMsg")public String sendMsg() {/*** 1.交换机名称* 2.路由key名称* 3.发送内容*/MsgEntity msgEntity = new MsgEntity(UUID.randomUUID().toString(),"22","12345","edddd");amqpTemplate.convertAndSend("boyatop_ex", "", msgEntity);return "success";}
      }
    4. yml 文件:
      spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: boyatopVirtualHost
  4. 消费者工程代码:
    1. 实体类:
      package com.qcby.entity;import lombok.AllArgsConstructor;
      import lombok.Data;
      import lombok.NoArgsConstructor;import java.io.Serializable;@Data
      @AllArgsConstructor
      @NoArgsConstructor
      public class MsgEntity implements Serializable {private String MsgId;private String UserId;private String phone;private String email;
      }
    2. 监听队列:
      package com.qcby.controller;import com.qcby.entity.MsgEntity;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.amqp.rabbit.annotation.RabbitHandler;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;@Slf4j
      @Component
      @RabbitListener(queues = "fanout_sms_queue")
      public class smsController {@RabbitHandlerpublic void listen(MsgEntity msgEntity){log.info("email:" + msgEntity);System.out.println(msgEntity);}
      }
    3. yml 文件:
      spring:rabbitmq:####连接地址host: 127.0.0.1####端口号port: 5672####账号username: guest####密码password: guest### 地址virtual-host: boyatopVirtualHost
      server:port: 8081

生产者如何获取消费结果:

  1. 根据业务来定:
    1. 消费者消费成功结果:能够在数据库中插入一条数据
  2. Roketmq 自带全局消息 id,能够根据该全局消息获取消费结果
    1. 异步返回一个全局 id,前端使用 ajax 定时主动查询
    2. 在roketmq 中,自带根据消息 id 查询是否消费成功
  3. 原理:
    1. 生产者投递消息 MQ  服务器,NQ 服务器端在这时候返回一个全局消息 id ,当消费者消费该消息之后,消费者会给我们 MQ 服务器发送通知,标识该消息消费成功
    2. 生产者获取到该消息全局 id,每隔 2s 时间调用 MQ 服务器接口查询是否有被消费成功

http://www.ppmy.cn/server/165384.html

相关文章

【鸿蒙HarmonyOS Next实战开发】Web组件H5界面与原生交互-抽奖页面

想必很多人都经历过这样的情况&#xff1a;当我们点击某个应用的页面时&#xff0c;往往会跳转到一个类似于浏览器加载的页面&#xff0c;只有等到加载完成之后&#xff0c;才会呈现出该页面的具体内容。通常情况下&#xff0c;加载和显示网页的任务都是由浏览器来完成的。 而A…

ZZNUOJ(C/C++)基础练习1061——1070(详解版)

目录 1061 : 顺序输出各位数字 C语言版 C版 1062 : 最大公约数 C C 1063 : 最大公约与最小公倍 C C 1064 : 加密字符 C C 1065 : 统计数字字符的个数 C C 1066 : 字符分类统计 C C 1067 : 有问题的里程表 C C 1068 : 进制转换 C C C&#xff08;容器stack…

大模型RAG优化方案_融合bm25和语义检索

1. 写在前面 检索增强生成 (Retrieval-Augmented Generation, RAG) 是一种将检索 (Retrieval) 和生成 (Generation) 相结合的技术,它利用检索到的相关信息来增强大型语言模型 (LLM) 的生成能力。RAG 系统通常包含两个关键组件: 检索器 (Retriever):从知识库中检索与输入查询…

FPGA 时钟多路复用

时钟多路复用 您可以使用并行和级联 BUFGCTRL 的组合构建时钟多路复用器。布局器基于时钟缓存 site 位置可用性查找最佳布局。 如果可能&#xff0c;布局器将 BUFGCTRL 布局在相邻 site 位置中以利用专用级联路径。如无法实现&#xff0c;则布局器将尝试将 BUFGCTRL 从…

C++ Primer 算术运算符

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…

分析用户请求K8S里ingress-nginx提供的ingress流量路径

前言 本文是个人的小小见解&#xff0c;欢迎大佬指出我文章的问题&#xff0c;一起讨论进步~ 我个人的疑问点 进入的流量是如何自动判断进入iptables的四表&#xff1f;k8s nodeport模式的原理&#xff1f; 一 本机环境介绍 节点名节点IPK8S版本CNI插件Master192.168.44.1…

Visual Studio Code应用本地部署的deepseek

1.打开Visual Studio Code&#xff0c;在插件中搜索continue&#xff0c;安装插件。 2.添加新的大语言模型&#xff0c;我们选择ollama. 3.直接点connect&#xff0c;会链接本地下载好的deepseek模型。 参看上篇文章&#xff1a;deepseek本地部署-CSDN博客 4.输入需求生成可用…

从DTFT到DFT:数字信号处理中的关键过渡

摘要 在数字信号处理领域&#xff0c;从离散时间傅里叶变换&#xff08;DTFT&#xff09;过渡到离散傅里叶变换&#xff08;DFT&#xff09;是一个至关重要的发展阶段。本文将深入浅出地阐述这一过渡过程&#xff0c;详细解释为什么需要用DFT来表示实际的信号。首先介绍DTFT的…