Idea+maven+springboot项目搭建系列--2 整合Rabbitmq完成客户端服务器端消息收发

news/2025/1/10 21:53:52/

前言:本文通过springBoot -maven 框架,对Rabbitmq 进行整合,完成客户端消息的发送和消费;

1 为什么要使用Rabbitmq:

RabbitMQ 是一个可靠的、灵活的、开源的消息中间件,具有以下优点:

  • 异步通信:RabbitMQ 支持异步通信,使得消息发送者和接收者能够异步处理,提高了系统性能和吞吐量。

  • 解耦合:RabbitMQ 的消息队列机制可以将发送者和接收者解耦合,减少了应用程序之间的耦合度。

  • 可靠性高:RabbitMQ 支持事务和持久化,能够确保消息不会丢失。

  • 高吞吐量:RabbitMQ 支持多种吞吐量调优方法,能够处理高并发的消息通讯。

  • 可扩展性:RabbitMQ 支持集群和分布式部署,可以扩展到大规模的消息通讯场景。

RabbitMQ 提供了易用、高效、灵活、可靠的消息传递机制,可以帮助开发者更快地构建系统并实现各种复杂的业务场景。

2 springboot 整合:

2.1 pom 引入依赖:

 <!-- rabbitmq 自动装配 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 提供web访问 默认端口8080 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency><!-- loomback 用于生成get set 方法 -->
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope>
</dependency>
<!-- 阿里的json 数据转换 -->
<dependency><groupId>com.alibaba.fastjson2</groupId><artifactId>fastjson2</artifactId><version>2.0.31</version>
</dependency>

2.2 连接参数配置:
2.2.1 基础配置:
基础配置后springboot 的自动装载机制会注册一个RabbitTemplate rabbitTemplate 对象用于消息的接收和发送;

############# 基础配置
# mq 服务器的地址
spring.rabbitmq.host=localhost
# mq 服务器的端口
spring.rabbitmq.port=5672
# mq 服务器的连接使用的用户名
spring.rabbitmq.username=admin
# mq 服务器的连接使用的密码
spring.rabbitmq.password=rabbitmq
# mq 服务器的连接使用的虚拟机
spring.rabbitmq.virtual-host=my_vhost

注意: 其中 spring.rabbitmq.virtual-host 为隔离的虚拟机,需要根据自己业务进行配置,如果rabbitmq 有web 端可以在web端创建需要的v_host:
在这里插入图片描述
2.2.2 可扩展的连接参数配置:

############# 连接和管道配置
# When the cache mode is 'CHANNEL', the connection cache size cannot be configured.
# spring.rabbitmq.cache.connection.mode 为connection 生效 ,connection 连接池的大小
#spring.rabbitmq.cache.connection.size=3
# 与broker 连接的 模式 channel 或者 connection 默认channel 
spring.rabbitmq.cache.connection.mode=channel
# 与broker 连接的默认时间,默认为 60000即 60 秒,超时会会中断并抛出异常,单位毫秒
spring.rabbitmq.connection-timeout=1000
# 每个连接中可以建立的channel 数量,默认值25
spring.rabbitmq.cache.channel.size=50
# 如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel
# 默认值为 -1,表示不限制等待时间,即一直等待直到获取到可用的 Channel,单位毫秒
spring.rabbitmq.cache.channel.checkout-timeout=2000
# 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.requested-heartbeat=60
# 客户端总共可以创建总的channel 数量
spring.rabbitmq.requested-channel-max=1024

默认与rabbitmq 的连接为channel,多个channel 公用一个connection , 每个线程都从缓存池中获取channel ,每个线程中持有的channel 是互相隔离的;

2.3 生产者发送消息:
生产者发送消息主要是通过 引入 RabbitTemplate 模版对象来完成;这里按照发送消息发送的场景分别进行介绍:

2.3.1 交换机和队列的绑定:
因为消息最开始是要发送到交换机上的,然后在通过交换机通过routkey 路由键到匹配的队列中;所以我们需要先在项目中使用的
virtual-host 中去分别创建交换机和队列,然后进行绑定;一帮情况下,我们应该向运维去申请自己的虚拟机,交换机,队列,然后通过后,项目中直接使用即可;当然通过代码也完全可以进行交换机和队列的创建和绑定,这里我们通过web 页面来进行处理:

2.3.1.1交换机的创建:
在这里插入图片描述

  • Virtual host : 对应隔离的虚拟机,所以需要选择项目中 通过spring.rabbitmq.virtual-host 参数连接的虚拟机;

  • Name: 虚拟机的名称,见名知意即可;

  • Type: 虚拟机的类型:比较常用的有直连 direct; 主题topic,广播fanout;
    在这里插入图片描述
    这里对交换机的类型进行简单的介绍:

  • 直连direct的交换机,交换机直接与队列完成绑定,通过发送消息是携带的Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中:

  • Direct Exchange
    Direct Exchange 是最简单的交换机类型,交换机直接与队列完成绑定,它根据消息携带的 Routing Key 和队列与 Exchange 绑定时指定的 Routing Key 精准匹配,然后路由消息到指定队列中。 Direct Exchange 可以理解为一张路由表,交换机通过 Routing Key 在路由表中查找匹配队列,将消息从生产者处发送到匹配队列。

  • Topic Exchange
    Topic Exchange 根据 Routing Key 的匹配规则将消息路由到对应的队列中。Topic Exchange 支持两种匹配规则:* 代表通配符,表示可以匹配一个单词,# 代表通配符,表示可以匹配多个单词。例如,Routing Key 为 com.XXX.# 的消息会被路由到匹配 com.XXX. 开头的所有队列中,Routing Key 为 # ,会匹配到所有的消息;列如 user.* 匹配 user. 后跟一个单词的消息,可以匹配到user.a 但是匹配不到user.a.b 。

  • Fanout Exchange
    Fanout Exchange 会将消息路由到所有绑定到它上面的队列中。Fanout Exchange 的路由方式与路由表无关,会忽略 Routing Key,与 Direct Exchange 和 Topic Exchange 相比,它具有更高的传输效率和更低的消耗。

  • Headers Exchange
    Headers Exchange 根据消息头中的键值对匹配规则将消息路由到对应的队列中。Headers Exchange 的匹配规则相对较复杂,需要在绑定时指定键值对的匹配方式。

  • Durability : 交换机是否持久化到磁盘的属性值设置

  • 如果将 Durability 属性设置为 durable ,表示交换器会被持久化到磁盘上,即使 RabbitMQ 服务器在交换机定义被创建之后终止,交换机定义仍然能够在服务器重新启动时得到恢复,从而保证交换机在重启后仍然存在。

  • 如果将 Durability 属性设置为 transient ,表示交换器不会被持久化到磁盘上,如果 RabbitMQ 服务器重启,则该交换器定义将会丢失。

  • Auto delete 用于指定该交换机是否自动删除。当一个交换机关联的所有队列都被删除时,如果交换机的 Auto Delete 属性为 true,则该交换机也会被自动删除

  • Internal 是否为内部交换机:
    内部交换机的 internal 属性设置为 true,使其只能被通过 AMQP 协议连接到相同 Virtual Host 的客户端使用,不能被直连类型的 Exchange 或 Headers 类型的 Exchange 所使用。
    内部交换机只能用于消费者和生产者在同一个 RabbitMQ 实例中的场景,而不能用于服务器和客户端之间传递消息。
    内部交换机主要用于应用程序之间传递消息,而不是用于服务器和客户端之间传递消息。

  • Arguments:交换机的额外属性,比较常用的属性如alternate-exchange:指定备用交换机。如果一条消息无法被路由到任何队列中,那么它将被发送到备用交换机中;

一般我们创建交换机时只需要选择Virtual host:,填入交换机的名称,选择交互机的类型这3项,其它都默认即可:
在这里插入图片描述

2.3.1.2 队列的创建:
在这里插入图片描述- type 队列的类型:
在 RabbitMQ 中,队列的 type 参数共有三种,分别是 classic、quorum 和 stream。它们的区别可以简单概括如下:
classic 队列:
最早的、经典的队列类型,支持多个消费者竞争消费消息,但是在节点宕机时可能会出现消息丢失的情况。适用于简单的消息处理场景。

quorum 队列:
支持高可用性、多个消费者竞争消费的队列类型。它通过复制机制保证消息的可靠性,可以在节点宕机时自动进行故障转移,避免消息丢失。适用于需要高可用特性的分布式环境中使用,但相对来说,quorum 队列性能较 classic 队列有所下降。

stream 队列:
支持无限缓存的消息流队列,可以通过队列中的缓存来处理各种等待中的问题。传统队列中当消息进入队列时,它就被立即写入了内存中,并等待处理。这样做的问题是,当生产者不断地发送消息时,很容易将内存撑满。 stream 队列则允许队列的缓存区域随着时间和队列大小的增长而扩展,使得待处理的消息可以在缓存区域中有所体现。适用于需要处理海量时间序列数据的场景。

需要注意的是,stream 队列是从 RabbitMQ 3.8 开始引入的新类型,目前还不是很成熟,可能在稳定性和性能方面还需要更多的优化和改进。因此,在选择队列类型时,需要结合具体的业务情况和系统限制,选择采用 classic、quorum 还是 stream 队列,以达到最优的性能和可用性。

  • Name 队列的名称;
  • Durability 队列是否持久化,参数意义同交换机;
  • Auto delete:
    在 RabbitMQ 中,队列的 auto-delete 参数用于控制队列的自动删除行为。如果将 auto-delete 参数设置为 true,则在最后一个消费者断开连接时,队列会自动被删除。
  • Arguments 队列参数的额外选择;

通常创建队列时只需要选择Virtual host,填入队列的名称,其它项默认即可:
在这里插入图片描述

2.3.1.3 交换机和队列的绑定:完成交换机和队列关系的绑定
在这里插入图片描述
2.3.2 发送消息:
2.3.2.2 生产者参数的配置:

########## 生产者配置
spring.rabbitmq.template.exchange=my_exchange
# 启用消息投递结果确认
spring.rabbitmq.publisher-returns=true
# 启用强制消息投递,即生产者发送消息成功或者失败,需要返回确认消息
spring.rabbitmq.template.mandatory=true
# 消息发布者确认模式
spring.rabbitmq.publisher-confirm-type=correlated# 发送重试是否可用
spring.rabbitmq.template.retry.enabled= true
# 最大重试次数,默认值为 3
spring.rabbitmq.template.retry.max-attempts=3
# 第一次和第二次尝试发布或传递消息之间的间隔,默认值为 1000 毫秒
spring.rabbitmq.template.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.template.retry.multiplier=1 
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.template.retry.max-interval= 1000

2.3.2.3 使用RabbitTemplate 模版发送单条消息,发送多条消息,发送延迟消息,使用自定义的RabbitTemplate 发送事务消息:
1) 定义一个类来封装我们要发送的消息结构:

package com.example.rabbitmqdemo.rabbitmq.msgDto;import lombok.AllArgsConstructor;
import lombok.Data;import java.io.Serializable;@Data
@AllArgsConstructor
public class MsgDto implements Serializable {// 消息类型private String msgType;// 消息体private Object body;
}

2) 对RabbitTemplate 模版对象配置消息确认:
如果消息投递失败,我们需要对此类消息进行记录,方便后续进行数据补偿;

package com.example.rabbitmqdemo.rabbitmq.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Slf4j
@Component("rabbitMqCustomerConfig")
public class BatchConfig {@Value("${env:prod}")private String env;@AutowiredSimpleRabbitListenerContainerFactory containerFactory;@AutowiredRabbitTemplate rabbitTemplate;@PostConstructpublic void simpleListenerBatchInit() {log.info("设置批量-----");containerFactory.setBatchListener(true);if ("prod".equals(env)) {// 依照不同的环境进行开启containerFactory.setAutoStartup(true);}// 设置 ConfirmCallback 回调函数 确认消息是否成功发送到 ExchangrabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {if (null == correlationData) {// 延迟消息 correlationData 为nullreturn;}log.debug("Message sent successfully:{} ", correlationData.getId());} else {if (null == correlationData && null == cause) {// 延迟消息 correlationData 为nullreturn;}log.error("Message sent failed: {}", correlationData.getId() + ", cause: " + cause);}});// ReturnCallback  处理的是未路由的消息返回的情况rabbitTemplate.setReturnCallback((oneMessage, replyCode, replyText, exchange, routingKey) -> {// 判断是否是延迟消息if (routingKey.indexOf("delay") != -1) {// 是一个延迟消息,忽略这个错误提示return;}log.debug("Message returned: {}", new String(oneMessage.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);});}}

3) 因为发送事务需要关闭消息的确认,所以这里重新定义一个RabbitTemplate 模版用来发送事务消息:

package com.example.rabbitmqdemo.rabbitmq.config;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TxRabbitTemplate {@Value("${spring.rabbitmq.host}")private String host;@Value("${spring.rabbitmq.username}")private String username;@Value("${spring.rabbitmq.password}")private String password;@Value("${spring.rabbitmq.port}")private String port;@Value("${spring.rabbitmq.virtual-host}")private String virtualHost;@Bean(value = "txRabbitTemplat")public RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());rabbitTemplate.setChannelTransacted(true);return rabbitTemplate;}private ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setUsername(username);connectionFactory.setPassword(password);connectionFactory.setVirtualHost(virtualHost);connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);connectionFactory.setChannelCacheSize(10);// 关闭消息的ack 确认connectionFactory.setPublisherConfirms(false);connectionFactory.setPublisherReturns(false);return connectionFactory;}
}

4)使用自动装配的RabbitTemplate 模版来进行 消息发送 :

package com.example.rabbitmqdemo.rabbitmq.producer;import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.nio.charset.Charset;
import java.util.UUID;@Slf4j
@Component
public class MessageProducer {// 这里可以指定一个默认发送使用的交换机@Value("${amqp-binding.exchange-name:my_exchange}")private String exchangeName;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowired@Qualifier("txRabbitTemplat")private RabbitTemplate txRabbitTemplate;/*** 指定的routKey 发送信息** @param message*/public void sendMessage(String routKey, Object message) {this.sendMessage(exchangeName, routKey, JSONObject.toJSONString(message));}/*** 通过交换机,路由key 发送消息** @param exchangeName* @param routKey* @param message*/public void sendMessage(String exchangeName, String routKey, Object message) {// 设置消息的唯一标识符long deliveryTag = System.currentTimeMillis();rabbitTemplate.convertAndSend(exchangeName, routKey, message, messagePostProcessor -> {messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + deliveryTag));return messagePostProcessor;}, new CorrelationData(UUID.randomUUID().toString()));}/*** 指定的routKey 发送批量信息** @param messages*/public void sendMessageBatch(String routKey, MsgDto messages) {this.sendMessageBatch(exchangeName, routKey, JSONObject.toJSONString(messages));}/*** 通过交换机,路由key 发送批量信息** @param exchangeName* @param routKey* @param messages*/public void sendMessageBatch(String exchangeName, String routKey, Object messages) {rabbitTemplate.convertSendAndReceive(exchangeName, routKey, messages, messagePostProcessor -> {messagePostProcessor.getMessageProperties().setMessageId(String.valueOf("messageId_" + 1));return messagePostProcessor;}, new CorrelationData(UUID.randomUUID().toString()));}/*** 指定的routKey 发送信息** @param message*/public void sendDelayMessage(String routKey, Object message, long delayTime) {this.sendDelayMessage(exchangeName, routKey, message, delayTime);}/*** 指定的routKey 发送延迟信息** @param message*/public void sendDelayMessage(String exchangeName, String routKey, Object message, long delayTime) {log.debug("producer send delay message:{}", message);rabbitTemplate.convertAndSend(exchangeName, routKey, message, header -> {header.getMessageProperties().setHeader("x-delay", delayTime);return header;});}/*** 指定的routKey 发送事务信息** @param message*/@SneakyThrowspublic void sendTxMessage(String exchangeName, String routKey, Object message) {log.debug("producer send delay message:{}", message);String messageStr = JSONObject.toJSONString(message);// method 1:
//        sendTransactedMsgByNewChannel(exchangeName,routKey,message);// method2:sendTransactedMsgByNTemplate(exchangeName, routKey, messageStr);}private void sendTransactedMsgByNTemplate(String exchangeName, String routKey, String message) {txRabbitTemplate.execute(channel -> {try {String messageId = UUID.randomUUID().toString() + "_messageId";String correlationId = UUID.randomUUID().toString() + "_correId";// 创建 BasicProperties 对象并设置属性AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().messageId(messageId).correlationId(correlationId).build();channel.txSelect(); // 开启事务channel.basicPublish(exchangeName, routKey, properties, message.getBytes(Charset.forName("UTF-8"))); // 发送消息
//                "124".substring(7);channel.txCommit(); // 提交事务} catch (Exception e) {channel.txRollback(); // 回滚事务}return true;});}@SneakyThrowsprivate void sendTransactedMsgByNewChannel(String exchangeName, String routKey, String message) {// 获取新的channel 对象Channel channel = txRabbitTemplate.getConnectionFactory().createConnection().createChannel(true);// 开启事务channel.txSelect();try {// 消息格式化channel.basicPublish(exchangeName, routKey, null, message.getBytes(Charset.forName("UTF-8")));// 消息提交channel.txCommit();} catch (IOException e) {channel.txRollback();throw e;}}}

5)测试代码:

package com.example.rabbitmqdemo.rabbitmq.controller;import com.example.rabbitmqdemo.rabbitmq.enums.RabbitRoutKeyEnum;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.example.rabbitmqdemo.rabbitmq.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;@RestController
public class TestSendMsgController {@Autowiredprivate MessageProducer messageProducer;@GetMapping("/sendMsg")public boolean sendMsg(@RequestParam String content,@RequestParam String routKey) {List<Object> msgs = new ArrayList<>(10);for (int i = 0; i < 10; i++) {msgs.add(content+"_"+i);}msgs.stream().forEach(e->{MsgDto msgDto = new MsgDto("user",e);messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);});return true;}@GetMapping("/sendBatchMsg")public boolean sendBatchMsg(@RequestParam String content,@RequestParam String routKey) {List<Object> msgs = new ArrayList<>(10);for (int i = 0; i < 10; i++) {msgs.add(content+"_"+i);}MsgDto msgDto = new MsgDto("test",msgs);messageProducer.sendMessageBatch(RabbitRoutKeyEnum.业务_多条消息.getRoutKey(), msgDto);return true;}@GetMapping("/sendDelayMsg")public boolean sendDelayMsg(@RequestParam String content,@RequestParam long delayTime) {List<Object> msgs = new ArrayList<>(10);for (int i = 0; i < 10; i++) {msgs.add(content+"_"+i);}msgs.stream().forEach(e->{messageProducer.sendDelayMessage("my_delay_exchange",RabbitRoutKeyEnum.业务_延迟.getRoutKey(),e,delayTime);});return true;}@GetMapping("/sendTxMsg")public boolean sendTxMsg(@RequestParam String content) {List<Object> msgs = new ArrayList<>(10);for (int i = 0; i < 2; i++) {msgs.add(content+"_"+i);}msgs.stream().forEach(e->{MsgDto msgDto = new MsgDto("tx",e);messageProducer.sendTxMessage("my_tx_exchange",RabbitRoutKeyEnum.业务_事务.getRoutKey(),msgDto);
//            messageProducer.sendMessage(RabbitRoutKeyEnum.业务_单条消息.getRoutKey(),msgDto);});return true;}
}

这里分别测试了单条消息,多条消息,延迟消息,事务消息的发送,将其封装为MsgDto对象,在发送时将其转为json 字符串;基本上满足了大部分的业务场景;需要注意的是rabbitmq 中所谓批量发送的消息实际上会被消息压缩为1条消息进行发送,到达队列是也是1条消息;

6 )routKey 的枚举类:

package com.example.rabbitmqdemo.rabbitmq.enums;import lombok.AllArgsConstructor;
import lombok.Getter;@Getter
@AllArgsConstructor
public enum RabbitRoutKeyEnum {业务_单条消息("my_routKey"),业务_多条消息("my_batch_routKey"),业务_1("my_one_routKey"),业务_延迟("my_delay_routKey"),业务_事务("my_tx_routKey"),;private String routKey;}

至此我们已基本完成生产端消息的发送以及发送结果的监听处理;需要注意的是对于延迟消息,返回的确认消息correlationData 是一个null 值,所以这里对其消息的确认进行了一次特殊的判断;

3 消费者接收消息:

3.1 消费者参数的配置:

########## 消费者配置
# 是否自动启动消息的监听
spring.rabbitmq.listener.simple.auto-startup=false
# 消费消息确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 批量预取条数 默认值250
spring.rabbitmq.listener.simple.prefetch=50
# 开启批量消费
spring.rabbitmq.listener.simple.consumer-batch-enabled=true
# 批量消费的条数
spring.rabbitmq.listener.simple.batch-size=2
# 并发消费最小线程数
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费最大线程数
spring.rabbitmq.listener.simple.max-concurrency=1### 消费失败 重试参数
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 表示最大重试次数,默认值为 3
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 表示第一次重试的时间间隔,默认值为 1000 毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.listener.simple.retry.multiplier=1
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.listener.simple.retry.max-interval=1000
# 消息监听器是否启用无状态(stateless)重试 默认true
spring.rabbitmq.listener.simple.retry.stateless=false
# 控制当消息消费失败后,RabbitMQ 是否需要将消息重新入队。该参数的默认值为 true,即消息将被重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true

以上参数,配置了消费端消费消息后的ack 机制为手动提交,并且设定了 批量预取条数 和每次批量消费的条数,以及消费失败的重试机制配置;

3.2 消费消息:
消费者监听某个或者几个队列,然后通过channel 获取要消费的消息:

package com.example.rabbitmqdemo.rabbitmq.consumer;import com.alibaba.fastjson2.JSONObject;
import com.example.rabbitmqdemo.rabbitmq.msgDto.MsgDto;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.List;@Slf4j
@Component
public class MessageConsumer {/*** 逐条/批量 消费** @param messages*/
//    @RabbitListener(queues = "my_queue_one")public void receiveMessage(List<Message> messages, Channel channel) throws IOException {log.debug("逐条消费消息:{}", messages);for (Message message : messages) {try {
//                // 处理消息log.debug("Received message: {}", message);String jsonMessage = new String(message.getBody(), "UTF-8");MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);// 数据处理// 手动发送 ack 消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 发生异常,手动发送 nack 消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 逐条消费--延时消息** @param messages*/@RabbitListener(queues = "my_deay_queue")public void receiveDelayMessage(List<Message> messages, Channel channel) throws IOException {for (Message message : messages) {try {// 处理消息log.debug("Received delay message: {}", message);String jsonMessage = new String(message.getBody(), "UTF-8");MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);// 手动发送 ack 消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 发生异常,手动发送 nack 消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}/*** 逐条消费--事务消息** @param messages*/@RabbitListener(queues = "my_tx_queue")public void receiveTxMessage(List<Message> messages, Channel channel) throws IOException {for (Message message : messages) {try {// 处理消息log.debug("Received delay message: {}", message);String jsonMessage = new String(message.getBody(), "UTF-8");MsgDto body = JSONObject.parseObject(jsonMessage, MsgDto.class);// 手动发送 ack 消息channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} catch (Exception ex) {// 发生异常,手动发送 nack 消息channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);}}}}

这里我们接收到消息后,然后通过"UTF-8"编码(生产者默认按照UTF-8 对数据编码后进行发送)将字节数据转换为字符串,然后通过阿里的json jar 完成java 对象的转换,进行业务处理,最后手动提交消息;

4 总结:

  • Rabbitmq 对于消息的发送依赖于交换机,通过routKey 绑定不同的queue 完成消息的路由工作;
  • Rabbitmq 发送消息可以为其配置ack确认机制,以及发送失败重试机制参数可以配合完成消息的发送;
  • Rabbitmq 发送消息可以进行批量发送,但是本质上会被合并到一条消息进行发送;
  • Rabbitmq 对于消息的消费,依赖于构建channel 管道 ,绑定queue 完成消息的消费;
  • Rabbitmq 消费消息,可以进行手动的ack 确认,并且可以设置消费重试参数,应便于消费失败的场景;

5 扩展:

5.1 rabbitmq 发送事务消息为什么要关闭 消息的确认回调?

在RabbitMQ中,如果发送事务消息,并且开启了确认模式,那么需要特别注意关闭消息的确认回调,以避免一些潜在的问题。
在RabbitMQ中,开启事务模式后,生产者发送消息时,RabbitMQ会将消息缓存在生产者端。在事务提交之前,不会直接将消息发送到队列。如果在事务未提交的情况下,RabbitMQ服务器异常中断或者连接被关闭,那么消息将会丢失。为了避免这种情况的发生,可以采用事务提交确认和确认模式,在确认之后才将消息发送到队列中。

然而,在发送事务消息时,开启确认模式后,需要关闭消息的确认回调。这是因为在事务提交之前,消息并没有发送到队列中,确认回调将在消息发送到队列后才触发。而在事务模式下,消息已经被缓存到生产者端,没有被发送到队列中,所以确认回调不应该被触发。


http://www.ppmy.cn/news/380948.html

相关文章

android camera2 区别,三星Galaxy Camera2/Camera1性能对比异同之处

2012年三星在IFA德国柏林国际消费类电子展发布了一款安卓相机Galaxy Camera&#xff0c;时隔一年多&#xff0c;三星终于为我们带来第二代Galaxy Camera(详情)&#xff0c;我们今天来看看两代Galaxy Camera之间有哪些异同&#xff0c;第二代又进行了哪些升级。 作为一款相机&am…

智能相机(Smart Camera)

智能相机(Smart Camera)并不是一台简单的相机&#xff0c;而是一种高度集成化的微小型机器视觉系统。它将图像的采集、处理与通信功能集成于单一相机内&#xff0c;从而提供了具有多功能、模块化、高可靠性、易于实现的机器视觉解决方案。 一些图像处理功能&#xff1a;如几何边…

用户因GalaxyS20相机玻璃自发开裂起诉三星

本文转载自IT之家 IT之家 4 月 29 日消息 Galaxy S20 用户本周对三星提起集体诉讼&#xff0c;声称该公司故意隐瞒了产品的设计缺陷&#xff0c;导致在不施加外力的情况下&#xff0c;相机镜头玻璃就会自发碎裂。 这起诉讼是由 Hagens Berman 律师事务所提起的&#xff0c;诉讼…

操作系统常识

4.进程同步 1.什么是临界区&#xff1f;什么是临界资源 在计算机系统中&#xff0c;临界资源指的是被多个并发执行的线程或进程共享访问的某个资源&#xff0c;如共享内存区、共享文件等。 临界区指的是访问临界资源的那部分代码片段&#xff0c;它是一段需要保护的代码区域…

三星自定义状态栏_详解三星GALAXY Camera很实用的编辑功能

1无需回家 拍完就编辑 在以往的相机中&#xff0c;如果我们想要对拍摄的照片进行美化&#xff0c;那么必须要先回家&#xff0c;将存储卡中的照片复制到电脑上&#xff0c;再在电脑上打开图片编辑软件&#xff0c;最后才可以进行编辑操作&#xff0c;这样做不仅费时还很麻烦。而…

三星android图片,三星首款安卓相机GALAXY Camera高清图赏

三星首款安卓相机GALAXY Camera高清图赏 http://www.sinaimg.cn/dy/slidenews/5_img/2012_35/20550_584789_477013.jpg http://www.sinaimg.cn/dy/slidenews/5_t160/2012_35/20550_584789_477013.jpg http://www.sinaimg.cn/dy/slidenews/5_t50/2012_35/20550_584789_477013.jp…

Android 相机开发 三星拍照崩溃修改解决 详细

android 相机调用问题 三星系统4.0以上&#xff0c; 调用系统相机屏幕会强制切换为横屏 拍照完会切换为竖屏 点击保存回Activity 程序会直接崩溃 我理解的是&#xff1a;android系统在手机切换横竖屏的时候会把当前的activity销毁掉 然后又重新执行Oncreat方法 activi…

android 三星手机拍照旋转90度,解决三星拍照上传照片被旋转90度,和三星相机崩溃...

刚刚想起来前几天面试的时候遇到的一个问题&#xff0c; 问题大概是这样的做拍照上传图片功能的时候&#xff0c;在三星手机上拍出的照片是旋转了90度的&#xff0c;应该如何解决这个问题。因为之前没有遇到过这种问题&#xff0c;当时我回答的是给图片做一个ObjectAnimator的r…