# 新一代分布式任务调度工具PowerJob
# 概述
PowerJob是新一代分布式任务调度与计算框架,支持CRON、API、固定频率、固定延迟等调度策略,
提供工作流来编排任务解决依赖关系,使用简单,功能强大,文档齐全,开箱即用!
提供工作流来编排任务解决依赖关系,使用简单,功能强大,文档齐全,开箱即用!
# 为什么选择PowerJob
当前市面上流行的作业调度框架有老牌的Quartz、基于Quartz的elastic-job和原先基于Quartz后面移除依赖的xxl-job,这里分别谈一些这些框架现存的缺点。
Quartz可以视为第一代调度框架,基本上是现有所有分布式调度框架的“祖宗”。由于历史原因,它不提供Web界面,只能通过API完成任务的配置,使用起来不够方便和灵活,同时它仅支持单机执行,无法有效利用整个集群的计算能力。
xxl-job可以视为第二代调度框架,在一定程度上解决了Quartz的不足,在过去几年中是个非常优秀的调度框架,不过放到今天来看,还是存在着一些不足的,具体如下:
数据库支持单一:仅支持MySQL,使用其他DB需要自己魔改代码
有限的分布式计算能力:仅支持静态分片,无法很好的完成复杂任务的计算
不支持工作流:无法配置各个任务之间的依赖关系,不适用于有DAG需求的场景
Quartz可以视为第一代调度框架,基本上是现有所有分布式调度框架的“祖宗”。由于历史原因,它不提供Web界面,只能通过API完成任务的配置,使用起来不够方便和灵活,同时它仅支持单机执行,无法有效利用整个集群的计算能力。
xxl-job可以视为第二代调度框架,在一定程度上解决了Quartz的不足,在过去几年中是个非常优秀的调度框架,不过放到今天来看,还是存在着一些不足的,具体如下:
数据库支持单一:仅支持MySQL,使用其他DB需要自己魔改代码
有限的分布式计算能力:仅支持静态分片,无法很好的完成复杂任务的计算
不支持工作流:无法配置各个任务之间的依赖关系,不适用于有DAG需求的场景
正所谓长江后浪推前浪,在如今这个数据量日益增长、业务越来越复杂的年代,急需一款更为强大的任务调度框架来解决上诉问题,而PowerJob就是这个“救世主”。
PowerJob可以视为第三代调度框架,在任务调度的基础上,额外提供分布式计算和工作流功能,其主要特性如下:
使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。
定时策略完善:支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。
执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
DAG工作流支持:支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递
执行器支持广泛:支持Spring Bean、内置/外置Java类、Shell、Python等处理器,应用范围广。
运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率。
依赖精简:最小仅依赖关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...),同时支持所有Spring Data JPA所支持的关系型数据库。
高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
PowerJob可以视为第三代调度框架,在任务调度的基础上,额外提供分布式计算和工作流功能,其主要特性如下:
使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。
定时策略完善:支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。
执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
DAG工作流支持:支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递
执行器支持广泛:支持Spring Bean、内置/外置Java类、Shell、Python等处理器,应用范围广。
运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率。
依赖精简:最小仅依赖关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...),同时支持所有Spring Data JPA所支持的关系型数据库。
高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。
# PowerJob与各个框架的对比如下:
# 适用场景
有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。
有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。
# 整体架构
# 通用部署
# 环境要求
JDK 8 或 JDK 11 或 JDK17(三个 LTS 版本)
任意 Spring Data Jpa 支持的关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...)
任意 Spring Data Jpa 支持的关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...)
# 调度中心(powerjob-server)
1 初始化数据库:CREATE DATABASE IF NOT EXISTS powerjob-product
DEFAULT CHARSET utf8mb4 创建数据库即可
2 下载源码 修改数据库连接 启动 powerjob-server-starter服务
3 访问 http://ip:${server.port} 检验是否部署成功
开放端口:7700 + 10086 + 10010
# 执行器(powerjob-worker)
1 添加Pom依赖
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>${latest.powerjob.version}</version>
</dependency>
1
2
3
4
5
2
3
4
5
2 在 SpringBoot 配置文件(application.yml/properties)中添加相关的配置项。
# akka 工作端口,可选,默认 27777
powerjob.worker.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
powerjob.worker.app-name=my-powerjob-worker
# 调度服务器地址,IP:Port 或 域名,多值逗号分隔
powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# 通讯协议,4.3.0 开始支持 HTTP 和 AKKA 两种协议,官方推荐使用 HTTP 协议(注意 server 和 worker 都要开放相应端口)
powerjob.worker.protocol=http
# 持久化方式,可选,默认 disk
powerjob.worker.store-strategy=disk
# 任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认值 8192
powerjob.worker.max-result-length=4096
# 单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认值 8192
powerjob.worker.max-appended-wf-context-length=4096
# 同时运行的轻量级任务数量上限
powerjob.worker.max-lightweight-task-num=1024
# 同时运行的重量级任务数量上限
powerjob.worker.max-heavy-task-num=64时滚动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 处理器Processor - 举例单机和广播
1 单机处理器:BasicProcessor
单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。单机执行任务需要实现接口 BasicProcessor,代码示例如下:
// 支持 SpringBean 的形式
@Component
public class BasicProcessorDemo implements BasicProcessor {
@Resource
private MysteryService mysteryService;
@Override
public ProcessResult process(TaskContext context) throws Exception {
// 在线日志功能,可以直接在控制台查看任务日志,非常便捷
OmsLogger omsLogger = context.getOmsLogger();
omsLogger.info("BasicProcessorDemo start to process, current JobParams is {}.", context.getJobParams());
// TaskContext为任务的上下文信息,包含了在控制台录入的任务元数据,常用字段为
// jobParams(任务参数,在控制台录入),instanceParams(任务实例参数,通过 OpenAPI 触发的任务实例才可能存在该参数)
// 进行实际处理...
mysteryService.hasaki();
// 返回结果,该结果会被持久化到数据库,在前端页面直接查看,极为方便
return new ProcessResult(true, "result is xxx");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2 广播处理器:BroadcastProcessor
广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor 的基础上额外增加了 preProcess 和 postProcess 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:
@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext taskContext) throws Exception {
// 预执行,会在所有 worker 执行 process 方法前调用
return new ProcessResult(true, "init success");
}
public ProcessResult process(TaskContext context) throws Exception {
// 撰写整个worker集群都会执行的代码逻辑
return new ProcessResult(true, "release resource success");
}
@Override
public ProcessResult postProcess(TaskContext taskContext, List<TaskResult> taskResults) throws Exception {
// taskResults 存储了所有worker执行的结果(包括preProcess)
// 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果
return new ProcessResult(true, "process success");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 云药店适配
# 服务器部署
服务已经修改为Oracle版本
[服务和脚本] [点击下载](http://10.9.48.91:7777/ftp/zhangxu-neu/powerjob/)约定统一放置于 /home/neusoft/monitor/ 目录下
访问 [http://ip:7700/](http://172.22.111.248:7700/) 新建应用--登录
# pom.xml 增加如下配置
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-worker-spring-boot-starter</artifactId>
<version>4.3.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.14.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1.1-jre</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# application.yml 增加如下配置
powerjob:
worker:
port: 27777
app-name: jamesapp
server-address: 127.0.0.1:7700
protocol: http
store-strategy: disk
max-result-length: 4096
max-appended-wf-context-length: 4096
max-lightweight-task-num: 1024
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 工程添加Processor处理器
参照上面处理器代码