【RabbitMQ】rabbitmq在spring boot中的使用

news/2025/3/14 16:52:30/

rabbitmq官网地址:https://www.rabbitmq.com/tutorials

下面介绍rabbitmq官网中七种使用方式在spring boot中如何使用

下面是基于 Spring Boot 使用 RabbitMQ 实现这七种模式的示例代码。假设已经引入了以下依赖:

Maven 依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 

1. Hello World! 模式

生产者和消费者直接发送和接收消息。

配置类

@Configuration
public class RabbitConfig {public static final String QUEUE_NAME = "hello";@Beanpublic Queue helloQueue() {return new Queue(QUEUE_NAME);}
}

生产者

@RestController
@RequestMapping("/hello")
public class HelloProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send")public String send() {rabbitTemplate.convertAndSend(RabbitConfig.QUEUE_NAME, "Hello RabbitMQ!");return "Message sent!";}
}

消费者

@Component
public class HelloConsumer {@RabbitListener(queues = RabbitConfig.QUEUE_NAME)public void receive(String message) {System.out.println("Received: " + message);}
}

2. Work Queues(工作队列)

多个消费者从同一个队列中获取任务,进行任务分发。

配置类

@Configuration
public class WorkQueueConfig {public static final String WORK_QUEUE = "work_queue";@Beanpublic Queue workQueue() {return new Queue(WORK_QUEUE);}
}

生产者

@RestController
@RequestMapping("/work")
public class WorkQueueProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send/{msg}")public String send(@PathVariable String msg) {rabbitTemplate.convertAndSend(WorkQueueConfig.WORK_QUEUE, msg);return "Work message sent!";}
}

消费者(多个)

@Component
public class WorkConsumer {@RabbitListener(queues = WorkQueueConfig.WORK_QUEUE)public void receive(String message) throws InterruptedException {System.out.println("Worker received: " + message);Thread.sleep(1000);  // 模拟任务耗时}
}

3. Publish/Subscribe(发布/订阅)

使用 Fanout Exchange 实现广播。

配置类

@Configuration
public class FanoutConfig {public static final String FANOUT_EXCHANGE = "fanout_exchange";@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE);}@Beanpublic Queue queue1() {return new Queue("fanout.queue1");}@Beanpublic Queue queue2() {return new Queue("fanout.queue2");}@Beanpublic Binding binding1() {return BindingBuilder.bind(queue1()).to(fanoutExchange());}@Beanpublic Binding binding2() {return BindingBuilder.bind(queue2()).to(fanoutExchange());}
}

生产者

@RestController
@RequestMapping("/fanout")
public class FanoutProducer {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send")public String send() {rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, "", "Fanout message!");return "Fanout message sent!";}
}

4. Routing(路由模式)

使用 Direct Exchange 和路由键实现定向投递。

配置类

@Configuration
public class DirectConfig {public static final String DIRECT_EXCHANGE = "direct_exchange";@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE);}@Beanpublic Queue errorQueue() {return new Queue("direct.error");}@Beanpublic Queue infoQueue() {return new Queue("direct.info");}@Beanpublic Binding errorBinding() {return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");}@Beanpublic Binding infoBinding() {return BindingBuilder.bind(infoQueue()).to(directExchange()).with("info");}
}

5. Topics(主题模式)

使用 Topic Exchange 实现多级路由。

配置类

@Configuration
public class TopicConfig {public static final String TOPIC_EXCHANGE = "topic_exchange";@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE);}@Beanpublic Queue topicQueue1() {return new Queue("topic.queue1");}@Beanpublic Queue topicQueue2() {return new Queue("topic.queue2");}@Beanpublic Binding binding1() {return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("*.error");}@Beanpublic Binding binding2() {return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("log.#");}
}

6. RPC(远程过程调用)

生产者发送请求,消费者处理后返回响应。

RPC 服务端

@Component
public class RpcServer {@RabbitListener(queues = "rpc_queue")public String process(String message) {return "Processed: " + message;}
}

RPC 客户端

@RestController
@RequestMapping("/rpc")
public class RpcClient {@Autowiredprivate AmqpTemplate rabbitTemplate;@GetMapping("/send/{msg}")public String send(@PathVariable String msg) {Object response = rabbitTemplate.convertSendAndReceive("rpc_queue", msg);return "RPC response: " + response;}
}

7. Publisher Confirms(发布者确认)

确保消息成功发送到 RabbitMQ 服务器。

配置类

@Configuration
public class ConfirmConfig {@Beanpublic CachingConnectionFactory connectionFactory() {CachingConnectionFactory factory = new CachingConnectionFactory("localhost");factory.setPublisherConfirms(true);return factory;}@Beanpublic RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message confirmed");} else {System.err.println("Message failed: " + cause);}});return template;}
}


http://www.ppmy.cn/news/1579105.html

相关文章

Centos离线安装openssl-devel

文章目录 Centos离线安装openssl-devel1. openssl-devel是什么&#xff1f;2. openssl-devel下载地址3. openssl-devel安装4. 安装结果验证 Centos离线安装openssl-devel 1. openssl-devel是什么&#xff1f; openssl-devel 是 Linux 系统中与 OpenSSL 加密库相关的开发包&…

【最佳实践】Go 责任链模式实现参数校验

这里我们使用责任链模式来创建一个参数校验的示例。在这个示例中&#xff0c;我们将实现一个简单的责任链来校验不同的参数条件。这种模式允许我们将多个校验步骤串联在一起&#xff0c;以便可以在不同的条件下进行灵活的校验。 设计思路 接口定义 (Validator) 目的&#xff1…

React hook钩子性能优化Hooks的面试常考题目

根据,提到了常用的Hooks有useState、useEffect、useContext、useReducer、useCallback、useMemo、useRef,还有其他如useLayoutEffect和useImperativeHandle。和也提到了类似的Hooks,并且强调了useEffect的重要性。详细解释了useState、useEffect、useMemo和useCallback的区别…

计算机视觉领域开源数据集资源整理

1. 目标检测数据集 1.1 COCO2017 数据集 COCO2017 是 2017 年发布的 COCO 数据集的一个版本&#xff0c;主要用于 COCO 在 2017 年后持有的物体检测任务、关键点检测任务和全景分割任务。 1.2 火焰和烟雾图像数据集 数据集链接&#xff1a;http://m6z.cn/6fzn0f 该数据集由…

UE5 RVT 制作场景交互 - 遮罩

RVT可以通过物体制作场景的RVT的贴图遮罩绘制 首先放一个Runtime Virtual Texture Volume在场景里面 设置一个合理的大小 创建一个RVT 这里有你想要的存储的通道和贴图精度 将才创建的RVT放到Runtime Virtual Texture Volume上去 现在放一个平面到Runtime Virtual Texture Volu…

c++之STL库

STL 基本概念一.容器&#xff08;Containers&#xff09;1.序列容器&#xff08;Sequence Containers&#xff09;&#xff1a;2.关联容器&#xff08;Associative Containers&#xff09;&#xff1a;3.无序容器&#xff08;Unordered Containers&#xff09;&#xff1a;4.迭…

LGA封装 Z3588开发板,8K视频编解码

Z3588 是基于瑞芯微 RK3588 CPU 研发开发板&#xff0c;RK3588 是瑞芯微推出的新一代旗舰级高端处理器&#xff0c;采用 8nm 工艺设计&#xff0c;搭载四核 A76 四核 A55 的八核 CPU 和 Arm 高性能 GPU&#xff0c;内置 6T 算力的 NPU。 LGA&#xff08;Land Grid Array&#x…

【Python】PyQt5在PyCharm的配置与应用

一、安装pycharm与python版本 Download PyCharm: The Python IDE for data science and web development by JetBrains Professional&#xff1a;专业版&#xff0c;收费&#xff0c;功能齐全 Community&#xff1a;社区版&#xff0c;免费&#xff0c;功能阉割 二、升级pip与…