在实际的开发中,很多系统需要处理大量的异步任务。例如,发送通知、图片处理、数据同步等。这些任务常常需要在后台进行,而不应影响主流程的运行。因此,使用分布式任务调度系统来处理这些任务就显得尤为重要。一个常见的需求是将任务分发到不同的处理节点,并且能合理地调度任务、负载均衡、任务失败重试等。
本文将介绍如何使用 PHP 构建一个 分布式任务调度系统,该系统使用 Redis 作为任务队列,配合 Worker 模式 来处理任务。每个 Worker 负责从 Redis 中获取任务并处理,同时支持任务的重试机制和任务状态管理。
功能特点

1.任务队列管理:通过 Redis 实现任务的分发与管理,确保任务能够持久化并且支持高并发。
2.Worker 模式:多个 Worker 进程并发工作,自动从任务队列中取任务进行处理,支持任务的并行执行。
3.任务状态管理:任务可以标记为“待处理”、“处理中”、“成功”、“失败”等状态,确保任务的可靠性。
4.任务重试机制:对于执行失败的任务,系统可以自动重试。
5.负载均衡:多个 Worker 进程间共享任务队列,保证任务的均匀分配。

代码实现
1. 任务调度系统的核心代码
<?php

// 任务调度系统
class TaskScheduler
{
    private $redis;
    private $queueName;

    public function __construct($redis, $queueName = 'task_queue')
    {
        $this->redis = $redis;
        $this->queueName = $queueName;
    }

    // 向任务队列中添加任务
    public function addTask($taskData)
    {
        $taskData['status'] = 'waiting'; // 任务初始状态为等待中
        $taskData['created_at'] = time(); // 记录任务创建时间
        $this->redis->rpush($this->queueName, json_encode($taskData)); // 将任务加入队列
    }

    // 获取队列中的任务
    public function getTask()
    {
        $taskData = $this->redis->lpop($this->queueName); // 从队列头部获取任务
        return $taskData ? json_decode($taskData, true) : null;
    }

    // 获取当前队列中的任务数
    public function getQueueLength()
    {
        return $this->redis->llen($this->queueName);
    }
}
?>

2. Worker 模式的任务处理代码
<?php

// Worker 进程
class TaskWorker
{
    private $redis;
    private $scheduler;

    public function __construct($redis, $scheduler)
    {
        $this->redis = $redis;
        $this->scheduler = $scheduler;
    }

    // 任务处理入口
    public function start()
    {
        echo "Worker started...\n";

        while (true) {
            // 获取一个任务
            $task = $this->scheduler->getTask();

            if ($task) {
                echo "Processing task: " . $task['task_id'] . "\n";

                // 模拟任务处理
                $success = $this->processTask($task);

                if ($success) {
                    $this->updateTaskStatus($task['task_id'], 'completed');
                    echo "Task " . $task['task_id'] . " completed successfully.\n";
                } else {
                    $this->updateTaskStatus($task['task_id'], 'failed');
                    echo "Task " . $task['task_id'] . " failed, will retry...\n";
                    $this->retryTask($task);
                }
            } else {
                echo "No task available, waiting...\n";
                sleep(1); // 如果队列为空,等待 1 秒再检查
            }
        }
    }

    // 处理任务的核心逻辑
    private function processTask($task)
    {
        // 模拟任务处理(实际可以是图片处理、数据同步等)
        $randomSuccess = rand(0, 1);
        return $randomSuccess == 1; // 随机决定任务是否成功
    }

    // 更新任务状态
    private function updateTaskStatus($taskId, $status)
    {
        // 在实际项目中,可以将任务状态保存到数据库
        echo "Task $taskId status updated to $status.\n";
    }

    // 重试任务
    private function retryTask($task)
    {
        if ($task['retries'] < 3) { // 限制最大重试次数
            $task['retries'] += 1;
            $this->scheduler->addTask($task); // 重新将任务加入队列
        } else {
            echo "Task " . $task['task_id'] . " reached max retry attempts.\n";
        }
    }
}
?>

3. 启动任务调度与 Worker
<?php

// 连接 Redis
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

// 创建任务调度器
$scheduler = new TaskScheduler($redis);

// 添加任务到任务队列
for ($i = 1; $i <= 10; $i++) {
    $taskData = [
        'task_id' => 'task_' . $i,
        'task_type' => 'process_image',
        'payload' => 'image_data_' . $i,
        'retries' => 0
    ];
    $scheduler->addTask($taskData);
    echo "Added task: task_$i\n";
}

// 启动 Worker 进程,处理任务
$worker = new TaskWorker($redis, $scheduler);
$worker->start();
?>

4. Redis 安装
确保已经安装并配置好 Redis。可以通过以下命令安装 Redis:
# 使用 apt 安装 Redis(Ubuntu/Debian 系统)
sudo apt update
sudo apt install redis-server

# 启动 Redis 服务
sudo service redis-server start

使用说明

6.配置 Redis:


7.确保你的系统上已安装 Redis,并且 PHP 已经安装了 Redis 扩展。可以通过 php-redis 扩展来连接 Redis。
8.在本例中,Redis 通过 127.0.0.1:6379 来进行连接,可以根据需要更改为其他 Redis 服务器地址。


9.任务调度器:


10.TaskScheduler 类负责将任务添加到 Redis 队列,并能够从队列中获取任务进行处理。
11.使用 addTask($taskData) 方法可以将一个任务加入到队列,任务数据是一个关联数组,包含任务 ID、任务类型、任务数据等信息。


12.Worker 进程:


13.TaskWorker 类负责从 Redis 队列中获取任务并进行处理。任务处理成功或失败后,会更新任务状态。如果任务处理失败,则会根据重试机制自动重新添加任务到队列中进行重试,最多重试 3 次。
14.start() 方法会让 Worker 持续监听队列并处理任务。


15.任务重试机制:


16.如果任务执行失败,Worker 会根据重试次数进行重试,最多重试 3 次。如果超过 3 次,任务会被标记为“失败”并停止重试。


17.负载均衡:


18.多个 Worker 可以并行工作,共享同一个 Redis 队列,这样就能实现任务的负载均衡。每个 Worker 会从队列中获取不同的任务进行处理,确保任务得到高效执行。

小结
这个 分布式任务调度系统 可以帮助我们处理大规模的异步任务。通过 Redis 实现任务队列管理,并利用 Worker 模式来并行处理任务,任务失败时有重试机制,保证任务的可靠性。这个系统不仅适用于批量数据处理任务,还可以用于各种异步任务场景,如邮件推送、图片处理、日志分析等。
在实际的应用中,您可以根据需要扩展任务处理的复杂度,比如添加任务优先级、任务超时等功能,提升系统的健壮性和可扩展性。

Logo

更多推荐