任务调度之时间轮实现 | 京东云技术团队

2023年 7月 17日 40.9k 0

前言

在生活中太阳的东升西落,鸟类的南飞北归,四级的轮换,每天的上下班,海水的潮汐,每月的房租车贷等等,如果用程序员的视角看,这就是一个个的定时任务,在日常的开发工作中也有很多的定时任务场景:

  • 数仓系统凌晨进行的数据同步
  • 订单12小时未支付的状态校验
  • rpc调用超时时间的校验
  • 缓存数据失效时间的延长
  • 定时开启的促销活动
  • ……
  • 假如现在有一个任务需要3s后执行,你会如何实现?

    简单点,直接一个线程的休眠,thread.sleep(3000),一行代码就能达到目的,但是性能嘛……,由于每个任务都需要一个单独的线程,当系统中存在大量任务,

    任务调度

    假如,现在有一个任务需要3s后执行,你会如何实现呢?

    简单点,直接一个休眠,让线程sleep 3s,不就达到目的了吗?但是性能嘛……,由于每个任务都需要一个单独的线程,在系统中存在大量任务时,这种方案的消耗是极其巨大的,那么如何实现高效的调度呢?大佬们低头看了一眼手表,一个算法出现了

    时间轮的数据结构

    如图所示,这就是时间轮的一个基础结构,一个存储了定时任务的环形队列,可以理解为一个时间钟,队列的每个节点称为时间槽,每个槽位又使用列表存储着需要执行的定时任务。和生活中的钟表运行机制一样,每隔固定的单位时间,就会从一个槽位跳到下一个槽位,就像秒针跳动了一次,再取出当前槽位的任务进行执行。假如固定单位时间为1S,当前槽位位2,如果需要插入一个3S后的任务,就会在槽位5的的列表里加上当前任务。等指针运行到第五个槽位时,取出任务执行就可以了。

    时间轮的最大优势是在时间复杂度上的优势,一个任务简单的生命周期:

  • 创建任务,插入到数据结构中。
  • 查询任务,找到满足条件的任务
  • 执行任务。
  • 任务归档,从任务调度的列表中删出。
  • 其中第三步的执行时间是固定的,所以1,2,4这三部就的时间复杂度就决定了整个任务调度流程的复杂度,而时间轮是链式存储结构,所以在增删和查询时,时间复杂度都是0(1),其他常见的任务调度算法例如最小堆和红黑树以及跳表。

    最小堆是一颗完全二叉树而且子节点的值总是大于等于父节点的值,所以在插入时候需要判断父节点的关系,它的时间添加操作时间复杂度是O(logn),在任务执行时,只需要判断最顶节点就行,所以它的查询时间复杂度时哦O(1)。

    根据红黑树的特性已经被归纳法证明它的增加的时间复杂度是O(logn),查找最小节点的时间复杂度位O(h)。

    跳表的的本质是实现二分查找法的有序链表,但是他有多个层级,和红黑树的高度值相似,它的时间复杂度也是O(logn)

    高级时间轮

    如上图所示,如果一个刻度代表1S,那么一个周期就是1分钟,但是如果我一个任务是在3分钟后执行呢,如果是在一个12小时后执行呢?

    当然如果是单纯的增加环形链表的长度也是可以的,直接扩大到3600*24,一天一个周期,直接放进来。但是还有更好的办法。

    带轮次标记的任务

    任务执行轮次的计算公式:((任务执行时间-当前时间)/固定单位时间)%槽位数量

    根据槽位计算公式可以算出当前任务需要插入执行的轮次,我在任务上面加一个字段round,当每次执行到该槽位时,就遍历该槽位的任务列表,每个任务的round-1,取出来round=0的任务执行就行。

    for(Task task:taskList){
      int round= task.getRound();
       round=(round-1);
       task.setRound(round);
       if(round==0){
         doTask(task);
       }
    }
    

    如果任务间隔不是很大,看起来也是不错的一种解决方式。

    但是工作中有很多任务,延迟执行的时间是很久以后的,例如延保履约服务成功之后会有一个7天自动完成的定时任务,甚至有一些几年后才会执行的任务,如果都用round来处理的话,那这个round将会变的非常大的一个数字,也会在任务列表中插入很多当前不需要执行的任务,如果每次都执行上面的逻辑,显然会浪费大量的资源。

    多层时间轮

    多层时间轮的核心思想是:

    就上上图的水表,有很多小的表盘,但是每个表盘的刻度其实是不一样,又或者手表里的时分秒或者日历上的年月日。

    针对时间复杂度的问题:不做遍历计算round,只要到了当前槽位,就把任务列表的所有任务拿出来执行。

    针对空间复杂度的问题:分层,每个层级的时间轮刻度不一样,多个时间轮协调工作。

    如上图所示,第一次时间轮,每个刻度是1ms,一轮是20ms,第二个层时间轮的刻度是20ms,一轮就是400ms,第三层的刻度是400ms,一轮就是8000ms,每层的周期就等于 20ms *2的n次方。这要使用多层级时间轮就可以很容易把任务区分开来。每当高层次时间轮到达当前节点,就把任务降级到低层级的时间轮上。对于400ms的时间轮来说,小于1ms和小于399ms的任务都是过期任务,只要不大于400ms,都认为是过期任务。

    代码实现的话,往上也有很多,最近比较火热的POWER-JOB的分布式调度框架就是才有的时间轮算法,粘贴下核心代码大家看下:

    1.首先定义了一个任务接口

    public interface TimerTask extends Runnable {
    }
    

    2.调度中的任务对象

    public interface TimerFuture {
    
        /**
         * 获取实际要执行的任务
         * @return
         */
        TimerTask getTask();
    
        /**
         * 取消任务
         * @return
         */
        boolean cancel();
    
        /**
         * 任务是否取消
         * @return
         */
        boolean isCancelled();
    
        /**
         * 任务是否完成
         * @return
         */
        boolean isDone();
    }
    

    3.调度器接口

    public interface Timer {
    
        /**
         * 调度定时任务
         */
        TimerFuture schedule(TimerTask task, long delay, TimeUnit unit);
    
        /**
         * 停止所有调度任务
         */
        Set stop();
    }
    

    4.时间轮的实现

    public class HashedWheelTimer implements Timer {
    
        private final long tickDuration;
        private final HashedWheelBucket[] wheel;
        private final int mask;
    
        private final Indicator indicator;
    
        private final long startTime;
    
        private final Queue waitingTasks = Queues.newLinkedBlockingQueue();
        private final Queue canceledTasks = Queues.newLinkedBlockingQueue();
    
        private final ExecutorService taskProcessPool;
    
        public HashedWheelTimer(long tickDuration, int ticksPerWheel) {
            this(tickDuration, ticksPerWheel, 0);
        }
    
        /**
         * 新建时间轮定时器
         * @param tickDuration 时间间隔,单位毫秒(ms)
         * @param ticksPerWheel 轮盘个数
         * @param processThreadNum 处理任务的线程个数,0代表不启用新线程(如果定时任务需要耗时操作,请启用线程池)
         */
        public HashedWheelTimer(long tickDuration, int ticksPerWheel, int processThreadNum) {
    
            this.tickDuration = tickDuration;
    
            // 初始化轮盘,大小格式化为2的N次,可以使用 & 代替取余
            int ticksNum = CommonUtils.formatSize(ticksPerWheel);
            wheel = new HashedWheelBucket[ticksNum];
            for (int i = 0; i  {
                    if (timerFuture.status == HashedWheelTimerFuture.WAITING) {
                        tasks.add(timerFuture.timerTask);
                    }
                };
    
                waitingTasks.forEach(consumer);
                for (HashedWheelBucket bucket : wheel) {
                    bucket.forEach(consumer);
                }
                return tasks;
            }
        }
    }
    

    作者:京东保险 陈建华

    来源:京东云开发者社区

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论