关于PowerJob
PowerJob(原OhMyScheduler)是全新一代分布式任务调度与计算框架,其主要功能特性如下:
- 使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。
- 定时策略完善:支持 CRON 表达式、固定频率、固定延迟和API四种定时调度策略。
- 执行模式丰富:支持单机、广播、Map、MapReduce 四种执行模式,其中 Map/MapReduce 处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
- 工作流支持:支持在线配置任务依赖关系(DAG),以可视化的方式对任务进行编排,同时还支持上下游任务间的数据传递,以及多种节点类型(判断节点 & 嵌套工作流节点)。
- 执行器支持广泛:支持 Spring Bean、内置/外置 Java 类,另外可以通过引入官方提供的依赖包,一键集成 Shell、Python、HTTP、SQL 等处理器,应用范围广。
- 运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低 debug 成本,极大地提高开发效率。
- 依赖精简:最小仅依赖关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...)
- 高可用 & 高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
- 故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
适用场景
- 有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表、未支付订单超时取消等。
- 有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
- 有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce 处理器完成任务的分发,调动整个集群加速计算。
- 有需要延迟执行某些任务的业务场景:比如订单过期处理等。
同类产品对比
基本概念
分组概念
- appName:应用名称,建议与用户实际接入 PowerJob 的应用名称保持一致,用于业务分组与隔离。一个 appName 等于一个业务集群,也就是实际的一个 Java 项目。
核心概念
- 任务(Job):描述了需要被 PowerJob 调度的任务信息,包括任务名称、调度时间、处理器信息等。
- 任务实例( JobInstance,简称 Instance):任务(Job)被调度执行后会生成任务实例(Instance),任务实例记录了任务的运行时信息(任务与任务实例的关系类似于类与对象的关系)。
- 作业(Task):任务实例的执行单元,一个 JobInstance 存在至少一个 Task,具体规则如下:
- 工作流(Workflow):由 DAG(有向无环图)描述的一组任务(Job),用于任务编排。
- 工作流实例(WorkflowInstance):工作流被调度执行后会生成工作流实例,记录了工作流的运行时信息。
扩展概念
- JVM 容器:以 Maven 工程项目的维度组织一堆 Java 文件(开发者开发的众多 Java 处理器),可以通过前端网页动态发布并被执行器加载,具有极强的扩展能力和灵活性。
- OpenAPI:允许开发者通过接口来完成手工的操作,让系统整体变得更加灵活。开发者可以基于 API 便捷地扩展 PowerJob 原有的功能。
定时任务类型
- API:该任务只会由 powerjob-client 中提供的 OpenAPI 接口触发,server 不会主动调度。
- CRON:该任务的调度时间由 CRON 表达式指定。
- 固定频率:秒级任务,每隔多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate 相同。
- 固定延迟:秒级任务,延迟多少毫秒运行一次,功能与 java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay 相同。
- 工作流:该任务只会由其所属的工作流调度执行,server 不会主动调度该任务。如果该任务不属于任何一个工作流,该任务就不会被调度。
备注:固定延迟和固定频率任务统称秒级任务,这两种任务无法被停止,只有任务被关闭或删除时才能真正停止任务。
搭建PowerJob环境
本地启动
初始化项目
git clone https://github.com/PowerJob/PowerJob.git
导入 IDE,源码结构如下,我们需要启动调度服务器(powerjob-server),同时在 samples 工程中编写自己的处理器代码
启动调度服务器
powerjob-server 日常环境配置文件:application-daily.properties
oms.env=DAILY
logging.cnotallow=classpath:logback-dev.xml
####### 外部数据库配置(需要用户更改为自己的数据库配置) #######
spring.datasource.core.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.core.jdbc-url=jdbc:mysql://localhost:3306/powerjob-daily?useUnicode=true&characterEncoding=UTF-8&serverTimeznotallow=Asia/Shanghai
spring.datasource.core.username=root
spring.datasource.core.password=No1Bug2Please3!
spring.datasource.core.hikari.maximum-pool-size=20
spring.datasource.core.hikari.minimum-idle=5
####### mongoDB配置,非核心依赖,通过配置 oms.mongodb.enable=false 来关闭 #######
oms.mongodb.enable=true
spring.data.mongodb.uri=mongodb://localhost:27017/powerjob-daily
####### 邮件配置(不需要邮件报警可以删除以下配置来避免报错) #######
spring.mail.host=smtp.163.com
spring.mail.username=zqq@163.com
spring.mail.password=GOFZPNARMVKCGONV
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
####### 资源清理配置 #######
oms.instanceinfo.retentinotallow=1
oms.container.retention.local=1
oms.container.retention.remote=-1
####### 缓存配置 #######
oms.instance.metadata.cache.size=1024
图片
docker-compose启动
环境要求
本地需要安装docker和docker-compose
下载代码
git clone --depth=1 https://github.com/PowerJob/PowerJob.git
运行
cd PowerJob
docker-compose up
docker-compose up -d
刚开始启动时,powerjob-worker-samples会启动失败,等powerjob-server启动成功后,powerjob-worker-samples才会启动成功。这大概需要几分钟。
运行成功后,浏览器访问 http://127.0.0.1:7700/
应用名称:powerjob-worker-samples
密码:powerjob123
停止
docker-compose down
Stopping powerjob-worker-samples ... done
Stopping powerjob-server ... done
Stopping powerjob-mysql ... done
Removing powerjob-worker-samples ... done
Removing powerjob-server ... done
Removing powerjob-mysql ... done
cd PowerJob
rm -rf powerjob-data
SpringBoot集成PowerJob
添加相关maven依赖
tech.powerjob
powerjob-worker-spring-boot-starter
${latest.powerjob.version}
配置文件配置
powerjob:
worker:
# akka 工作端口,可选,默认 27777
akka-port: 27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
app-name: ${spring.application.name}
# 调度服务器地址,IP:Port 或 域名,多值逗号分隔
server-address: 81.70.117.188:7700
# 持久化方式,可选,默认 disk
store-strategy: disk
# 任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认值 8192
max-result-length: 8192
# 单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认值 8192
max-appended-wf-context-length: 8192
处理器(Processor)开发
处理器概述
基本概念
PowerJob 支持 Python、Shell、HTTP、SQL 等众多通用任务的处理,开发者只需要引入依赖,在控制台配置好相关参数即可,关于这部分详见 官方处理器 ,此处不再赘述。本章将重点阐述 Java 处理器开发方法与使用技巧。
- Java 处理器可根据代码所处位置划分为内置 Java 处理器和外置 Java 处理器,前者直接集成在宿主应用(也就是接入本系统的业务应用)中,一般用来处理业务需求;后者可以在一个独立的轻量级的 Java 工程中开发,通过 JVM 容器技术(详见容器章节)被 worker 集群热加载,提供 Java 的“脚本能力”,一般用于处理灵活多变的需求。
- Java 处理器可根据对象创建者划分为 SpringBean 处理器和普通 Java 对象处理器,前者由 Spring IOC 容器完成处理器的创建和初始化,后者则由 PowerJob 维护其生命周期。如果宿主应用支持 Spring,强烈建议使用 SpringBean 处理器,开发者仅需要将 Processor 注册进 Spring IOC 容器(一个 @Component 注解或一句 bean 配置)即可享受 Spring 带来的便捷之处。
- Java处理器可根据功能划分为单机处理器、广播处理器、Map 处理器和 MapReduce 处理器。
单机处理器(BasicProcessor)对应了单机任务,即某个任务的某次运行只会有某一台机器的某一个线程参与运算。
广播处理器(BroadcastProcessor)对应了广播任务,即某个任务的某次运行会调动集群内所有机器参与运算。
Map处理器(MapProcessor)对应了Map任务,即某个任务在运行过程中,允许产生子任务并分发到其他机器进行运算。
MapReduce 处理器(MapReduceProcessor)对应了 MapReduce 任务,在 Map 任务的基础上,增加了所有任务结束后的汇总统计。
入参 TaskContext
TaskContext 包含了本次任务的上下文信息,具体信息如下
属性列表(红色标注的为常用属性) |
|
属性名称 |
意义/用法 |
jobId |
任务 ID,开发者一般无需关心此参数 |
instanceId |
任务实例 ID,全局唯一,开发者一般无需关心此参数 |
subInstanceId |
子任务实例 ID,秒级任务使用,开发者一般无需关心此参数 |
taskId |
采用链式命名法的 ID,在某个任务实例内唯一,开发者一般无需关心此参数 |
taskName |
task 名称,Map/MapReduce 任务的子任务的值为开发者指定,否则为系统默认值,开发者一般无需关心此参数 |
jobParams |
任务参数 对于非工作流中的任务其值等同于控制台录入的任务参数;如果该任务为工作流中的任务且有配置节点参数信息,那么接收到的是节点配置的参数信息 |
instanceParams |
任务实例参数 对于非工作流中的任务 其值 等同于 OpenAPI 传递的实例参数,非 OpenAPI 触发的任务则一定为空。如果该任务为工作流中的任务那么这里实际接收到的是工作流上下文信息,建议使用 getWorkflowContext 方法获取上下文信息 |
maxRetryTimes |
Task 的最大重试次数 |
currentRetryTimes |
Task 的当前重试次数,和 maxRetryTimes 联合起来可以判断当前是否为该 Task 的最后一次运行机会 |
subTask |
子 Task,Map/MapReduce 处理器专属,开发者调用map方法时传递的子任务列表中的某一个 |
omsLogger |
在线日志,用法同 Slf4J,记录的日志可以直接通过控制台查看,非常便捷和强大!不过使用过程中需要注意频率,滥用在线日志会对 Server 造成巨大的压力 |
userContext |
用户在 PowerJobWorkerConfig 中设置的自定义上下文 |
workflowContext |
工作流上下文,更多信息见下方说明 |
工作流上下文( WorkflowContext )
该属性是 v4.0.0 版本的重大变更之一,移除了原来的参数传递机制,提供了 API 让开发者可以更加灵活便捷地在工作流中实现信息的传递。
属性列表 |
|
属性名称 |
意义/用法 |
wfInstanceId |
工作流实例 ID |
data |
工作流上下文数据,键值对 |
appendedContextData |
当前任务向工作流上下文中追加的数据。在任务执行完成后 ProcessorTracker 会将其上报给 TaskTracker,TaskTracker 在当前任务执行完成后会将这个信息上报给 server ,追加到当前的工作流上下文中,供下游任务消费 |
上游任务通过 WorkflowContext#appendData2WfContext(String key,Object value) 方法向工作流上下文中追加数据,下游任务便可以通过 WorkflowContext#fetchWorkflowContext() 方法获取到相应的数据进行消费。注意,当追加的上下文信息的 key 已经存在于当前的上下文中时,新的 value 会覆盖之前的值。另外,每次任务实例追加的上下文数据大小也会受到 worker 的配置项 powerjob.worker.max-appended-wf-context-length 的限制,超过这个长度的会被直接丢弃。
返回值 ProcessResult
方法的返回值为 ProcessResult,代表了本次 Task 执行的结果,包含 success 和 msg 两个属性,分别用于传递 Task 是否执行成功和 Task 需要返回的信息。
处理器开发示例
单机处理器:BasicProcessor
单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。单机执行任务需要实现接口 BasicProcessor,代码示例如下:
/**
* @Author iron.guo
* @Date 2023/1/7
* @Description
*/
@Component
@Slf4j
public class StandaloneProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("处理器启动成功,context 是 {}.", context);
log.info("单机处理器正在处理");
log.info(context.getJobParams());
omsLogger.info("处理器执行结束");
boolean success = true;
return new ProcessResult(success, context + ": " + success);
}
}
执行结果
图片
广播处理器:BroadcastProcessor
广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor 的基础上额外增加了 preProcess 和 postProcess 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:
@Slf4j
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("预执行,会在所有 worker 执行 process 方法前调用");
log.info("预执行,会在所有 worker 执行 process 方法前调用");
// 预执行,会在所有 worker 执行 process 方法前调用
return new ProcessResult(true, "init success");
}
@Override
public ProcessResult process(TaskContext context) throws Exception {
OmsLogger omsLogger = context.getOmsLogger();
// 撰写整个worker集群都会执行的代码逻辑
omsLogger.info("撰写整个worker集群都会执行的代码逻辑");
log.info("撰写整个worker集群都会执行的代码逻辑");
return new ProcessResult(true, "release resource success");
}
@Override
public ProcessResult postProcess(TaskContext context, List taskResults) throws Exception {
// taskResults 存储了所有worker执行的结果(包括preProcess)
// 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果");
log.info("收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果");
return new ProcessResult(true, "process success");
}
}
执行结果
图片
并行处理器:MapReduceProcessor
MapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分,将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的不二之选!实现 MapReduce 处理器需要继承 MapReduceProcessor类,具体用法如下示例代码所示:
@Slf4j
@Component
public class MapReduceProcessorDemo implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) throws Exception {
final OmsLogger omsLogger = context.getOmsLogger();
// 判断是否为根任务
if (isRootTask()) {
// 构造子任务
List subTaskList = Lists.newLinkedList();
SubTask subTask=new SubTask();
subTask.setSiteId(1L);
subTask.setName("iron.guo");
subTaskList.add(subTask);
/*
* 子任务的构造由开发者自己定义
* eg. 现在需要从文件中读取100W个ID,并处理数据库中这些ID对应的数据,那么步骤如下:
* 1. 根任务(RootTask)读取文件,流式拉取100W个ID,并按1000个一批的大小组装成子任务进行派发
* 2. 非根任务获取子任务,完成业务逻辑的处理
*/
// 调用 map 方法,派发子任务(map 可能会失败并抛出异常,做好业务操作)
map(subTaskList, "DATA_PROCESS_TASK");
omsLogger.info("执行根任务-派发子任务");
return new ProcessResult(true, "ROOT_PROCESS_SUCCESS");
}
// 非子任务,可根据 subTask 的类型 或 TaskName 来判断分支
if (context.getSubTask() instanceof SubTask) {
omsLogger.info("执行子任务开始");
omsLogger.info("Get from SubTask : name is {} and id is {}",((SubTask) context.getSubTask()).getName(),((SubTask) context.getSubTask()).getSiteId());
// 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器
return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS");
}
return new ProcessResult(false, "UNKNOWN_BUG");
}
@Override
public ProcessResult reduce(TaskContext taskContext, List taskResults) {
// 所有 Task 执行结束后,reduce 将会被执行
// taskResults 保存了所有子任务的执行结果
// 用法举例,统计执行结果
AtomicLong successCnt = new AtomicLong(0);
taskResults.forEach(tr -> {
if (tr.isSuccess()) {
successCnt.incrementAndGet();
}
});
// 该结果将作为任务最终的执行结果
return new ProcessResult(true, "success task num:" + successCnt.get());
}
// 自定义的子任务
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
private static class SubTask {
private Long siteId;
private String name;
}
}
执行结果
图片
注:Map 处理器相当于 MapReduce 处理器的阉割版本(阉割了 reduce 方法),此处不再单独举例。
工作流
图片
点击右上角按钮 新建工作流,即可录入新的工作流,具体界面和说明如下所示。
- 工作流名称:名称,无实际业务用途,请尽量精简字段
- 工作流描述:描述,无实际业务用途,请尽量精简字段
- 定时信息:该工作流的触发方式的触发方式,包含时间表达式类型选择框和时间表达式输入框
CRON -> 填写 CRON 表达式(在线生成网站)
API -> 不需要填写任何参数,表明该任务由 OpenAPI 触发
- 生命周期:定时策略生效的时间段
- 最大实例:该工作流同时执行的数量
- 任务依赖关系:提供编辑界面可视化操作,绘制 DAG(有向无环图),配置工作流内各个任务的依赖关系
DAG 操作指南
编辑依赖关系
v4.0.0 以后支持节点的自由拖拉拽,不用再点点点了,哈哈哈 ~
- 添加节点:点击 DAG 编辑框左上方的 “导入任务”,导入当前存在的任务(需要提前在 任务管理界面 录入任务),生成 DAG 的节点
- 连接节点:点击起始节点的任意一个锚点摁住不放,拖动鼠标连接到另一个节点的任意一个锚点即可
- 删除节点:选中需要删除的节点,按退格键( 注意:windows 下使用退格键 [Backspace],macOS 下使用删除键 [delete] )
- 删除边:选中需要删除的边,按退格键( 注意:windows 下使用退格键 [Backspace],macOS 下使用删除键 [delete] )
导入任务节点
任务为之前创建的任务,可用工作流形式串联起来执行。
图片
编辑节点信息
点击需要编辑的节点,在右侧会弹出一个编辑框,如下图所示
图片
- 任务名称:当前节点引用的任务名称,点击可编辑(支持输入名称进行模糊搜索)
- 节点名称:节点的名称,无实际业务用途,在能明确表示节点背后的业务逻辑的情况下请尽量精简字段
- 节点参数:节点的参数配置,当这个信息不为空的时候使用这个参数覆盖当前节点所引用的任务所配置的参数信息
- 是否启用:未启用的节点将会直接跳过
- 失败跳过:当这个节点执行失败的时候不会打断整个工作流的执行
特殊节点说明
判断节点
图片
判断节点 不允许失败跳过以及禁用,节点参数中存储的是 Groovy 代码(执行 Groovy 代码时会将当前工作流上下文作为 context 变量注入到代码执行的上下文中),其执行结果仅能返回 "true" 或者 "false",同时判断节点仅有且必须有两条“输出”路径。会根据该代码的执行结果决定下游需要执行的节点。这里处理的原则是, 仅 cancel 那些只能通过被 disable 掉的边可达的节点
举个两个栗子,灰色代表相应的边 或者 节点被 disable 或 cancel,菱形代表判断节点,假定执行结果为 true
图片
case 3 以及 case 4 中的节点 3 都会被 cancel ,因为它只能通过节点 1 -> 节点 3 的边可达(该边的属性为 false),但对于节点 5 而言,在 case 4 中因为判断节点 2 的执行结果为 true ,那么其可以通过节点 2 -> 节点 5 的边可达,所以不会被 disable 。
备注:如果需要根据上游节点的执行结果决定下游节点,可以将上游节点的执行结果注入上下文中,再在判断节点中做相应的判断。
工作流嵌套节点
图片
该节点代表对某个工作流的引用,节点的 jobId 属性存储的是工作流 id,其他属性和普通的任务节点一致。不允许出现循环引用以及多级嵌套的情况,即嵌套节点中指向的工作流一定是一个不含嵌套节点的工作流。
执行到该节点时,如果该节点处于启用状态,那么将启动该节点所引用工作流的一个新实例,待该实例执行完成后再同步更新该节点的状态。
注意,创建子工作流时,会透传当前的上下文作为工作流的实例参数,在子工作流执行完成时会合并子工作流的上下文至父工作流的上下文中。
重试子工作流不会联动重试父工作流,但失败的子工作流会随着父工作流的重试而原地重试(不会生成新的实例)