消息可靠投递

server/2024/9/20 13:24:48/ 标签: rabbitmq

消息可靠投递

  • 生产者消息投递到 Broker 时,万一网络断了,生产者以为自己投递成功了,消息队列以为没有生产者投递
  • RabbitMQ 提供了两种方式控制可靠投递,confirm 确认模式,return 退回模式
  • RabbitMQ 提供事务机制,但是性能较差,不做讲解,可自行研究

image-20210816144827884

ConfrmCallBack

消息从生产者投递交换机,交换机给生产者一个响应,生产者收到肯定应答,才能保证消息成功投递到交换机,但是如果没有设置持久化,这时候交换机断电重启,仍然丢失,需要做到以下3个方法才能保证可靠投递到交换机

  1. 队列设置持久化
  2. 消息设置持久化
  3. ConfirmCallBack回调

单个同步确认

生产者投递一个消息,交换机回应,生产者确认之后再发布下一个,吞吐量很低

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//单条同步确认
public class SingleSyncConfirm {/*** 生产者 → 交换机* 生产者投递 1条 消息给交换机* 交换机持久化,保存本地之后,给生产者一个应答* 生产者接收到成功应答之后,再投递下一条消息* 10000条,每条都回执一次 总耗时: 1958 毫秒* @param args*/private static final String SINGLE_SYNC_CONFIRM_EXCHANGE = "singleSyncConfirmExchange";private static final String SINGLE_SYNC_CONFIRM_QUEUE = "singleSyncConfirmQueue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//交换机声明,队列声明,交换机绑定队列channel.exchangeDeclare(SINGLE_SYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);channel.queueDeclare(SINGLE_SYNC_CONFIRM_QUEUE,true,false,false,null);channel.queueBind(SINGLE_SYNC_CONFIRM_QUEUE,SINGLE_SYNC_CONFIRM_EXCHANGE,SINGLE_SYNC_CONFIRM_QUEUE);/*** 开启 confirm 确认*/channel.confirmSelect();long begin = System.currentTimeMillis();//开始时间for (int i = 0; i < 1000; i++) {String str = i + "";channel.basicPublish(SINGLE_SYNC_CONFIRM_EXCHANGE,SINGLE_SYNC_CONFIRM_QUEUE,null,str.getBytes());//生产者等待确认boolean b = channel.waitForConfirms();if (b) {System.out.println("第 " + i + " 条发送成功");} else {System.out.println("第 " + i + " 条发送失败================");}}long end = System.currentTimeMillis();//结束时间System.out.println("总耗时: " + (end - begin) + " 毫秒");}
}

批量同步确认

发布批量消息之后,等待,当有某一个故障的时候,不知道是哪个消息出问题

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;//多条同步确认
public class MultipleSyncConfirm {private static final String MULTIPLE_SYNC_CONFIRM_EXCHANGE = "multipleSyncConfirmExchange";private static final String MULTIPLE_SYNC_CONFIRM_QUEUE = "multipleSyncConfirmQueue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//交换机声明channel.exchangeDeclare(MULTIPLE_SYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);channel.queueDeclare(MULTIPLE_SYNC_CONFIRM_QUEUE,true,false,false,null);channel.queueBind(MULTIPLE_SYNC_CONFIRM_QUEUE,MULTIPLE_SYNC_CONFIRM_EXCHANGE,MULTIPLE_SYNC_CONFIRM_QUEUE);/*** 开启 confirm 确认*/channel.confirmSelect();long begin = System.currentTimeMillis();//开始时间for (int i = 1; i <= 1000; i++) {String str = i + "";channel.basicPublish(MULTIPLE_SYNC_CONFIRM_EXCHANGE, MULTIPLE_SYNC_CONFIRM_QUEUE, null, str.getBytes());if (i % 100 == 0) {//生产者等待确认boolean b = channel.waitForConfirms();if (b) {System.out.println("第 " + i + " 条发送成功");} else {System.out.println("第 " + i + " 条发送失败================");}}}long end = System.currentTimeMillis();//结束时间System.out.println("总耗时: " + (end - begin) + " 毫秒");}
}

异步批量确认

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;//多条异步确认
public class MultipleAsyncConfirm {private static final String MULTIPLE_ASYNC_CONFIRM_EXCHANGE = "multipleAsyncConfirmExchange";private static final String MULTIPLE_ASYNC_CONFIRM_QUEUE = "multipleAsyncConfirmQueue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();if (true) {factory.setHost("localhost");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");factory.setVirtualHost("/");}Connection connection = factory.newConnection();Channel channel = connection.createChannel();//交换机声明channel.exchangeDeclare(MULTIPLE_ASYNC_CONFIRM_EXCHANGE, BuiltinExchangeType.DIRECT, true);channel.queueDeclare(MULTIPLE_ASYNC_CONFIRM_QUEUE, true, false, false, null);channel.queueBind(MULTIPLE_ASYNC_CONFIRM_QUEUE, MULTIPLE_ASYNC_CONFIRM_EXCHANGE, MULTIPLE_ASYNC_CONFIRM_QUEUE);/*** 开启 confirm 确认*/channel.confirmSelect();long begin = System.currentTimeMillis();//开始时间/*** 开启 confirm 回调函数*/ConfirmCallback ackCallBack = new ConfirmCallback() {/*** 成功回调* @param deliveryTag* @param multiple      true表示已确认多条,false表示已确认单条* @throws IOException*/@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("已确认多条: " + deliveryTag);} else {System.out.println("已确认单条: " + deliveryTag);}}};ConfirmCallback nackCallBack = new ConfirmCallback() {/*** 失败回调* @param deliveryTag* @param multiple      true表示未确认多条,false表示未确认单条* @throws IOException*/@Overridepublic void handle(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("未确认多条: " + deliveryTag);} else {System.out.println("未确认多条: " + deliveryTag);}}};channel.addConfirmListener(ackCallBack, nackCallBack);for (int i = 1; i <= 1000; i++) {String str = i + "\t" + new Date().getTime();channel.basicPublish(MULTIPLE_ASYNC_CONFIRM_EXCHANGE, MULTIPLE_ASYNC_CONFIRM_QUEUE, null, str.getBytes());}long end = System.currentTimeMillis();//结束时间System.out.println("总耗时: " + (end - begin) + " 毫秒==========================");}
}

对比

单条确认:简单,吞吐量低

批量确认:简单,吞吐量比单条确认高,当一批消息中有一个出问题,不知道是哪一个

异步批量确认:代码复杂,吞吐量高

ReturnCallBack

交换机投递到队列


SpringBoot 整合

ConfirmCallback

引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件中开启
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /publisher-confirm-type: correlated
  • 较低版本中配置是

    publisher-confirms: true
    
  • publisher-confirm-type:取值有3种

    • none 禁用发布确认模式,默认
    • correlated 消息从生产者投递到交换机成功后触发回调
    • simple 类似 correlated ,支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法
配置类

在 Publish/Subscribe 基础之上整合

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class ConfirmCallBackImpl implements RabbitTemplate.ConfirmCallback {/*** @param correlationData 相关数据,一般存储一个id,用来辨识唯一性* @param ack             确认,如果成功返回 true,如果失败返回 false* @param cause           原因,失败原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("correlationData = " + correlationData);System.out.println("ack = " + ack);System.out.println("cause = " + cause);if (ack) {System.out.println("投递到交换机成功");} else {System.out.println("投递失败,原因是:\t" + cause);System.out.println("将失败的消息 " + correlationData.getId() + " 保存到数据库");}}
}
//设置确认回调
rabbitTemplate.setConfirmCallback(confirmCallBackI);

image-20210816182706901

因为是和之前的一起整合,所以设置 ConfirmCallBack 在注入 RabbitTemplate 时一起设置
也可以使用匿内部类或者Lambda表达式
测试
  • http://localhost:8080/confirm/test/abc 正常投递
  • http://localhost:8080/confirm/test-fail/abc 失败投递
import com.example.constants.Constants;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@RestController
@RequestMapping("confirm")
public class ConfirmController {/*** 注入 RabbitTemplate 模板对象*/@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("test/{name}")public String send(@PathVariable("name") String name) {/*** 发送消息*/String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(Constants.MY_FANOUT_EXCHANGE, "", str, correlationData);try {/*** 睡眠,程序还没运行结束* 确认回调已经执行,说明是异步的*/TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息发送成功!\t" + str);return "success";}/*** 测试失败** @param name* @return*/@RequestMapping("test-fail/{name}")public String sendFail(@PathVariable("name") String name) {String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());/*** 创建一个不存在的交换机名称,用来测试投递失败*/String errorExchangeName = Constants.MY_FANOUT_EXCHANGE + "abc";rabbitTemplate.convertAndSend(errorExchangeName, "", str, correlationData);try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息发送成功!\t" + str);return "success";}
}

ReturnCallBack

引入依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件中开启
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /publisher-returns: true
配置类
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class ReturnsCallBackImpl implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {System.out.println("returnedMessage = " + returnedMessage);System.out.println("returnedMessage.getMessage() = " + returnedMessage.getMessage());System.out.println("returnedMessage.getReplyCode() = " + returnedMessage.getReplyCode());System.out.println("returnedMessage.getReplyText() = " + returnedMessage.getReplyText());System.out.println("returnedMessage.getExchange() = " + returnedMessage.getExchange());System.out.println("returnedMessage.getRoutingKey() = " + returnedMessage.getRoutingKey());}
}
//设置确认回调
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnsCallBackImpl);

image-20210817032048020

测试
  • 在 Direct 模式下,修改路由为一个不存在的路由
import com.example.constants.Constants;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;@RestController
@RequestMapping("return")
public class ReturnController {/*** 注入 RabbitTemplate 模板对象*/@Resourceprivate RabbitTemplate rabbitTemplate;@RequestMapping("test/{name}")public String send(@PathVariable("name") String name) {/*** 发送消息*/String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, "sms", str, correlationData);rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, "email", str, correlationData);try {/*** 睡眠,程序还没运行结束* 确认回调已经执行,说明是异步的*/TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息发送成功!\t" + str);return "success";}/*** 测试失败** @param name* @return*/@RequestMapping("test-fail/{name}")public String sendFail(@PathVariable("name") String name) {String str = name + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss_SSS").format(new Date());CorrelationData correlationData = new CorrelationData();correlationData.setId(UUID.randomUUID().toString());/*** 使用错误的路由键,用来测试交换机不能达到队列*/String routingKey = "abc";rabbitTemplate.convertAndSend(Constants.MY_DIRECT_EXCHANGE, routingKey, str, correlationData);try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消息发送成功!\t" + str);return "success";}
}

http://www.ppmy.cn/server/114021.html

相关文章

如何在 PyTorch 中定义一个简单的卷积神经网络?

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发…

以太网--TCP/IP协议(一)

概述 以太网是局域网的一种&#xff0c;其他的比如还有令牌环、FDDI。和局域网对应的就是广域网&#xff0c;如Internet&#xff0c;城域网等。 从网络层次看&#xff0c;局域网协议主要偏重于低层&#xff08;业内一般把物理层、数据链路层归为低层&#xff09;。以太网协议…

uniapp如何监听页面滚动?

在uni-app中&#xff0c;监听页面滚动主要通过在页面的.vue文件中的onPageScroll生命周期函数来实现。onPageScroll函数会在页面滚动时触发&#xff0c;你可以在这个函数中获取到当前页面的滚动位置等信息。 下面是一个简单的示例&#xff0c;展示了如何在uni-app中监听页面滚…

python(进阶2)实现自动化注册和登录

1. 分析需求 后端完成接口以后&#xff0c;工作中可能会涉及到自测通断&#xff0c;a接口和b接口之间可能有关联关系&#xff0c;例如:a接口注册&#xff0c;b接口登录&#xff0c;就需要a接口返回的参数传递到b接口 2. 环境准备 需要这些类包 import requests import rand…

socket 断线重连和心跳机制如何实现?

一、socket 概念 1.套接字&#xff08;socket&#xff09;是网络通信的基石&#xff0c;是支持 TCP/IP 协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示&#xff0c;包含进行网络通信必须的五种信息&#xff1a;连接使用的协议&#xff0c;本地主机的 IP 地址…

WordPress安装指南:主题、插件和最佳实践

WordPress是世界上最流行的内容管理系统&#xff08;CMS&#xff09;&#xff0c;因其易用性和灵活性而备受欢迎。本文将指导您完成WordPress的安装过程&#xff0c;介绍一些常用的主题和插件&#xff0c;并分享一些重要的注意事项。 1. WordPress安装 步骤1&#xff1a;准备…

【Linux】System V通信

目录 System V共享内存 共享内存数据结构 共享内存函数 共享内存的关联 共享内存的去关联 用共享内存实现serve&client通信 共享内存与管道进行对比 System V共享内存 管道通信本质是基于文件的&#xff0c;也就是说操作系统并没有为此做过多的设计工作&#xff0c;…

设计模式六大原则

设计模式的六大原则&#xff0c;也称为SOLID原则&#xff0c;是面向对象编程中用于指导软件设计的一套基本原则。这些原则旨在帮助开发者创建可维护、可扩展、灵活且易于理解的代码。 1. 单一职责原则&#xff08;Single Responsibility Principle, SRP&#xff09; 含义&…

Anaconda Prompt 安装paddle2.6报错

bug描述 python 3.11.9 通过 pip install paddlepaddle2.6.1 安装后&#xff0c;运行 paddle.utils.run_check() 则出现下面的错误&#xff1a; 解决办法 方法一&#xff1a;使用paddle 3的版本 这里要注意我的python版本 方法二&#xff1a;使用低版本的python python3.9…

基于EPS32C3电脑远程开机模块设计

基于EPS32C3电脑远程开机模块设计 前言 缘起&#xff0c;手头资料太多了&#xff0c;所以想组一台NAS放在家里存储数据。在咸鱼淘了一套J3160主板加机箱&#xff0c;加上几块硬盘组建NAS。 对于NAS&#xff0c;我的需求是不用的时候关机(节省功耗)&#xff0c;要用的时候开机…

Ubuntu16.04操作系统-内核优化

1. 概述 本文所用优化是生产环境中经过长期验证的内核优化策略&#xff0c;针对的服务器与POD主要用于高CPU、高内存、高IO的业务场景。 备注: OS: ubuntu16.04, 内核&#xff1a; 4.15.0-147-generic 主要涵盖以下内容优化&#xff1a; ulimit优化加强tcp参数其他内存参数 …

深度学习(一)-感知机+神经网络+激活函数

深度学习概述 深度学习的特点 优点 性能更好 不需要特征工程 在大数据样本下有更好的性能 能解决某些传统机器学习无法解决的问题 缺点 小数据样本下性能不如机器学习 模型复杂 可解释性弱 深度学习与传统机器学习相同点 深度学习、机器学习是同一问题不同的解决方法 …

.Net Core 笔试1

1、用两个线程一个输出字母一个输出数字&#xff0c;交替输出Chat数组 char[] aI "1234567".ToCharArray();char[] aC "ABCDEFG".ToCharArray();Task task1 null;Task task2 null;task1 Task.Run(() >{foreach (var item in aI){Console.WriteLine…

2024数学建模国赛ABCDE题选题分析及初步思路

高教社杯全国大学生数学建模竞赛&#xff08;以下简称“国赛”&#xff09;是面向全国大学生的一项重要赛事&#xff0c;旨在培养学生的数学建模能力、团队合作能力和科学研究能力。近年来&#xff0c;国赛的参赛人数和比赛难度不断提升&#xff0c;对参赛者的数学建模能力提出…

上海亚商投顾:深成指、创业板指均涨超1%,华为产业链反复活跃

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 大小指数昨日走势分化&#xff0c;沪指全天震荡调整&#xff0c;2800点失而复得&#xff0c;深成指、创业板指…

C++之格式化日期时间为字符串(精确到毫秒)

C11中提供了获取时间的chrono库&#xff0c;但是格式化显示太方便&#xff1b;C11还提供了格式化时间iomanip库&#xff0c;通过其put_time方法可以格式化时间到秒&#xff0c;要显示毫秒&#xff0c;就需要自己处理了。 #include <chrono> #include <string> #in…

LiveQing视频点播流媒体RTMP推流服务功能-支持大疆等无人机RTMP推流支持OBS推流一步一步搭建RTMP视频流媒体服务示例

LiveQing支持大疆等无人机RTMP推流支持OBS推流一步一步搭建RTMP视频流媒体服务示例 1、流媒体服务搭建2、推流工具准备3、创建鉴权直播间4、获取推流地址5、配置OBS推流6、推流及播放7、获取播放地址7.1 页面查看视频源地址7.2 接口查询 8、相关问题8.1、大疆无人机推流花屏 9、…

nginx配置中的服务器名称

通常&#xff0c;在nginx的配置节中&#xff1a; server {listen 80;server_name example.org www.example.org;... } server_name(服务器名称) 指令定义确定哪个服务器块用于给定请求。可以使用确切名称、通配符名称、ip地址或正则表达式来定义它们&#xff1a; se…

Excel 将行和列转置的两种方法

方法一&#xff1a; 方法二&#xff1a;使用transpose公式

.NetCore+vue3上传图片 Multipart body length limit 16384 exceeded.

实现目标。点击图片上传头像 效果图 前端部分图片上传关键代码 <div class"avatar-wrap"><el-imagestyle"width: 154px; height: 154px":src"form.headPic":fit"fit"/></div><div class"upload-box"…