005 延时交换机

ops/2024/10/18 18:14:27/

文章目录

    • 延时交换机插件的安装
    • PluginsDelayConfig
    • Producer.java
    • Consumer.java
    • application.yaml

RabbitMQ中既有延时队列的概念,也有延时交换机的概念,但两者在实现机制上有所不同。以下是关于这两者的详细解释:

延时队列:

延时队列是RabbitMQ中的一种队列类型,其主要特点是消息在队列中会被延迟一段时间后再被消费。
延时队列的实现通常依赖于消息的TTL(Time To Live)和死信队列。当消息在队列中的存活时间超过了设定的TTL后,消息会变成死信,并被发送到预先配置好的死信队列中,从而实现消息的延时处理。
延时队列适用于需要在特定时间后处理消息的场景,如订单超时未支付自动取消等。
延时交换机:

延时交换机是RabbitMQ的一个扩展插件,它提供了一种新的消息交换类型,允许在发送消息时指定一个延迟时间。
当消息达到指定的延迟时间后,才会被放入队列供消费者消费。这种方式提供了更高的延时精度和灵活性。
使用延时交换机需要安装和配置相应的RabbitMQ插件。
综上所述,RabbitMQ中既支持延时队列也支持延时交换机,它们都可以实现消息的延迟处理,但具体实现机制和使用方式有所不同。在实际应用中,可以根据具体需求和场景选择合适的方式来实现消息的延时处理。

延时交换机插件的安装

下载地址:https://www.rabbitmq.com/community-plugins.html
rabbitmq_delayed_message_exchange放到rabbitMQ server plugins中
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

PluginsDelayConfig


package com.example.delay;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class PluginsDelayConfig {//延时交换机@Beanpublic CustomExchange newDelayExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, args);}//延时队列@Beanpublic Queue newDelayQueue() {return new Queue("delayed-queue", true);}//绑定@Beanpublic Binding bindingDelayedQueue() {return BindingBuilder.bind(newDelayQueue()).to(newDelayExchange()).with("key3").noargs();}
}

Producer.java


package com.example.delay;import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;//消费顺序与发送顺序一致,即"消息1","消息3","消息2",很可能是因为这些消息都被发送到了同一个队列,并且该队列中的消息是按照它们到达的顺序进行排列的。RabbitMQ的队列通常遵循FIFO(先进先出)的原则,除非特别配置了其他策略,如优先级队列等。
//
//当调用/delayedMsg、/delayedMsg3、/delayedMsg2接口时,生产者会依次将消息发送到配置了延迟功能的交换机(假设已经安装并配置了RabbitMQ的延迟消息插件)。尽管每条消息都设置了不同的延迟时间,但这些延迟时间是在消息从交换机路由到队列之前应用的。一旦消息被路由到队列,它们就会按照到达队列的顺序排列。
//
//如果消费者是从这个队列中按照FIFO原则拉取消息,那么消费顺序就会与发送顺序一致,因为消息是按照它们被发送到队列的顺序被排列和消费的。
//
//简而言之,尽管为消息设置了延迟,但这些延迟是在消息被发送到队列之前应用的。一旦消息进入队列,它们就会按照到达的顺序被排列,并由消费者按照这个顺序进行消费。这就是为什么消费顺序与发送顺序一致的原因。//setDelay 方法并不是用来设置TTL的,而是用来设置消息在交换机中的延迟时间,这是RabbitMQ的延迟消息插件提供的功能。这个延迟是指消息在被发送到队列之前,会在交换机中等待指定的时间。一旦延迟时间过去,消息才会被路由到相应的队列中。
//
//现在,关于消息发送顺序问题,实际上与消息的延迟时间无关。消息的发送顺序是由生产者发送消息的时间点决定的。当依次调用 /delayedMsg、/delayedMsg3、/delayedMsg2 接口时,生产者会按照这个顺序发送消息。因此,即使为每条消息设置了不同的延迟时间,消息的发送顺序仍然是 "消息1","消息3","消息2"。
//
//这里的关键是理解“发送顺序”和“消费顺序”的区别。发送顺序是生产者将消息发送到RabbitMQ的顺序,而消费顺序是消费者从队列中获取并处理消息的顺序。如果希望按照消息的延迟时间顺序来消费消息,那么需要在消费者端进行适当的处理,例如使用优先级队列或根据消息的延迟时间进行排序等。
//
//总结一下,虽然为每条消息设置了不同的延迟时间,但这并不影响消息的发送顺序。发送顺序仍然是由生产者发送消息的时间点决定的。而消息的延迟时间只是在消息被路由到队列之前,在交换机中的等待时间。因此,观察到的现象与RabbitMQ的延时队列TTL设置并不矛盾。//如果/delayedMsg、/delayedMsg3和/delayedMsg2请求几乎同时到达交换机,并且分别为这些请求设置了不同的延迟时间,理论上期望的是消息会按照它们各自的延迟时间被路由到队列,进而被消费者按照延迟到期的顺序消费。然而,观察到的是"消息1","消息3","消息2"这样的顺序。
//
//这种情况可能是由于以下几个原因造成的:
//
//消息到达交换机的微小时间差:
//即使认为请求是几乎同时发出的,实际上在网络传输和RabbitMQ服务器处理过程中,仍然可能存在微小的时间差。这些微小的时间差可能导致消息到达交换机的顺序与您发送请求的顺序一致,因此交换机仍然会按照FIFO的原则处理这些消息,即使它们被设置了不同的延迟。
//
//延迟时间的精度:
//RabbitMQ的延迟消息插件在处理延迟时可能存在一定的精度误差。如果延迟时间相差不大,这些误差可能会影响消息的实际路由顺序。
//
//队列行为:
//一旦消息从交换机路由到队列,它们将遵循队列的FIFO原则。如果消息几乎同时到达队列(即在延迟时间结束后几乎同时被路由到队列),那么队列将按照它们到达的顺序来处理这些消息。
//
//系统负载和性能:
//在高负载情况下,RabbitMQ服务器的性能可能会影响消息的处理速度。这可能导致实际延迟时间与预期有所偏差。
//
//为了验证和调试这种情况,可以考虑以下步骤:
//
//增加日志记录:在生产者和消费者端记录消息发送和消费的时间戳,以便更准确地跟踪消息的生命周期。
//调整延迟时间:尝试设置更明显的延迟时间差,以观察消息是否按照预期的延迟顺序被消费。
//检查RabbitMQ配置:确保交换机和队列的配置正确,且延迟插件正常工作。
//性能监控:监控RabbitMQ服务器的性能指标,以确保系统资源不是瓶颈。
//最后,如果确实需要严格按照延迟时间的顺序处理消息,可能需要在消费者端实现额外的逻辑来确保这一点,例如通过检查消息中的时间戳或元数据来确定处理顺序。@RestController
public class Producer {@Autowiredprivate AmqpTemplate rabbitTemplate;//模拟下订单@GetMapping("delayedMsg")public String sendMsg1( ){String msg1 = "消息1";
//        String msg2 = "消息2";
//        String msg3 = "消息3";rabbitTemplate.convertAndSend("delayed-exchange","key3",msg1,message ->{message.getMessageProperties().setDelay(50000);return  message;});//        rabbitTemplate.convertAndSend("delayed-exchange","key3",msg3,message ->{
//            message.getMessageProperties().setDelay(10000);
//            return  message;
//        });
//
//        rabbitTemplate.convertAndSend("delayed-exchange","key3",msg2,message ->{
//            message.getMessageProperties().setDelay(8000);
//            return  message;
//        });return "生产者发送消息成功";}@GetMapping("delayedMsg2")public String sendMsg2( ){String msg2 = "消息2";rabbitTemplate.convertAndSend("delayed-exchange","key3",msg2,message ->{message.getMessageProperties().setDelay(80000);return  message;});return "生产者发送消息成功";}
//
//
//
//
//
//
//
//
//
//@GetMapping("delayedMsg3")public String sendMsg3( ){String msg3 = "消息3";rabbitTemplate.convertAndSend("delayed-exchange","key3",msg3,message ->{message.getMessageProperties().setDelay(100000);return  message;});return "生产者发送消息成功";}}

Consumer.java


package com.example.delay;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
public class Consumer {@RabbitListener(queues = "delayed-queue")public void getMsg(Message message, Channel channel){try {channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);System.out.println("消费者收到的消息是:" + message);} catch (IOException e) {throw new RuntimeException(e);}}
}

application.yaml


server:servlet:context-path: /app
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)listener:simple:acknowledge-mode: manual # 手动消息确认concurrency: 1 #消费者数量max-concurrency: 1  #消费者最大数量prefetch: 1  #消费者每次从队列中取几个消息

http://www.ppmy.cn/ops/20303.html

相关文章

视频通话实时换脸:支持训练面部模型 | 开源日报 No.235

iperov/DeepFaceLive Stars: 19.7k License: GPL-3.0 DeepFaceLive 是一个用于 PC 实时流媒体或视频通话的人脸换装工具。 可以使用训练好的人脸模型从网络摄像头或视频中交换面部。提供多个公共面部模型&#xff0c;包括 Keanu Reeves、Mr. Bean 等。支持自己训练面部模型以…

web server apache tomcat11-24-Virtual Hosting and Tomcat

前言 整理这个官方翻译的系列&#xff0c;原因是网上大部分的 tomcat 版本比较旧&#xff0c;此版本为 v11 最新的版本。 开源项目 从零手写实现 tomcat minicat 别称【嗅虎】心有猛虎&#xff0c;轻嗅蔷薇。 系列文章 web server apache tomcat11-01-官方文档入门介绍 web…

【GNS3 GraduProj】路由器Ansible脚本测试(文件备份)

R1DhcpPoolReception.yml &#xff08;测试成功&#xff09; --- - name: Routers Configurationhosts: R1gather_facts: falseconnection: network_clitasks:- name: DHCP Configios_config:parents: "ip dhcp pool Reception"lines:- network 192.168.10.0 255.2…

责任链模式的应用

设计模式责任链模式 责任链模式介绍 概述 责任链模式是一种行为型模式。责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,直到链上的某一个对象决定处理此请求。发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求…

目标检测(任务概述、机器学习时代的方法、深度学习时代的目标检测模型)

文章目录 目标检测任务概述机器学习时代的目标检测方法深度学习时代的目标检测模型基于提议的目标检测模型R-CNN 模型Fast RCNN 模型Faster RCNN 模型SSD 模型YOLO 模型 基于分割的目标检测模型FCN 模型U-Net 模型 目标检测任务概述 目标检测任务的概念&#xff1a;尝试从一张…

mysql添加普通索引(简单使用)

前言&#xff1a;以订单表&#xff08;oms_order&#xff09;为例&#xff0c;添加用户id&#xff08;user_id&#xff09;为普通索引 mysql添加普通索引&#xff08;简单使用&#xff09; 1.查看表已经存在的索引情况a.语法b.使用c.结果&#xff08;这里还没有添加所以&#…

vue的axios使用!

什么是axios? 1.axios是一个基于 promise 的 HTTP 库&#xff0c;可以用在浏览器和 node.js 中, 也是 vue 官方推荐使用的 http 库&#xff1b;封装axios&#xff0c;一方面为了以后维护方便&#xff0c;另一方面也可以对请求进行自定义处理。 如何安装&#xff1f; npm in…

Windows 11 轻量简单的美化方案

Windows 11 美化最终往往是回到最初的默认配置 每次更新版本、重装系统都要维护无疑是麻烦事 这里存一下我的简易轻量级美化&#xff0c;多数云端同步&#xff0c;开箱即用 壁纸 Wallpaper engine 直接 steam 同步即可 HDR 在 post-processing 开启 多屏支持也比较好 类…