JAVA实现基于多线程的分布式任务调度系统
JAVA实现基于多线程的分布式任务调度系统
功能描述:
本功能实现了一个基于多线程的分布式任务调度系统。该系统允许用户提交任务,并自动将任务分配给多个工作线程进行处理。系统还支持任务优先级、任务依赖关系、任务失败重试等功能。通过分布式设计,系统可以在多台机器上运行,实现任务的负载均衡和高可用性。
代码实现:
import java.util.concurrent.*;
import java.util.*;
public class DistributedTaskScheduler {
private final ExecutorService executorService;
private final Map<String, Task> taskMap;
private final BlockingQueue<Task> taskQueue;
private final List<Worker> workers;
public DistributedTaskScheduler(int numThreads) {
this.executorService = Executors.newFixedThreadPool(numThreads);
this.taskMap = new ConcurrentHashMap<>();
this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingInt(Task::getPriority));
this.workers = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
Worker worker = new Worker(taskQueue, taskMap);
workers.add(worker);
executorService.submit(worker);
}
}
public void submitTask(Task task) {
taskMap.put(task.getId(), task);
taskQueue.offer(task);
}
public void shutdown() {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
}
}
public static void main(String[] args) {
DistributedTaskScheduler scheduler = new DistributedTaskScheduler(4);
Task task1 = new Task("task1", 1, () -> {
System.out.println("Executing task1");
return "task1 completed";
});
Task task2 = new Task("task2", 2, () -> {
System.out.println("Executing task2");
return "task2 completed";
});
scheduler.submitTask(task1);
scheduler.submitTask(task2);
scheduler.shutdown();
}
}
class Task {
private final String id;
private final int priority;
private final Callable<String> action;
public Task(String id, int priority, Callable<String> action) {
this.id = id;
this.priority = priority;
this.action = action;
}
public String getId() {
return id;
}
public int getPriority() {
return priority;
}
public Callable<String> getAction() {
return action;
}
}
class Worker implements Runnable {
private final BlockingQueue<Task> taskQueue;
private final Map<String, Task> taskMap;
public Worker(BlockingQueue<Task> taskQueue, Map<String, Task> taskMap) {
this.taskQueue = taskQueue;
this.taskMap = taskMap;
}
@Override
public void run() {
while (true) {
try {
Task task = taskQueue.take();
System.out.println("Worker " + Thread.currentThread().getId() + " is processing " + task.getId());
String result = task.getAction().call();
System.out.println("Task " + task.getId() + " completed with result: " + result);
taskMap.remove(task.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
使用说明:
-
初始化调度器:创建一个
DistributedTaskScheduler
实例,指定工作线程的数量。例如,DistributedTaskScheduler scheduler = new DistributedTaskScheduler(4);
表示创建一个有4个工作线程的调度器。 -
提交任务:使用
submitTask
方法提交任务。每个任务需要指定一个唯一的ID、优先级和要执行的操作。例如:Task task1 = new Task("task1", 1, () -> { System.out.println("Executing task1"); return "task1 completed"; }); scheduler.submitTask(task1);
-
关闭调度器:在任务提交完成后,调用
shutdown
方法关闭调度器。调度器会等待所有任务完成后再关闭。 -
任务执行:任务会被自动分配给工作线程执行,执行结果会打印到控制台。
-
扩展性:该系统可以轻松扩展为分布式系统,只需在多台机器上运行多个调度器实例,并通过网络通信实现任务分配和结果收集。
功能特点:
• 多线程处理:支持多线程并发处理任务,提高任务执行效率。
• 任务优先级:任务可以设置优先级,优先级高的任务会优先执行。
• 任务依赖:可以通过扩展代码实现任务之间的依赖关系,确保某些任务在特定任务完成后执行。
• 任务重试:可以扩展代码实现任务失败后的重试机制,提高任务执行的可靠性。
• 分布式支持:通过扩展代码,可以实现任务的分布式调度和执行,适用于大规模任务处理场景。
适用场景:
• 需要处理大量并发任务的场景,如数据处理、批量计算等。
• 需要任务优先级和依赖关系的场景,如工作流管理系统。
• 需要高可用性和负载均衡的分布式任务处理场景。
扩展建议:
• 可以引入Zookeeper或Redis等分布式协调服务,实现任务的分布式调度和负载均衡。
• 可以引入任务结果存储和查询功能,方便用户查看任务执行结果。
• 可以引入任务监控和告警功能,实时监控任务执行状态并发送告警。
更多推荐
所有评论(0)