《RabbitMQ篇》消息应答和发布确认

news/2024/12/21 22:27:33/

消息应答

消息应答机制:消费者接收信息并处理完之后,告诉rabbitmq该信息已经处理,rabbitmq可以把该信息删除了.

消息自动重新入队:如果处理某个消息的消费者异常关闭了,没有发送ACK确认,rabbitmq会将其重新入队,分发给其他消费者处理。

自动应答

消息发送后即被认为传送成功。该模式弊端,消费者不断的接收消息,如果处理不及时,导致内存耗尽,系统杀死消费者进程。所以适用于消费者可以高效处理信息的情况下。

手动应答:

Channel.basicAck(long deliveryTag, boolean multiple):

确认一条或多条收到的消息。对于经过确认的消息,RabbitMQ 认为该消息成功的处理,可以将其丢弃了。

在手动应答情况下,如果消费者没有关闭,但就是不发送ACK确认,这条消息就会一直在MQ中显示unacked,不会重新分发给其他消费者。

deliveryTag:对应消息的标识。

multiple:如果为True,则应用多消息。

multiple该参数用于开启批量应答:比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的还未应答的消息都会被确认收到消息应答。

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue):

拒绝一条或多条收到的消息。

deliveryTag:对应消息的标识。

multiple:如果为True,则应用多消息。

requeue:如果为True,拒绝的消息将会重新加入队列

重新入队的消息会分发给其他消费者,不重新入队的消息将会丢失/死信。

Channel.basicReject(long deliveryTag, boolean requeue):

拒绝一条消息。与 Channel.basicNack 相比少一个参数Multiple。

发布确认

生产者把消息发送到MQ后,需要MQ返回一个ACK给生产者,这样生产者才知道自己的消息成功发出去了。

实现过程:生产者将信道设置成confirm模式,所有在该信道的消息都会被指派一个唯一的id,消息到达匹配的队列后,broker发送一个确认给生产者,这样生产者就知道消息已经发布到MQ了

RabbitMQ默认没有开启发布确认模式的,需要使用Channel.confirmSelect() 开启发布确认。

Channel.waitForCinfirms():等待Broker确认或拒绝自上次调用以来发布的所有消息。

单个发布确认

单个确认发布:发布一个确认一个,才能接着发第二个。

优缺点:可以确认哪个消息失败,适用于吞吐量不大的程序。

public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception {publishMessageIndividually();//发布1000个单独确认消息,耗时:546ms}public static void publishMessageIndividually() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//单个确认boolean flag = channel.waitForConfirms();if(flag){// TODO:消息发送成功的处理逻辑} else {// TODO: 消息发送失败的处理逻辑}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");}
}

批量发布确认

批量发布确认:以批次发布,一批次可以有很多消息,待批次发布完,才确认。

优缺点:速度比单个快,但是不能确认哪个消息失败。

public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000; public static void main(String[] args) throws Exception {publishMessageBatch(); //发布1000个批量确认消息,耗时:117ms}public static void publishMessageBatch() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();int batchSize = 10;//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//批量确认if(i % batchSize == 0) {boolean flag = channel.waitForConfirms();if(flag) {// TODO:消息发送成功的处理逻辑} else {// TODO: 消息发送失败的处理逻辑}}}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");}
}

异步发布确认

异步确认发布:每个信息都有自己的标识,一直发布即可,发布成功与否,都会有回调信息。

优缺点:最快,又能确认哪个消息发布失败。

public class ConfirmMessage {//消息的个数public static final int MESSAGE_COUNT = 1000;public static void main(String[] args) throws Exception {publishMessageAsync(); //发布1000个异步确认消息,耗时:61ms}public static void publishMessageAsync() throws Exception {Channel channel = RabbitMQUtil.getChannel();//队列的声明String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,true,false,null);//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//异步确认ConfirmCallback confirmAckCallback = (deliveryTag, multiple) -> {// TODO:消息发送成功的处理逻辑};ConfirmCallback confirmNackCallback = (deliveryTag, multiple) -> {// TODO: 消息发送失败的处理逻辑};channel.addConfirmListener(confirmAckCallback, confirmNackCallback);//批量发消息for (int i = 1; i <= MESSAGE_COUNT; i++) {String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());}//结束时间long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");}
}

消息应答和发布确认的区别

消息应答属于消费者,消费完消息告诉MQ已经消费成功。

发布确认属于生产者,生产消息发送到MQ,MQ告诉生产者已经收到消息。


http://www.ppmy.cn/news/1536680.html

相关文章

抓包工具:Mitmproxy

Mitmproxy 是一组工具,它们为 HTTP/1、 HTTP/2和 WebSocket 提供交互式、支持 SSL/TLS 的拦截代理。 特性 拦截 HTTP 和 HTTPS 请求和响应并动态修改它们。 保存完整的 HTTP 对话,以便以后重放和分析。 重放 HTTP 会话的客户端。 重放以前记录的服务器的 HTTP 响应。 反向代…

信号用wire类型还是reg类型定义

wire类型就是一根线&#xff0c;线有两端&#xff0c;一端发生改变&#xff0c;经过线传递的信号当然也会发生改变&#xff0c;reg类型则不同&#xff0c;可以把reg类型理解为存储数据的寄存器&#xff0c;当满足一定条件时&#xff0c;数值才被激活发生改变。 那么&#xff0…

深入浅出React Hooks:打造高效、灵活的函数式组件

欢迎来到这本专注于React Hooks的小册!在这里,我们将深入探讨React生态系统中最强大、最灵活的特性之一 - Hooks。自2018年React 16.8版本引入以来,Hooks彻底改变了我们构建React应用的方式,为函数式组件注入了新的活力和能力。 本册涵盖了从基础到高级的47个精心挑选的Hooks,涉…

Pikachu-Sql Inject-宽字节注入

基本概念 宽字节是相对于ascII这样单字节而言的&#xff1b;像 GB2312、GBK、GB18030、BIG5、Shift_JIS 等这些都是常说的宽字节&#xff0c;实际上只有两字节 GBK 是一种多字符的编码&#xff0c;通常来说&#xff0c;一个 gbk 编码汉字&#xff0c;占用2个字节。一个…

Perforce演讲回顾(上):从UE项目Project Titan,看Helix Core在大型游戏开发中的版本控制与集成使用策略

日前&#xff0c;Perforce携手合作伙伴龙智一同亮相Unreal Fest 2024上海站&#xff0c;分享Helix Core版本控制系统及其协作套件的强大功能与最新动态&#xff0c;助力游戏创意产业加速前行。 Perforce解决方案工程师Kory Luo在活动主会场&#xff0c;带来《Perforce Helix C…

电子摄像头分割系统源码&数据集分享

电子摄像头分割系统源码&#xff06;数据集分享 [yolov8-seg-C2f-DWR&#xff06;yolov8-seg-C2f-ContextGuided等50全套改进创新点发刊_一键训练教程_Web前端展示] 1.研究背景与意义 项目参考ILSVRC ImageNet Large Scale Visual Recognition Challenge 项目来源AAAI Glob…

Axios 和 Ajax 的区别与联系

在前端开发中&#xff0c;数据的获取和交互是至关重要的环节。Axios 和 Ajax 都是常用的技术手段&#xff0c;用于实现与服务器的数据通信。本文将深入探讨 Axios 和 Ajax 的区别和联系&#xff0c;包括它们的特性、优缺点以及应用场景。 一、Axios 与 Ajax 的特性 Axios 的特…

【Linux】进程间通信——System V消息队列和信号量

一、消息队列 1.1 概念 进程间通信的原理是让不同进程看到同一份资源&#xff0c;资源种类的不同就决定了通信方式的差异。如果用管道通信&#xff0c;则资源是文件缓冲区&#xff1b;如果用共享内存&#xff0c;则资源是内存块 消息队列是由操作系统提供的资源&#xff0c;…