功能描述:

本功能实现了一个基于多线程的分布式任务调度系统。该系统允许用户提交任务,并自动将任务分配给多个工作线程进行处理。系统还支持任务优先级、任务依赖关系、任务失败重试等功能。通过分布式设计,系统可以在多台机器上运行,实现任务的负载均衡和高可用性。

代码实现:
import java.util.concurrent.*;
import java.util.*;

public class DistributedTaskScheduler {

    private final ExecutorService executorService;
    private final Map<String, Task> taskMap;
    private final BlockingQueue<Task> taskQueue;
    private final List<Worker> workers;

    public DistributedTaskScheduler(int numThreads) {
        this.executorService = Executors.newFixedThreadPool(numThreads);
        this.taskMap = new ConcurrentHashMap<>();
        this.taskQueue = new PriorityBlockingQueue<>(10, Comparator.comparingInt(Task::getPriority));
        this.workers = new ArrayList<>();
        for (int i = 0; i < numThreads; i++) {
            Worker worker = new Worker(taskQueue, taskMap);
            workers.add(worker);
            executorService.submit(worker);
        }
    }

    public void submitTask(Task task) {
        taskMap.put(task.getId(), task);
        taskQueue.offer(task);
    }

    public void shutdown() {
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }

    public static void main(String[] args) {
        DistributedTaskScheduler scheduler = new DistributedTaskScheduler(4);

        Task task1 = new Task("task1", 1, () -> {
            System.out.println("Executing task1");
            return "task1 completed";
        });

        Task task2 = new Task("task2", 2, () -> {
            System.out.println("Executing task2");
            return "task2 completed";
        });

        scheduler.submitTask(task1);
        scheduler.submitTask(task2);

        scheduler.shutdown();
    }
}

class Task {
    private final String id;
    private final int priority;
    private final Callable<String> action;

    public Task(String id, int priority, Callable<String> action) {
        this.id = id;
        this.priority = priority;
        this.action = action;
    }

    public String getId() {
        return id;
    }

    public int getPriority() {
        return priority;
    }

    public Callable<String> getAction() {
        return action;
    }
}

class Worker implements Runnable {
    private final BlockingQueue<Task> taskQueue;
    private final Map<String, Task> taskMap;

    public Worker(BlockingQueue<Task> taskQueue, Map<String, Task> taskMap) {
        this.taskQueue = taskQueue;
        this.taskMap = taskMap;
    }

    @Override
    public void run() {
        while (true) {
            try {
                Task task = taskQueue.take();
                System.out.println("Worker " + Thread.currentThread().getId() + " is processing " + task.getId());
                String result = task.getAction().call();
                System.out.println("Task " + task.getId() + " completed with result: " + result);
                taskMap.remove(task.getId());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}
使用说明:
  1. 初始化调度器:创建一个DistributedTaskScheduler实例,指定工作线程的数量。例如,DistributedTaskScheduler scheduler = new DistributedTaskScheduler(4); 表示创建一个有4个工作线程的调度器。

  2. 提交任务:使用submitTask方法提交任务。每个任务需要指定一个唯一的ID、优先级和要执行的操作。例如:

    Task task1 = new Task("task1", 1, () -> {
        System.out.println("Executing task1");
        return "task1 completed";
    });
    scheduler.submitTask(task1);
    
  3. 关闭调度器:在任务提交完成后,调用shutdown方法关闭调度器。调度器会等待所有任务完成后再关闭。

  4. 任务执行:任务会被自动分配给工作线程执行,执行结果会打印到控制台。

  5. 扩展性:该系统可以轻松扩展为分布式系统,只需在多台机器上运行多个调度器实例,并通过网络通信实现任务分配和结果收集。

功能特点:

多线程处理:支持多线程并发处理任务,提高任务执行效率。
任务优先级:任务可以设置优先级,优先级高的任务会优先执行。
任务依赖:可以通过扩展代码实现任务之间的依赖关系,确保某些任务在特定任务完成后执行。
任务重试:可以扩展代码实现任务失败后的重试机制,提高任务执行的可靠性。
分布式支持:通过扩展代码,可以实现任务的分布式调度和执行,适用于大规模任务处理场景。

适用场景:

• 需要处理大量并发任务的场景,如数据处理、批量计算等。
• 需要任务优先级和依赖关系的场景,如工作流管理系统。
• 需要高可用性和负载均衡的分布式任务处理场景。

扩展建议:

• 可以引入Zookeeper或Redis等分布式协调服务,实现任务的分布式调度和负载均衡。
• 可以引入任务结果存储和查询功能,方便用户查看任务执行结果。
• 可以引入任务监控和告警功能,实时监控任务执行状态并发送告警。

Logo

更多推荐