Golang RabbitMQ实现的延时队列

news/2024/10/17 15:22:46/

文章目录

  • 前言
  • 一、延时队列与应用场景
  • 二、RabbitMQ如何实现延时队列
    • 实现延时队列的基本要素
    • 整体的实现原理如下
  • 三、Go语言实战
    • 生产者
    • 消费者


前言

之前做秒杀商城项目的时候使用到了延时队列来解决订单超时问题,本博客就总结一下Golang是如何利用RabbitMQ实现的延时队列的。

一、延时队列与应用场景

延迟队列是一种特殊类型的消息队列,用于在一定时间后将消息投递给消费者。它可以用于处理需要延迟执行的任务或者具有定时特性的业务场景。使用延迟队列可以灵活地控制消息的发送和处理时间,适用于很多场景,如订单超时处理、提醒任务等

具体应用场景有如下:

  1. 订单取消:当订单生成时,将订单消息发送到延迟队列中,并设置延迟时间为十分钟。消费者在十分钟后接收到订单消息并进行关闭操作。

  2. 店铺商品提醒:在店铺创建时,将提醒消息发送到延迟队列中,并设置延迟时间为十天。消费者在十天后接收到消息并发送提醒通知。

  3. 用户登录提醒:用户注册成功后,将提醒消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并发送短信提醒。

  4. 退款通知:当用户发起退款时,将通知消息发送到延迟队列中,并设置延迟时间为三天。消费者在三天后接收到消息并通知相关运营人员。

  5. 会议提醒:在会议预定时,将提醒消息发送到延迟队列中,并设置延迟时间为预定时间前十分钟。消费者在指定时间点前十分钟接收到消息并发送会议参加通知。

通过使用延迟队列,可以在指定的时间点触发任务,避免了轮询的低效方式,并且能够满足大量数据和时效性的需求。这种方法提供了更高的性能和实时性,并有效减轻了系统的负载。
下图是订单超时处理的流程图。
在这里插入图片描述

二、RabbitMQ如何实现延时队列

虽然 rabbitmq 没有延时队列的功能,但是稍微变动一下也是可以实现的。
通过设置消息的 TTL 和 DLX 等参数,可以将消息转发到一个指定的队列中,以便在一定的时间后再进行处理。

实现延时队列的基本要素

1、存在一个倒计时机制:Time To Live(TTL)
2、当到达时间点的时候会触发一个发送消息的事件:Dead Letter Exchanges(DLX)

基于第一点,我利用的是消息存在过期时间这一特性, 消息一旦过期就会变成dead letter,可以让单独的消息过期,也可以设置整个队列消息的过期时间 而rabbitmq会有限取两个值的最小
**基于第二点,**是用到了rabbitmq的过期消息处理机制: . x-dead-letter-exchange 将过期的消息发送到指定的 exchange 中 . x-dead-letter-routing-key 将过期的消息发送到自定的 route当中

整体的实现原理如下

发送者将消息发送到延时队列上并设置过期时间,当过期时间到达时,消息会被自动转发到指定的交换机和队列中供接收者消费。
1、建立与 RabbitMQ 服务器的连接并创建通道。
2、发送者通过 ch.Publish 方法将消息发送到延时队列(“test_delay”)上,设置消息的过期时间。
3、延时队列中的消息在到达过期时间后会自动被发送到 “logs” 交换机,由交换机将消息广播给所有绑定的队列。
4、接收者通过监听 “test_logs” 队列接收并处理消息。当有消息到达时,会触发回调函数进行处理。
也就是说要实现延时队列,消费者必须试实现两个队列。
一个是延时队列(“test_delay”),另一个是接收延时消息的队列(“test_logs”)。

这两个队列的作用如下:
延时队列(“test_delay”):这个队列用于接收需要延时发送的消息。发送者通过将消息发送到延时队列,设置消息的过期时间。当消息过期时,RabbitMQ 会自动将消息转发到指定的交换机和队列中。
接收延时消息的队列(“test_logs”):这个队列用于接收延时消息。在示例中,这个队列是通过将 “test_logs” 队列绑定到 “logs” 交换机上来实现的。交换机会将消息广播给所有绑定的队列,因此当延时消息到达过期时间后,会被发送到这个队列中供消费者进行处理。
通过使用两个队列,消息可以被延时发送到指定的队列,并在过期后自动转发到接收队列,实现了延时发送和消费的功能。

三、Go语言实战

生产者

首先建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,通过 ch.Publish 方法将消息发送到延时队列上。这里使用的是空字符串作为交换机(exchange),表示不选择任何交换机,只将消息发送到指定的队列(“test_delay”)。在消息的属性中,设置了消息的过期时间为 5 秒。


func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()body := "hello"// 将消息发送到延时队列上err = ch.Publish("", 				// exchange 这里为空则不选择 exchange"test_delay",     	// routing keyfalse,  			// mandatoryfalse,  			// immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),Expiration: "5000",	// 设置五秒的过期时间})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}

消费者

同样建立与 RabbitMQ 服务器的连接,并创建一个通道。然后,声明了一个名为 “logs” 的交换机,类型为 “fanout”,并且可持久化,表示该交换机会将消息广播给所有绑定的队列。接着,声明了一个常规的队列 “test_logs”,并将其绑定到 “logs” 交换机上。之后,声明了一个延时队列 “test_delay”,并设置了该队列的 x-dead-letter-exchange 参数为 “logs”,即当消息过期时将消息发送到 “logs” 交换机。最后,通过 ch.Consume 方法监听 “test_logs” 队列,并在回调函数中处理接收到的消息。


func main() {// 建立链接conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个主要使用的 exchangeerr = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列q, err := ch.QueueDeclare("test_logs",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")/*** 注意,这里是重点!!!!!* 声明一个延时队列, ß我们的延时消息就是要发送到这里*/_, errDelay := ch.QueueDeclare("test_delay",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitamqp.Table{// 当消息过期时把消息发送到 logs 这个 exchange"x-dead-letter-exchange":"logs",},   // arguments)failOnError(errDelay, "Failed to declare a delay_queue")err = ch.QueueBind(q.Name, // queue name, 这里指的是 test_logs"",     // routing key"logs", // exchangefalse,nil)failOnError(err, "Failed to bind a queue")// 这里监听的是 test_logsmsgs, err := ch.Consume(q.Name, // queue name, 这里指的是 test_logs"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

http://www.ppmy.cn/news/1103853.html

相关文章

2.12 PE结构:实现PE字节注入

本章笔者将介绍一种通过Metasploit生成ShellCode并将其注入到特定PE文件内的Shell注入技术。该技术能够劫持原始PE文件的入口地址&#xff0c;在PE程序运行之前执行ShellCode反弹&#xff0c;执行后挂入后台并继续运行原始程序&#xff0c;实现了一种隐蔽的Shell访问。而我把这…

51单片机的智能台灯控制系统仿真( proteus仿真+程序+原理图+报告+讲解视频)

51单片机的红外光敏检测智能台灯控制系统仿真 1.主要功能&#xff1a;2.仿真3. 程序代码4. 原理图5. 设计报告6. 设计资料内容清单&&下载链接 51单片机的红外光敏检测智能台灯控制系统仿真( proteus仿真程序原理图报告讲解视频&#xff09; 仿真图proteus7.8及以上 程…

AI系统论文阅读:SmartMoE

提出稀疏架构是为了打破具有密集架构的DNN模型中模型大小和计算成本之间的连贯关系的——最著名的MoE。 MoE模型将传统训练模型中的layer换成了多个expert sub-networks&#xff0c;对每个输入&#xff0c;都有一层special gating network 来将其分配到最适合它的expert中&…

celery的用法--bind=True

通过将 bindTrue 设置为 app.task 装饰器的参数&#xff0c;Celery 会自动将任务实例绑定到第一个参数&#xff08;通常命名为 self&#xff09;&#xff0c;使得你可以在任务函数内部访问任务实例的属性和方法。 在 Celery 的任务函数中&#xff0c;self 参数代表任务实例本身…

Mysql树形表的两种查询方案(递归与自连接)

你有没有遇到过这样一种情况&#xff1a; 一张表就实现了一对多的关系&#xff0c;并且表中每一行数据都存在“爷爷-父亲-儿子-…”的联系&#xff0c;这也就是所谓的树形结构 对于这样的表很显然想要通过查询来实现价值绝对是不能只靠select * from table 来实现的&#xff0…

Java——》synchronized的使用

推荐链接&#xff1a; 总结——》【Java】 总结——》【Mysql】 总结——》【Redis】 总结——》【Kafka】 总结——》【Spring】 总结——》【SpringBoot】 总结——》【MyBatis、MyBatis-Plus】 总结——》【Linux】 总结——》【MongoD…

1分钟了解音频、语音数据和自然语言处理的关系

机器学习在日常场景中的应用 音频、语音数据和自然语言处理这三者正在不断促进人工智能技术的发展&#xff0c;人机交互也逐渐渗透进生活的每个角落。在各行各业包括零售业、银行、食品配送服务商&#xff09;的多样互动中&#xff0c;我们都能通过与某种形式的AI&#xff08;…

springboot整合log4j

1.log4j文件 <?xml version"1.0" encoding"UTF-8"?> <!--monitorInterval&#xff1a;Log4j2 自动检测修改配置文件和重新配置本身&#xff0c;设置间隔秒数--> <configuration monitorInterval"5"><!--日志级别以及优先…