PHP 实现分布式任务调度系统(基于 Redis 和 Worker 模式)
PHP 实现分布式任务调度系统(基于 Redis 和 Worker 模式)
在实际的开发中,很多系统需要处理大量的异步任务。例如,发送通知、图片处理、数据同步等。这些任务常常需要在后台进行,而不应影响主流程的运行。因此,使用分布式任务调度系统来处理这些任务就显得尤为重要。一个常见的需求是将任务分发到不同的处理节点,并且能合理地调度任务、负载均衡、任务失败重试等。
本文将介绍如何使用 PHP 构建一个 分布式任务调度系统,该系统使用 Redis 作为任务队列,配合 Worker 模式 来处理任务。每个 Worker 负责从 Redis 中获取任务并处理,同时支持任务的重试机制和任务状态管理。
功能特点
1.任务队列管理:通过 Redis 实现任务的分发与管理,确保任务能够持久化并且支持高并发。
2.Worker 模式:多个 Worker 进程并发工作,自动从任务队列中取任务进行处理,支持任务的并行执行。
3.任务状态管理:任务可以标记为“待处理”、“处理中”、“成功”、“失败”等状态,确保任务的可靠性。
4.任务重试机制:对于执行失败的任务,系统可以自动重试。
5.负载均衡:多个 Worker 进程间共享任务队列,保证任务的均匀分配。
代码实现
1. 任务调度系统的核心代码
<?php
// 任务调度系统
class TaskScheduler
{
private $redis;
private $queueName;
public function __construct($redis, $queueName = 'task_queue')
{
$this->redis = $redis;
$this->queueName = $queueName;
}
// 向任务队列中添加任务
public function addTask($taskData)
{
$taskData['status'] = 'waiting'; // 任务初始状态为等待中
$taskData['created_at'] = time(); // 记录任务创建时间
$this->redis->rpush($this->queueName, json_encode($taskData)); // 将任务加入队列
}
// 获取队列中的任务
public function getTask()
{
$taskData = $this->redis->lpop($this->queueName); // 从队列头部获取任务
return $taskData ? json_decode($taskData, true) : null;
}
// 获取当前队列中的任务数
public function getQueueLength()
{
return $this->redis->llen($this->queueName);
}
}
?>
2. Worker 模式的任务处理代码
<?php
// Worker 进程
class TaskWorker
{
private $redis;
private $scheduler;
public function __construct($redis, $scheduler)
{
$this->redis = $redis;
$this->scheduler = $scheduler;
}
// 任务处理入口
public function start()
{
echo "Worker started...\n";
while (true) {
// 获取一个任务
$task = $this->scheduler->getTask();
if ($task) {
echo "Processing task: " . $task['task_id'] . "\n";
// 模拟任务处理
$success = $this->processTask($task);
if ($success) {
$this->updateTaskStatus($task['task_id'], 'completed');
echo "Task " . $task['task_id'] . " completed successfully.\n";
} else {
$this->updateTaskStatus($task['task_id'], 'failed');
echo "Task " . $task['task_id'] . " failed, will retry...\n";
$this->retryTask($task);
}
} else {
echo "No task available, waiting...\n";
sleep(1); // 如果队列为空,等待 1 秒再检查
}
}
}
// 处理任务的核心逻辑
private function processTask($task)
{
// 模拟任务处理(实际可以是图片处理、数据同步等)
$randomSuccess = rand(0, 1);
return $randomSuccess == 1; // 随机决定任务是否成功
}
// 更新任务状态
private function updateTaskStatus($taskId, $status)
{
// 在实际项目中,可以将任务状态保存到数据库
echo "Task $taskId status updated to $status.\n";
}
// 重试任务
private function retryTask($task)
{
if ($task['retries'] < 3) { // 限制最大重试次数
$task['retries'] += 1;
$this->scheduler->addTask($task); // 重新将任务加入队列
} else {
echo "Task " . $task['task_id'] . " reached max retry attempts.\n";
}
}
}
?>
3. 启动任务调度与 Worker
<?php
// 连接 Redis
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 创建任务调度器
$scheduler = new TaskScheduler($redis);
// 添加任务到任务队列
for ($i = 1; $i <= 10; $i++) {
$taskData = [
'task_id' => 'task_' . $i,
'task_type' => 'process_image',
'payload' => 'image_data_' . $i,
'retries' => 0
];
$scheduler->addTask($taskData);
echo "Added task: task_$i\n";
}
// 启动 Worker 进程,处理任务
$worker = new TaskWorker($redis, $scheduler);
$worker->start();
?>
4. Redis 安装
确保已经安装并配置好 Redis。可以通过以下命令安装 Redis:
# 使用 apt 安装 Redis(Ubuntu/Debian 系统)
sudo apt update
sudo apt install redis-server
# 启动 Redis 服务
sudo service redis-server start
使用说明
6.配置 Redis:
7.确保你的系统上已安装 Redis,并且 PHP 已经安装了 Redis 扩展。可以通过 php-redis 扩展来连接 Redis。
8.在本例中,Redis 通过 127.0.0.1:6379 来进行连接,可以根据需要更改为其他 Redis 服务器地址。
9.任务调度器:
10.TaskScheduler 类负责将任务添加到 Redis 队列,并能够从队列中获取任务进行处理。
11.使用 addTask($taskData) 方法可以将一个任务加入到队列,任务数据是一个关联数组,包含任务 ID、任务类型、任务数据等信息。
12.Worker 进程:
13.TaskWorker 类负责从 Redis 队列中获取任务并进行处理。任务处理成功或失败后,会更新任务状态。如果任务处理失败,则会根据重试机制自动重新添加任务到队列中进行重试,最多重试 3 次。
14.start() 方法会让 Worker 持续监听队列并处理任务。
15.任务重试机制:
16.如果任务执行失败,Worker 会根据重试次数进行重试,最多重试 3 次。如果超过 3 次,任务会被标记为“失败”并停止重试。
17.负载均衡:
18.多个 Worker 可以并行工作,共享同一个 Redis 队列,这样就能实现任务的负载均衡。每个 Worker 会从队列中获取不同的任务进行处理,确保任务得到高效执行。
小结
这个 分布式任务调度系统 可以帮助我们处理大规模的异步任务。通过 Redis 实现任务队列管理,并利用 Worker 模式来并行处理任务,任务失败时有重试机制,保证任务的可靠性。这个系统不仅适用于批量数据处理任务,还可以用于各种异步任务场景,如邮件推送、图片处理、日志分析等。
在实际的应用中,您可以根据需要扩展任务处理的复杂度,比如添加任务优先级、任务超时等功能,提升系统的健壮性和可扩展性。
更多推荐
所有评论(0)