上篇分析了XxlJobScheduler
的主要流程和各个流程具体的逻辑内容,但是并没有深入到核心细节原理。本篇接着JobTriggerPoolHelper
类的后半部分接着探索。看一看为什么作者需要分快触发和慢触发。具体触发的逻辑和原理又是什么。
JobTriggerPoolHelper.trigger
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// 选择一个触发的线程池,默认是快触发
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {
// 1分钟内超时了10次就放入到慢触发线程池
triggerPool_ = slowTriggerPool;
}
// 线程池触发调用
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// 触发
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// 1分钟清理一次超时次数的map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// 计算花费时间
long cost = System.currentTimeMillis()-start;
if (cost > 500) {
// 超过500ms了就算是超时一次
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
上述代码可以看到默认使用的线程池是快速触发线程池,而当触发的超时次数超过了1分钟10次的话,就放入另外一个慢触发线程池。这样做的目的应该是怕慢触发的任务影响了快速触发的任务。因为线程池的线程创建的个数是有限的,超过之后,就会放入任务队列等待执行。如果任务量大,执行的速度又慢,实际会影响其他任务的处理。分成两个可以更好的区分任务的执行快慢,分开触发,影响小。
这里判断是否超时的条件是任务的执行时间是否大于了500ms,超过500ms就进行累加次数。超时的次数会存入一个单独的map,其key为jobId
,并且每分钟做一次清理操作。
继续看下XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList)
方法
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// 获取job信息
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
// 设置参数
jobInfo.setExecutorParam(executorParam);
}
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
// 设置地址
if (addressList!=null && addressList.trim().length()>0) {
group.setAddressType(1);
group.setAddressList(addressList.trim());
}
// 设置分片参数
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
// 分片广播的处理,也就是根据注册的执行器,每隔执行器执行自己分片的那一部分,传递了总分片和当前自己处理的分片
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
// 非广播分片直接调用处理
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
在触发的时候,首先是对参数信息进行了处理,然后判断了注册了执行器的数量和配置的策略,如果支持广播分片,还需要进行分片的处理,每个执行器执行自己负责的那个分片,否则直接执行,分片也就是默认的单片。这个方法处理完分片的信息后,就调用了processTrigger
。
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// 获取参数
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
// 1、保存日志id
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 2、初始化触发参数
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
triggerParam.setExecutorParams(jobInfo.getExecutorParam());
triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
triggerParam.setGlueType(jobInfo.getGlueType());
triggerParam.setGlueSource(jobInfo.getGlueSource());
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setBroadcastIndex(index);
triggerParam.setBroadcastTotal(total);
// 3、初始化调用执行的地址
String address = null;
ReturnT routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index >>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +">>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
在processTrigger
方法中,第一步是初始化了日志的信息,并记录了一条初始化的日志。接着是初始化触发的参数。这些参数信息都是需要传递给执行器进行调用的必要信息,包括最重要的执行Handler
。然后是需要根据分片的信息拿到调用执行器的地址。分片的概念实际是将任务分解,然后进行分布式的任务处理。因为每个执行器执行属于自己的一部分任务,互不干扰,这样就可以实行多台机器并行的执行,加快任务的处理。后续就是触发执行和触发后的日志信息的保存。毕竟不能保证每个任务都能成功。当任务触发失败后,需要详细的记录下日志信息,这样才方便后续问题的排查。
但是上述代码任然没触及到核心具体是怎么触发的。继续深入triggerResult = runExecutor(triggerParam, address);
public static ReturnT runExecutor(TriggerParam triggerParam, String address){
ReturnT runResult = null;
try {
// 获取到触发执行器
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
// 触发执行
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("address:").append(address);
runResultSB.append("code:").append(runResult.getCode());
runResultSB.append("msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
这里获取到执行器后就可以触发执行了。但是这个执行器有两个实现,具体是哪一个呢?
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
如果根据远程地址获取不到执行器,就需要创建一个,也就是ExecutorBizClient
这个执行器。最后查看下ExecutorBizClient
的run
方法。
public ReturnT run(TriggerParam triggerParam) {
// 发送post请求到各个执行器调度处理
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
分析到这里就很清晰了,run
方法调用注册到调度中心的执行器的一个post
请求,让任务执行器去执行。任务执行器接受到请求后,根据参数就可以找到对应的Handler
进行逻辑处理了。
总结
本篇分析了作者创建快触发和慢触发线程池的理由。也重点分析trigger
方法。这里面并没有很难很深的东西。其实很多源码也并不是太难。跟着作者的思路走,都能理解。关键是在看源码的过程中,要去思考作者的设计。如此设计的好处在哪里。在上面的代码中,作者就考虑了触发任务的执行快慢,对触发任务记录了日志,方便后续的问题排查。还设计了分片的思想,方便我们在多个处理器上进行分布式的执行任务。如果是我们自己设计,自己写这个任务调度是不是能考虑的如此全面呢?说到这里,实际上你就明白差距在哪里了,应该学习什么内容了。
不过本篇没有介绍调度触发的时机,也就是为什么它能够精准的调度,这个我们下一篇继续探讨。