6. xxljob源码分析调度触发原理(二)

2023年 7月 25日 27.3k 0

上篇介绍了xxl-job的调度触发原理,也就是分析了触发是做了一件什么事情。本篇重点讲解调度任务是在什么时机下进行调度的。它是怎么知道每个时间点应该触发什么任务,又是通过什么方式触发的。带着疑问让我们开始代码分析吧。

这部分的有关代码也就是4. xxl-job源码分析-XxlJobScheduler分析中没有分析完的JobScheduleHelper

scheduleThread

// 调度线程
scheduleThread = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            // 随机等待4s-5s
            TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
        } catch (InterruptedException e) {
            if (!scheduleThreadToStop) {
                logger.error(e.getMessage(), e);
            }
        }
        logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

        // 预读的任务数量,两个线程池的最大数量总和*20,这个20是依据于每个触发花费50ms,1s中可以触发20次来进行计算的
        int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
        while (!scheduleThreadToStop) {
            long start = System.currentTimeMillis();
            Connection conn = null;
            Boolean connAutoCommit = null;
            PreparedStatement preparedStatement = null;
            boolean preReadSuc = true;
            try {
                // 获取数据库连接
                conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                connAutoCommit = conn.getAutoCommit();
                // 设置非自动提交,开启事务
                conn.setAutoCommit(false);
                // 对分布式的任务调度中心进行加锁,保证只有一个调度执行
                preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                preparedStatement.execute();
                // 1、预读任务
                long nowTime = System.currentTimeMillis();
                // 从数据库中获取下一个5s内的会触发的任务
                List scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                // 如果任务列表不为空
                if (scheduleList!=null && scheduleList.size()>0) {
                    // 2、推送到时间环进行处理
                    for (XxlJobInfo jobInfo: scheduleList) {
                        // 时间环跳跃了,跳过了上一次的触发时机
                        if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                            // 2.1、触发时间超过了5秒的处理并设置下一次触发时间
                            logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
                            // 1、错过处理,是直接丢弃还是立即触发
                            MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                            if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                //  如果是立即触发,那就直接触发
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                            }
                            // 2、刷新下次触发时间
                            refreshNextValidTime(jobInfo, new Date());
                        } else if (nowTime > jobInfo.getTriggerNextTime()) {
                            // 2.2、触发过期时间小于5s:直接触发并且确定下次触发时间
                            // 1、直接触发
                            JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                            logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                            // 2、刷新下次触发时间
                            refreshNextValidTime(jobInfo, new Date());
                            // 下次触发时间在5s内,再次预读
                            if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                                // 1、确定下次触发时机,随机1s的60刻度之内
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                                // 2、推送到时间环
                                pushTimeRing(ringSecond, jobInfo.getId());
                                // 3、刷新下次触发时间
                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                            }
                        } else {
                            // 2.3、触发器预读:时间环形触发&&使下一个触发时间
                            // 1、确定下次触发时机,随机1s的60刻度之内
                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                            // 2、推送到时间环
                            pushTimeRing(ringSecond, jobInfo.getId());
                            // 3、刷新下次触发时间
                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                        }
                    }
                    // 3、更新触发信息
                    for (XxlJobInfo jobInfo: scheduleList) {
                       XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                    }
                } else {
                    preReadSuc = false;
                }
                // tx stop
            } catch (Exception e) {
                if (!scheduleThreadToStop) {
                    logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                }
            } finally {
                // 提交事务,关闭连接
                if (conn != null) {
                    try {
                        conn.commit();
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        conn.setAutoCommit(connAutoCommit);
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

                // 关闭预处理
                if (null != preparedStatement) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
            }
            // 计算花费的时间
            long cost = System.currentTimeMillis()-start;


            // 小于1s
            if (cost >>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
    }
});
// 设置后台线程
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();

通过上述的代码,整理一下这个流程:

  • 线程启动启动首先等待4s-5s的时间,应该是做一个缓冲,避免服务启动的时候就将线程全部启动,引发大量线程同时并发处理。等待一段时间,也可以让资源进行初始化。
  • 根据配置初始化一个预读数量。
  • 开始事务,对处理进行加锁,保证同一时刻只有一个调度中心在处理。这个是以数据库表锁的形式实现分布式锁。
  • 根据预读的数量获取接下来5s内会触发的任务
  • 如果预读的任务列表不为空,就需要推送到时间环进行触发处理。
  • 针对推送给时间环处理又可以分为是否已经错过了当前分段的触发时间还是没有错过。如果错过了当前时段的触发时间,可以选择立即触发一次还是直接放弃。而对于没有错过的,直接触发,再计算下次触发的时机。

    我们再看下刷新下次时间和推送到时间环在代码中是如何处理的。

    // 刷新时间
    private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
        // 根据任务信息和开始时间获取下一次触发的时间
        Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
        if (nextValidTime != null) {
            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
            jobInfo.setTriggerNextTime(nextValidTime.getTime());
        } else {
            jobInfo.setTriggerStatus(0);
            jobInfo.setTriggerLastTime(0);
            jobInfo.setTriggerNextTime(0);
            logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}",
                        jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
        }
    }
    
    public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
        // 根据不同的策略算时间
        ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
        if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {
            // cron表达式计算
            Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
            return nextValidTime;
        } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
            // 固定时间周期计算
            return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
        }
        return null;
    }
    
    // key为秒,value是触发的任务id
    private volatile static Map ringData = new ConcurrentHashMap();
    private void pushTimeRing(int ringSecond, int jobId){
        // 将任务放入至ringData,
        List ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);
    
        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    }
    

    这部分的代码也不复杂,下次时间的计算是根据不同的策略来计算的。支持的是两种策略,一是根据Cron表达式计算,二是根据固定时间周期计算。而推送到时间环的任务,是维护了一个ConcurrentHashMap,其key为在几秒后触发,value是那一秒需要触发的jobId

    ringThread

    // 时间环线程
    ringThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!ringThreadToStop) {
                // 随机等待0-1s
                try {
                    TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                } catch (InterruptedException e) {
                    if (!ringThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
    
                try {
                    // 获取1s内需要触发的任务
                    List ringItemData = new ArrayList();
                    int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                    for (int i = 0; i >>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                    if (ringItemData.size() > 0) {
                        // 拿到这1s的所有数据,依次触发
                        for (int jobId: ringItemData) {
                            // 执行触发动作
                            JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                        }
                        // 触发完成后清理数据
                        ringItemData.clear();
                    }
                } catch (Exception e) {
                    if (!ringThreadToStop) {
                        logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                    }
                }
            }
            logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
        }
    });
    ringThread.setDaemon(true);
    ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
    ringThread.start();
    

    时间环的代码就比较简单了。因为scheduleThread已经把需要调度的任务推送到了ringData,时间环只需要每秒从里面取出数据调用JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);触发即可。

    总结

    在没有详细介绍调度任务的时候,你是否会觉得只需要根据调度表达式配置了调度任务,到点了就能准确触发就很神奇。但是当你当你看到源码后。你就会明白,世间那有什么岁月静好,都是有人替你负重前行了。这些并不是什么魔法,而是有两个线程,一个是scheduleThread线程,每隔一定时间(如果任务密集的话,是0-1s,非任务密集是4-5s)就会进行一次任务的预读。任务的预读完成后,还需要刷新下次任务触发的时间,这样在到时间点的时候就可以被查询到进行触发。另一个是ringThread,它每一秒都会去检测当前秒需要触发的任务,如果当前秒有任务需要触发,它就会调用触发方法,通过线程池去异步触发。这个就和我们的时钟一样,滴答滴答的前行。每滴答一次。就去检测任务触发。所以我们才会看到定时任务每次都很准的在当前的那一秒触发了。虽然没有定位到毫秒级的准确度,但是对于我们工程来说,毫秒级真没有必要(当然计算机是肯定能做到毫秒级的,毕竟现在是主频随随便便都是GHz)。秒级的触发对于计算机来说简直不要太轻松了。在人类世界世界里,秒就是很短的单位,因为1秒钟我们根本完不成设么复杂任务。但是在计算机的世界里。是以毫秒,微秒甚至以纳秒为单位进行的任务处理。换算一下,是不是也就不那么神奇了。

    最后说了那么长一串,就是希望大家能打开思路,站在计算机的视角去考虑问题,这样你的视野会开拓很多,也就不会那么迷惑。当一切变得明朗和清晰的时候,或许你也会感叹一句:哦,也就那么回事啊!

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论