分布式定时任务
一、开源框架1、elastic-job介绍:Elastic-Job是一个分布式计划解决方案,由两个独立的项目Lite和Cloud组成。Elastic-Job-Lite是一种轻量级的分散式解决方案,可提供分布式任务分片服务。Elastic-Job-Cloud是一个Mesos框架,它提供其他资源管理,App分发,进程隔离和任务聚合功能。优点:基于quartz 定时任务框架为基础的,因此具备quartz
一、开源框架
1、elastic-job
介绍:
Elastic-Job是一个分布式计划解决方案,由两个独立的项目Lite和Cloud组成。
Elastic-Job-Lite是一种轻量级的分散式解决方案,可提供分布式任务分片服务。Elastic-Job-Cloud是一个Mesos框架,它提供其他资源管理,App分发,进程隔离和任务聚合功能。
优点:
基于quartz 定时任务框架为基础的,因此具备quartz的大部分功能;
使用zookeeper做协调,调度中心,更加轻量级;
支持任务的分片;
支持弹性扩容 , 可以水平扩展 , 当任务再次运行时,会检查当前的服务器数量,重新分片,分片结束之后才会继续执行任务;
失效转移,容错处理,当一台调度服务器宕机或者跟zookeeper断开连接之后,会立即停止作业,然后再去寻找其他空闲的调度服务器,来运行剩余的任务;
提供运维界面,可以管理作业和注册中心。
2、xxl-job
介绍:
由个人开源的一个轻量级分布式任务调度框架 ,主要分为 调度中心和执行器两部分 ,调度中心在启动初始化的时候,会默认生成执行器的RPC代理,对象(http协议调用),执行器项目启动之后,调度中心在触发定时器之后通过jobHandle来调用执行器项目里面的代码,核心功能和elastic-job差不多;集群环境中Quartz采用API的方式对任务进行管理。
开源地址:http://www.xuxueli.com/xxl-job
3、quartz
介绍:
quartz 的常见集群方案如下,通过在数据库中配置定时器信息, 以数据库悲观锁的方式达到同一个任务始终只有一个节点在运行。
优点:
保证节点高可用 (HA), 如果某一个几点挂了, 其他节点可以顶上;
缺点:
同一个任务只能有一个节点运行,其他节点将不执行任务,性能低,资源浪费;
当碰到大量短任务时,各个节点频繁的竞争数据库锁,节点越多这种情况越严重;
性能会很低下,quartz 的分布式仅解决了集群高可用的问题,并没有解决任务分片的问题,不能实现水平扩展。
开源地址:http://www.quartz-scheduler.org/
4、Saturn
介绍:
Saturn (任务调度系统)是唯品会开源的一个分布式任务调度平台,取代传统的Linux Cron/Spring Batch Job的方式,做到全域统一配置,统一监控,任务高可用以及分片并发处理。
Saturn是在当当开源的Elastic Job基础上,结合各方需求和我们的实践见解改良而成。
优点:
基于时间的作业调度,作业实现不受开发语言所限
简单的作业实现和基于web的作业管理
并行作业分片支持
秒级调度支持
智能的基于负载作业分配算法
异常检测和自动failover
统计数据可视化
全方位监控和简易的trouble shooting
支持多活集群部署
容器友好
经受住生产每日几十亿级别的调度考验
开源地址:https://github.com/vipshop/Saturn
5、opencron
介绍:
一个功能完善真正通用的linux定时任务调度定系统,满足多种场景下各种复杂的定时任务调度,同时集成了linux实时监控,webssh,提供一个方便管理定时任务的平台。
缺点:
仅支持 kill任务, 现场执行,查询任务运行状态 等, 主要功能是着重于任务的修改和查询上。不能动态的添加任务以及任务分片。
开源地址:https://gitee.com/terrytan/opencron
6、antares
介绍:
一个任务仅会被服务器集群中的某个节点调度,调度机制基于成熟的 Quartz,antares内部会重写执行逻辑;
优点:
一个任务仅会被服务器集群中的某个节点调度,调度机制基于成熟的 quartz;
并行执行 , 用户可通过对任务预分片,有效提升任务执行效率;
失效转移;
弹性扩容,在任务运行时,可以动态的加机器;
友好的管理控制台。
缺点:
不能动态的添加任务,仅能在控制台对任务进行触发,暂停,删除等操作;
文档不多,开源社区不够活跃。
开源地址:https://github.com/ihaolin/antares/tree/master/antares-server
7、TBSchedule
介绍:
TBSchedule是一个支持分布式的调度框架,让批量任务或者不断变化的任务能够被动态的分配到多个主机的JVM中,在不同的线程组中并行执行,所有的任务能够被不重复,不遗漏的快速处理。基于ZooKeeper的纯Java实现,由Alibaba开源。
开源地址:https://github.com/taobao/TBSchedule
8、SchedulerX
介绍:
分布式任务调度 SchedulerX 2.0 是阿里巴巴基于 Akka 架构自研的新一代分布式任务调度平台,提供定时调度、调度任务编排和分布式批量处理等功能。
springboot使用:
依赖:
<dependency>
<groupId>com.aliyun.schedulerx</groupId>
<artifactId>schedulerx2-spring-boot-starter</artifactId>
<version>${schedulerx2.version}</version>
</dependency>
配置:
spring.schedulerx2.endpoint=${endpoint}
spring.schedulerx2.namespace=${namespace}
spring.schedulerx2.groupId=${groupId}
spring.schedulerx2.aliyunAccessKey=${aliyunAccessKey}
spring.schedulerx2.aliyunSecretKey=${aliyunSecretKey}
继承使用
package com.aliyun.schedulerx.test.job;
import com.alibaba.schedulerx.worker.domain.JobContext;
import com.alibaba.schedulerx.worker.processor.JavaProcessor;
import com.alibaba.schedulerx.worker.processor.ProcessResult;
@Component
public class MyHelloJob extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
System.out.println("hello schedulerx2.0");
return new ProcessResult(true);
}
}
二、rabbitmq和redis
思路:
1、用单线程进行维持任务判断,判断是否在时间段内部;
2、如果在时间段,发送rabbitmq消息,用rabbitmq工作模式;
3、添加将任务,放在redis中,循环拿redis的数据判断,如果停止或者暂停任务,就删除redis数据;
4、同时维持redis的key状态,生产者发送消息,但是需要控制消息发送一条,采用redis计数法,订阅者可以多个;
5、消费者处理业务逻辑,同时判断redis的状态。
三、线程池方式
package com.xxxx.xclouddesk.outboundcall.utils;
import java.util.HashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* 定时任务工具
*/
public class ScheduleUtil {
private static HashMap<String, ScheduledFuture> map = new HashMap<>();
private static ScheduledExecutorService pool = null;
/**
* 初始化线程池
* */
public static void init() {
if (pool == null) {
pool = Executors.newScheduledThreadPool(10);
}
}
/**
* 提交任务执行
* */
public static void execute(Runnable r) {
init();
pool.execute(r);
}
/**
* 提交延迟任务执行,1000毫秒
* */
public static void executeDelay(Runnable r, long delay) {
init();
pool.schedule(r, delay, TimeUnit.MILLISECONDS);
}
/**
* 关闭线程池
* */
public static void unInit() {
if (pool == null || pool.isShutdown()) {
return;
}
map.clear();
pool.shutdownNow();
pool = null;
}
/**
* 该接口定义了线程的名字,用于管理,
* 如判断是否存活,是否停止该线程等等
**/
public interface SRunnable extends Runnable {
String getName();
}
/**
* @param sr 需要执行线程,该线程必须继承SRunnable
* @param delay 延迟执行时间
* @param period 执行周期时间
* @param unit 时间单位 比如TimeUnit.SECONDS
*/
public static void stard(SRunnable sr, long delay, long period, TimeUnit unit) {
if (sr.getName() == null || map.get(sr.getName()) != null) {
throw new UnsupportedOperationException("线程名不能为空或者线程名不能重复!");
}
if (pool == null || pool.isShutdown()) {
init();
}
ScheduledFuture scheduledFuture = pool.scheduleAtFixedRate(sr, delay, period, unit);
map.put(sr.getName(), scheduledFuture);
}
/**
* @param sr 停止当前正在执行的线程,该线程必须是继承SRunnable
*/
public static void stop(SRunnable sr) {
if (sr.getName() == null) {
throw new UnsupportedOperationException("停止线程时,线程名不能为空!");
}
//服务未启动
if (pool == null || pool.isShutdown()) {
return;
}
if (map.size() > 0 && map.get(sr.getName()) != null) {
map.get(sr.getName()).cancel(true);
map.remove(sr.getName());
}
if (map.size() <= 0) {
shutdown();
}
}
/**
* 停止所有线程服务
*/
public static void shutdown() {
map.clear();
pool.shutdown();
}
/**
* 判断该线程是否还存活着,还在运行
* @param sr
* @return
*/
public static boolean isAlive(SRunnable sr) {
if (map.size() > 0 && map.get(sr.getName()) != null) {
return !map.get(sr.getName()).isDone();
}
return false;
}
}
更多推荐

所有评论(0)