之前分析的内容都是和客户端集成的相关源码内容。包括2. xxl-job源码分析-核心XxlJobExecutor和3. xxl-job源码分析-executor-server详解。本篇讲解的内容是和中心调度平台的核心类XxlJobScheduler
。因为这个类将调度任务、注册调度器、失败处理等内容结合在一起。这个类就像洋葱的外壳,里面包着一层又一层的核心内容。现在我带着大家将其一层层的剥开,看看它的里面到底是什么。
创建并启动XxlJobScheduler
首先在xxl-job-admin
工程中找到XxlJobAdminConfig
类。它实现了spring
中的InitializingBean
接口,可以在spring
容器完成属性赋值后进行调用。
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
// 创建并初始化
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
代码也非常简单,没有太多花里花哨的内容,直接new一个对象,然后调用初始化方法。
初始化方法
public void init() throws Exception {
// 初始化国际化内容
initI18n();
// 任务触发帮助类启动
JobTriggerPoolHelper.toStart();
// 注册帮助类启动
JobRegistryHelper.getInstance().start();
// 失败监控帮助类启动
JobFailMonitorHelper.getInstance().start();
// 丢失监控帮助类启动
JobCompleteHelper.getInstance().start();
// 日志报表帮助类启动
JobLogReportHelper.getInstance().start();
// 调度帮助类启动
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
初始化代码的逻辑十分清晰,包含了7大步骤,分别是初始化国际化内容,任务触发帮助类启动,注册帮助类启动,失败监控帮助类启动,丢失监控帮助类启动,日志报表帮助类启动,调度帮助类启动。下面我们一个个分析,看看中心调度节点是如何运行的。
初始化国际化内容
private void initI18n(){
for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
}
}
这段代码很简单,就是对ExecutorBlockStrategyEnum
各个枚举值按本地语言环境设置对应的值。
任务触发帮助类启动
public static void toStart() {
helper.start();
}
public void start(){
// 创建一个快触发的线程池
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
// 创建一个慢触发的线程池
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
在任务触发帮助类启动的时候,创建了两个线程池,一个命名为快触发,一个命名为慢触发,至于为啥分开两个线程池,又为什么一个快,一个慢呢?这部分等到后续代码分析的时候再进行讲解。
注册帮助类启动
public void start(){
// 注册移除的线程池
registryOrRemoveThreadPool = new ThreadPoolExecutor(
2,
10,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
}
});
// 创建了一个注册监控的线程
registryMonitorThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// 查询配置为自动注册的管理器
List groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// 找到90秒都没有更新心跳的节点id
List ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
// 如果存在这样的节点,就删除
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// 刷新各个执行器组的在线地址,key是组名,value是地址
HashMap appAddressMap = new HashMap();
// 查找所有正常的注册节点,时间在90s内有过更新的
List list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
// 存在则放入到对应名称的list中
for (XxlJobRegistry item: list) {
if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
String appname = item.getRegistryKey();
List registryList = appAddressMap.get(appname);
if (registryList == null) {
registryList = new ArrayList();
}
if (!registryList.contains(item.getRegistryValue())) {
registryList.add(item.getRegistryValue());
}
appAddressMap.put(appname, registryList);
}
}
}
// 刷新执行器组内的地址,多个地址用逗号拼接
for (XxlJobGroup group: groupList) {
List registryList = appAddressMap.get(group.getAppname());
String addressListStr = null;
if (registryList!=null && !registryList.isEmpty()) {
Collections.sort(registryList);
StringBuilder addressListSB = new StringBuilder();
for (String item:registryList) {
addressListSB.append(item).append(",");
}
addressListStr = addressListSB.toString();
addressListStr = addressListStr.substring(0, addressListStr.length()-1);
}
group.setAddressList(addressListStr);
group.setUpdateTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
try {
// 等待30s
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
}
});
// 设置为后台线程进行启动
registryMonitorThread.setDaemon(true);
registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
registryMonitorThread.start();
}
注册帮助的启动类中创建了一个registryOrRemoveThreadPool
,从命名上可以才到它就是在注册和移除的时候用的一个异步的处理的线程池。关键的代码还是在registryMonitorThread
之中。这个线程一直在后台运行,每隔30s去检查下注册的执行器组,看是否有新的执行器注册了,又需要看看哪个执行器掉线了,需要将其移除。移除的标准是3倍的检测时间,也就是90s内如果没有发送心跳检测了,就认为那个执行器掉线了,主动进行移除。对于已经注册地址的执行器,则每次需要更新到数据库表中。
失败监控帮助类启动
public void start(){
// 创建一个监控线程
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
// 寻找执行失败的jobId
List failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// 更新告警标志为当前锁定处理状态
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = " >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +">>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
// 处理完一批等待10s再处理第二批
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
失败监控的主要处理逻辑有两个,一个是判断是否可以失败重试,根据配置的executorFailRetryCount
进行判断,二是对失败的任务进行邮件告警动作。该线程也是一个后台线程,监控这失败任务。在失败告警这块,如果有需要定制化的告警的,比如说短信提醒,钉钉机器人告警,都可以在这块进行定制化。也就在boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
这里进行的触发。
// 从spring 容器中获取所有实现了JobAlarm接口的实现类
private List jobAlarmList;
public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {
boolean result = false;
if (jobAlarmList!=null && jobAlarmList.size()>0) {
result = true; // success means all-success
// 逐个执行告警方法
for (JobAlarm alarm: jobAlarmList) {
boolean resultItem = false;
try {
resultItem = alarm.doAlarm(info, jobLog);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (!resultItem) {
result = false;
}
}
}
return result;
}
这里有个点要注意一下,如果自行进行了扩展,对result
要根据自己的需要定制,因为现在目前源码只有一个邮件告警,所以不存在覆盖,但是自定义实现了多个情况就需要小心了。
丢失监控帮助类启动
public void start(){
// 回调线程池
callbackThreadPool = new ThreadPoolExecutor(
2,
20,
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue(3000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
}
});
// 监控线程
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// wait for JobTriggerPoolHelper-init
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
// monitor
while (!toStop) {
try {
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
}
}
try {
TimeUnit.SECONDS.sleep(60);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
monitorThread.start();
}
代码主要是针对一些异常情况下,处理超时或者调度器断线的时候,监控更新状态的一些处理。
日志报表帮助类启动
public void start(){
// 日志线程
logrThread = new Thread(new Runnable() {
@Override
public void run() {
// 上次清理日志时间
long lastCleanLogTime = 0;
while (!toStop) {
// 1、日志报告刷新:3 天内刷新日志报告
try {
for (int i = 0; i 0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// s刷新
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret >>>>>>>>>> xxl-job, job log report thread error:{}", e);
}
}
// 2、日志清理:每天一次
if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
&& System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {
// expire-time
Calendar expiredDay = Calendar.getInstance();
expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
expiredDay.set(Calendar.HOUR_OF_DAY, 0);
expiredDay.set(Calendar.MINUTE, 0);
expiredDay.set(Calendar.SECOND, 0);
expiredDay.set(Calendar.MILLISECOND, 0);
Date clearBeforeTime = expiredDay.getTime();
// 清理过期的日志
List logIds = null;
do {
logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
if (logIds!=null && logIds.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
}
} while (logIds!=null && logIds.size()>0);
// 更新清理时间
lastCleanLogTime = System.currentTimeMillis();
}
try {
// 一分钟检查一次
TimeUnit.MINUTES.sleep(1);
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");
}
});
logrThread.setDaemon(true);
logrThread.setName("xxl-job, admin JobLogReportHelper");
logrThread.start();
}
日志线程处理2件事,一个是更新3天内的汇总数据,一个是清理大于1天的日志。这里也提供了一个报表的思路。有时候看到的实时报表并不是真正的实时报表,因为如果是真正的实时报表,往往需要耗费巨大的资源去实时查询,如果查询的量非常大,耗时,慢不说,还容易拖垮数据库。而平时我们可能并不是特别关心实时的数据,因为上一秒的数据可能就和下一秒的数据不一样,也不可能做到一定是实时的,所以我们需要的是一个准实时的报表,也就说可以容忍一定的时间误差,比如1分钟,5分钟,10分钟的误差范围。这样就给了我们一个处理的时间。在后台去定时更新报表内容。当查询的时候,就不需要点击查询的时候实时计算了。而且直接查询总的报表数据。当我们建立好索引和规则,查询自然会快很多。当然查询快了,也是有线程在后台负重前行的。我们不仅要看到前端的精美展示,也能看到后台默默无闻的线程计算。
总结
通过上面的分析,我们大体上了解了调度中心后台在做的一些不为人知的事情。不过这里还留下了两个悬念,一个是快触发和慢触发的处理,一个是调度帮助类启动的讲解。因为调度帮助类启动涉及的内容很多,也是核心中的核心,等下一篇进行单独讲解。希望读者能自行先去看一看这部分的内容,肯定会受益匪浅的。