xxljob的原理(2)—调度中心管理注册信息

2023年 10月 6日 60.4k 0

一、调度中心管理注册信息

1.JobApiController

执行器调用调度中心的url来实现注册、下线、回调等操作;其主要的实现类是JobApiController,调用/api/registry接口注册执行器信息,调用/api/registryRemove接口下线执行器信息,调用/api/callback接口执行回调操作。

@Controller
@RequestMapping("/api")
public class JobApiController {

    @Resource
    private AdminBiz adminBiz;

    /**
     * api
     *
     * @param uri
     * @param data
     * @return
     */
    @RequestMapping("/{uri}")
    @ResponseBody
    @PermissionLimit(limit=false)
    public ReturnT api(HttpServletRequest request, @PathVariable("uri") String uri, @RequestBody(required = false) String data) {

        // valid
        if (!"POST".equalsIgnoreCase(request.getMethod())) {
            return new ReturnT(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
        }
        if (uri==null || uri.trim().length()==0) {
            return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
        if (XxlJobAdminConfig.getAdminConfig().getAccessToken()!=null
                && XxlJobAdminConfig.getAdminConfig().getAccessToken().trim().length()>0
                && !XxlJobAdminConfig.getAdminConfig().getAccessToken().equals(request.getHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN))) {
            return new ReturnT(ReturnT.FAIL_CODE, "The access token is wrong.");
        }

        // services mapping
        if ("callback".equals(uri)) {
            List callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
            return adminBiz.callback(callbackParamList);
        } else if ("registry".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registry(registryParam);
        } else if ("registryRemove".equals(uri)) {
            RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
            return adminBiz.registryRemove(registryParam);
        } else {
            return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
        }

    }

}

2.AdminBizImpl

执行adminBiz.registry(registryParam)是调用实现类AdminBizImpl,在实现类中调用registry(registryParam)来实现。

@Service
public class AdminBizImpl implements AdminBiz {


    @Override
    public ReturnT callback(List callbackParamList) {
        return JobCompleteHelper.getInstance().callback(callbackParamList);
    }

    @Override
    public ReturnT registry(RegistryParam registryParam) {
        return JobRegistryHelper.getInstance().registry(registryParam);
    }

    @Override
    public ReturnT registryRemove(RegistryParam registryParam) {
        return JobRegistryHelper.getInstance().registryRemove(registryParam);
    }

}

3.JobRegistryHelper

在AdminBizImpl中的实现也不难理解,通过在初始化start()方法中创建的registryOrRemoveThreadPool线程池中执行异步注册任务,注册信息写入到数据表xxl_job_registry中。

//AdminBizImpl.java

	public void start(){

		// for registry or remove
		registryOrRemoveThreadPool = new ThreadPoolExecutor(
				2,
				10,
				30L,
				TimeUnit.SECONDS,
				new LinkedBlockingQueue(2000),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
					}
				},
				new RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						r.run();
						logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
					}
				});
    
    //...省略

public ReturnT registry(RegistryParam registryParam) {

   // valid
   if (!StringUtils.hasText(registryParam.getRegistryGroup())
         || !StringUtils.hasText(registryParam.getRegistryKey())
         || !StringUtils.hasText(registryParam.getRegistryValue())) {
      return new ReturnT(ReturnT.FAIL_CODE, "Illegal Argument.");
   }

   // async execute
   //my-异步执行,将注册信息持久化到数据库
   registryOrRemoveThreadPool.execute(new Runnable() {
      @Override
      public void run() {
        //my-写入数据库
         int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
         if (ret < 1) {
            XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

            // fresh
            freshGroupRegistryInfo(registryParam);
         }
      }
   });

   return ReturnT.SUCCESS;
}

4.总结

  • 执行器通过restful api形式注册到调度中心来,调度中心JobApiController对应有3个注册、下线和回调的方法实现;
  • 通过AdminBizImpl的adminBiz.registry(registryParam)来实际执行注册方法,实际使用JobRegistryHelper类;
  • 在JobRegistryHelper类中在初始化的时候会创建一个线程池,每次注册执行器的时候会创建一个异步线程来将注册信息持久化的数据库;
  • JobApiController_api

    二、调度中心的配置和启动

    1.添加权限控制

    制定权限注解@PermissionLimit,其实现的逻辑在PermissionInterceptor中,首先判断是否需要鉴权,如果需要则根据cookie中拿到的用户信息查库判断是否有权限登录,如果没有权限则重定向到登录页面或提示没有权限。

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface PermissionLimit {
       
       /**
        * 登录拦截 (默认拦截)
        */
       boolean limit() default true;
    
       /**
        * 要求管理员权限
        *
        * @return
        */
       boolean adminuser() default false;
    
    }
    
    @Component
    public class PermissionInterceptor implements AsyncHandlerInterceptor {
    
       @Resource
       private LoginService loginService;
    
       @Override
       public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
          //my-处理登录权限的逻辑
          if (!(handler instanceof HandlerMethod)) {
             return true;   // proceed with the next interceptor
          }
    
          // if need login
          boolean needLogin = true;
          boolean needAdminuser = false;
          HandlerMethod method = (HandlerMethod)handler;
          PermissionLimit permission = method.getMethodAnnotation(PermissionLimit.class);
          if (permission!=null) {
             needLogin = permission.limit();
             needAdminuser = permission.adminuser();
          }
    
          if (needLogin) {
             XxlJobUser loginUser = loginService.ifLogin(request, response);
             if (loginUser == null) {
                response.setStatus(302);
                response.setHeader("location", request.getContextPath()+"/toLogin");
                return false;
             }
             if (needAdminuser && loginUser.getRole()!=1) {
                throw new RuntimeException(I18nUtil.getString("system_permission_limit"));
             }
             request.setAttribute(LoginService.LOGIN_IDENTITY_KEY, loginUser);
          }
    
          return true;   // proceed with the next interceptor
       }
       
    }
    

    将PermissionInterceptor添加到web配置文件中。

    @Configuration
    public class WebMvcConfig implements WebMvcConfigurer {
    
        @Resource
        private PermissionInterceptor permissionInterceptor;
        @Resource
        private CookieInterceptor cookieInterceptor;
    
        @Override
        public void addInterceptors(InterceptorRegistry registry) {
            registry.addInterceptor(permissionInterceptor).addPathPatterns("/**");
            registry.addInterceptor(cookieInterceptor).addPathPatterns("/**");
        }
    
    }
    

    2.配置中心初始化

    xxljob的初始化和销毁动作在XxlJobAdminConfig中配置完成。

    @Component
    public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
    
        private static XxlJobAdminConfig adminConfig = null;
        public static XxlJobAdminConfig getAdminConfig() {
            return adminConfig;
        }
    
    
        // ---------------------- XxlJobScheduler ----------------------
    
        private XxlJobScheduler xxlJobScheduler;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            adminConfig = this;
    
            xxlJobScheduler = new XxlJobScheduler();
            xxlJobScheduler.init();
        }
    
        @Override
        public void destroy() throws Exception {
            xxlJobScheduler.destroy();
        }
    }
    

    具体初始化的操作。

    public class XxlJobScheduler  {
        private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
    
    
        public void init() throws Exception {
            // init i18n
            initI18n();
    
            // admin trigger pool start
            JobTriggerPoolHelper.toStart();
    
            // admin registry monitor run
            JobRegistryHelper.getInstance().start();
    
            // admin fail-monitor run
            JobFailMonitorHelper.getInstance().start();
    
            // admin lose-monitor run ( depend on JobTriggerPoolHelper )
            JobCompleteHelper.getInstance().start();
    
            // admin log report start
            JobLogReportHelper.getInstance().start();
    
            // start-schedule  ( depend on JobTriggerPoolHelper )
            JobScheduleHelper.getInstance().start();
    
            logger.info(">>>>>>>>> init xxl-job admin success.");
        }
    
        
        public void destroy() throws Exception {
    
            // stop-schedule
            JobScheduleHelper.getInstance().toStop();
    
            // admin log report stop
            JobLogReportHelper.getInstance().toStop();
    
            // admin lose-monitor stop
            JobCompleteHelper.getInstance().toStop();
    
            // admin fail-monitor stop
            JobFailMonitorHelper.getInstance().toStop();
    
            // admin registry stop
            JobRegistryHelper.getInstance().toStop();
    
            // admin trigger pool stop
            JobTriggerPoolHelper.toStop();
    
        }
    
        // ---------------------- I18n ----------------------
    
        private void initI18n(){
            for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
                item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
            }
        }
    

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论