RabbitMQ基本使用以及整合Java项目

server/2024/12/15 18:11:33/

RabbitMQ安装

此步骤可以参考CSDN上其他博文,有写得很详细的,此处不做过多安装问题,主要讲述怎么使用。

项目整合

导入依赖

     <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.0</version></dependency>

配置好yml文件

rabbitmq:host: 192.168.85.xxx  #自己的地址port: 5673   # linux 对外开放端口是5673:5672username: adminpassword: adminvirtual-host: /

基本使用

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ImgApp.class)
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMq(){//队列名称String que="test.queue";//消息String msg="hello,test";rabbitTemplate.convertAndSend(que,msg);}
}
@Component
public class MqListener {@RabbitListener(queues = "test.queue")public void listenerQueue(String msg){System.out.println("消费者收到消息:"+msg);}
}

可以看到跨服务接受到消息

MQ的三种交换机

  1. Direct 交换机
    • Direct 交换机是最简单的一种交换机类型。它将消息路由到绑定键(Binding Key)与消息的路由键(Routing Key)完全匹配的队列中。如果绑定键与消息的路由键相匹配,消息将被路由到对应的队列中。
    • Direct 交换机通常用于一对一的消息传递模式,适合处理特定类型的消息。
  1. Fanout 交换机
    • Fanout 交换机会将接收到的消息广播到所有与之绑定的队列中,忽略消息的路由键。即 Fanout 交换机会将消息发送到所有与其绑定的队列,不管队列的绑定键是什么。
    • Fanout 交换机适用于广播消息给多个消费者的场景,每个消费者都会收到相同的消息。
  1. Topic 交换机
    • Topic 交换机根据消息的路由键和队列的绑定键之间的模式匹配规则,将消息路由到一个或多个队列中。Topic 交换机支持通配符匹配,可以根据路由键的模式进行灵活的匹配。
    • Topic 交换机适用于灵活的消息路由场景,可以根据消息的内容进行多种模式匹配,实现更精细的消息路由控制。
    • * (星号) 用来表示一个单词 (必须出现的)
      # (井号) 用来表示任意数量(零个或多个)单词

配置消息转换器

原因:

引入jackson依赖

<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.5</version>
</dependency>

当映入jackson依赖后其spring的ampq中将会自动使用

在生产者和消费者是启动类上注入Bean

消费者端使用注解方式声明交换机以及队列

//基于注解声明队列以及交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue"),exchange = @Exchange(name = "xiaomi.topic",type = ExchangeTypes.TOPIC),key = "test.#"
))

生产者确认机制

开启生产者确认机制后,MQ的性能下降明显,一般场景不建议开启。

保证消息可靠性

LazyQueue

即将消息持久化直接写入磁盘,而不是从放在内存中。

消费者确认机制

一般情况下为了保证消息不被丢失都是:nack

失败重试机制

当消费者出现异常后,为了避免消息不断重新入队后又重新发送,造成死循环导致性能压力飙升,所以又消息失败重试机制。

当重试耗尽后的处理方式

创建失败处理的Config

/** MQ的配置* @author 12547* @version 1.0* @Date 2024/3/17 16:28*/
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry",name = "enabled",havingValue = "true")
public class ErrorConfig {@Beanpublic DirectExchange errorExchange(){return new DirectExchange("error.direct");}@Beanpublic Queue errorQueue(){return new Queue("error.queue");}@Beanpublic Binding errorBinding(Queue errorQueue,DirectExchange errorExchange){return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@Beanpublic MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
}

死信交换机

死信交换机如以上案例所示,用作于某些无法处理的消息的兜底方案,还可以用做延迟消息(思路上可以,但不推荐)

延迟消息

官方延迟插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

使用docker logs rabbit 查看MQ版本

找到对应版本并下载

进入 cd /root/目录下

将下载好的插件上传至该目录下

docker cp /root/rabbitmq_delayed_message_exchange-3.9.0.ez rabbit:/plugins

执行后进入docker容器 cd plugins 查看是否安装成功

启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启容器后查看管理页面在选项栏中看到则代表成功

SpringBoot整合延时消息

在注解中的@Exchange中额外加入delayed="true";

发送消息

 生产者测试代码:

//测试延时消息@Testpublic void testSendDelayMapMsg(){//队列名称//消息String msg="hello,delay";rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//添加延时消息属性message.getMessageProperties().setDelay(5000);  //设置延时时间5sreturn message;}});System.out.println("消息发送成功"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));}}

消费者测试代码:

//基于注解声明队列以及交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenerDelay(String msg){System.out.println("消息接受成功"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));System.out.println("收到延时消息:"+msg);
}

测试延时结果

触发失败消息策略

在消费者端期初由于LocalDateTime类使用错误(用了LocalTime),而Formate类是DateTime,导致无法转换,而生产者端会直接报错。生产者端改正后忘记修改消费者端的这部分代码(复制过去的),生产者测试代码能够正常运行,而消费者端这部分发现报错,但IDEA不会出现报错提示,此处应该是我的代码出现异常,所以导致消费者无法正常消费,走异常处理后进入失败重试,重试结束后走消息失败策略,进入定义的error交换机与队列。

异常后进入error.queue

延时消息经典场景

支付超时取消订单

有关延时消息最常用的一个业务就是支付超时取消,即防止用户下订后一直不支付,通过延时消息来达到取消订单的功能。

以订单30分钟超时时间为例,如果用户创建订单后进入消息队列直接设置30分钟延时消息后,那么这个消息要30分钟后才会给消费者判断,而这30分钟则一直堆积在队列中,但绝大多数情况用户支付基本都会在1分钟内完成,只有极少数情况才会出现超时,所以方案就是将30分钟的超时时间拆分为一个一个分段的延时时间1s,5s,10s,30s......这样子就避免了消息堆积,浪费性能

自定义一个延时消息体类 用于封账需要发送的消息和延迟时间

/**延迟消息实体类* @author 12547* @version 1.0* @Date 2024/3/17 22:02*/
@Data
public class MultiDelayMessage<T> {//消息体private T data;//延时消息数组private List<Long> delayMils;public MultiDelayMessage(T data,List<Long> delayMils){this.data=data;this.delayMils=delayMils;}public static <T> MultiDelayMessage<T> of(T data, Long ...delayMils){return new MultiDelayMessage<>(data, CollUtil.newArrayList(delayMils));}//获取延迟队列的下一个延迟时间并返回public Long removeNextDelay(){return delayMils.remove(0);   //从延迟时间列表中移除并返回下一个延迟时间}}

 

为了避免发送延迟消息时都要创建一个新消息体对象,所以手动创建一个类

自定义一个DelayPostMessage实现接口MessagePostProcessor的postProcessMessage方法,然后在方法中创建一个delay的成员变量,即通过构造函数的方式传入delay。@RequiredArgsConstructor 注解可以用于类上,用于自动生成一个包含所有被 final 修饰的成员变量

自定义DelayPostMessage:

@RequiredArgsConstructor
public class DelayPostMessage implements MessagePostProcessor {private final int delay;@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(delay);return message;}
}

那么使用就改为了

rabbitTemplate.convertAndSend("delay.direct", "delay", msg, new DelayPostMessage(msg.removeNextDelay().intValue()));-


http://www.ppmy.cn/server/150405.html

相关文章

vue前端获取电脑本机的mac和ip地址

vue 前端 使用 node 里的 os 模块,来获取主机mac和IP地址 需要注意的是这篇教程只能获取本地的&#xff0c;打包到服务器上也是服务器的。需要获取不同使用者的客户端mac&#xff0c;看我这篇。获取客户端不同的mac地址 一、根目录创建 getNetworkInfo.js 脚本文件 const os …

Axios结合Typescript 二次封装完整详细场景使用案例

Axios 是一个基于 promise 的 HTTP 客户端&#xff0c;用于浏览器和 node.js。二次封装 Axios 主要是为了统一管理 HTTP 请求&#xff0c;例如设置统一的请求前缀、头部、超时时间&#xff0c;统一处理请求和响应的格式&#xff0c;以及错误处理等。 以下是一个使用 TypeScrip…

体验 Whisper ,本地离线部署自己的 ASR 语音识别服务

需求背景 最近看视频&#xff0c;过几天后经常忘记内容&#xff0c;所以有了把重点内容总结提炼到自己知识库的需求&#xff0c;这涉及到了提取视频中的音频数据、离线语音识别等功能。 提取视频中的音频数据&#xff0c;可以使用格式工厂或 FFmpeg 等工具&#xff0c; FFmpe…

操作系统(7)处理机调度

前言 操作系统中的处理机调度是一个核心概念&#xff0c;它涉及如何从就绪队列中选择进程并将处理机分配给它以运行&#xff0c;从而实现进程的并发执行。 一、调度的层次 高级调度&#xff08;作业调度&#xff09;&#xff1a; 调度对象&#xff1a;作业&#xff08;包含程序…

MR30分布式IO在新能源领域加氢站的应用

导读 氢能被誉为21世纪最具发展潜力的清洁能源&#xff0c;氢能科技创新和产业发展持续得到各国青睐。氢能低碳环保&#xff0c;燃烧的产物只有水&#xff0c;是用能终端实现绿色低碳转型的重要载体。氢能产业链分别为上游制氢、中游储运以及下游用氢。上游制氢工艺目前大部分…

MFC:CFile类的使用

为方便对文件操作&#xff0c;MFC提供了CFile类用于文件的读写操作&#xff0c;具体包括文件数据的读取、写入以及文件内部指针的移动等。打开文件时&#xff0c;会有一个指向该文件的指针&#xff0c;同时还生成一个指向具体数据的内部指针&#xff0c;用于记录读取或写入的位…

linux下socket本地套接字通讯

使用套接字除了可以实现网络间不同主机间的通信外&#xff0c;还可以实现同一主机的不同进程间的通信&#xff0c;且建立的通信是双向的通信。socket进程通信与网络通信使用的是统一套接口&#xff0c;只是地址结构与某些参数不同。 用途 进程间通信&#xff1a;本地套…

Redis 附加功能(一) - 数据库

Redis的数据会被存储到一个名为数据库的容器中。一个Redis服务器可以包含多个数据库&#xff0c;默认情况下&#xff0c;会创建16个数据库。这些数据库用号码进行标志&#xff0c;第一个数据库为0号数据库。 数据库 切换到指定的数据库&#xff1a;SELECT index 获取所有与给定…