XxlJob任务调度平台的执行器管理:上下线机制详解~

2023年 10月 16日 41.3k 0

前言

在实际开发中,我们往往需要某些特定时刻执行一些任务,例如: 早上十点进行push推送凌晨2点刷新一批数据每隔5分钟进行失败补偿等等。

这时候我们就需要用到定时任务来完成上述任务~

业界的定时任务选型有很多,例如👇🏻

单机: jdk自带的ScheduledThreadPoolExecutor、Timer、开源框架

分布式: Quartzelastic-jobxxljob

本文来带大家了解的就是 xxl-job~

简单了解

image-20230907230851590

xxlJob架构分为两大模块,即调度中心执行器

简单理解: 执行器即我们的服务启动后注册到调度中心,调度中心管理着所有执行器、任务,根据任务触发时间点下发到执行器执行。

本文来了解的就是执行器的注册与注销原理

执行器注册

执行器发起注册

xxjob官方springboot案例中,我们可以看到定义了一个XxlJobConfig配置类,同时在这个配置类中创建了XxlJobSpringExecutor这个bean,并且传入了xxljob admin地址、appname(执行器的名称)等信息

image-20230907232148824

我们进入XxlJobSpringExecutor可见,它实现了SmartInitializingSingleton、DisposableBean,并重写了afterSingletonsInstantiated、destroy方法,其中👇🏻

afterSingletonsInstantiated 在所有单例 bean 都初始化完成以后进行调用

destroy在服务注销时会进行调用

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {

  // start
  @Override
  public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // 初始化并注册任务方法
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    try {
      // 开始执行
      super.start();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }

  // destroy
  @Override
  public void destroy() {
    super.destroy();
  }
}

关于initJobHandlerMethodRepository方法,其源码比较简单,就不深究了

具体逻辑是拿到Spring容器里所有bean,挨个遍历,看看bean中是否有方法是被@XxlJob注解修饰的,如果存在这样的方法,最终将其封装为MethodJobHandler,以jobhandler namekeyMethodJobHandler value添加到ConcurrentMap中进行维护

image-20230907233849093

image-20230909145209760

image-20230907234518027

注册完任务,咱们接着来看start逻辑,咱们重点了解initEmbedServer逻辑

public class XxlJobExecutor  {

  // ---------------------- start + stop ----------------------
  public void start() throws Exception {

    // init logpath
    XxlJobFileAppender.initLogPath(logPath);

    // 初始化xxljob admin地址,可能存在多个节点,根据,分隔
    initAdminBizList(adminAddresses, accessToken);

    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);

    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();

    // 向调度中心发起注册
    initEmbedServer(address, ip, port, appname, accessToken);
  }
}
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

  // fill ip port
  port = port > 0 ? port : NetUtil.findAvailablePort(9999);
  ip = (ip != null && ip.trim().length() > 0) ? ip : IpUtil.getIp();

  // generate address
  if (address == null || address.trim().length() == 0) {
    String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
    address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
  }

  // accessToken
  if (accessToken == null || accessToken.trim().length() == 0) {
    logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
  }

  // start
  embedServer = new EmbedServer();
  embedServer.start(address, port, appname, accessToken);
}

initEmbedServer中先简单做了下参数处理,如果没有指定本机address、port则会进行获取

随后创建了EmbedServer,并进行start

public class EmbedServer {
  private ExecutorBiz executorBiz;
  private Thread thread;

  public void start(final String address, final int port, final String appname, final String accessToken) {
    executorBiz = new ExecutorBizImpl();
    
    thread = new Thread(new Runnable() {
      @Override
      public void run() {
        // param
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
          0,
          200,
          60L,
          TimeUnit.SECONDS,
          new LinkedBlockingQueue(2000),
          new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
              return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
            }
          },
          new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
              throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
            }
          });
        try {
          
          // 创建netty server
          ServerBootstrap bootstrap = new ServerBootstrap();
          bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer() {
              @Override
              public void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline()
                  .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                  .addLast(new HttpServerCodec())
                  .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                  .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
              }
            })
            .childOption(ChannelOption.SO_KEEPALIVE, true);

          // bind
          ChannelFuture future = bootstrap.bind(port).sync();

          logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

          // 发起注册
          startRegistry(appname, address);

          // wait util stop
          future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
          logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
        } catch (Exception e) {
          logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
        } finally {
          // stop
          try {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
          } catch (Exception e) {
            logger.error(e.getMessage(), e);
          }
        }
      }
    });
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    thread.start();
  }
}

大致扫一眼,虽然看着start方法中代码挺多的,但其中无非也就两件事,创建netty serverstart registry

继续看注册,最终也是来到ExecutorRegistryThread中的start方法,其中管理着执行器的注册与注销逻辑

先看注册,注册逻辑很简单,遍历所有admin节点,挨个注册上去

public class ExecutorRegistryThread {
  private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);

  private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
  public static ExecutorRegistryThread getInstance(){
    return instance;
  }

  private Thread registryThread;
  private volatile boolean toStop = false;
  
  public void start(final String appname, final String address){
    // ......
    
    registryThread = new Thread(new Runnable() {
      @Override
      public void run() {

        // 发起注册
        while (!toStop) {
          try {
            RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
            // todo 前面提到了,admin可能存在多个节点,这里就遍历,注册到每一个节点上去
            for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
              try {
                // todo 发起注册请求
                ReturnT registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                  registryResult = ReturnT.SUCCESS;
                  logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                  break;
                } else {
                  logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
              } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
              }

            }
          } catch (Exception e) {
            if (!toStop) {
              logger.error(e.getMessage(), e);
            }

          }

          try {
            if (!toStop) {
              TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
            }
          } catch (InterruptedException e) {
            if (!toStop) {
              logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
            }
          }
        }

        // ..... 注销

      }
    });
    registryThread.setDaemon(true);
    registryThread.setName("xxl-job, executor ExecutorRegistryThread");
    registryThread.start();
  }
}

最终发起http post请求,调用registry接口,进行注册。

image-20230908000255406

调度中心处理注册

上面我们了解了执行器发起注册逻辑,最终是发起post请求调用api/registry接口

则来到JobApiController#api

image-20230908000443114

相比于执行器发起注册的逻辑,调度中心处理注册的逻辑就简单很多了

  • 参数校验
  • 异步完成注册
  • 先进行更新操作
  • 如果操作行数 < 1,说明记录不存在,则执行插入操作,写入到xxl_job_registry表中,完成注册
  • 直接返回注册成功响应
  • // com.xxl.job.admin.core.thread.JobRegistryHelper#registry
    public ReturnT registry(RegistryParam registryParam) {
    
      // 参数校验
      if (!StringUtils.hasText(registryParam.getRegistryGroup())
          || !StringUtils.hasText(registryParam.getRegistryKey())
          || !StringUtils.hasText(registryParam.getRegistryValue())) {
        return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument.");
      }
    
      // 异步写入数据库完成注册
      registryOrRemoveThreadPool.execute(new Runnable() {
        @Override
        public void run() {
          // 先进行更新操作
          int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
          if (ret < 1) {
            // 操作行数 < 1,说明记录不存在,则执行插入操作,写入到xxl_job_registry表中
            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
    
            // fresh
            freshGroupRegistryInfo(registryParam);
          }
        }
      });
    
      // 直接返回成功
      return ReturnT.SUCCESS;
    }
    

    执行器注销

    执行器注销分为主动注销和被动注销

    主动注销很好理解,例如服务发布,会用新节点替换旧节点,那么旧节点需要告诉调度中心我要下线了,请把我注销掉,然后新节点再主动注册到调度中心,这样任务调度就会调度到新节点执行。

    像这种经典的client server通信,那么必不可少的就是探活机制,当探活失败时,调度中心会主动注销掉client,那么对于client来说就是被动注销

    主动注销

    回到最开始,我们了解到XxlJobSpringExecutor是实现了DisposableBean接口的,当服务下线时,会回调destroy方法

    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
    
      // destroy
      @Override
      public void destroy() {
        super.destroy();
      }
    }
    
    // com.xxl.job.core.executor.XxlJobExecutor#destroy
    public void destroy(){
      // 注销netty server
      stopEmbedServer();
    
      // 停止所有job线程
      if (jobThreadRepository.size() > 0) {
        for (Map.Entry item: jobThreadRepository.entrySet()) {
          JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
          // 如果存在正在执行的job thread,则等待其执行完毕
          if (oldJobThread != null) {
            try {
              oldJobThread.join();
            } catch (InterruptedException e) {
              logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
            }
          }
        }
        
        // 清空 jobThread
        jobThreadRepository.clear();
      }
    
      // 清空jobhandler
      jobHandlerRepository.clear();
    
    
      // ......
    }
    

    stopEmbedServer最终也还是来到了前面提到过的ExecutorRegistryThread#toStop中,将toStop标识设置为true,打断镔铁同步等待registryThread执行完毕

    registryThread#run方法中,当toStop = true则跳出循环,向所有admin节点发起注销请求

    public class ExecutorRegistryThread {
    
      private volatile boolean toStop = false;
    
      public void toStop() {
        toStop = true;
    
        // interrupt and wait
        if (registryThread != null) {
          registryThread.interrupt();
          try {
            registryThread.join();
          } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
          }
        }
    
      }
    
      public void start(final String appname, final String address){
        registryThread = new Thread(new Runnable() {
          @Override
          public void run() {
    
            while(!toStop) {
              // ..... execute register
            }
    
            // registry remove
            try {
              RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
              // todo 遍历所有admin节点,一个一个发起注销请求
              for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                try {
                  // todo 发起注销请求
                  ReturnT registryResult = adminBiz.registryRemove(registryParam);
                  if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;
                  } else {
                    logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                  }
                } catch (Exception e) {
                  if (!toStop) {
                    logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                  }
    
                }
    
              }
            } catch (Exception e) {
              if (!toStop) {
                logger.error(e.getMessage(), e);
              }
            }
            logger.info(">>>>>>>>>>> xxl-job, executor registry thread destroy.");
    
          }
        });
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        registryThread.start();
      }
    }
    

    同注册一样,只不过这次是请求的registryRemove接口

    image-20230908005109786

    JobApiController处理注销请求,本质上就是从xxl_job_registry表中删除记录

    image-20230908005240423

    被动注销

    调度中心init的时候,会开启一个registryMonitorThread线程,其中每隔30s会查询超过90s没有更新的执行器节点记录(认为探活失败), 查出来后会直接移除掉

    image-20230908010114570

    在执行器启动的时候会向调度中心发起注册

    之后每隔30s会再次发起注册,此时就会去更新节点在xxl_job_registryupdate_time,这样一样就能维持探活,节点就不会被移除

    image-20230908010325616

    总结

    通过本文,我们了解到了xxlJob执行器的注册、注销逻辑。

    spring的环境下,利用spring留下的扩展接口,将执行器的节点信息注册到调度中心, 注销机制同理,同时调度中心执行器之间建立心跳机制,保证任务的正常调度。

    我是 Code皮皮虾 ,会在以后的日子里跟大家一起学习,一起进步!
    觉得文章不错的话,可以在 掘金 关注我,这样就不会错过很多技术干货啦~

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论