5. xxljob源码分析调度触发原理(一)
上篇分析了XxlJobScheduler
的主要流程和各个流程具体的逻辑内容,但是并没有深入到核心细节原理。本篇接着JobTriggerPoolHelper
类的后半部分接着探索。看一看为什么作者需要分快触发和慢触发。具体触发的逻辑和原理又是什么。
JobTriggerPoolHelper.trigger
public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam, final String addressList) { // 选择一个触发的线程池,默认是快触发 ThreadPoolExecutor triggerPool_ = fastTriggerPool; AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // 1分钟内超时了10次就放入到慢触发线程池 triggerPool_ = slowTriggerPool; } // 线程池触发调用 triggerPool_.execute(new Runnable() { @Override public void run() { long start = System.currentTimeMillis(); try { // 触发 XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList); } catch (Exception e) { logger.error(e.getMessage(), e); } finally { // 1分钟清理一次超时次数的map long minTim_now = System.currentTimeMillis()/60000; if (minTim != minTim_now) { minTim = minTim_now; jobTimeoutCountMap.clear(); } // 计算花费时间 long cost = System.currentTimeMillis()-start; if (cost > 500) { // 超过500ms了就算是超时一次 AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1)); if (timeoutCount != null) { timeoutCount.incrementAndGet(); } } } } }); }
上述代码可以看到默认使用的线程池是快速触发线程池,而当触发的超时次数超过了1分钟10次的话,就放入另外一个慢触发线程池。这样做的目的应该是怕慢触发的任务影响了快速触发的任务。因为线程池的线程创建的个数是有限的,超过之后,就会放入任务队列等待执行。如果任务量大,执行的速度又慢,实际会影响其他任务的处理。分成两个可以更好的区分任务的执行快慢,分开触发,影响小。
这里判断是否超时的条件是任务的执行时间是否大于了500ms,超过500ms就进行累加次数。超时的次数会存入一个单独的map,其key为jobId
,并且每分钟做一次清理操作。
继续看下XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList)
方法
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) { // 获取job信息 XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId); if (jobInfo == null) { logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId); return; } if (executorParam != null) { // 设置参数 jobInfo.setExecutorParam(executorParam); } int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount(); XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup()); // 设置地址 if (addressList!=null && addressList.trim().length()>0) { group.setAddressType(1); group.setAddressList(addressList.trim()); } // 设置分片参数 int[] shardingParam = null; if (executorShardingParam!=null){ String[] shardingArr = executorShardingParam.split("/"); if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) { shardingParam = new int[2]; shardingParam[0] = Integer.valueOf(shardingArr[0]); shardingParam[1] = Integer.valueOf(shardingArr[1]); } } if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null) && group.getRegistryList()!=null && !group.getRegistryList().isEmpty() && shardingParam==null) { // 分片广播的处理,也就是根据注册的执行器,每隔执行器执行自己分片的那一部分,传递了总分片和当前自己处理的分片 for (int i = 0; i < group.getRegistryList().size(); i++) { processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size()); } } else { // 非广播分片直接调用处理 if (shardingParam == null) { shardingParam = new int[]{0, 1}; } processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]); } }
在触发的时候,首先是对参数信息进行了处理,然后判断了注册了执行器的数量和配置的策略,如果支持广播分片,还需要进行分片的处理,每个执行器执行自己负责的那个分片,否则直接执行,分片也就是默认的单片。这个方法处理完分片的信息后,就调用了processTrigger
。
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){ // 获取参数 ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null; // 1、保存日志id XxlJobLog jobLog = new XxlJobLog(); jobLog.setJobGroup(jobInfo.getJobGroup()); jobLog.setJobId(jobInfo.getId()); jobLog.setTriggerTime(new Date()); XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog); logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId()); // 2、初始化触发参数 TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total); // 3、初始化调用执行的地址 String address = null; ReturnT routeAddressResult = null; if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) { if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) { if (index >>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +">>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId()); }
在processTrigger
方法中,第一步是初始化了日志的信息,并记录了一条初始化的日志。接着是初始化触发的参数。这些参数信息都是需要传递给执行器进行调用的必要信息,包括最重要的执行Handler
。然后是需要根据分片的信息拿到调用执行器的地址。分片的概念实际是将任务分解,然后进行分布式的任务处理。因为每个执行器执行属于自己的一部分任务,互不干扰,这样就可以实行多台机器并行的执行,加快任务的处理。后续就是触发执行和触发后的日志信息的保存。毕竟不能保证每个任务都能成功。当任务触发失败后,需要详细的记录下日志信息,这样才方便后续问题的排查。
但是上述代码任然没触及到核心具体是怎么触发的。继续深入triggerResult = runExecutor(triggerParam, address);
public static ReturnT runExecutor(TriggerParam triggerParam, String address){ ReturnT runResult = null; try { // 获取到触发执行器 ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address); // 触发执行 runResult = executorBiz.run(triggerParam); } catch (Exception e) { logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e); runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.toString(e)); } StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":"); runResultSB.append("address:").append(address); runResultSB.append("code:").append(runResult.getCode()); runResultSB.append("msg:").append(runResult.getMsg()); runResult.setMsg(runResultSB.toString()); return runResult; }
这里获取到执行器后就可以触发执行了。但是这个执行器有两个实现,具体是哪一个呢?
public static ExecutorBiz getExecutorBiz(String address) throws Exception { // valid if (address==null || address.trim().length()==0) { return null; } // load-cache address = address.trim(); ExecutorBiz executorBiz = executorBizRepository.get(address); if (executorBiz != null) { return executorBiz; } // set-cache executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken()); executorBizRepository.put(address, executorBiz); return executorBiz; }
如果根据远程地址获取不到执行器,就需要创建一个,也就是ExecutorBizClient
这个执行器。最后查看下ExecutorBizClient
的run
方法。
public ReturnT run(TriggerParam triggerParam) { // 发送post请求到各个执行器调度处理 return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class); }
分析到这里就很清晰了,run
方法调用注册到调度中心的执行器的一个post
请求,让任务执行器去执行。任务执行器接受到请求后,根据参数就可以找到对应的Handler
进行逻辑处理了。
总结
本篇分析了作者创建快触发和慢触发线程池的理由。也重点分析trigger
方法。这里面并没有很难很深的东西。其实很多源码也并不是太难。跟着作者的思路走,都能理解。关键是在看源码的过程中,要去思考作者的设计。如此设计的好处在哪里。在上面的代码中,作者就考虑了触发任务的执行快慢,对触发任务记录了日志,方便后续的问题排查。还设计了分片的思想,方便我们在多个处理器上进行分布式的执行任务。如果是我们自己设计,自己写这个任务调度是不是能考虑的如此全面呢?说到这里,实际上你就明白差距在哪里了,应该学习什么内容了。
不过本篇没有介绍调度触发的时机,也就是为什么它能够精准的调度,这个我们下一篇继续探讨。