XXL-JOB分布式任务调度 (从0-1项目实战)
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
目录
📢 本博客转载自【黑马学成在线】!
🚀 主题:XXL-JOB分布式调度框架
📖 内容涵盖:
✅ 从0到1部署 —— 手把手教你搭建XXL-JOB调度中心
✅ 源码级分析 —— 深入理解调度机制与执行原理
✅ 真实项目实战 —— 结合【学成在线】媒资业务场景,落地分布式任务调度💡 适合人群:
🔹 想学习分布式任务调度的开发者
🔹 需要解决定时任务高可用、分片问题的团队
🔹 对XXL-JOB底层实现感兴趣的技术控👇 下面阅读原文,开启分布式调度之旅吧~
一.什么是分布式任务调度?
通常任务调度的程序是集成在应用中的,比如:优惠卷服务中包括了定时发放优惠卷的的调度程序,结算服务中包括了定期生成报表的任务调度程序,由于采用分布式架构,一个服务往往会部署多个冗余实例来运行我们的业务,在这种分布式系统环境下运行任务调度,我们称之为分布式任务调度,如下图:
分布式调度要实现的目标:
不管是任务调度程序集成在应用程序中,还是单独构建的任务调度系统,如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建,这样就可以具有分布式系统的特点,并且提高任务的调度处理能力:
1、并行任务调度
并行任务调度实现靠多线程,如果有大量任务需要调度,此时光靠多线程就会有瓶颈了,因为一台计算机CPU的处理能力是有限的。
如果将任务调度程序分布式部署,每个结点还可以部署为集群,这样就可以让多台计算机共同去完成任务调度,我们可以将任务分割为若干个分片,由不同的实例并行执行,来提高任务调度的处理效率。
2、高可用
若某一个实例宕机,不影响其他实例来执行任务。
3、弹性扩容
当集群中增加实例就可以提高并执行任务的处理效率。
4、任务管理与监测
对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况,从而做出快速的应急处理响应。
5、避免任务重复执行
当任务调度以集群方式部署,同一个任务调度可能会执行多次,比如在上面提到的电商系统中到点发优惠券的例子,就会发放多次优惠券,对公司造成很多损失,所以我们需要控制相同的任务在多个运行实例上只执行一次。
而XXL-JOB就可以很好的实现这些目标。
定时任务调度框架:
1.单机:
-
Timer:这是 java 自带的 java.util.Timer 类,这个类允许你调度一个 java.util.TimerTask 任务。使用这种方式可以让你的程序按照某一个频度执行,但不能在指定时间运行。一般用的较少。
-
ScheduledExecutorService:也 jdk 自带的一个类;是基于线程池设计的定时任务类,每个调度任务都会分配到线程池中的一个线程去执行,也就是说,任务是并发执行,互不影响。
-
Spring Task:Spring3.0 以后自带的 task,配置简单功能较多,如果系统使用单机的话可以优先考虑spring定时器。
2.分布式:
-
Quartz:Java事实上的定时任务标准。但Quartz关注点在于定时任务而非数据,并无一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行调度的功能。
-
TBSchedule:阿里早期开源的分布式任务调度系统。代码略陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重。
-
elastic-job:当当开发的弹性分布式任务调度系统,功能丰富强大,采用zookeeper实现分布式协调,实现任务高可用以及分片,并且可以支持云开发。
-
Saturn:是唯品会自主研发的分布式的定时任务的调度平台,基于当当的elastic-job 版本1开发,并且可以很好的部署到docker容器上。
-
xxl-job: 是大众点评员工徐雪里于2015年发布的分布式任务调度平台,是一个轻量级分布式任务调度框架,其核心设计目标是开发迅速、学习简单、轻量级、易扩展,其在唯品会内部已经发部署350+个节点,每天任务调度4000多万次。同时,管理和统计也是它的亮点。使用案例 大众点评、易信(IM)、京东(电商系统)、360金融(金融系统)、易企秀、随行付(支付系统)、优信二手车。
二.XXL-JOB介绍与部署:
1.什么是XXL-JOB?
XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线,开箱即用。
官网:官网地址
文档:官方文档
XXL-JOB主要有调度中心、执行器、任务:
调度中心:
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码;
主要职责为执行器管理、任务管理、监控运维、日志管理等
任务执行器:
负责接收调度请求并执行任务逻辑;
只要职责是注册服务、任务执行服务(接收到任务后会放入线程池中的任务队列)、执行结果上报、日志服务等
任务:负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下:
执行流程:
1.任务执行器根据配置的调度中心的地址,自动注册到调度中心
2.达到任务触发条件,调度中心下发任务
3.执行器基于线程池执行任务,并把执行结果放入内存队列中、把执行日志写入日志文件中
4.执行器消费内存队列中的执行结果,主动上报给调度中心
5.当用户在调度中心查看任务日志,调度中心请求任务执行器,任务执行器读取任务日志文件并返回日志详情
XXL-JOB的特性:(了解)
2.部署XXL-JOB:
文档地址:
源码仓库地址:
GitHub:https://github.com/xuxueli/xxl-jobhttps://github.com/xuxueli/xxl-job
源码仓库地址 | Release Download |
---|---|
https://github.com/xuxueli/xxl-job | Download |
http://gitee.com/xuxueli0323/xxl-job | Download |
中央仓库地址
当前项目使用版本:2.4.1-SNAPSHOT
注:为了统一版本,已统一下载,在资料中获取:xxl-job-master.zip
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${最新稳定版本}</version>
</dependency>
使用IDEA打开解压后的目录:
xxl-job-admin:调度中心
xxl-job-core:公共依赖
xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用)
:xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式;
:xxl-job-executor-sample-frameless:无框架版本;
doc :文档资料,包含数据库脚本。
xxl-job-master: xxl-job-admin:调度中心 xxl-job-core:公共依赖 xxl-job-executor-samples:执行器Sample示例(选择合适的版本执行器,可直接使用,也可以参考其并将现有项目改造成执行器) xxl-job-executor-sample-springboot:Springboot版本,通过Springboot管理执行器,推荐这种方式; xxl-job-executor-sample-frameless:无框架版本;
我们导入xxl-job数据库(/xxl-job-master/doc/db/tables_xxl_job.sql):
并在 xxl-job-admin 修改数据库连接后, 访问:http://192.168.101.65:8088/xxl-job-admin/
账号和密码:admin/123456
如果无法使用虚拟机运行xxl-job可以在本机idea运行xxl-job调度中心。
调度中心集群部署(可选):
调度中心支持集群部署,提升调度系统容灾和可用性。
调度中心集群部署时,几点要求和建议:
-
DB配置保持一致;
-
集群机器时钟保持一致(单机集群忽视);
-
建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。
3.XXL-JOB的使用:
XXL-JOB一般分为调度中心与执行器,调度中心就是我们打开的网页:
而执行器是需要我们进行配置:
执行器负责与调度中心通信接收调度中心发起的任务调度请求。
首先进入调度中心的执行器管理:
上面我们启动了xxl-job-executor-sample-springboot 执行器项目,当前已注册上来,我们执行使用改执行器。
执行器属性说明:
AppName: 是每个执行器集群的唯一标示AppName, 执行器会周期性以AppName为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;
名称: 执行器的名称, 因为AppName限制字母数字等组成,可读性不强, 名称为了提高执行器的可读性;排序: 执行器的排序, 系统中需要执行器的地方,如任务新增, 将会按照该排序读取可用的执行器列表;
注册方式:调度中心获取执行器地址的方式;
- 自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址。
- 手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用。
机器地址:"注册方式"为"手动录入"时有效,支持人工维护执行器的地址信息;
点击新增,填写执行器信息,appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
添加成功:
配置部署“执行器项目”:
“执行器”项目:xxl-job-executor-sample-springboot (提供多种版本执行器供选择,现以 springboot 版本为例,可直接使用,也可以参考其并将现有项目改造成执行器)
作用:负责接收“调度中心”的调度并执行;可直接部署执行器,也可以将执行器集成到现有业务项目中。
随后在service工程添加依赖:
<!-- xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.1-SNAPSHOT</version>
</dependency>
随后在配置中心配置相关文件:
xxl:
job:
# 调度中心部署根地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
admin:
addresses: http://127.0.0.1:8088/xxl-job-admin
executor:
# 执行器名称(AppName) [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: media-process-service
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
ip:
# 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /data/applogs/xxl-job/jobhandler
# 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
# 执行器通讯TOKEN [选填]:非空时启用;
accessToken: default_token
注意配置中的appname这是执行器的应用名,port是执行器启动的端口,如果本地启动多个执行器,注意端口不能重复。
执行器组件,配置内容说明:
package com.xxl.job.executor.core.config;
import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* xxl-job config
*
* @author xuxueli 2017-04-28
*/
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
/**
* 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
*
* 1、引入依赖:
* <dependency>
* <groupId>org.springframework.cloud</groupId>
* <artifactId>spring-cloud-commons</artifactId>
* <version>${version}</version>
* </dependency>
*
* 2、配置文件,或者容器启动变量
* spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
*
* 3、获取IP
* String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
*/
}
随后将 xxl-job 示例工程下配置类拷贝到媒资管理的service工程下:
/**
* @description 测试执行器
* @version 1.0
*/
@Component
@Slf4j
public class SampleJob {
@XxlJob("testJob")
public void testJob() throws Exception {
log.info("开始执行.....");
}
}
到此完成媒资管理模块service工程配置xxl-job执行器,在xxl-job调度中心添加执行器,下边准备测试执行器与调度中心是否正常通信,因为接口工程依赖了service工程,所以启动媒资管理模块的接口工程。
启动后观察日志,出现下边的日志表示执行器在调度中心注册成功:
同时观察调度中心中的执行器界面:
在线机器地址处已显示1个执行器。
随后我们给执行器添加任务:
任务管理 ==》 新增
点击新增,填写任务信息:
调度类型:固定速度指按固定的间隔定时调度。
Cron,通过Cron表达式实现更丰富的定时调度策略。
Cron表达式是一个字符串,通过它可以定义调度策略,格式如下:
{秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
运行模式有BEAN和GLUE,bean模式较常用就是在项目工程中编写执行器的任务代码,GLUE是将任务代码编写在调度中心。
JobHandler即任务方法名,填写任务方法上边@XxlJob注解中的名称。
路由策略:当执行器集群部署时,调度中心向哪个执行器下发任务,这里选择第一个表示只向第一个执行器下发任务,路由策略的其它选项稍后在分片广播章节详细解释。
高级配置的其它配置项稍后下面实战详细解释。
我们这里选择的是BEAN,稍后在 JobHandler 的执行方法上通过 @XxlJob("testJob") 注解即可关联。
添加成功,启动任务:
通过调度日志查看任务执行情况:
任务跑一段时间注意清理日志:
执行器集群(可选):
执行器支持集群部署,提升调度系统可用性,同时提升任务处理能力。
执行器集群部署时,几点要求和建议:
-
执行器回调地址(xxl.job.admin.addresses)需要保持一致;执行器根据该配置进行执行器自动注册等操作。
-
同一个执行器集群内AppName(xxl.job.executor.appname)需要保持一致;调度中心根据该配置动态发现不同集群的在线执行器列表。
三.XXL-JOB实战例子解析:
1.高级配置:
掌握了xxl-job的基本使用,下边思考如何进行分布式任务处理呢?如下图,我们会启动多个执行器组成一个集群,去执行任务。
执行器在集群部署下调度中心有哪些路由策略呢?
查看xxl-job官方文档,阅读高级配置相关的内容:
高级配置:
- 路由策略:当执行器集群部署时,提供丰富的路由策略,包括;
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
- 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度,通过子任务可以实现一个任务执行完成去执行另一个任务。
- 调度过期策略:
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
- 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;
- 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;
下边要重点说的是分片广播策略,分片是指是调度中心以执行器为维度进行分片,将集群中的执行器标上序号:0,1,2,3...,广播是指每次调度会向集群中的所有执行器发送任务调度,请求中携带分片参数。
每个执行器收到调度请求同时接收分片参数。
xxl-job支持动态扩容执行器集群从而动态增加分片数量,当有任务量增加可以部署更多的执行器到集群中,调度中心会动态修改分片的数量。
作业分片适用哪些场景呢?
- 分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
- 广播任务场景:广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。
所以,广播分片方式不仅可以充分发挥每个执行器的能力,并且根据分片参数可以控制任务是否执行,最终灵活控制了执行器集群分布式处理任务。
"分片广播" 和普通任务开发流程一致,不同之处在于可以获取分片参数进行分片业务处理。
BEAN、GLUE模式(Java),可参考Sample示例执行器中的示例任务"ShardingJobHandler":
/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// 分片序号,从0开始
int shardIndex = XxlJobHelper.getShardIndex();
// 分片总数
int shardTotal = XxlJobHelper.getShardTotal();
log.info("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
log.info("开始执行第" + shardIndex + "批任务");
// ....
}
添加成功:
下边启动两个执行器实例,观察每个实例的执行情况
首先在nacos中配置media-service的本地优先配置:
# 配置本地优先
spring:
cloud:
config:
override-none: true
将media-service启动两个实例
两个实例的在启动时注意端口不能冲突:
实例1 在VM options处添加:-Dserver.port=63051 -Dxxl.job.executor.port=9998
实例2 在VM options处添加:-Dserver.port=63050 -Dxxl.job.executor.port=9999
启动两个实例,观察任务调度中心,稍等片刻执行器有两个:
如果其中一个执行器挂掉,只剩下一个执行器在工作,稍等片刻调用中心发现少了一个执行器将动态调整总分片数为1。
到此作业分片任务调试完成,此时我们可以思考:
当一次分片广播到来,各执行器如何根据分片参数去分布式执行任务,保证执行器之间执行的任务不重复呢?
2.分片方案:
掌握了xxl-job的分片广播调度方式,下边思考如何分布式去执行学成在线平台中的视频处理任务。
任务添加成功后,对于要处理的任务会添加到待处理任务表中,现在启动多个执行器实例去查询这些待处理任务,此时如何保证多个执行器不会查询到重复的任务呢?
XXL-JOB并不直接提供数据处理的功能,它只会给执行器分配好分片序号,在向执行器任务调度的同时下发分片总数以及分片序号等参数,执行器收到这些参数根据自己的业务需求去利用这些参数。
下图表示了多个执行器获取视频处理任务的结构:
每个执行器收到广播任务有两个参数:分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数,如果等于分片序号则执行此任务。
上边两个执行器实例那么分片总数为2,序号为0、1,从任务1开始,如下:
1 % 2 = 1 执行器2执行
2 % 2 = 0 执行器1执行
3 % 2 = 1 执行器2执行
以此类推.
通过作业分片方案保证了执行器之间查询到不重复的任务,如果一个执行器在处理一个视频还没有完成,此时调度中心又一次请求调度,为了不重复处理同一个视频该怎么办?
首先配置调度过期策略:
查看文档如下:
- 调度过期策略:调度中心错过调度时间的补偿处理策略,包括:忽略、立即补偿触发一次等;
- 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
- 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
- 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;
这里我们选择忽略,如果立即执行一次就可能重复执行相同的任务。
其次,再看阻塞处理策略,阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度,此时该如何处理。
查看文档如下:
单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
这里如果选择覆盖之前调度则可能重复执行任务,这里选择 丢弃后续调度或单机串行方式来避免任务重复执行。
只做这些配置可以保证任务不会重复执行吗?
做不到,还需要保证任务处理的幂等性,什么是任务的幂等性?任务的幂等性是指:对于数据的操作不论多少次,操作的结果始终是一致的。在本项目中要实现的是不论多少次任务调度同一个视频只执行一次成功的转码。
什么是幂等性?
它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。
幂等性是为了解决重复提交问题,比如:恶意刷单,重复支付等。
解决幂等性常用的方案:
1)数据库约束,比如:唯一索引,主键。
2)乐观锁,常用于数据库,更新数据时根据乐观锁状态去更新。
3)唯一序列号,操作传递一个唯一序列号,操作时判断与该序列号相等则执行。
基于以上分析,在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性,要有办法去判断该视频是否处理完成,如果正在处理中或处理完则不再处理。
这里我们在数据库视频处理表中添加处理状态字段,视频处理完成更新状态为完成,执行视频处理前判断状态是否完成,如果完成则不再处理。
3.业务流程:
下边梳理整个视频上传及处理的业务流程:
上传视频成功向视频处理待处理表添加记录。
视频处理的详细流程如下:
1、任务调度中心广播作业分片。
2、执行器收到广播作业分片,从数据库读取待处理任务,读取未处理及处理失败的任务。
3、执行器更新任务为处理中,根据任务内容从MinIO下载要处理的文件。
4、执行器启动多线程去处理任务。
5、任务处理完成,上传处理后的视频到MinIO。
6、将更新任务处理结果,如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中,最后将任务完成记录写入历史表。
查询待处理任务只处理未提交及处理失败的任务,任务处理失败后进行重试,最多重试3次。
任务处理成功将待处理记录移动到历史任务表。
下图是待处理任务表:
历史任务表与待处理任务表的结构相同。(读写分离)
上传视频成功向视频处理待处理表添加记录,暂时只添加对avi视频的处理记录。
根据MIME Type去判断是否是avi视频,下边列出部分MIME Type:
avi视频的MIME Type是video/x-msvideo
修改文件信息入库方法,如下:
@Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {
//从数据库查询文件
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileMd5);
if (mediaFiles == null) {
mediaFiles = new MediaFiles();
//拷贝基本信息
BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);
mediaFiles.setId(fileMd5);
mediaFiles.setFileId(fileMd5);
mediaFiles.setCompanyId(companyId);
//媒体类型
mediaFiles.setUrl("/" + bucket + "/" + objectName);
mediaFiles.setBucket(bucket);
mediaFiles.setFilePath(objectName);
mediaFiles.setCreateDate(LocalDateTime.now());
mediaFiles.setAuditStatus("002003");
mediaFiles.setStatus("1");
//保存文件信息到文件表
int insert = mediaFilesMapper.insert(mediaFiles);
if (insert < 0) {
log.error("保存文件信息到数据库失败,{}", mediaFiles.toString());
XueChengPlusException.cast("保存文件信息失败");
}
//添加到待处理任务表
addWaitingTask(mediaFiles);
log.debug("保存文件信息到数据库成功,{}", mediaFiles.toString());
}
return mediaFiles;
}
/**
* 添加待处理任务
* @param mediaFiles 媒资文件信息
*/
private void addWaitingTask(MediaFiles mediaFiles){
//文件名称
String filename = mediaFiles.getFilename();
//文件扩展名
String exension = filename.substring(filename.lastIndexOf("."));
//文件mimeType
String mimeType = getMimeType(exension);
//如果是avi视频添加到视频待处理表
if(mimeType.equals("video/x-msvideo")){
MediaProcess mediaProcess = new MediaProcess();
BeanUtils.copyProperties(mediaFiles,mediaProcess);
mediaProcess.setStatus("1");//未处理
mediaProcess.setFailCount(0);//失败次数默认为0
mediaProcessMapper.insert(mediaProcess);
}
}
如何保证查询到的待处理视频记录不重复?
编写根据分片参数获取待处理任务的DAO方法,定义DAO接口如下:
@Mapper
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {
/**
* @description 根据分片参数获取待处理任务,保证了执行器之间查询到不重复的任务
* @param shardTotal 分片总数
* @param shardIndex 分片序号
* @param count 任务数
* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>
* 多个执行器处理任务:id % 执行器数 = 执行器编号
* status:1.未处理 3.处理失败
* fail_count:三次重试机会
* limit:每次只查2条数据
*/
@Select("select * from media_process t where t.id % #{shardTotal} = #{shardIndex} and (t.status = '1' or t.status = '3') and t.fail_count < 3 limit #{count}")
List<MediaProcess> selectListByShardIndex(@Param("shardTotal") int shardTotal, @Param("shardIndex") int shardIndex, @Param("count") int count);
}
为了避免多线程去争抢同一个任务可以使用synchronized同步锁去解决,如下代码:
synchronized(锁对象){
执行任务...
}
而synchronized只能保证同一个虚拟机中多个线程去争抢锁。
如果是多个执行器分布式部署,并不能保证同一个视频只有一个执行器去处理。
现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁,虚拟机可以分布式部署,锁也可以分布式部署,如下图:
虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务。
该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。
实现分布式锁的方案有很多,常用的如下:
1、基于数据库实现分布锁
利用数据库主键唯一性的特点,或利用数据库唯一索引、行级锁的特点,多个线程同时去更新相同的记录,谁更新成功谁就抢到锁。
2、基于redis实现锁
redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。
3、使用zookeeper实现
zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
下边基于数据库方式实现分布锁,开始执行任务将任务执行状态更新为4表示任务执行中。
下边的sql语句可以实现更新操作:
update media_process m set m.status='4' where m.id=? |
如果是多个线程去执行该sql都将会执行成功,但需求是只能有一个线程抢到锁,所以此sql无法满足需求。
下边使用乐观锁的方式实现更新操作:
下边使用乐观锁的方式实现更新操作:
update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=? |
多个线程同时执行上边的sql只会有一个线程执行成功。
什么是乐观锁、悲观锁?
synchronized是一种悲观锁,在执行被synchronized包裹的代码时需要首先获取锁,没有拿到锁则无法执行,是总悲观的认为别的线程会去抢,所以要悲观锁。
乐观锁的思想是它不认为会有线程去争抢,尽管去执行,如果没有执行成功就再去重试。
@Mapper
public interface MediaProcessMapper extends BaseMapper<MediaProcess> {
/**
* 开启一个任务
* @param id 任务id
* @return 更新记录数
* 4:处理中
*/
@Update("update media_process m set m.status='4' where (m.status='1' or m.status='3') and m.fail_count<3 and m.id=#{id}")
int startTask(@Param("id") long id);
}
service方法:
/**
* 开启一个任务
* @param id 任务id
* @return true开启任务成功,false开启任务失败
*/
public boolean startTask(long id);
// ServiceImpl实现如下
public boolean startTask(long id) {
int result = mediaProcessMapper.startTask(id);
return result<=0?false:true;
}
任务处理完成需要更新任务处理结果,任务执行成功更新视频的URL、及任务处理结果,将待处理任务记录删除,同时向历史任务表添加记录。
package com.xuecheng.media.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.mapper.MediaProcessHistoryMapper;
import com.xuecheng.media.mapper.MediaProcessMapper;
import com.xuecheng.media.model.po.MediaFiles;
import com.xuecheng.media.model.po.MediaProcess;
import com.xuecheng.media.model.po.MediaProcessHistory;
import com.xuecheng.media.service.MediaFileProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
@Slf4j
@Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {
@Autowired
MediaFilesMapper mediaFilesMapper;
@Autowired
MediaProcessMapper mediaProcessMapper;
@Autowired
MediaProcessHistoryMapper mediaProcessHistoryMapper;
/**
* @description 获取待处理任务
* @param shardIndex 分片序号
* @param shardTotal 分片总数
* @param count 获取记录数
* @return java.util.List<com.xuecheng.media.model.po.MediaProcess>
*/
@Override
public List<MediaProcess> getMediaProcessList(int shardIndex, int shardTotal, int count) {
List<MediaProcess> mediaProcesses = mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);
return mediaProcesses;
}
/**
* 开启一个任务
* @param id 任务id
* @return true开启任务成功,false开启任务失败
*/
@Override
public boolean startTask(long id) {
int result = mediaProcessMapper.startTask(id);
return result <= 0 ? false : true;
}
// 处理保存失败的结果
@Transactional
@Override
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {
// 查出任务,如果不存在则直接返回
MediaProcess mediaProcess = mediaProcessMapper.selectById(taskId);
if(mediaProcess == null){
return ;
}
// 处理失败,更新任务处理结果
LambdaQueryWrapper<MediaProcess> queryWrapperById = new LambdaQueryWrapper<MediaProcess>().eq(MediaProcess::getId, taskId);
// 处理失败
if(status.equals("3")){
MediaProcess mediaProcess_u = new MediaProcess();
mediaProcess_u.setStatus("3");
mediaProcess_u.setErrormsg(errorMsg);
mediaProcess_u.setFailCount(mediaProcess.getFailCount() + 1);
mediaProcessMapper.update(mediaProcess_u,queryWrapperById);
log.debug("更新任务处理状态为失败,任务信息:{}",mediaProcess_u);
return ;
}
// 任务处理成功
MediaFiles mediaFiles = mediaFilesMapper.selectById(fileId);
if(mediaFiles != null){
//更新媒资文件中的访问url
mediaFiles.setUrl(url);
mediaFilesMapper.updateById(mediaFiles);
}
// 处理成功,更新url和状态
mediaProcess.setUrl(url);
mediaProcess.setStatus("2");
mediaProcess.setFinishDate(LocalDateTime.now());
mediaProcessMapper.updateById(mediaProcess);
// 添加到历史记录
MediaProcessHistory mediaProcessHistory = new MediaProcessHistory();
BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);
mediaProcessHistoryMapper.insert(mediaProcessHistory);
// 删除mediaProcess
mediaProcessMapper.deleteById(mediaProcess.getId());
}
}
下面编写执行器代码:
首先根据CPU占有核心数来设置每次在数据库取出多少任务(size个),随后启动size个线程的线程池,而我们只是创建了size个线程,线程创建完成就代表videoJobHandler()方法结束:
ExecutorService threadPool = Executors.newFixedThreadPool(size);
我们想要让线程待执行方法调度完成任务后在结束,所以需要加入计数器,以此确保所有视频处理逻辑执行完毕后再结束方法:
CountDownLatch countDownLatch = new CountDownLatch(size);
以此就可以通过操作计数器控制:
countDownLatch.countDown(); // 计算器-1
// 阻塞主线程,直到所有子线程通过countDown()通知完成,确保所有视频处理逻辑执行完毕后再退出任务。 // 等待,给一个充裕的超时时间,防止无限等待,到达超时时间30min还没有处理完成则结束任务 countDownLatch.await(30, TimeUnit.MINUTES);
countDownLatch.await(30, TimeUnit.MINUTES)
设置的 30分钟超时 是指整个videoJobHandler
方法的 最大执行时间。这30分钟是从
countDownLatch.await()
调用开始计时,如果30分钟内所有子线程未完成(即计数器未减到0),主线程会继续执行,方法退出,未完成的任务会继续在后台线程池中运行,如果XXL-JOB管理界面也配置了任务超时时间(如20分钟),那么XXL-JOB会认为本次调度已超时。
下面是分析线程处理方法:
首先使用方法对每个查询出来的任务进行线程调度:
// 计数器,协调并发线程生命周期,确保所有视频处理逻辑执行完毕后再退出任务 CountDownLatch countDownLatch = new CountDownLatch(size); // 将处理任务加入线程池 mediaProcessList.forEach(mediaProcess -> { threadPool.execute(() -> { try{ // 业务逻辑... } finally { countDownLatch.countDown(); // 计算器-1 } }); // 阻塞主线程,直到所有子线程通过countDown()通知完成,确保所有视频处理逻辑执行完毕后再退出任务。 // 等待,给一个充裕的超时时间,防止无限等待,到达超时时间30min还没有处理完成则结束任务 countDownLatch.await(30, TimeUnit.MINUTES); }
在任务里面就需要抢占任务,随后将要处理的文件下载到服务器上,处理结束的视频文件,随后在将处理完成的mp4上传到MinIO。
package com.xuecheng.media.service.jobhandler;
import com.xuecheng.base.utils.Mp4VideoUtil;
import com.xuecheng.media.model.po.MediaProcess;
import com.xuecheng.media.service.MediaFileProcessService;
import com.xuecheng.media.service.MediaFileService;
import com.xuecheng.media.service.MediaFileTransactionalService;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.*;
@Slf4j
@Component
public class VideoTask {
@Autowired
MediaFileService mediaFileService;
@Autowired
private MediaFileTransactionalService mediaFileTransactionalService;
@Autowired
MediaFileProcessService mediaFileProcessService;
@Value("${videoprocess.ffmpegpath}")
String ffmpegpath;
@XxlJob("videoJobHandler")
public void videoJobHandler() throws Exception {
// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();
List<MediaProcess> mediaProcessList = null;
int size = 0;
try {
// 取出cpu核心数作为一次处理数据的条数
int processors = Runtime.getRuntime().availableProcessors();
// 一次处理视频数量不要超过cpu核心数
mediaProcessList = mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);
size = mediaProcessList.size();
log.debug("取出待处理视频任务{}条", size);
if (size < 0) {
return;
}
} catch (Exception e) {
e.printStackTrace();
return;
}
// 启动size个线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(size);
// 计数器,协调并发线程生命周期,确保所有视频处理逻辑执行完毕后再退出任务
// 因为下面代码只是创建了size个线程,线程创建完成就代表videoJobHandler()方法结束,而我们想让方法调度完成任务后在结束,所以需要加入计数器
CountDownLatch countDownLatch = new CountDownLatch(size);
// 将处理任务加入线程池
mediaProcessList.forEach(mediaProcess -> {
threadPool.execute(() -> {
try {
// 任务id
Long taskId = mediaProcess.getId();
// 抢占任务
boolean b = mediaFileProcessService.startTask(taskId);
if (!b) {
return;
}
log.debug("开始执行任务:{}", mediaProcess);
// 下边是处理逻辑
// 桶
String bucket = mediaProcess.getBucket();
// 存储路径
String filePath = mediaProcess.getFilePath();
// 原始视频的md5值
String fileId = mediaProcess.getFileId();
// 原始文件名称
String filename = mediaProcess.getFilename();
// 将要处理的文件下载到服务器上
File originalFile = mediaFileService.downloadFileFromMinIO(mediaProcess.getBucket(), mediaProcess.getFilePath());
if (originalFile == null) {
log.debug("下载待处理文件失败,originalFile:{}", mediaProcess.getBucket().concat(mediaProcess.getFilePath()));
// 保存任务处理失败的结果
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "下载待处理文件失败");
return;
}
// 处理结束的视频文件
File mp4File = null;
try {
mp4File = File.createTempFile("mp4", ".mp4");
} catch (IOException e) {
log.error("创建mp4临时文件失败");
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "创建mp4临时文件失败");
return;
}
// 视频处理结果
String result = "";
try {
// 开始处理视频
Mp4VideoUtil videoUtil = new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());
// 开始视频转换,成功将返回success
result = videoUtil.generateMp4();
} catch (Exception e) {
e.printStackTrace();
log.error("处理视频文件:{},出错:{}", mediaProcess.getFilePath(), e.getMessage());
}
if (!result.equals("success")) {
// 记录错误信息
log.error("处理视频失败,视频地址:{},错误信息:{}", bucket + filePath, result);
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, result);
return;
}
// 将mp4上传至minio
// mp4在minio的存储路径
String objectName = getFilePath(fileId, ".mp4");
// 访问url
String url = "/" + bucket + "/" + objectName;
try {
mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), "video/mp4", bucket, objectName);
// 将url存储至数据,并更新状态为成功,并将待处理视频记录删除存入历史
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "2", fileId, url, null);
} catch (Exception e) {
log.error("上传视频失败或入库失败,视频地址:{},错误信息:{}", bucket + objectName, e.getMessage());
// 最终还是失败了
mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), "3", fileId, null, "处理后视频上传或入库失败");
}
}finally {
countDownLatch.countDown(); // 计算器-1
}
});
});
// 阻塞主线程,直到所有子线程通过countDown()通知完成,确保所有视频处理逻辑执行完毕后再退出任务。
// 等待,给一个充裕的超时时间,防止无限等待,到达超时时间30min还没有处理完成则结束任务
countDownLatch.await(30, TimeUnit.MINUTES);
}
private String getFilePath(String fileMd5,String fileExt){
return fileMd5.substring(0,1) + "/" + fileMd5.substring(1,2) + "/" + fileMd5 + "/" +fileMd5 +fileExt;
}
}
进入xxl-job调度中心添加执行器和视频处理任务
在xxl-job配置任务调度策略:
1)配置阻塞处理策略为:丢弃后续调度。
2)配置视频处理调度时间间隔不用根据视频处理时间去确定,可以配置的小一些,如:5分钟,即使到达调度时间如果视频没有处理完会丢弃调度请求。
配置完成开始测试视频处理:
1、首先上传至少4个视频,非mp4格式。
2、在xxl-job启动视频处理任务
3、观察媒资管理服务后台日志
4.抢占任务测试:
修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”:
在抢占任务代码处打断点并选择支持多线程方式:
在抢占任务代码处的下边两行代码分别打上断点,避免观察时代码继续执行。启动任务。
四.自定义任务操作方法:
在对于一些用户操作就要开启任务调度,指定只能动态创建XXL-JOB任务,因此我们要封装XXL-JOB客户端,通过接口的形式添加并启动任务。
我们可以在xxl-job-admin模块,添加改造后的api接口。
在JobInfoController类末尾添加方法,如下:
定义了添加、更新、删除、停止、开启、添加并开启任务的接口。
注意!@PermissionLimit(limit = false) 注解排除登录校验。
/*------------------自定义方法---------------------- */
@RequestMapping("/addJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> addJobInfo(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.add(jobInfo);
}
@RequestMapping("/updateJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> updateJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.update(jobInfo);
}
@RequestMapping("/removeJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> removeJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.remove(jobInfo.getId());
}
@RequestMapping("/stopJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> pauseJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.stop(jobInfo.getId());
}
@RequestMapping("/startJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> startJob(@RequestBody XxlJobInfo jobInfo) {
return xxlJobService.start(jobInfo.getId());
}
@RequestMapping("/addAndStartJob")
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT<String> addAndStartJob(@RequestBody XxlJobInfo jobInfo) {
ReturnT<String> result = xxlJobService.add(jobInfo);
int id = Integer.valueOf(result.getContent());
xxlJobService.start(id);
//立即执行一次
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, jobInfo.getExecutorParam(), "");
return result;
}
/*------------------自定义方法---------------------- */
随后我们项目中 xxl 完整配置需多加 client 的配置。
xxl:
job:
admin:
# 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册
addresses: http://139.198.30.131:8080/xxl-job-admin
# addresses: http://localhost:8080/xxl-job-admin
# 执行器通讯TOKEN [选填]:非空时启用
accessToken:
executor:
# 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
appname: xxl-job-executor-sample
# 执行器注册 [选填]:优先使用该配置作为注册地址,为空时使用内嵌服务 ”IP:PORT“ 作为注册地址。从而更灵活的支持容器类型执行器动态IP和动态映射端口问题。
address:
# 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
ip:
# 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
port: 9999
# 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
logpath: /data/applogs/xxl-job/jobhandler
# 执行器日志文件保存天数 [选填] : 过期日志自动清理, 限制值大于等于3时生效; 否则, 如-1, 关闭自动清理功能;
logretentiondays: 30
client:
jobGroupId: 1
addUrl: ${xxl.job.admin.addresses}/jobinfo/addJob
removeUrl: ${xxl.job.admin.addresses}/jobinfo/removeJob
startJobUrl: ${xxl.job.admin.addresses}/jobinfo/startJob
stopJobUrl: ${xxl.job.admin.addresses}/jobinfo/stopJob
addAndStartUrl: ${xxl.job.admin.addresses}/jobinfo/addAndStartJob
之后创建配置类,读取配置文件里面调用的调度中心任务操作的方法 :
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "xxl.job.client")
public class XxlJobClientConfig {
private Integer jobGroupId;
private String addUrl;
private String removeUrl;
private String startJobUrl;
private String stopJobUrl;
private String addAndStartUrl;
}
随后创建客户端类,编写调用调度中心里面的方法:
import com.alibaba.fastjson.JSONObject;
import com.atguigu.daijia.common.execption.GuiguException;
import com.atguigu.daijia.common.result.ResultCodeEnum;
import com.atguigu.daijia.dispatch.xxl.config.XxlJobClientConfig;
import com.atguigu.daijia.model.entity.dispatch.XxlJobInfo;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
/**
* XXL-JOB客户端操作类
* 提供对XXL-JOB调度系统的操作封装
* 参考文档:https://dandelioncloud.cn/article/details/1598865461087518722
*/
@Slf4j
@Component
public class XxlJobClient {
// 自动注入XXL-JOB客户端配置
@Autowired
private XxlJobClientConfig xxlJobClientConfig;
@Autowired
private RestTemplate restTemplate;
/**
* 添加XXL-JOB任务
* @param executorHandler 执行器Handler名称
* @param param 任务参数
* @param corn CRON表达式
* @param desc 任务描述
* @return 创建成功的任务ID
*/
@SneakyThrows // 自动抛出异常
public Long addJob(String executorHandler, String param, String corn, String desc){
// 构建任务信息对象
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setJobGroup(xxlJobClientConfig.getJobGroupId()); // 设置任务组ID
xxlJobInfo.setJobDesc(desc); // 任务描述
xxlJobInfo.setAuthor("eleven"); // 作者固定为"eleven"
xxlJobInfo.setScheduleType("CRON"); // 调度类型为CRON
xxlJobInfo.setScheduleConf(corn); // CRON表达式
xxlJobInfo.setGlueType("BEAN"); // 任务模式为BEAN
xxlJobInfo.setExecutorHandler(executorHandler); // 执行器Handler名称
xxlJobInfo.setExecutorParam(param); // 任务参数
xxlJobInfo.setExecutorRouteStrategy("FIRST"); // 路由策略:第一个
xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION"); // 阻塞策略:串行
xxlJobInfo.setMisfireStrategy("FIRE_ONCE_NOW"); // 错过策略:立即执行一次
xxlJobInfo.setExecutorTimeout(0); // 任务超时时间(0为不限制)
xxlJobInfo.setExecutorFailRetryCount(0); // 失败重试次数(0为不重试)
// 构建HTTP请求
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON); // 设置JSON内容类型
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
// 调用XXL-JOB的添加任务接口
String url = xxlJobClientConfig.getAddUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
// 处理响应结果
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("增加xxl执行任务成功,返回信息:{}", response.getBody().toJSONString());
return response.getBody().getLong("content"); // 返回任务ID
}
log.info("调用xxl增加执行任务失败:{}", response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.XXL_JOB_ERROR); // 抛出业务异常
}
/**
* 启动XXL-JOB任务
* @param jobId 任务ID
* @return 是否启动成功
*/
public Boolean startJob(Long jobId) {
// 构建任务信息对象(只需要ID)
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(jobId.intValue());
// 构建HTTP请求
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
// 调用XXL-JOB的启动任务接口
String url = xxlJobClientConfig.getStartJobUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("启动xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
return true;
}
log.info("启动xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.XXL_JOB_ERROR);
}
/**
* 停止XXL-JOB任务
* @param jobId 任务ID
* @return 是否停止成功
*/
public Boolean stopJob(Long jobId) {
// 构建任务信息对象(只需要ID)
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(jobId.intValue());
// 构建HTTP请求
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
// 调用XXL-JOB的停止任务接口
String url = xxlJobClientConfig.getStopJobUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("停止xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
return true;
}
log.info("停止xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.XXL_JOB_ERROR);
}
/**
* 删除XXL-JOB任务
* @param jobId 任务ID
* @return 是否删除成功
*/
public Boolean removeJob(Long jobId) {
// 构建任务信息对象(只需要ID)
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setId(jobId.intValue());
// 构建HTTP请求
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
// 调用XXL-JOB的删除任务接口
String url = xxlJobClientConfig.getRemoveUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("删除xxl执行任务成功:{},返回信息:{}", jobId, response.getBody().toJSONString());
return true;
}
log.info("删除xxl执行任务失败:{},返回信息:{}", jobId, response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.XXL_JOB_ERROR);
}
/**
* 添加并立即启动XXL-JOB任务
* @param executorHandler 执行器Handler名称
* @param param 任务参数
* @param corn CRON表达式
* @param desc 任务描述
* @return 创建成功的任务ID
*/
public Long addAndStart(String executorHandler, String param, String corn, String desc) {
// 构建任务信息对象(与addJob方法相同)
XxlJobInfo xxlJobInfo = new XxlJobInfo();
xxlJobInfo.setJobGroup(xxlJobClientConfig.getJobGroupId());
xxlJobInfo.setJobDesc(desc);
xxlJobInfo.setAuthor("eleven");
xxlJobInfo.setScheduleType("CRON");
xxlJobInfo.setScheduleConf(corn);
xxlJobInfo.setGlueType("BEAN");
xxlJobInfo.setExecutorHandler(executorHandler);
xxlJobInfo.setExecutorParam(param);
xxlJobInfo.setExecutorRouteStrategy("FIRST");
xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
xxlJobInfo.setMisfireStrategy("FIRE_ONCE_NOW");
xxlJobInfo.setExecutorTimeout(0);
xxlJobInfo.setExecutorFailRetryCount(0);
// 构建HTTP请求
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<XxlJobInfo> request = new HttpEntity<>(xxlJobInfo, headers);
// 调用XXL-JOB的添加并启动任务接口
String url = xxlJobClientConfig.getAddAndStartUrl();
ResponseEntity<JSONObject> response = restTemplate.postForEntity(url, request, JSONObject.class);
if(response.getStatusCode().value() == 200 && response.getBody().getIntValue("code") == 200) {
log.info("增加并开始执行xxl任务成功,返回信息:{}", response.getBody().toJSONString());
return response.getBody().getLong("content"); // 返回任务ID
}
log.info("增加并开始执行xxl任务失败:{}", response.getBody().toJSONString());
throw new GuiguException(ResultCodeEnum.XXL_JOB_ERROR);
}
}
并在启动类配置RestTemplate:
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
创建并启动任务接口:
以乘客唤车为例,在Controller编写创建并启动任务调度的API接口:
@Tag(name = "司机新订单接口管理")
@RestController
@RequestMapping("/dispatch/newOrder")
@SuppressWarnings({"unchecked", "rawtypes"})
public class NewOrderController {
@Autowired
private NewOrderService newOrderService;
//创建并启动任务调度方法
@Operation(summary = "添加并开始新订单任务调度")
@PostMapping("/addAndStartTask")
public Result<Long> addAndStartTask(@RequestBody NewOrderTaskVo newOrderTaskVo) {
Long id = newOrderService.addAndStartTask(newOrderTaskVo);
return Result.ok(id);
}
}
下面是Order_job表:
import com.atguigu.daijia.model.entity.base.BaseEntity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
@Data
@Schema(description = "订单任务关联表")
@TableName("order_job")
public class OrderJob extends BaseEntity {
private static final long serialVersionUID = 1L;
@Schema(description = "订单id")
@TableField("order_id")
private Long orderId;
@Schema(description = "任务id")
@TableField("job_id")
private Long jobId;
@Schema(description = "参数")
@TableField("parameter")
private String parameter;
}
随后编写 Service:
步骤解析:
- 判断当前的订单是否已经启动任务调度。
- 若没有启动,首先创建新订单任务调度,随后记录任务调度日志。
@Service
@SuppressWarnings({"unchecked", "rawtypes"}) // 抑制编译器产生的特定类型的警告信息
public class NewOrderServiceImpl implements NewOrderService {
@Autowired
private OrderJobMapper orderJobMapper;
@Autowired
private XxlJobClient xxlJobClient;
//创建并启动任务调度方法
@Override
public Long addAndStartTask(NewOrderTaskVo newOrderTaskVo) {
//1 判断当前订单是否启动任务调度
//根据订单id查询
LambdaQueryWrapper<OrderJob> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(OrderJob::getOrderId,newOrderTaskVo.getOrderId());
OrderJob orderJob = orderJobMapper.selectOne(wrapper);
//2 没有启动,进行操作
if(orderJob == null) {
//创建并启动任务调度
//String executorHandler 执行任务job方法
// String param
// String corn 执行cron表达式
// String desc 描述信息
Long jobId = xxlJobClient.addAndStart("newOrderTaskHandler", "",
"0 0/1 * * * ?",
"新创建订单任务调度:" + newOrderTaskVo.getOrderId());
//记录任务调度信息
orderJob = new OrderJob();
orderJob.setOrderId(newOrderTaskVo.getOrderId());
orderJob.setJobId(jobId);
orderJob.setParameter(JSONObject.toJSONString(newOrderTaskVo));
orderJobMapper.insert(orderJob);
}
return orderJob.getJobId();
}
}
随后远程调用:
@FeignClient(value = "service-dispatch")
public interface NewOrderFeignClient {
/**
* 添加新订单任务
* @param newOrderDispatchVo
* @return
*/
@PostMapping("/dispatch/newOrder/addAndStartTask")
Result<Long> addAndStartTask(@RequestBody NewOrderTaskVo newOrderDispatchVo);
}
下面编写任务的具体方法JobHandler:
注意!!!注解内名字必须与前面创建的任务名保持一致。
步骤解析:
- 记录任务调度日志。
- newOrderService.executeTask(XxlJobHelper.getJobId())方法搜索附近代驾司机。
- 记录时间。
@Component
public class JobHandler {
@Autowired
private XxlJobLogMapper xxlJobLogMapper;
@Autowired
private NewOrderService newOrderService;
@XxlJob("newOrderTaskHandler")
public void newOrderTaskHandler() {
//记录任务调度日志
XxlJobLog xxlJobLog = new XxlJobLog();
xxlJobLog.setJobId(XxlJobHelper.getJobId());
long startTime = System.currentTimeMillis();
try {
//自定义执行任务:搜索附近代驾司机
newOrderService.executeTask(XxlJobHelper.getJobId());
//成功状态
xxlJobLog.setStatus(1);
} catch (Exception e) {
//失败状态
xxlJobLog.setStatus(0);
xxlJobLog.setError(e.getMessage());
e.printStackTrace();
} finally {
long times = System.currentTimeMillis()- startTime;
//TODO 完善long
xxlJobLog.setTimes((int)times);
xxlJobLogMapper.insert(xxlJobLog);
}
}
}
接下来编写搜索附近代驾司机方法:
步骤解析:
- 根据jobid查询数据库,当前任务是否已经创建。
- 查询订单状态,如果当前订单接单状态,继续执行。如果当前订单不是接单状态,停止任务调度。
//执行任务:搜索附近代驾司机
@Override
public void executeTask(long jobId) {
//1 根据jobid查询数据库,当前任务是否已经创建
//如果没有创建,不往下执行了
LambdaQueryWrapper<OrderJob> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(OrderJob::getJobId,jobId);
OrderJob orderJob = orderJobMapper.selectOne(wrapper);
if(orderJob == null) {
//不往下执行了
return;
}
//2 查询订单状态,如果当前订单接单状态,继续执行。如果当前订单不是接单状态,停止任务调度
//获取OrderJob里面对象
String jsonString = orderJob.getParameter();
NewOrderTaskVo newOrderTaskVo = JSONObject.parseObject(jsonString, NewOrderTaskVo.class);
//获取orderId
Long orderId = newOrderTaskVo.getOrderId();
Integer status = orderInfoFeignClient.getOrderStatus(orderId).getData();
if(status.intValue() != OrderStatus.WAITING_ACCEPT.getStatus().intValue()) {
//停止任务调度
xxlJobClient.stopJob(jobId);
return;
}
//3 远程调用:搜索附近满足条件可以接单司机
//4 远程调用之后,获取满足可以接单司机集合
SearchNearByDriverForm searchNearByDriverForm = new SearchNearByDriverForm();
searchNearByDriverForm.setLongitude(newOrderTaskVo.getStartPointLongitude());
searchNearByDriverForm.setLatitude(newOrderTaskVo.getStartPointLatitude());
searchNearByDriverForm.setMileageDistance(newOrderTaskVo.getExpectDistance());
//远程调用
List<NearByDriverVo> nearByDriverVoList =
locationFeignClient.searchNearByDriver(searchNearByDriverForm).getData();
//5 遍历司机集合,得到每个司机,为每个司机创建临时队列,存储新订单信息
nearByDriverVoList.forEach(driver -> {
//使用Redis的set类型
//根据订单id生成key
String repeatKey =
RedisConstant.DRIVER_ORDER_REPEAT_LIST+newOrderTaskVo.getOrderId();
//记录司机id,防止重复推送
Boolean isMember = redisTemplate.opsForSet().isMember(repeatKey, driver.getDriverId());
if(!isMember) {
//把订单信息推送给满足条件多个司机
redisTemplate.opsForSet().add(repeatKey,driver.getDriverId());
//过期时间:15分钟,超过15分钟没有接单自动取消
redisTemplate.expire(repeatKey,
RedisConstant.DRIVER_ORDER_REPEAT_LIST_EXPIRES_TIME,
TimeUnit.MINUTES);
NewOrderDataVo newOrderDataVo = new NewOrderDataVo();
newOrderDataVo.setOrderId(newOrderTaskVo.getOrderId());
newOrderDataVo.setStartLocation(newOrderTaskVo.getStartLocation());
newOrderDataVo.setEndLocation(newOrderTaskVo.getEndLocation());
newOrderDataVo.setExpectAmount(newOrderTaskVo.getExpectAmount());
newOrderDataVo.setExpectDistance(newOrderTaskVo.getExpectDistance());
newOrderDataVo.setExpectTime(newOrderTaskVo.getExpectTime());
newOrderDataVo.setFavourFee(newOrderTaskVo.getFavourFee());
newOrderDataVo.setDistance(driver.getDistance());
newOrderDataVo.setCreateTime(newOrderTaskVo.getCreateTime());
//新订单保存司机的临时队列,Redis里面List集合
String key = RedisConstant.DRIVER_ORDER_TEMP_LIST+driver.getDriverId();
redisTemplate.opsForList().leftPush(key,JSONObject.toJSONString(newOrderDataVo));
//过期时间:1分钟
redisTemplate.expire(key,RedisConstant.DRIVER_ORDER_TEMP_LIST_EXPIRES_TIME, TimeUnit.MINUTES);
}
});
}
更多推荐
所有评论(0)