尚品汇-MQ模块搭建测试、消息不丢失(重)(四十三)

news/2024/9/19 0:41:06/ 标签: java, 开发语言

目录:

(1)消息不丢失

(2)消息确认

(3)消息确认业务封装

 (4)封装发送端消息确认

(5)封装消息发送

(6)发送确认消息测试

(7)消息发送失败,设置重发机制

(1)消息不丢失

消息的不丢失,在MQ角度考虑,一般有三种途径:

1,生产者不丢数据

2,MQ服务器不丢数据

3,消费者不丢数据

保证消息不丢失有两种实现方式:

  1. 开启事务模式
  2. 消息确认模式

说明:开启事务会大幅降低消息发送及接收效率,使用的相对较少,因此我们生产环境一般都采取消息确认模式,以下我们只是讲解消息确认模式

(2)消息确认

消息持久化

如果希望RabbitMQ重启之后消息不丢失,那么需要对以下3种实体均配置持久化

Exchange

声明exchange时设置持久化(durable = true)并且不自动删除(autoDelete = false)

Queue

声明queue时设置持久化(durable = true)并且不自动删除(autoDelete = false)

message

发送消息时通过设置deliveryMode=2持久化消息

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢,其实也很容易,就下面两步:

1、将queue的持久化标识durable设置为true,则代表是一个持久的队列

2、发送消息的时候将deliveryMode=2

这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

发送确认

有时,业务处理成功,消息也发了,但是我们并不知道消息是否成功到达了rabbitmq,如果由于网络等原因导致业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可以使用rabbitmq的发送确认功能,即要求rabbitmq显式告知我们消息是否已成功发送。

手动消费确认

有时,消息被正确投递到消费方,但是消费方处理失败,那么便会出现消费方的不一致问题。比如:订单已创建的消息发送到用户积分子系统中用于增加用户积分,但是积分消费方处理却都失败了,用户就会问:我购买了东西为什么积分并没有增加呢?

要解决这个问题,需要引入消费方确认,即只有消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack

(3)消息确认业务封装

搭建rabbit-util模块

由于消息队列是公共模块,我们把mq的相关业务封装到该模块,其他service微服务模块都可能使用,因此我们把他封装到一个单独的模块,需要使用mq的模块直接引用该模块即可

搭建方式如:common-util,导入常量类 MqConst

package com.atguigu.gmall.constant;public class MqConst {/*** 消息补偿*/public static final String MQ_KEY_PREFIX = "mq:list";public static final int RETRY_COUNT = 3;/*** 商品上下架*/public static final String EXCHANGE_DIRECT_GOODS = "exchange.direct.goods";public static final String ROUTING_GOODS_UPPER = "goods.upper";public static final String ROUTING_GOODS_LOWER = "goods.lower";//队列public static final String QUEUE_GOODS_UPPER  = "queue.goods.upper";public static final String QUEUE_GOODS_LOWER  = "queue.goods.lower";/*** 取消订单,发送延迟队列*/public static final String EXCHANGE_DIRECT_ORDER_CANCEL = "exchange.direct.order.cancel";//"exchange.direct.order.create" test_exchange;public static final String ROUTING_ORDER_CANCEL = "order.create";//延迟取消订单队列public static final String QUEUE_ORDER_CANCEL  = "queue.order.cancel";//取消订单 延迟时间 单位:秒 真实业务public static final int DELAY_TIME  = 24*60*60;//  测试取消订单// public static final int DELAY_TIME  = 3;/*** 订单支付*/public static final String EXCHANGE_DIRECT_PAYMENT_PAY = "exchange.direct.payment.pay";public static final String ROUTING_PAYMENT_PAY = "payment.pay";//队列public static final String QUEUE_PAYMENT_PAY  = "queue.payment.pay";/*** 减库存*/public static final String EXCHANGE_DIRECT_WARE_STOCK = "exchange.direct.ware.stock";public static final String ROUTING_WARE_STOCK = "ware.stock";//队列public static final String QUEUE_WARE_STOCK  = "queue.ware.stock";/*** 减库存成功,更新订单状态*/public static final String EXCHANGE_DIRECT_WARE_ORDER = "exchange.direct.ware.order";public static final String ROUTING_WARE_ORDER = "ware.order";//队列public static final String QUEUE_WARE_ORDER  = "queue.ware.order";/*** 关闭交易*/public static final String EXCHANGE_DIRECT_PAYMENT_CLOSE = "exchange.direct.payment.close";public static final String ROUTING_PAYMENT_CLOSE = "payment.close";//队列public static final String QUEUE_PAYMENT_CLOSE  = "queue.payment.close";/*** 定时任务*/public static final String EXCHANGE_DIRECT_TASK = "exchange.direct.task";public static final String ROUTING_TASK_1 = "seckill.task.1";//队列public static final String QUEUE_TASK_1  = "queue.task.1";/*** 秒杀*/public static final String EXCHANGE_DIRECT_SECKILL_USER = "exchange.direct.seckill.user";public static final String ROUTING_SECKILL_USER = "seckill.user";//队列public static final String QUEUE_SECKILL_USER  = "queue.seckill.user";/*** 定时任务*/public static final String ROUTING_TASK_18 = "seckill.task.18";//队列public static final String QUEUE_TASK_18  = "queue.task.18";}

pom.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>common</artifactId><groupId>com.atguigu.gmall</groupId><version>1.0</version></parent><modelVersion>4.0.0</modelVersion><artifactId>rabbit-util</artifactId><dependencies><!--rabbitmq消息队列--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><!--rabbitmq 协议--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-bus-amqp</artifactId></dependency></dependencies></project>

service-mq引入rabbit-util模块依赖

<!--rabbitmq消息--><dependency><groupId>com.atguigu.gmall</groupId><artifactId>rabbit-util</artifactId><version>1.0</version></dependency>

 (4)封装发送端消息确认

在rabbit-util 中添加类

package com.atguigu.gmall.common.config;/*** @Description 消息发送确认* <p>* ConfirmCallback  只确认消息是否正确到达 Exchange 中* ReturnCallback   消息没有正确到达队列时触发回调,如果正确到达队列不执行* <p>* 1. 如果消息没有到exchange,则confirm回调,ack=false* 2. 如果消息到达exchange,则confirm回调,ack=true* 3. exchange到queue成功,则不回调return* 4. exchange到queue失败,则回调return* */
@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;// 修饰一个非静态的void()方法,在服务器加载Servlet的时候运行,并且只会被服务器执行一次在构造函数之后执行,init()方法之前执行。@PostConstructpublic void init() {rabbitTemplate.setConfirmCallback(this);            //指定 ConfirmCallbackrabbitTemplate.setReturnCallback(this);             //指定 ReturnCallback}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if (ack) {log.info("消息发送成功:" + JSON.toJSONString(correlationData));} else {log.info("消息发送失败:" + cause + " 数据:" + JSON.toJSONString(correlationData));}}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {// 反序列化对象输出System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + replyCode);System.out.println("描述:" + replyText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);}}

(5)封装消息发送

在rabbit-util 中添加类RabbitService

package com.atguigu.gmall.common.service;@Service
public class RabbitService {@Autowiredprivate RabbitTemplate rabbitTemplate;/***  发送消息* @param exchange 交换机* @param routingKey 路由键* @param message 消息*/public boolean sendMessage(String exchange, String routingKey, Object message) {rabbitTemplate.convertAndSend(exchange, routingKey, message);return true;}}

(6)发送确认消息测试

在service-mq编写测试代码

消息发送端

package com.atguigu.gmall.mq.controller;@RestController
@RequestMapping("/mq")
public class MqController {@Autowiredprivate RabbitService rabbitService;/*** 消息发送*///http://localhost:8282/mq/sendConfirm@GetMapping("sendConfirm")public Result sendConfirm() {rabbitService.sendMessage("exchange.confirm", "routing.confirm", "来人了,开始接客吧!");return Result.ok();}
}

消息接收端

在service-mq 中编写

这里交换机队列绑定:有两种方式一种是配置,一种是注解

我们这里使用注解进行创建绑定

@SneakyThrows 是Lombk的异常 处理注解

package com.atguigu.gmall.mq.receiver;@Component
public class ConfirmReceiver {@SneakyThrows
@RabbitListener(bindings=@QueueBinding(value = @Queue(value = "queue.confirm",autoDelete = "false"),exchange = @Exchange(value = "exchange.confirm",autoDelete = "false"),key = {"routing.confirm"}))
public void process(Message message, Channel channel){System.out.println("RabbitListener:"+new String(message.getBody()));// 参数一:消息的唯一标识,参数二:是否批量确认 false 确认一个消息,true 批量确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

测试:http://localhost:8282/mq/sendConfirm

 加入了确认机制后,成功后:会有提示

把交换机写错,发送到交换机会失败

 

 

把路由key写错,交换机到队列消息失败:


 

(7)消息发送失败,设置重发机制

实现思路:借助redis来实现重发机制

在rabbit-util 模块中添加依赖

<!-- redis -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId>
</dependency><!-- spring2.X集成redis所需common-pool2-->
<dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId>
</dependency>

自定义一个实体类来接收消息

@Data
public class GmallCorrelationData extends CorrelationData {//  消息主体private Object message;//  交换机private String exchange;//  路由键private String routingKey;//  重试次数private int retryCount = 0;//  消息类型  是否是延迟消息private boolean isDelay = false;//  延迟时间private int delayTime = 10;
}

 

修改发送方法

//  封装一个发送消息的方法
public Boolean sendMsg(String exchange,String routingKey, Object msg){//  将发送的消息 赋值到 自定义的实体类GmallCorrelationData gmallCorrelationData = new GmallCorrelationData();//  声明一个correlationId的变量String correlationId = UUID.randomUUID().toString().replaceAll("-","");gmallCorrelationData.setId(correlationId);gmallCorrelationData.setExchange(exchange);gmallCorrelationData.setRoutingKey(routingKey);gmallCorrelationData.setMessage(msg);//  发送消息的时候,将这个gmallCorrelationData 对象放入缓存。redisTemplate.opsForValue().set(correlationId, JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法
//this.rabbitTemplate.convertAndSend(exchange,routingKey,msg);this.rabbitTemplate.convertAndSend(exchange,routingKey,msg,gmallCorrelationData);//  默认返回truereturn true;
}

发送失败调用重发方法  MQProducerAckConfig 配置类中修改 

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {//  ack = true 说明消息正确发送到了交换机if (ack){System.out.println("哥们你来了.");log.info("消息发送到了交换机");}else {//  消息没有到交换机log.info("消息没发送到交换机");//  调用重试发送方法this.retrySendMsg(correlationData);}
}@Override
public void returnedMessage(Message message, int code, String codeText, String exchange, String routingKey) {System.out.println("消息主体: " + new String(message.getBody()));System.out.println("应答码: " + code);System.out.println("描述:" + codeText);System.out.println("消息使用的交换器 exchange : " + exchange);System.out.println("消息使用的路由键 routing : " + routingKey);//  获取这个CorrelationData对象的Id  spring_returned_message_correlationString correlationDataId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");//  因为在发送消息的时候,已经将数据存储到缓存,通过 correlationDataId 来获取缓存的数据String strJson = (String) this.redisTemplate.opsForValue().get(correlationDataId);//  消息没有到队列的时候,则会调用重试发送方法GmallCorrelationData gmallCorrelationData = JSON.parseObject(strJson,GmallCorrelationData.class);//  调用方法  gmallCorrelationData 这对象中,至少的有,交换机,路由键,消息等内容.this.retrySendMsg(gmallCorrelationData);
}/*** 重试发送方法* @param correlationData   父类对象  它下面还有个子类对象 GmallCorrelationData*/
private void retrySendMsg(CorrelationData correlationData) {//  数据类型转换  统一转换为子类处理GmallCorrelationData gmallCorrelationData = (GmallCorrelationData) correlationData;//  获取到重试次数 初始值 0int retryCount = gmallCorrelationData.getRetryCount();//  判断if (retryCount>=3){//  不需要重试了log.error("重试次数已到,发送消息失败:"+JSON.toJSONString(gmallCorrelationData));} else {//  变量更新retryCount+=1;//  重新赋值重试次数 第一次重试 0->1 1->2 2->3gmallCorrelationData.setRetryCount(retryCount);System.out.println("重试次数:\t"+retryCount);//  更新缓存中的数据this.redisTemplate.opsForValue().set(gmallCorrelationData.getId(),JSON.toJSONString(gmallCorrelationData),10, TimeUnit.MINUTES);//  调用发送消息方法 表示发送普通消息  发送消息的时候,不能调用 new RabbitService().sendMsg() 这个方法this.rabbitTemplate.convertAndSend(gmallCorrelationData.getExchange(),gmallCorrelationData.getRoutingKey(),gmallCorrelationData.getMessage(),gmallCorrelationData);}
}

把路由写错,交换机到队列消息失败:

  把交换机写错:发送到交换机失败

 

总结

为了防止消息不丢失 ,从以下三方面考虑:

1,生产者不丢数据

2,MQ服务器不丢数据

3,消费者不丢数据

   

生产者:

MQ服务器:

RabbitMQ重启之后消息不丢失

消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号

开启确认回调(成功到达交换机发送ack标识,没有达到重发)、失败回调(设置重发机制)

为了防止消费者处理消息丢失,我们开启消费者消息消费确认回调,消息被成功处理之后才告知rabbitmq以ack,否则告知rabbitmq以nack


http://www.ppmy.cn/news/1519797.html

相关文章

鸿蒙(API 12 Beta3版)【媒体资源使用指导】Media Library Kit媒体文件管理服务

应用可以通过photoAccessHelper的接口&#xff0c;对媒体资源&#xff08;图片、视频&#xff09;进行相关操作。 说明 在进行功能开发前&#xff0c;请开发者查阅[开发准备]&#xff0c;了解如何获取相册管理模块实例和如何申请相册管理模块功能开发相关权限。文档中使用到p…

基于深度学习的游客满意度分析与评论分析【情感分析、主题分析】

需要本项目的可以私信博主 目录 1 绪论 1.1 选题背景及研究意义 1.1.1 选题背景 1.1.2 研究意义 1.2 研究内容与方法 1.2.1 研究内容 1.2.2 研究方法 1.3 创新与不足 1.3.1创新点 1.3.2研究局限性 2 文献综述 2.1 相关概念界定 2.1.1 大数据分析 2.1.2 游客满意度 2.2 国内外研…

大数据系列之:查看Centos服务器用户可以创建的最大线程数、查看系统内核支持的最大线程数、查看系统支持的最大进程数、设置最大线程数限制、查看进程使用的线程数

大数据系列之:查看Centos服务器用户可以创建的最大线程数、查看系统内核支持的最大线程数、查看系统支持的最大进程数、设置最大线程数限制、查看进程使用的线程数 显示当前用户的资源限制查看用户可以创建的最大线程指定进程的资源限制查看系统内核支持的最大线程数查看系统支…

React 入门第八天:性能优化与开发者工具的使用

随着对React的逐步深入&#xff0c;我开始关注如何优化React应用的性能&#xff0c;特别是在复杂的组件树和频繁的状态更新中保持应用的高效性。这一天&#xff0c;我集中学习了React中的性能优化策略&#xff0c;并探索了如何使用React开发者工具来调试和优化应用。 1. 组件的…

续:当有数据时添加slave2

【示例】 另启一台虚拟机&#xff0c;作为mysql3. 新的虚拟机没有mysql软件包&#xff0c;如何才能快速部署&#xff1f;通过mysql1. mysql1&#xff1a; [rootmysql1 ~]# rsync -al /usr/local/mysql/ root172.25.254.166:/usr/local/mysql The authenticity of host 172.25…

Java算法之快速排序(Quick Sort)

快速排序&#xff1a;分而治之的高效排序算法 简介 快速排序是一种分而治之的排序算法&#xff0c;由C. A. R. Hoare在1960年提出。它通过选取一个元素作为"基准"&#xff08;pivot&#xff09;&#xff0c;然后重新排列数组&#xff0c;使得所有比基准值小的元素都…

【软考】【多媒体应用设计师】媒体与技术

1. 多媒体技术改变了传统循序式模式&#xff0c;用户可以借助超文本链接等方式&#xff0c;更自由灵活地访问所需的信息&#xff0c;体现了其&#xff08; &#xff09;的特点。 A.控制性 B.非线性 C.集成性 D.实时性 答案解析&#xff1a;本题考查信息多媒体非线性特点。多媒体…

安防监控/软硬一体/视频汇聚网关EasyCVR硬件启动崩溃是什么原因?

安防视频监控EasyCVR安防监控视频系统采用先进的网络传输技术&#xff0c;支持高清视频的接入和传输&#xff0c;能够满足大规模、高并发的远程监控需求。EasyCVR平台支持多种视频流的外部分发&#xff0c;如RTMP、RTSP、HTTP-FLV、WebSocket-FLV、HLS、WebRTC、WS-FMP4、HTTP-…

vue part 5

生命周期 <!DOCTYPE html> <html><head><meta charset"UTF-8" /><title>引出生命周期</title><!-- 引入Vue --><script type"text/javascript" src"https://cdn.jsdelivr.net/npm/vue/dist/vue.js&quo…

进程、线程的区别

进程&#xff08;Process&#xff09;和线程&#xff08;Thread&#xff09;是操作系统中的基本概念&#xff0c;它们在资源管理和任务执行方面有着本质的区别&#xff1a; 定义&#xff1a; 进程&#xff1a;进程是操作系统进行资源分配和调度的一个独立单位。每个进程都有自己…

ArcGIS Pro 3.1下载分享

在使用了很长一段时间ArcGIS Pro 3.0之后&#xff0c;终于迎来了ArcGIS Pro 3.1的更新&#xff0c;这里为你分享一下ArcGIS Pro 3.1的安装步骤。 软件介绍 ArcGIS Pro 3.1 是由Esri发布的地理信息系统 (GIS) 软件的较新版本&#xff0c;作为 ArcGIS 桌面应用程序家族中的核心…

【递归深搜之记忆化搜索算法】

1. 斐波那契数 解法一&#xff1a;递归 class Solution { public:int fib(int n) {return dfs(n);}int dfs(int n){if(n 0 || n 1)return n;return dfs(n - 1) dfs(n - 2);} }; 解法二&#xff1a;记忆化搜索 class Solution {int nums[31]; // 备忘录 public:int fib(int …

使用C++,仿照string类,实现myString

类由结构体演化而来&#xff0c;只需要将struct改成关键字class&#xff0c;就定义了一个类 C中类和结构体的区别&#xff1a;默认的权限不同&#xff0c;结构体中默认权限为public&#xff0c;类中默认权限为private 默认的继承方式不同&#xff0c;结构体的默认继承方式为p…

微型直线导轨高精度运行的工作原理

微型导轨是一种用于高精度定位和运动控制的传动装置&#xff0c;常用于微小化、高精密度化的机械设备中&#xff0c;如IC制造设备、半导体设备、高速移载的设备、精密测量、检测仪器、医疗设备、X-Y table&#xff0c;以及高速皮带驱动的设备等小型化设备。 微型导轨的构成相对…

单窗口IP代理设置指南:轻松搞定

在现代互联网生活中&#xff0c;IP代理已经成为了许多人日常上网的必备工具。单窗口IP代理是一种非常实用的代理方式&#xff0c;它允许你在同一个浏览器中为不同的窗口设置不同的IP地址&#xff0c;从而更好地保护隐私和实现多任务处理。今天&#xff0c;我们就来详细讲解一下…

在 macOS 上升级 Ruby 版本的几种方法

在 macOS 上升级 Ruby 版本通常有几种方法&#xff0c;以下是一些常用的方法&#xff1a; 使用系统自带的 Ruby: macOS 系统自带 Ruby&#xff0c;但通常不是最新版本。可以通过终端使用 softwareupdate 命令来更新系统自带的 Ruby。 使用 Homebrew: Homebrew 是 macOS 的包管…

字符串地指针表示方式

每日诗词&#xff1a; 人生自是有情痴&#xff0c;此恨不关风与月。 ——玉楼春尊前拟把归期说 【宋】欧阳修 目录 数组本身的值和数组储存的值一样吗 char[]和cahr*的区别 1. 类型 2. 内存分配 3. 使用方式 4. 字符串字面量 实例 变式 总结&#xff1a; 下期预告&a…

vue2+countup.js实现大屏数字滚动效果封装

很多大屏、官网或者展示类页面会用到数字跳动更新效果的需求&#xff0c;countup用起来就非常方便 一、官网 CountUp.js 二、效果图 三、安装countup与引入 npm install countup 进行安装依赖 import { CountUp } from countUp.js;//需要用到的页面引入&#xff0c;也可以…

生成式AI:创造性智能的新纪元

引言 随着人工智能技术的飞速发展&#xff0c;生成式AI&#xff08;Generative AI&#xff09;已经成为一个引人注目的领域。它不仅仅是模仿人类行为&#xff0c;而是通过学习大量的数据&#xff0c;创造出全新的内容&#xff0c;如文本、图像、音乐等。本文将探讨生成式AI的基…

同样128个内核,AMD霄龙9755性能翻倍:Zen 5架构下的性能飞跃

近日&#xff0c;AMD在服务器处理器领域再次展示了其强大的技术实力&#xff0c;随着AMD EPYC“Turin”处理器发布日期的临近&#xff0c;其基准测试结果也开始浮出水面。硬件爱好者博主9550pro近期分享了AMD 128核EPYC 9755“Turin”处理器在7zip压缩/解压缩基准测试中的跑分数…