# 新一代分布式任务调度工具PowerJob

# 概述

PowerJob是新一代分布式任务调度与计算框架,支持CRON、API、固定频率、固定延迟等调度策略,
提供工作流来编排任务解决依赖关系,使用简单,功能强大,文档齐全,开箱即用!

# 为什么选择PowerJob

当前市面上流行的作业调度框架有老牌的Quartz、基于Quartz的elastic-job和原先基于Quartz后面移除依赖的xxl-job,这里分别谈一些这些框架现存的缺点。
Quartz可以视为第一代调度框架,基本上是现有所有分布式调度框架的“祖宗”。由于历史原因,它不提供Web界面,只能通过API完成任务的配置,使用起来不够方便和灵活,同时它仅支持单机执行,无法有效利用整个集群的计算能力。
xxl-job可以视为第二代调度框架,在一定程度上解决了Quartz的不足,在过去几年中是个非常优秀的调度框架,不过放到今天来看,还是存在着一些不足的,具体如下:
数据库支持单一:仅支持MySQL,使用其他DB需要自己魔改代码
有限的分布式计算能力:仅支持静态分片,无法很好的完成复杂任务的计算
不支持工作流:无法配置各个任务之间的依赖关系,不适用于有DAG需求的场景
正所谓长江后浪推前浪,在如今这个数据量日益增长、业务越来越复杂的年代,急需一款更为强大的任务调度框架来解决上诉问题,而PowerJob就是这个“救世主”。
PowerJob可以视为第三代调度框架,在任务调度的基础上,额外提供分布式计算和工作流功能,其主要特性如下:
使用简单:提供前端Web界面,允许开发者可视化地完成调度任务的管理(增、删、改、查)、任务运行状态监控和运行日志查看等功能。
定时策略完善:支持CRON表达式、固定频率、固定延迟和API四种定时调度策略。
执行模式丰富:支持单机、广播、Map、MapReduce四种执行模式,其中Map/MapReduce处理器能使开发者寥寥数行代码便获得集群分布式计算的能力。
DAG工作流支持:支持在线配置任务依赖关系,可视化得对任务进行编排,同时还支持上下游任务间的数据传递
执行器支持广泛:支持Spring Bean、内置/外置Java类、Shell、Python等处理器,应用范围广。
运维便捷:支持在线日志功能,执行器产生的日志可以在前端控制台页面实时显示,降低debug成本,极大地提高开发效率。
依赖精简:最小仅依赖关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...),同时支持所有Spring Data JPA所支持的关系型数据库。
高可用&高性能:调度服务器经过精心设计,一改其他调度框架基于数据库锁的策略,实现了无锁化调度。部署多个调度服务器可以同时实现高可用和性能的提升(支持无限的水平扩展)。
故障转移与恢复:任务执行失败后,可根据配置的重试策略完成重试,只要执行器集群有足够的计算节点,任务就能顺利完成。

# PowerJob与各个框架的对比如下:

# 适用场景

有定时执行需求的业务场景:如每天凌晨全量同步数据、生成业务报表等。
有需要全部机器一同执行的业务场景:如使用广播执行模式清理集群日志。
有需要分布式处理的业务场景:比如需要更新一大批数据,单机执行耗时非常长,可以使用Map/MapReduce处理器完成任务的分发,调动整个集群加速计算。

# 整体架构

# 通用部署

# 环境要求

JDK 8 或 JDK 11 或 JDK17(三个 LTS 版本)
任意 Spring Data Jpa 支持的关系型数据库(MySQL/PostgreSQL/Oracle/MS SQLServer...)

# 调度中心(powerjob-server)

1 初始化数据库:CREATE DATABASE IF NOT EXISTS powerjob-product DEFAULT CHARSET utf8mb4 创建数据库即可
2 下载源码 修改数据库连接 启动 powerjob-server-starter服务
3 访问 http://ip:${server.port} 检验是否部署成功

开放端口:7700 + 10086 + 10010

# 执行器(powerjob-worker)

1 添加Pom依赖

<dependency>
    <groupId>tech.powerjob</groupId>
    <artifactId>powerjob-worker-spring-boot-starter</artifactId>
    <version>${latest.powerjob.version}</version>
</dependency>
1
2
3
4
5

2 在 SpringBoot 配置文件(application.yml/properties)中添加相关的配置项。

# akka 工作端口,可选,默认 27777
powerjob.worker.akka-port=27777
# 接入应用名称,用于分组隔离,推荐填写 本 Java 项目名称
powerjob.worker.app-name=my-powerjob-worker
# 调度服务器地址,IP:Port 或 域名,多值逗号分隔
powerjob.worker.server-address=127.0.0.1:7700,127.0.0.1:7701
# 通讯协议,4.3.0 开始支持 HTTPAKKA 两种协议,官方推荐使用 HTTP 协议(注意 server 和 worker 都要开放相应端口)
powerjob.worker.protocol=http
# 持久化方式,可选,默认 disk
powerjob.worker.store-strategy=disk
# 任务返回结果信息的最大长度,超过这个长度的信息会被截断,默认值 8192
powerjob.worker.max-result-length=4096
# 单个任务追加的工作流上下文最大长度,超过这个长度的会被直接丢弃,默认值 8192
powerjob.worker.max-appended-wf-context-length=4096
# 同时运行的轻量级任务数量上限
powerjob.worker.max-lightweight-task-num=1024
# 同时运行的重量级任务数量上限
powerjob.worker.max-heavy-task-num=64时滚动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 处理器Processor - 举例单机和广播

1 单机处理器:BasicProcessor
单机执行的策略下,server 会在所有可用 worker 中选取健康度最佳的机器进行执行。单机执行任务需要实现接口 BasicProcessor,代码示例如下:

// 支持 SpringBean 的形式
@Component
public class BasicProcessorDemo implements BasicProcessor {
    @Resource
    private MysteryService mysteryService;
    @Override
    public ProcessResult process(TaskContext context) throws Exception {
        // 在线日志功能,可以直接在控制台查看任务日志,非常便捷
        OmsLogger omsLogger = context.getOmsLogger();
        omsLogger.info("BasicProcessorDemo start to process, current JobParams is {}.", context.getJobParams());        
        // TaskContext为任务的上下文信息,包含了在控制台录入的任务元数据,常用字段为
        // jobParams(任务参数,在控制台录入),instanceParams(任务实例参数,通过 OpenAPI 触发的任务实例才可能存在该参数)
        // 进行实际处理...
        mysteryService.hasaki();
        // 返回结果,该结果会被持久化到数据库,在前端页面直接查看,极为方便
        return new ProcessResult(true, "result is xxx");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

2 广播处理器:BroadcastProcessor
广播执行的策略下,所有机器都会被调度执行该任务。为了便于资源的准备和释放,广播处理器在BasicProcessor 的基础上额外增加了 preProcess 和 postProcess 方法,分别在整个集群开始之前/结束之后选一台机器执行相关方法。代码示例如下:

@Component
public class BroadcastProcessorDemo implements BroadcastProcessor {
    @Override
    public ProcessResult preProcess(TaskContext taskContext) throws Exception {
        // 预执行,会在所有 worker 执行 process 方法前调用
        return new ProcessResult(true, "init success");
    }
    public ProcessResult process(TaskContext context) throws Exception {
        // 撰写整个worker集群都会执行的代码逻辑
        return new ProcessResult(true, "release resource success");
    }
    @Override
    public ProcessResult postProcess(TaskContext taskContext, List<TaskResult> taskResults) throws Exception {
        // taskResults 存储了所有worker执行的结果(包括preProcess)
        // 收尾,会在所有 worker 执行完毕 process 方法后调用,该结果将作为最终的执行结果
        return new ProcessResult(true, "process success");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 云药店适配

# 服务器部署

服务已经修改为Oracle版本
[服务和脚本] [点击下载](http://10.9.48.91:7777/ftp/zhangxu-neu/powerjob/)
约定统一放置于 /home/neusoft/monitor/ 目录下
访问 [http://ip:7700/](http://172.22.111.248:7700/) 新建应用--登录

# pom.xml 增加如下配置

		<dependency>
  		   <groupId>tech.powerjob</groupId>
    	   <artifactId>powerjob-worker-spring-boot-starter</artifactId>
    	    <version>4.3.6</version>
		</dependency>
		 <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.14.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.14.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
            <version>2.14.1</version>
        </dependency>
       <dependency>
	    	 <groupId>com.google.guava</groupId>
    	    <artifactId>guava</artifactId>
    		<version>30.1.1-jre</version>
		</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# application.yml 增加如下配置

powerjob:
    worker:
      port: 27777
      app-name: jamesapp
      server-address: 127.0.0.1:7700
      protocol: http
      store-strategy: disk
      max-result-length: 4096
      max-appended-wf-context-length: 4096
      max-lightweight-task-num: 1024
1
2
3
4
5
6
7
8
9
10

# 工程添加Processor处理器

参照上面处理器代码

# 项目实践截图

# 首页

# 任务管理

# 任务实例

# 工作流管理

# 工作流实例

# 工作流编辑-库存

# 工作流编辑-销售