20.rabbitmq插件实现延迟队列

news/2024/9/22 23:48:01/

问题

前面谈到基于死信的延迟队列,存在的问题:如果第一个消息延时时间很长,而第二个消息延时时间很短,第二个消息并不会优先得到执行。

下载插件

地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

rabbitmq_delayed_message_exchange-3.8.0.ez

说明:rabbitmq安装后,会生成这个目录

/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins/

拷贝插件到上面这个目录

安装插件

需要重启rabbitmq

监测插件是否安装成功

可以看出不再使用延迟队列,而是使用延迟交换机。

代码

配置代码

java">package com.xkj.org.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class DelayExchangeConfig {//队列public static final String DELAYED_QUEUE_NAME = "delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";//RoutingKeypublic static final String DELYAED_ROUTING_KEY = "delayed.routingkey";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");//第一个参数交换机的名称//第二个参数交换机的类型//第三个参数是否持久化//第四个参数是否删除//第五个参数其他参数return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,arguments);}@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE_NAME);}@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedExchange")CustomExchange delayedExchange,@Qualifier("delayedQueue")Queue delayedQueue) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELYAED_ROUTING_KEY).noargs();}}

生产者

java">@ApiOperation("基于插件的延迟消息")@GetMapping("/sendDelayedMsg/{msg}/{delayedTime}")public void sendDelayedMsg(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message,@ApiParam(value = "延迟时间", required = true)@PathVariable("delayedTime")Integer delayedTime) {log.info("当前时间{},发送一条消息给延迟交换机:{},delayedTime={}", new Date().toString(), message, delayedTime);rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message, msg -> {msg.getMessageProperties().setDelay(delayedTime);return msg;});}

消费者

java">package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.UnsupportedEncodingException;
import java.util.Date;/*** 基于插件的延迟消息队列监听*/
@Slf4j
@Component
public class DelayedQueueConsumer {@RabbitListener(queues = "delayed.queue")public void receiver(Message message, Channel channel) throws UnsupportedEncodingException {String msg = new String(message.getBody(), "UTF-8");log.info("当前时间:{},收到延迟队列的消息:{}", new Date().toString(), msg);}}

 总结

延迟队列可以保证消息可靠发送,消息可靠投递,死信队列保证消息至少被消费一次,已经未被处理的消息不会被丢弃。


http://www.ppmy.cn/news/1501220.html

相关文章

【ffmpeg命令基础】视频选项讲解

文章目录 前言设置输出文件的帧数设置每秒播放的帧数设置输出视频的帧率示例1&#xff1a;更改输出视频的帧率示例2&#xff1a;将图像序列转换为视频 设置输入视频的帧率示例3&#xff1a;处理高帧率视频示例4&#xff1a;处理低帧率视频 同时设置输入和输出帧率示例5&#xf…

【C++】set的使用

&#x1f525;个人主页&#xff1a; Forcible Bug Maker &#x1f525;专栏&#xff1a; STL || C 目录 &#x1f308;前言&#x1f308;关于set&#x1f525;容量函数emptysize &#x1f525;Modifiersinserteraseclear &#x1f525;Operationsfindcountlower_bound和upper_…

笔记分类的烦恼

前言 你是否为笔记的分类而苦恼&#xff0c;是否迷失在市面上纷繁复杂的笔记分类法&#xff1f; 不用再烦恼了&#xff0c;本文将介绍一个适用于个人笔记的终极分类办法&#xff0c;只需三刀&#xff0c;尘埃落定。 &#x1f52a; 第一刀 笔记场景 &#x1f370; 也就是笔记…

Web 安全之 OOB(Out-of-Band)攻击详解

OOB&#xff08;Out-of-Band&#xff09;攻击是指一种网络安全攻击技术&#xff0c;攻击者利用目标系统与外部资源之间的通信来获取信息或检测漏洞是否存在。这种攻击方式不会通过目标系统的直接响应来显示攻击结果&#xff0c;而是通过外部信道&#xff08;如 DNS、HTTP、SMTP…

【Linux】centos7安装php7.4

环境说明 本文档在服务器不能连接互联网的情况下&#xff0c;进行安装php7.4及其扩展。 操作系统&#xff1a;centos7.6 架构&#xff1a;X86_64 一、安装依赖&#xff08;可选&#xff09; 说明&#xff1a;服务器能联网就可以通过 yum install 命令下载对应php需要的依赖…

nodejs - 接口 学习笔记

一、简介 1-1、是什么 接口是 前后端通信的桥梁 简单理解&#xff1a;一个接口就是 服务中的一个路由规则&#xff0c;根据请求响应结果 接口的英文单词是 API (Application Program Interface)&#xff0c;所以有时也称之为 API 接口 这里的接口指的是『数据接口』&#xff…

LINUX操作系统安全

一、概述内容 操作系统负责计算机系统的资产管理&#xff0c;支撑和控制各种应用程序运行&#xff0c;为用户提供管理计算机系统管理接口。操作系统也是构成网络信息系统的核心关键组件&#xff0c;其安全可靠性决定了计算机系统的安全性和可靠性。 操作系统安全是指满足安全…

树与二叉树【数据结构】

前言 之前我们已经学习过了各种线性的数据结构&#xff0c;顺序表、链表、栈、队列&#xff0c;现在我们一起来了解一下一种非线性的结构----树 1.树的结构和概念 1.1树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一…