RabbitMQ---事务及消息分发

news/2025/1/23 23:40:16/

(一)事务

   RabbitMQ是基于AMQP协议实现的,该协议实现了事务机制,所以RabbitMQ也支持事务机制,他的事务允许开发者确保消息的发送和接收时原子性的,要么全部成功,要么全部失败

 我们设置事务有三步,首先就是开启事务

因为我们是针对rabbitTemplate来作设置的,所以会影响此rabbitTemplate的所有消息,这里我们新开了一个

然后我们使用时要加一个注解

 最后一步我们需要加上事务管理

这样我们就成功开启了事务

这三步是必不可少的,缺少一步,都无法成功开启事务

(二)消息分发

1.概念

  RabbitMQ队列有多个消费者时,队列会把消息分给不同的消费者,每条消息只发给一个消费者,默认情况下,RabbitMQ是按照轮询的方式分发的,不管消费者是否已经消费并确认消息,所以这种方式是很有可能造成消息积压的,因为有的消费者处理消息的速度快,有的就慢,这样会导致整体吞吐量下降

  那至于我们要如何处理,我们可以通过channelbasic()这个方法来限制信道上的最大消息数量,RabbitMQ每向该消费者发送消息,就会使消息计数+1,消费者消费消息就会使消息计数-1,当到达了上线,RabbitMQ就不会再发送消息了,知道消费者又消费了一条消息

2.应用场景

 我们可以将消息分发应用在限流.非公平分发(负载均衡中)

1)限流

 我们来看一个例子,我们有一个订单系统,每秒可以处理5000个请求,在正常情况下,是不会有问题的,但是在一些特殊时间比如11.11,请求量就会突增,如果这些请求全部直接发送到消费者(订单系统),那么就会把我们订单系统给弄崩了

  RabbitMQ提供了限流机制,可以控制消费者一次最多拉取多少个请求,通过在配置文件中设置prefetch参数即可 

我们来看代码

因为要限流,限制拉取的请求,所以我们这里要设置手动确认,如果我们设置成自动确认,那我们收到消息就确认,其实跟没有设置限流差别是不大的

还是先看配置代码

spring:rabbitmq:addresses: amqp://student:student@62.234.46.219:5672/testlistener:simple:#        acknowledge-mode: NONE#        acknowledge-mode: AUTOacknowledge-mode: MANUALprefetch: 5retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时⻓为5秒max-attempts: 5
#    publisher-confirm-type: correlated  #消息发送确认

这里我们设置一次最多拉取5条 

然后我们生产者一次发送10条消息

@RestController
@RequestMapping("producer")
public class Qos {@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/qos")public String qosConsumer(){for (int i = 0; i < 10; i++) {rabbitTemplate.convertAndSend(Constant.QOS_EXCHANGE,"qos","qos "+i);}return "发送成功";}
}

然后消费者消费消息,但是这里我们没有进行确认,主要是为了方便观察结果,如果我们确认了,本身我们消息就不多而且代码中没什么复杂逻辑,处理的很快,肉眼上就像一次全部获取了一样

 

@Component
public class QosConsumer {@RabbitListener(queues = Constant.QOS_QUEUE)public void listenerQueue(Message message, Channel channel) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("接收到消息: %s, deliveryTag: %d%n",new String(message.getBody(),"UTF-8"), deliveryTag);}
}

之后我们来看结果 

 

接收到了5条消息

然后我们看看控制台,发现确实有5条还没有发送,有5条已经确认了

 

那我们再把刚刚配置文件的限制取消了,再看看结果

 我们发现消息会直接全部发送到消费者

2)负载均衡

我们也可以用这个配置来实现负载均衡

 这里我们说的负载均衡,并不是说两个消费者每个人都处理相同数量的消息,而是两个人谁干的快谁就多干,谁干的慢谁就少干

 如图,我们有两个消费者,一个处理的快,一个处理的满,就会有消费者快的处理完后不知道干什么,而另一个消费者还一直在处理,这是因为RabbitMQ只是在消息进入队列时分配消息,并不考虑消费者未确认消息的数量 

  我们可以设置prefetch为1,这样每个消费者都会只有一条消息,在这条消息处理完之前,就不会有其他消息

   对于消费者来说就是,干的越快的干的越多

  那其实我个人认为限流和负载均衡是很像的,因为负载均衡就是多个消费者一起限流,然后谁干的快,谁就再拿,而且上述例子上的prdfetch为1也不一定,我们也可以是其他值

 接下来我们看代码

这里我们只需要多加一个消费者然后把配置文件prefetch改为1即可

@Component
public class QosConsumer {@RabbitListener(queues = Constant.QOS_QUEUE)public void listenerQueue(Message message, Channel channel) throws IOException, InterruptedException {long deliveryTag = message.getMessageProperties().getDeliveryTag();Thread.sleep(1000);System.out.printf("1接收到消息: %s, deliveryTag: %d%n",new String(message.getBody(),"UTF-8"), deliveryTag);channel.basicAck(deliveryTag,false);}@RabbitListener(queues = Constant.QOS_QUEUE)public void listenerQueue2(Message message, Channel channel) throws IOException, InterruptedException {long deliveryTag = message.getMessageProperties().getDeliveryTag();Thread.sleep(2000);System.out.printf("2接收到消息: %s, deliveryTag: %d%n",new String(message.getBody(),"UTF-8"), deliveryTag);channel.basicAck(deliveryTag,false);}
}

然后我们看现象,我们发现只有我们确认了之后,消息才会在再发送给这个消费者,来达到负载均衡的效果 

deliveryTag有重复是因为两个消费者使⽤的是不同的Channel,每个Channel上的 deliveryTag 是独⽴计数的


http://www.ppmy.cn/news/1565613.html

相关文章

记一次为centos7更换yum源成功

按照网络教程下载或编辑新的源&#xff0c;如阿里、清华大学的源等等&#xff1b; 将新的源文件替换目录/etc/yum.repos.d/下的文件CentOS-Base.repo&#xff1b; 按照网络教程&#xff0c;下面的步骤操作出现了问题&#xff1a; yum clean all yum makecache yum makecache命令…

MECD+: 视频推理中事件级因果图推理--VLM长视频因果推理

论文链接&#xff1a;https://arxiv.org/pdf/2501.07227v1 1. 摘要及主要贡献点 摘要&#xff1a; 视频因果推理旨在从因果角度对视频内容进行高层次的理解。然而&#xff0c;目前的研究存在局限性&#xff0c;主要表现为以问答范式执行&#xff0c;关注包含孤立事件和基本因…

mysql数据被误删的恢复方案

文章目录 一、使用备份恢复二、使用二进制日志&#xff08;Binary Log&#xff09;三、使用InnoDB表空间恢复四、使用第三方工具预防措施 数据误删是一个严重的数据库管理问题&#xff0c;但通过合理的备份策略和使用适当的恢复工具&#xff0c;可以有效地减少数据丢失的风险…

Go channel关闭方法

channel关闭原则 1、不能在消费端关闭channel&#xff08;基础原则&#xff0c;单生产者或多生产者均不能在消费端关闭&#xff09;&#xff1b; 2、多个生产者时&#xff0c;不能对channel执行关闭&#xff1b; 3、只有在唯一或最后唯一剩下的生产者协程中关闭channel&…

web前端第六次作业---制作网页页面

制作网页页面 代码: <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><s…

AR智慧点巡检系统探究和技术方案设计

一、项目背景 随着工业生产规模的不断扩大和设备复杂度的提升&#xff0c;传统的人工点巡检方式效率低下、易出错&#xff0c;难以满足现代化企业对设备运行可靠性和安全性的要求。AR&#xff08;增强现实&#xff09;技术的发展为点巡检工作带来了新的解决方案&#xff0c;通…

2025美赛倒计时,数学建模五类模型40+常用算法及算法手册汇总

数学建模美赛倒计时&#xff0c;对于第一次参加竞赛且没有相关基础知识的同学来讲&#xff0c;掌握数学建模常用经典的模型算法知识&#xff0c;并熟练使用相关软件进行建模是关键。本文将介绍一些常用的模型算法&#xff0c;以及软件操作教程。 数学建模常用模型包括&#xf…

5. 马科维茨资产组合模型+政策意图AI金融智能体(Qwen-Max)增强方案(理论+Python实战)

目录 0. 承前1. AI金融智能体1.1 What is AI金融智能体1.2 Why is AI金融智能体1.3 How to AI金融智能体 2. 数据要素&计算流程2.1 参数集设置2.2 数据获取&预处理2.3 收益率计算2.4 因子构建与预期收益率计算2.5 协方差矩阵计算2.6 投资组合优化2.7 持仓筛选2.8 AI金融…