Spring Boot集成RabbitMQ快速入门Demo

ops/2025/1/16 2:56:19/

1.什么是RabbitMQ?

RabbitMQ是一款使用Erlang语言开发的,基于AMQP协议的消息中间件,作为一款优秀的消息系统,RabbitMQ有高并发、可扩展等优势,并适用于大型系统中各个模块之间的通信。

RabbitMQ的特点为:

  • 持久化、传输确认、发布确认等功能保证消息可靠

  • 支持多种消息分发模式,处理更加灵活

  • 提供可视化管理界面,使用方便

  • 支持集群部署,保证服务高可用

2.RabbitMQ环境搭建

version: '3'
services:rabbitmq:image: registry.cn-hangzhou.aliyuncs.com/zhengqing/rabbitmq:3.7.8-management        # 原镜像`rabbitmq:3.7.8-management` 【 注:该版本包含了web控制页面 】container_name: rabbitmq            # 容器名为'rabbitmq'hostname: my-rabbitrestart: unless-stopped                                       # 指定容器退出后的重启策略为始终重启,但是不考虑在Docker守护进程启动时就已经停止了的容器environment:                        # 设置环境变量,相当于docker run命令中的-eTZ: Asia/ShanghaiLANG: en_US.UTF-8RABBITMQ_DEFAULT_VHOST: my_vhost  # 主机名RABBITMQ_DEFAULT_USER: admin      # 登录账号RABBITMQ_DEFAULT_PASS: admin      # 登录密码volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录- "./rabbitmq/data:/var/lib/rabbitmq"ports:                              # 映射端口- "5672:5672"- "15672:15672"

运行

docker-compose -f docker-compose-rabbitmq.yml -p rabbitmq up -d

web管理端:http://127.0.0.1:15672 登录账号密码:admin/admin

27a7e34328fda7e39066d77e99a7d4a8.png

3.代码工程

实验目的:实现通过rabbitmq发送和接收消息

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rabbitmq</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
</project>

application.properties

server.port=8088#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vahost

config

简单使用

package com.et.rabbitmq.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {@Beanpublic Queue Queue() {return new Queue("hello");}
}

topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列 首先对topic规则配置,这里使用两个队列来测试

java">package com.et.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicRabbitConfig {public final static String TOPIC_ONE = "topic.one";public final static String TOPIC_TWO = "topic.two";public final static String TOPIC_EXCHANGE = "topicExchange";@Beanpublic Queue queue_one(){return new Queue(TOPIC_ONE);}@Beanpublic Queue queue_two(){return new Queue(TOPIC_TWO);}@BeanTopicExchange exchange(){return new TopicExchange(TOPIC_EXCHANGE);}@BeanBinding bindingExchangeOne(Queue queue_one, TopicExchange exchange){return BindingBuilder.bind(queue_one).to(exchange).with("topic.one");}@BeanBinding bindingExchangeTwo(Queue queue_two, TopicExchange exchange){//# 表示零个或多个词//* 表示一个词return BindingBuilder.bind(queue_two).to(exchange).with("topic.#");}}

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

package com.et.rabbitmq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutRabbitConfig {@Beanpublic Queue AMessage() {return new Queue("fanout.A");}@Beanpublic Queue BMessage() {return new Queue("fanout.B");}@Beanpublic Queue CMessage() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(AMessage).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(BMessage).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(CMessage).to(fanoutExchange);}}

receiver

package com.et.rabbitmq.receiver;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class HelloReceiver {@RabbitListener(queues = "hello")public void process(String hello) {System.out.println("Receiver  : " + hello);}@RabbitListener(queues = {"topic.one"})public void receiveTopic1(@Payload String fileBody) {log.info("topic1:" + fileBody);}@RabbitListener(queues = {"topic.two"})public void receiveTopic2(@Payload String fileBody) {log.info("topic2:" + fileBody);}@RabbitListener(queues = {"fanout.A"})public void fanoutA(@Payload String fileBody) {log.info("fanoutA:" + fileBody);}@RabbitListener(queues = {"fanout.B"})public void fanoutB(@Payload String fileBody) {log.info("fanoutB:" + fileBody);}@RabbitListener(queues = {"fanout.C"})public void fanoutC(@Payload String fileBody) {log.info("fanoutC:" + fileBody);}
}

sender

package com.et.rabbitmq.sender;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.Date;
@Component
public class HelloSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String context = "hello " + new Date();System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("hello", context);}}
package com.et.rabbitmq.sender;import com.et.rabbitmq.config.TopicRabbitConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;//两个消息接受者都可以收到public void send_one() {String context = "Hi, I am message one";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.one",context);}//只有TopicReceiverTwo都可以收到public void send_two() {String context = "Hi, I am message two";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend(TopicRabbitConfig.TOPIC_EXCHANGE,"topic.two",context);}}

DemoApplication.java

java">package com.et.quartz;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}
}

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • https://github.com/Harries/springboot-demo

4.测试

简单使用

java">@Test
public void hello() throws Exception {helloSender.send();Thread.sleep(50000);
}

Topic Exchange

java">@Test
public void topicOne() throws Exception {topicSender.send_one();Thread.sleep(50000);
}
@Test
public void topicTwo() throws Exception {topicSender.send_two();Thread.sleep(50000);
}

Fanout Exchange

java">@Test
public void sendFanout() throws InterruptedException {String context = "hi, fanout msg ";System.out.println("Sender : " + context);this.rabbitTemplate.convertAndSend("fanoutExchange","", context);Thread.sleep(50000);
}

5.参考连接

  • https://www.rabbitmq.com/

  • https://spring.io/projects/spring-amqp


http://www.ppmy.cn/ops/16447.html

相关文章

电子工艺卡在汽车制造流程中的应用

在当今高度发达的汽车工业中&#xff0c;电子工艺卡作为一种重要的工具&#xff0c;在汽车制造流程中发挥着至关重要的作用。它不仅是汽车生产的指导手册&#xff0c;更是确保汽车质量和性能的关键因素。 汽车制造是一个复杂而精密的过程&#xff0c;涉及众多的零部件和系统。电…

Django与mysqlclient链接不成功

先检查自己的python是什么版本&#xff0c;是64位还是32位&#xff0c;这个自己去网上查。 我的是32位的&#xff0c;因为直接pip下载不了&#xff0c;网上也没有32位的whl&#xff0c;所以卸载重装一个64位的3.9.6的python 网上直接搜mysqlclient&#xff0c;找到对应py39也…

【threejs教程7】threejs聚光灯、摄影机灯和汽车运动效果

【图片完整效果代码位于文章末】 在上一篇文章中我们实现了汽车模型的加载&#xff0c;这篇文章主要讲如何让汽车看起来像在运动。同时列出聚光灯和摄像机灯光的加载方法。 查看上一篇&#x1f449;【threejs教程6】threejs加载glb模型文件&#xff08;小米su7&#xff09;&…

51-45 Diffuser,生成柔性行为的扩散轨迹规划

22年11月&#xff0c;Berkeley 和MIT联合发布Planning with Diffusion for Flexible Behavior Synthesis&#xff0c;作者在文中把轨迹扩散概率模型称为Diffuser。 Diffusion Model 具有更强的建模复杂分布的能力&#xff0c;能够更好地建模表征数据的各种特性&#xff0c;但是…

关于DevOps理解和总结

DevOps是研发领域最近几年最热的一个概念。参加过一些讲座&#xff0c;也看过不少的书籍&#xff0c;经常听到以下说法&#xff1a; DevOps是没有明确定义的&#xff0c;一千个研发心中就有一千个Devops&#xff1b;DevOps是一种文化&#xff0c;每个团队的DevOps实践都不一样…

【ZYNQ】zynq启动模式及程序固化

一、前言 由于zynq含有arm cpu ,其启动模式由ps主导&#xff0c;与纯逻辑的fpga不相同&#xff0c;此处做一个记录。 二、zynq启动模式 关于zynq的启动模式详细内容可以参考官方文档&#xff1a;ug585-Zynq 7000 SoC Technical Reference Manual&#xff0c;第六章。 2.1 启…

AWS制作WordPress在国内外的利弊?

AWS作为全球领先的云计算服务供应商&#xff0c;为WordPress提供了强大且灵活的托管环境&#xff0c;使用AWS来搭建和运行WordPress无疑是个不错的选择。即便如此使用AWS制作还是会有些许利弊&#xff0c;九河云作为AWS的合作伙伴来为读者们仔细探讨AWS在WordPress的利弊。 利&…

python_django农产品物流信息服务系统6m344

Python 中存在众多的 Web 开发框架&#xff1a;Flask、Django、Tornado、Webpy、Web2py、Bottle、Pyramid、Zope2 等。近几年较为流行的&#xff0c;大概也就是 Flask 和 Django 了 Flask 是一个轻量级的 Web 框架&#xff0c;使用 Python 语言编写&#xff0c;较其他同类型框…