golang实现延迟队列(delay queue)

news/2025/2/12 19:35:05/

golang实现延迟队列

1 延迟队列:邮件提醒、订单自动取消

延迟队列:处理需要在未来某个特定时间执行的任务。这些任务被添加到队列中,并且指定了一个执行时间,只有达到指定的时间点时才能从队列中取出并执行。
应用场景:

  • 邮件提醒
  • 订单自动取消(超过多少时间未支付,就取消订单)
  • 对超时任务的处理等

由于任务的执行是在未来的某个时间点,因此这些任务不会立即执行,而是存储在队列中,直到它的预定执行时间才会被执行。

2 实现

2.1 simple简单版:go自带的time包实现

思路:

  1. 定义Task结构体,包含
  • ExecuteTime time.Time
  • Job func()
  1. 定义DelayQueue
  • TaskQueue []Task
  • func AddTask
  • func RemoveTask
  • ExecuteTask

这种方案存在的问题:

Go程序重启时,存储在slice中的延迟处理任务将全部丢失

完整代码:

package mainimport ("fmt""time"
)/*
基于go实现延迟队列
*/
type Task struct {ExecuteTime time.TimeJob         func()
}type DelayQueue struct {Tasks []*Task
}func (d *DelayQueue) AddTask(t *Task) {d.Tasks = append(d.Tasks, t)
}func (d *DelayQueue) RemoveTask() {//FIFO: remove the first task to enqueued.Tasks = d.Tasks[1:]
}func (d *DelayQueue) ExecuteTask() {for len(d.Tasks) > 0 {//dequeue a taskcurrentTask := d.Tasks[0]if time.Now().Before(currentTask.ExecuteTime) {//if the task execution time is not up, waittime.Sleep(currentTask.ExecuteTime.Sub(time.Now()))}//execute the taskcurrentTask.Job()//remove task who has been executedd.RemoveTask()}}func main() {fmt.Println("start delayQueue")delayQueue := &DelayQueue{}firstTask := &Task{ExecuteTime: time.Now().Add(time.Second * 1),Job: func() {fmt.Println("executed task 1 after delay")},}delayQueue.AddTask(firstTask)secondTask := &Task{ExecuteTime: time.Now().Add(time.Second * 7),Job: func() {fmt.Println("executed task 2 after delay")},}delayQueue.AddTask(secondTask)delayQueue.ExecuteTask()fmt.Println("all tasks have been done!!!")
}

效果:
在这里插入图片描述

2.2 complex持久版:go+redis

为了防止Go重启后存储到delayQueue的数据丢失,我们可以将任务持久化到redis中。

思路:

  1. 初始化redis连接
  2. 延迟队列采用redis的zset(有序集合)实现

前置准备:

# 安装docker
yum install -y yum-utils
yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis

完整代码:

package mainimport ("fmt""github.com/go-redis/redis"log "github.com/ziyifast/log""time"
)/*
基于redis zset实现延迟队列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"func initClient() (err error) {redisdb = redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // not set passwordDB:       0,  //use default db})_, err = redisdb.Ping().Result()if err != nil {log.Errorf("%v", err)return err}return nil
}func main() {err := initClient()if err != nil {log.Errorf("init redis client err: %v", err)return}addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())//执行队列中的任务getAndExecuteTask()
}// executeTime为unix时间戳,作为zset中的score。允许redis按照task应该执行时间来进行排序
func addTaskToQueue(task string, executeTime int64) {err := redisdb.ZAdd(DelayQueueKey, redis.Z{Score:  float64(executeTime),Member: task,}).Err()if err != nil {panic(err)}
}// 从redis中取一个task并执行
func getAndExecuteTask() {for {tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{Min:    "-inf",Max:    fmt.Sprintf("%d", time.Now().Unix()),Offset: 0,Count:  1,}).Result()if err != nil {time.Sleep(time.Second * 1)continue}//处理任务for _, task := range tasks {fmt.Println("Execute task: ", task)//执行完任务之后用 ZREM 移除该任务redisdb.ZRem(DelayQueueKey, task)}time.Sleep(time.Second * 1)}
}

效果:

redis一直从延迟队列中取数据,如果处理完一批则睡眠1s

  • 具体根据大家的业务调整,此处主要介绍思路

在这里插入图片描述


http://www.ppmy.cn/news/1361009.html

相关文章

P8630 [蓝桥杯 2015 国 B] 密文搜索

P8630 [蓝桥杯 2015 国 B] 密文搜索 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn)https://www.luogu.com.cn/problem/P8630 题目分析 基本上是hash的板子,但实际上对于密码串,只要判断主串中任意连续的八个位置是否存在密码串即可;那么我们…

CiteSpace for Mac 最新保姆级教程

CiteSpace需要在Java的环境下运行,上篇文章分享了Java入门操作,安装好Java后,就可以下载安装文献分析神器CiteSpace。​​​​​​​ 注意!!CiteSpace的版本是要和Java版本匹配的。比如:要安装CiteSpace&a…

【Java程序设计】【C00286】基于Springboot的生鲜交易系统(有论文)

基于Springboot的生鲜交易系统(有论文) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的生鲜交易系统 本系统分为系统功能模块、管理员功能模块、用户功能模块以及商家功能模块。 系统功能模块:在系统首页可以…

85、字符串操作的优化

上一节介绍了在模型的推理优化过程中,动态内存申请会带来额外的性能损失。 Python 语言在性能上之所以没有c++高效,有一部分原因就在于Python语言将内存的动态管理过程给封装起来了,我们作为 Python 语言的使用者是看不到这个过程的。 这一点有点类似于 c++ 标准库中的一些…

3_怎么看原理图之协议类接口之I2C笔记

I2C只连接I2CSCL与I2CSDA两根线,即2线制异步串行总线。 I2CSCL与I2CSDA两根线需要上拉电阻,目的是让电平有确定的状态。 发完8bit数据后,第9个电平拉低SDA为低电平。 比如传一个数据A0x410100 0001 IIC总线有多个从机设备的通信&#xff0c…

Atcoder ABC341 A-D题解

比赛链接:ABC341 Problem A: 先签个到。 #include <bits/stdc.h> using namespace std; int main() {int n;cin>>n;for(int i0;i<n;i)cout<<"10"<<endl;cout<<"1"<<endl;return 0; } Problem B: 继续签。 #i…

RocketMQ(七):跟着官网学习敲代码(备份)

接上篇&#xff1a;RocketMQ&#xff08;六&#xff09;&#xff1a;跟着官网学习敲代码 1 定时消息 在/bin目录下执行以下命令&#xff0c;创建定时主题&#xff1a; sh mqadmin updateTopic -c DefaultCluster -t DelayTopic001 -o true -n 127.0.0.1:9876 -a message.type…

2.20 Qt day1

一. 思维导图 二. 消化常用类的使用&#xff0c;以及常用成员函数对应的功能 按钮类QPushButton&#xff1a; mywidget.h&#xff1a; #ifndef MYWIDGET_H #define MYWIDGET_H#include <QWidget> #include<QPushButton>//按钮类 #include<QIcon>class MyW…