Celery 分布式任务队列实战教程:从入门到生产部署
tasks.py@app.task@app.task# 模拟邮件发送# tasks.py from celery_app import app @app . task def add(x , y) : return x + y @app . task def send_email(to) : # 模拟邮件发送 import timeto } "
·
一、Celery 核心架构解析
三大核心组件
- 消息中间件(Broker)
- 作用:传递任务消息
- 常用选择:RabbitMQ(推荐), Redis, Amazon SQS
- 任务执行单元(Worker)
- 工作进程:实际执行任务的单元
- 支持模式:多进程、协程(gevent)、多线程
- 结果后端(Backend)
- 作用:存储任务执行结果
- 常用选择:Redis, RabbitMQ, Django ORM, SQLAlchemy
二、快速入门指南
1. 安装与基础配置
pip install celery redis # 推荐组合
# celery_app.py
from celery import Celery
app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1',
include=['tasks']
)
# 时区设置
app.conf.timezone = 'Asia/Shanghai'
参数 | 类型 | 默认值 | 说明 | 最佳实践 |
---|---|---|---|---|
main |
str |
- | 应用主名称 | 通常设为项目模块名如 myproject.tasks |
broker |
str |
- | 消息代理URL | RabbitMQ:amqp://user:pass@host:port/vhost Redis:redis://:password@hostname:port/db_number |
backend |
str/class |
rpc:// |
结果存储后端 | Redis:redis://localhost:6379/1 Django:django-db SQLAlchemy:db+postgresql://scott:tiger@localhost/mydatabase |
include |
list |
[] |
包含的任务模块 | 包含所有任务模块路径 ['module1.tasks', 'module2.jobs'] |
timezone |
str |
UTC |
时区设置 | 设置为业务所在时区,如 'Asia/Shanghai' |
task_serializer |
str |
'json' |
任务序列化格式 | 安全考虑避免使用 'pickle' ,推荐 'json' 或 'msgpack' |
task_default_queue |
str |
'celery' |
默认任务队列 | 可设置为 'default' 或 'high_priority' |
result_expires |
int |
86400 |
结果有效期(秒) | 根据业务需求设置,避免存储膨胀 |
worker_max_tasks_per_child |
int |
无限制 | 工作进程重启前执行任务数 | 防止内存泄漏,通常设置 100-1000 |
2. 定义你的第一个任务
# tasks.py
from celery_app import app
@app.task
def add(x, y):
return x + y
@app.task
def send_email(to):
# 模拟邮件发送
import time
time.sleep(3)
return f"Email sent to {to}"
3. 启动 Worker
celery -A celery_app worker --loglevel=info
4. 调用任务
# 同步调用(仅在测试环境使用)
result = add.delay(4, 6) # 返回AsyncResult对象
print(result.get(timeout=10)) # 获取结果
# 异步调用(生产环境推荐)
send_email.delay("user@example.com")
三、Worker 启动参数
# 基础启动命令格式
celery -A proj_name worker <参数选项>
# 详细参数示例
celery -A myproject \
-n worker1@%h \ # 设置worker唯一名称
--loglevel=INFO \ # 日志级别
--concurrency=16 \ # 并发工作进程数
-P gevent \ # 并发模型
-Q high_priority,default \ # 监听队列
--autoscale=10,4 \ # 弹性工作池
--max-tasks-per-child=500 \ # 内存泄漏防护
--heartbeat-interval=10 \ # 心跳设置
--without-gossip \ # 禁用分布式协调
--time-limit=300 \ # 硬性任务超时
--soft-time-limit=180 \ # 软性任务超时
-f /var/log/celery.log # 日志文件
🔧 核心参数详解表
参数 | 缩写 | 默认值 | 说明 | 生产环境推荐 |
---|---|---|---|---|
--concurrency/-c |
-c |
CPU核心数 | 工作进程/协程数 | -c 16 (需压测决定) |
--pool |
-P |
prefork |
并发模型: prefork (多进程) gevent (协程) solo (单进程) |
-P gevent (I/O密集任务) |
--queues/-Q |
-Q |
celery |
监听的队列(多队列用逗号分隔) | -Q high_priority,default |
--loglevel/-l |
-l |
WARNING |
日志级别: DEBUG/INFO/WARNING/ERROR/CRITICAL |
-l INFO (生产), -l DEBUG (开发) |
--hostname/-n |
-n |
随机生成 | worker唯一标识符 | -n worker1@%h (%h=主机名) |
--autoscale |
未启用 | 弹性工作池: 最大数,最小值 |
--autoscale=24,6 |
|
--max-tasks-per-child |
无限制 | 子进程处理任务数上限 | --max-tasks-per-child=500 (防内存泄漏) |
|
--time-limit |
未设置 | 硬性超时(秒),超时强制终止 | --time-limit=300 |
|
--soft-time-limit |
未设置 | 软性超时(秒),超时抛异常但不强杀 | --soft-time-limit=180 |
|
--without-gossip |
开启 | 禁用worker间的分布式消息 | 推荐启用提高稳定性 | |
--heartbeat-interval |
5秒 | broker心跳检测间隔 | --heartbeat-interval=10 |
|
--pidfile |
无 | PID文件路径 | --pidfile=/var/run/celery.pid |
|
--logfile |
-f |
控制台 | 日志文件路径 | -f /var/log/celery/%n%I.log |
--events |
-E |
不发送 | 发送task-sent等事件 | 监控需求时开启 |
🚀 高级参数与特殊模式
1. 远程控制 worker (运行时动态调整)
# 启动远程控制功能
celery -A proj worker -l INFO --autoreload --statedb=/var/run/celery-state
# 运行时添加新worker
celery control increase_consumers -Q default
# 运行时修改concurrency
celery control pool_grow 2
2. 监控与事件流
# 启用事件监控
celery -A proj worker -E
# 使用Flower实时监控
celery flower -A proj --port=5555
3. 专用模式参数
# 测试任务执行路径
celery worker --task-events --purge
# 性能分析模式
celery worker --profile=perf
📝 参数使用最佳实践
生产环境推荐配置
# 高可用worker配置模板
celery -A ecommerce \
-n "worker.${HOSTNAME}%d" \ # 含主机名的唯一标识
--logfile=/var/log/celery/worker-%n.log \
--loglevel=INFO \
-P gevent \
-c 32 \ # 32个协程
-Q checkout,notifications \
--autoscale=64,16 \ # 自动扩展16-64个协程
--max-tasks-per-child=2000 \
--time-limit=7200 \
--soft-time-limit=7000 \
--without-mingle \
--without-gossip
根据任务类型调整配置
任务类型 | 推荐并发模型 | 典型并发数 | 特殊参数 |
---|---|---|---|
CPU密集型 | -P prefork |
CPU核心数的1.5-2倍 | -c 16 |
I/O密集型 | -P gevent |
100-1000个协程 | --prefetch-multiplier=100 |
短周期任务 | -P threads |
每CPU核心50线程 | --threads=50 |
长运行任务 | -P solo |
单进程 | --task-events |
⚠️ 重要注意事项
-
参数优先级规则:
-
命令行参数 > 配置文件 > 默认值
-
使用
--help
查看当前参数配置:celery worker --help
-
-
日志文件路径模板变量:
变量 说明 %h
主机名 %n
worker名称 %i
工作进程ID %I
工作进程索引
四、高级功能实战
1. 定时任务(周期性任务)
# 在 celery_app.py 中配置
app.conf.beat_schedule = {
'every-10-seconds': {
'task': 'tasks.debug_task',
'schedule': 10.0, # 每10秒执行
'args': ()
},
'daily-report': {
'task': 'tasks.generate_report',
'schedule': crontab(hour=7, minute=30), # 每天7:30执行
}
}
启动Beat服务:
celery -A celery_app beat
2. 任务路由与队列
# 定义不同队列
app.conf.task_queues = {
'high_priority': {
'exchange': 'high_priority',
'binding_key': 'high_priority'
},
'low_priority': {
'exchange': 'low_priority',
'binding_key': 'low_priority'
}
}
# 任务路由配置
app.conf.task_routes = {
'tasks.urgent_task': {'queue': 'high_priority'},
'tasks.background_task': {'queue': 'low_priority'},
}
启动专用Worker:
celery -A celery_app worker -Q high_priority -c 8
3. 错误处理与重试
@app.task(
bind=True, # 允许访问self参数
max_retries=3, # 最大重试次数
default_retry_delay=30 # 重试间隔(秒)
)
def process_data(self, data_id):
try:
# 数据处理逻辑
result = DataProcessor().handle(data_id)
except TemporaryError as exc:
# 捕获特定异常重试
raise self.retry(exc=exc)
return result
五、生产环境最佳实践
1. 部署架构
2. 性能优化
# 使用gevent协程提高并发
celery -A celery_app worker -P gevent -c 1000
# 限制任务并发
app.conf.worker_max_tasks_per_child = 100 # 每个子进程执行100个任务后重启
app.conf.task_acks_late = True # 避免任务丢失
3. 监控与告警
-
使用 Flower 实时监控:
pip install flower celery -A celery_app flower --port=5555
-
关键监控指标:
# 在Django Admin中查看任务结果 INSTALLED_APPS += ['django_celery_results']
4. 错误排查工具
# 获取任务状态
result = task.delay()
print(result.status) # PENDING, STARTED, RETRY, FAILURE, SUCCESS
# 查看任务回溯
result.traceback
六、Celery 工作流示例
1. 链式任务
from celery import chain
# A -> B -> C
chain(task_A.s(1), task_B.s(), task_C.s())().get()
2. 任务组
from celery import group
# 并行执行任务
group([process_data.s(i) for i in range(10)])()
3. 和弦任务
from celery import chord
# 执行前置组后执行回调
chord(
group(download.s(url) for url in urls),
compile_results.s()
).delay()
七、常见问题解决方案
- 任务卡死:增加
task_time_limit
设置超时 - 内存泄露:配置
worker_max_tasks_per_child
- 任务丢失:开启
task_acks_late = True
- 结果过期:设置
result_expires = 3600
(秒)
掌握Celery的分布式任务处理能力,可以轻松构建高并发、可扩展的现代应用系统。通过合理的架构设计和配置优化,Celery能够稳定支撑数百万级的任务处理需求。
更多推荐
所有评论(0)