【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(下)

news/2024/11/28 19:45:36/

承接上文

承接上一篇文章【算法数据结构专题】「延时队列算法」史上手把手教你针对层级时间轮(TimingWheel)实现延时队列的开发实战落地(上)】我们基本上对层级时间轮算法的基本原理有了一定的认识,本章节就从落地的角度进行分析和介绍如何通过Java进行实现一个属于我们自己的时间轮服务组件,最后,在告诉大家一下,其实时间轮的技术是来源于生活中的时钟。

时间轮演示结构总览

无序列表时间轮

【无序列表时间轮】主要是由LinkedList链表和启动线程、终止线程实现。

遍历定时器中所有节点,将剩余时间为 0s 的任务进行过期处理,在执行一个周期。

  • 无序链表:每一个延时任务都存储在该链表当中(无序存储)。
  • 启动线程: 直接在链表后面push ,时间复杂度 O(1)。
  • 终止线程: 直接在链表中删除节点,时间复杂度 O(1) 。

遍历周期:需要遍历链表中所有节点,时间复杂度 O(n),所以伴随着链表中的元素越来越多,速度也会越来越慢!

无序列表时间轮的长度限制了其适用场景,这里对此进行优化。因此引入了有序列表时间轮。

有序列表时间轮

与无序列表时间轮一样,同样使用链表进行实现和设计,但存储的是绝对延时时间点

  • 启动线程有序插入,比较时间按照时间大小有序插入,时间复杂度O(n),主要耗时在插入操作
  • 终止线程链表中查找任务,删除节点,时间复杂度O(n),主要耗时在插入操作

找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1),从上面的描述「有序列表定时器」的性能瓶颈在于插入时的任务排序,但是换来的就是缩短了遍历周期。

所以我们如果要提高性,就必须要提升一下插入和删除以及检索的性能,因此引入了「树形有序列表时间轮」在「有序列表定时器」的基础上进行优化,以有序树的形式进行任务存储。

树形有序列表时间轮

  • 启动定时器: 有序插入,比较时间按照时间大小有序插入,时间复杂度 O(logn)
  • 终止定时器: 在链表中查找任务,删除节点,时间复杂度 O(logn)
  • 周期清算: 找到执行最后一个过期任务即可,无需遍历整个链表,时间复杂度 O(1)

层级时间轮

整体流程架构图,如下所示。

对应的原理,在这里就不进行赘述了,之前本人已经有两篇文章对层级式时间轮进行了较为详细的介绍了,有需要的小伙伴,可以直接去前几篇文章去学习,接下来我们进行相关的实现。

时间轮数据模型

时间轮(TimingWheel)是一个存储定时任务的环形队列,数组中的每个元素可以存放一个定时任务列表,其中存放了真正的定时任务,如下图所示。

时间轮的最基本逻辑模型,由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs),所以我们先来设计和定义开发对应的时间轮的轮盘模型。命名为Roulette类。

轮盘抽象类-Roulette

之所以定义这个抽象类

public abstract class Roulette {// 链表数据-主要用于存储每个延时任务节点List<TimewheelTask> tasks = null;// 游标指针索引protected int index;// 时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格protected int capacity;// 时间轮轮盘的层级,如果是一级,它的上级就是二级protected Integer level;private AtomicInteger num = new AtomicInteger(0);// 构造器public Roulette(int capacity, Integer level) {this.capacity = capacity;this.level = level;this.tasks = new ArrayList<>(capacity);this.index = 0;}// 获取当前下表的索引对应的时间轮的任务public TimewheelTask getTask() {return tasks.get(index);}// init初始化操作机制public List<TimewheelTask> init() {long interval = MathTool.power((capacity + 1), level);long add = 0;TimewheelTask delayTask = null;for (int i = 0; i < capacity; i++) {add += interval;if (level == 0) {delayTask = new DefaultDelayTask(level);} else {delayTask = new SplitDelayTask(level);}//已经转换为最小的时间间隔delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());tasks.add(delayTask);}return tasks;}// 索引下标移动public void indexAdd() {this.index++;if (this.index >= capacity) {this.index = 0;}}// 添加对应的任务到对应的队列里面public void addTask(TimewheelTask task) {tasks.add(task);}// 给子类提供的方法进行实现对应的任务添加功能public abstract void addTask(int interval, MyTask task);
}
时间轮盘的熟悉信息介绍

链表数据-主要用于存储每个延时任务节点。

List<TimewheelTask> tasks = null;

tasks也可以改成双向链表 + 数组的结构:即节点存贮的对象中有指针,组成环形,可以通过数组的下标灵活访问每个节点,类似 LinkedHashMap。

游标指针索引

protected int index;

时间轮轮盘的容量大小,如果是分钟级别,一般就是60个格

protected int capacity;

时间轮轮盘的层级,如果是一级,它的上级就是二级

protected Integer level;

init初始化时间轮轮盘对象模型,主要用于分配分配每一个轮盘上面元素的TimewheelTask,用于延时队列的执行任务线程,已经分配对应的每一个节点的延时时间节点数据。

 public List<TimewheelTask> init() {//  那么整个时间轮的总体时间跨度(interval)long interval = MathTool.power((capacity + 1), level);long add = 0;TimewheelTask delayTask = null;for (int i = 0; i < capacity; i++) {add += interval;if (level == 0) {delayTask = new ExecuteTimewheelTask(level);} else {delayTask = new MoveTimewheelTask(level);}//已经转换为最小的时间间隔delayTask.setDelay(add, TimeUnitProvider.getTimeUnit());tasks.add(delayTask);}return tasks;
}
  • 整数a的n次幂:interval,计算跨度,主要是各级别之间属于平方倍数

例如,第一层:20 ,第二层:20^2 …

    //例如 n=7  二进制 0   1                 1          1//a的n次幂 = a的2次幂×a的2次幂  ×   a的1次幂×a的1次幂  ×apublic static long power(long a, int n) {int rtn = 1;while (n >= 1) {if((n & 1) == 1){rtn *= a;}a *= a;n = n >> 1;}return rtn;}
TimeUnitProvider工具类

主要用于计算时间单位操作的转换

public class TimeUnitProvider {private static TimeUnit unit = TimeUnit.SECONDS;public static TimeUnit getTimeUnit() {return unit;}
}

代码简介:

  • interval:代表着初始化的延时时间数据值,主要用于不同的层次的出发时间数据
  • for (int i = 0; i < capacity; i++) :代表着进行for循环进行添加对应的延时队列任务到集合中
  • add += interval,主要用于添加对应的延时队列的延时数据值!并且分配给当前轮盘得到所有数据节点。

获取当前下标的索引对应的时间轮的任务节点

public TimewheelTask getTask() {return tasks.get(index);
}
层级时间轮的Bucket数据桶

在这里我们建立了一个TimewheelBucket类实现了Roulette轮盘模型,从而进行建立对应的我们的层级时间轮的数据模型,并且覆盖了addTask方法。

public class TimewheelBucket extends Roulette {public TimewheelBucket(int capacity, Integer level) {super(capacity, level);}public synchronized void addTask(int interval, MyTask task) {interval -= 1;int curIndex = interval + this.index;if (curIndex >= capacity) {curIndex = curIndex - capacity;}tasks.get(curIndex).addTask(task);}
}

添加addTask方法,进行获取计算对应的下标,并且此方法add操作才是对外开发调用的,在这里,我们主要实现了根据层级计算出对应的下标进行获取对应的任务执行调度点,将我们外界BizTask,真正的业务操作封装到这个BizTask模型,交由我们的系统框架进行执行。

     public synchronized void addTask(int interval, BizTask task) {interval -= 1;int curIndex = interval + this.index;if (curIndex >= capacity) {curIndex = curIndex - capacity;}tasks.get(curIndex).addTask(task);}
时间轮轮盘上的任务点

我们针对于时间轮轮盘的任务点进行设计和定义对应的调度执行任务模型。一个调度任务点,可以帮到关系到多个BizTask,也就是用户提交上来的业务任务线程对象,为了方便采用延时队列的延时处理模式,再次实现了Delayed这个接口,对应的实现代码如下所示:

Delayed接口
public interface Delayed extends Comparable<Delayed> {/*** Returns the remaining delay associated with this object, in the* given time unit.** @param unit the time unit* @return the remaining delay; zero or negative values indicate* that the delay has already elapsed*/long getDelay(TimeUnit unit);
}
TimewheelTask时间轮刻度点
@Getter
public abstract class TimewheelTask implements Delayed {private List<BizTask> tasks = new ArrayList<BizTask>();private int level;private Long delay;private long calDelay;private TimeUnit calUnit;public TimewheelTask(int level) {this.level = level;}public void setDelay(Long delay, TimeUnit unit) {this.calDelay=delay;this.calUnit=unit;}public void calDelay() {this.delay = TimeUnit.NANOSECONDS.convert(this.calDelay, this.calUnit) + System.nanoTime(); }public long getDelay(TimeUnit unit) {return this.delay - System.nanoTime();}public int compareTo(Delayed o) {long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));return (d == 0) ? 0 : ((d < 0) ? -1 : 1);}public void addTask(BizTask task) {synchronized (this) {tasks.add(task);}}public void clear() {tasks.clear();}public abstract void run();
}
  • 业务任务集合:private List<BizTask> tasks = new ArrayList<BizTask>();

    • 层级
    private int level;
    
    • 延时时间
      private Long delay;
    
    • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列
      private long calDelay;
      
    • 实际用于延时计算的时间,就是底层是统一化所有的延时时间到对应的延时队列(用于统一化的时间单位)
      private TimeUnit calUnit;
      
添加对应的业务延时任务到轮盘刻度点
    public void addTask(BizTask task) {synchronized (this) {tasks.add(task);}}
刻度点的实现类

因为对应的任务可能会需要将下游的业务任务进行升级或者降级,所以我们会针对于执行任务点分为,执行任务刻度点和跃迁任务刻度点两种类型。

  • 执行任务延时队列刻度点
public class ExecuteTimewheelTask extends TimewheelTask {public ExecuteTimewheelTask(int level) {super(level);}//到时间执行所有的任务public void run() {List<BizTask> tasks = getTasks();if (CollectionUtils.isNotEmpty(tasks)) {tasks.forEach(task -> ThreadPool.submit(task));}}
}

再次我们就定义执行这些任务的线程池为:

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 100, 3, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(10000),new MyThreadFactory("executor"), new ThreadPoolExecutor.CallerRunsPolicy());
  • 跃迁任务延时队列刻度点
public class MoveTimewheelTask extends TimewheelTask {public MoveTimewheelTask(int level) {super(level);}//跃迁到其他轮盘,将对应的任务public void run() {List<BizTask> tasks = getTasks();if (CollectionUtils.isNotEmpty(tasks)) {tasks.forEach(task -> {long delay = task.getDelay();TimerWheel.adddTask(task,delay, TimeUnitProvider.getTimeUnit());});}}
}

致辞整个时间轮轮盘的数据模型就定义的差不多了,接下来我们需要定义运行在时间轮盘上面的任务模型,BizTask基础模型。

BizTask基础模型
public abstract class BizTask implements Runnable {protected long interval;protected int index;protected long executeTime;public BizTask(long interval, TimeUnit unit, int index) {this.interval  = interval;this.index = index;this.executeTime= TimeUnitProvider.getTimeUnit().convert(interval,unit)+TimeUnitProvider.getTimeUnit().convert(System.nanoTime(),TimeUnit.NANOSECONDS);}public long getDelay() {return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);}
}

主要针对于任务执行,需要交给线程池去执行,故此,实现了Runnable接口。

  • protected long interval;:跨度操作
  • protected int index;:索引下表,在整个队列里面的下表处理
  • protected long executeTime;:对应的执行时间

其中最重要的便是获取延时时间的操作,主要提供给框架的Delayed接口进行判断是否到执行时间了。

     public long getDelay() {return this.executeTime - TimeUnitProvider.getTimeUnit().convert(System.nanoTime(), TimeUnit.NANOSECONDS);}
层级时间轮的门面TimerWheel

最后我们要进行定义和设计开发对应的整体的时间轮层级模型。

public class TimerWheel {private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();//一个轮表示三十秒private static int interval = 30;private static wheelThread wheelThread;public static void adddTask(BizTask task, Long time, TimeUnit unit) {if(task == null){return;}long intervalTime = TimeUnitProvider.getTimeUnit().convert(time, unit);if(intervalTime < 1){ThreadPool.submit(task);return;}Integer[] wheel = getWheel(intervalTime,interval);TimewheelBucket taskList = cache.get(wheel[0]);if (taskList != null) {taskList.addTask(wheel[1], task);} else {synchronized (cache) {if (cache.get(wheel[0]) == null) {taskList = new TimewheelBucket(interval-1, wheel[0]);wheelThread.add(taskList.init());cache.putIfAbsent(wheel[0],taskList);}}taskList.addTask(wheel[1], task);}}static{interval = 30;wheelThread = new wheelThread();wheelThread.setDaemon(false);wheelThread.start();}private static Integer[] getWheel(long intervalTime,long baseInterval) {//转换后的延时时间if (intervalTime < baseInterval) {return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};} else {return getWheel(intervalTime,baseInterval,baseInterval, 1);}}private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {long nextInterval = baseInterval * interval;if (intervalTime < nextInterval) {return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};} else {return getWheel(intervalTime,baseInterval,nextInterval, (p+1));}}static class wheelThread extends Thread {DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();public DelayQueue<TimewheelTask> getQueue() {return queue;}public void add(List<TimewheelTask> tasks) {if (CollectionUtils.isNotEmpty(tasks)) {tasks.forEach(task -> add(task));}}public void add(TimewheelTask task) {task.calDelay();queue.add(task);}@Overridepublic void run() {while (true) {try {TimewheelTask task = queue.take();int p = task.getLevel();long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));TimewheelBucket timewheelBucket = cache.get(p);synchronized (timewheelBucket) {timewheelBucket.indexAdd();task.run();task.clear();}task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());task.calDelay();queue.add(task);} catch (InterruptedException e) {}}}}
}
TimerWheel的模型定义
private static Map<Integer, TimewheelBucket> cache = new ConcurrentHashMap<>();

一个轮表示30秒的整体跨度。

private static int interval = 30;

创建整体驱动的执行线程

private static wheelThread wheelThread;static{interval = 30;wheelThread = new wheelThread();wheelThread.setDaemon(false);wheelThread.start();
}static class wheelThread extends Thread {DelayQueue<TimewheelTask> queue = new DelayQueue<TimewheelTask>();public DelayQueue<TimewheelTask> getQueue() {return queue;}public void add(List<TimewheelTask> tasks) {if (CollectionUtils.isNotEmpty(tasks)) {tasks.forEach(task -> add(task));}}public void add(TimewheelTask task) {task.calDelay();queue.add(task);}@Overridepublic void run() {while (true) {try {TimewheelTask task = queue.take();int p = task.getLevel();long nextInterval = MathTool.power(interval, Integer.valueOf(String.valueOf(MathTool.power(2, p))));TimewheelBucket timewheelBucket = cache.get(p);synchronized (timewheelBucket) {timewheelBucket.indexAdd();task.run();task.clear();}task.setDelay(nextInterval, TimeUnitProvider.getTimeUnit());task.calDelay();queue.add(task);} catch (InterruptedException e) {}}}
获取对应的时间轮轮盘模型体系
    private static Integer[] getWheel(long intervalTime,long baseInterval) {//转换后的延时时间if (intervalTime < baseInterval) {return new Integer[]{0, Integer.valueOf(String.valueOf((intervalTime % 30)))};} else {return getWheel(intervalTime,baseInterval,baseInterval, 1);}}private static Integer[] getWheel(long intervalTime,long baseInterval,long interval, int p) {long nextInterval = baseInterval * interval;if (intervalTime < nextInterval) {return new Integer[]{p, Integer.valueOf(String.valueOf(intervalTime / interval))};} else {return getWheel(intervalTime,baseInterval,nextInterval, (p+1));}}

到这里相信大家,基本上应该是了解了如何去实现对应的时间轮盘的技术实现过程,有兴趣希望整个完整源码的,可以联系我哦。谢谢大家!


http://www.ppmy.cn/news/41758.html

相关文章

RecvByteBufAllocator内存分配计算

虽然了解了整个内存池管理的细节&#xff0c;包括它的内存分配的具体逻辑&#xff0c;但是每次从NioSocketChannel中读取数据时&#xff0c;应该分配多少内存去读呢&#xff1f; 例如&#xff0c;客户端发送的数据为1KB , 应该分配多少内存去读呢&#xff1f; 例如&#xff1a;…

PCB如何消除电源噪声?六个技巧教你轻松搞定

在高频PCB设计中&#xff0c;电源噪声无疑是最常见的电磁干扰现象&#xff0c;也是许多小白工程师最怕遇见的设计难点&#xff0c;如果在PCB设计时没有很好处理电源噪声&#xff0c;很容易对后续高频信号造成很大的影响&#xff0c;甚至电路无法正常运行&#xff0c;不仅浪费时…

Leetcode.2507 使用质因数之和替换后可以取到的最小值

题目链接 Leetcode.2507 使用质因数之和替换后可以取到的最小值 Rating &#xff1a; 1500 题目描述 给你一个正整数 n 。 请你将 n 的值替换为 n 的 质因数 之和&#xff0c;重复这一过程。 注意&#xff0c;如果 n 能够被某个质因数多次整除&#xff0c;则在求和时&#x…

【Docker】从 Docker 镜像中下载内容到本地

使用 docker run 命令启动镜像并进入容器。 docker run -it --name my-container my-image:tag /bin/bash其中 my-container 为你给容器取的名字。 在容器中进行所需的操作&#xff0c;例如下载文件到容器中。 使用 docker cp 命令将容器中的文件复制到本地。 docker cp my…

fl studio插件在哪个文件夹里 fl studio插件怎么用

fl studio是一个全能数字音乐工作台&#xff0c;集编曲、剪辑、录音和混音为一体&#xff0c;致力于把电脑变为全功能音乐工作室。fl studio具有专业的调音台&#xff0c;提供有复杂作品所需的所有功能&#xff0c;另外fl studio的Pattern和Song模式可以更加快速的制作Hip-hop、…

Platform虚拟总线(设备驱动分离详解)

目录 1.驱动结构体​编辑 C语言语法&#xff1a; (struct platform_device *) platform驱动编写 2.设备结构体 1.驱动结构体 led驱动配置 platform_device是device的子类&#xff0c;设备数据类型为device&#xff0c;通过device_register向内核注册设备&#xff0c…

【刷题】小技巧

好久没更了 写天梯模拟L1都有题不能AC&#xff0c;是什么品种的蒟蒻 L1-7 谷歌的招聘 题目详情 - L1-7 谷歌的招聘 (pintia.cn) 自己写半天都是Segmentation Fault&#xff0c; 学习一下几个函数叭// 1.substr&#xff08;&#xff09;函数 获取子串 #include<bits/st…

Spring —— Spring Boot 创建和使用

JavaEE传送门JavaEE Spring —— Spring简单的读取和存储对象 Ⅱ Spring —— Bean 作用域和生命周期 目录Spring Boot 创建和使用Spring BootSpring Boot 项目创建使用 IDEA 创建网页版创建Spring Boot 目录介绍运行 Spring Boothello world约定大于配置Spring Boot 创建和使…