一、Celery 核心架构解析

发送任务
传递任务
执行任务
生产者
消息中间件
Worker节点
结果后端

三大核心组件

  1. 消息中间件(Broker)
    • 作用:传递任务消息
    • 常用选择:RabbitMQ(推荐), Redis, Amazon SQS
  2. 任务执行单元(Worker)
    • 工作进程:实际执行任务的单元
    • 支持模式:多进程、协程(gevent)、多线程
  3. 结果后端(Backend)
    • 作用:存储任务执行结果
    • 常用选择:Redis, RabbitMQ, Django ORM, SQLAlchemy

二、快速入门指南

1. 安装与基础配置

pip install celery redis  # 推荐组合
# celery_app.py
from celery import Celery

app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    include=['tasks']
)

# 时区设置
app.conf.timezone = 'Asia/Shanghai'
参数 类型 默认值 说明 最佳实践
main str - 应用主名称 通常设为项目模块名如 myproject.tasks
broker str - 消息代理URL RabbitMQ:amqp://user:pass@host:port/vhost Redis:redis://:password@hostname:port/db_number
backend str/class rpc:// 结果存储后端 Redis:redis://localhost:6379/1 Django:django-db SQLAlchemy:db+postgresql://scott:tiger@localhost/mydatabase
include list [] 包含的任务模块 包含所有任务模块路径 ['module1.tasks', 'module2.jobs']
timezone str UTC 时区设置 设置为业务所在时区,如 'Asia/Shanghai'
task_serializer str 'json' 任务序列化格式 安全考虑避免使用 'pickle',推荐 'json''msgpack'
task_default_queue str 'celery' 默认任务队列 可设置为 'default''high_priority'
result_expires int 86400 结果有效期(秒) 根据业务需求设置,避免存储膨胀
worker_max_tasks_per_child int 无限制 工作进程重启前执行任务数 防止内存泄漏,通常设置 100-1000

2. 定义你的第一个任务

# tasks.py
from celery_app import app

@app.task
def add(x, y):
    return x + y

@app.task
def send_email(to):
    # 模拟邮件发送
    import time
    time.sleep(3)
    return f"Email sent to {to}"

3. 启动 Worker

celery -A celery_app worker --loglevel=info

4. 调用任务

# 同步调用(仅在测试环境使用)
result = add.delay(4, 6)  # 返回AsyncResult对象
print(result.get(timeout=10))  # 获取结果

# 异步调用(生产环境推荐)
send_email.delay("user@example.com")

三、Worker 启动参数

# 基础启动命令格式
celery -A proj_name worker <参数选项>

# 详细参数示例
celery -A myproject \
       -n worker1@%h \               # 设置worker唯一名称
       --loglevel=INFO \             # 日志级别
       --concurrency=16 \             # 并发工作进程数
       -P gevent \                   # 并发模型
       -Q high_priority,default \    # 监听队列
       --autoscale=10,4 \            # 弹性工作池
       --max-tasks-per-child=500 \   # 内存泄漏防护
       --heartbeat-interval=10 \     # 心跳设置
       --without-gossip \            # 禁用分布式协调
       --time-limit=300 \            # 硬性任务超时
       --soft-time-limit=180 \       # 软性任务超时
       -f /var/log/celery.log        # 日志文件

🔧 核心参数详解表

参数 缩写 默认值 说明 生产环境推荐
--concurrency/-c -c CPU核心数 工作进程/协程数 -c 16 (需压测决定)
--pool -P prefork 并发模型: prefork (多进程) gevent (协程) solo (单进程) -P gevent (I/O密集任务)
--queues/-Q -Q celery 监听的队列(多队列用逗号分隔) -Q high_priority,default
--loglevel/-l -l WARNING 日志级别: DEBUG/INFO/WARNING/ERROR/CRITICAL -l INFO (生产), -l DEBUG (开发)
--hostname/-n -n 随机生成 worker唯一标识符 -n worker1@%h (%h=主机名)
--autoscale 未启用 弹性工作池: 最大数,最小值 --autoscale=24,6
--max-tasks-per-child 无限制 子进程处理任务数上限 --max-tasks-per-child=500 (防内存泄漏)
--time-limit 未设置 硬性超时(秒),超时强制终止 --time-limit=300
--soft-time-limit 未设置 软性超时(秒),超时抛异常但不强杀 --soft-time-limit=180
--without-gossip 开启 禁用worker间的分布式消息 推荐启用提高稳定性
--heartbeat-interval 5秒 broker心跳检测间隔 --heartbeat-interval=10
--pidfile PID文件路径 --pidfile=/var/run/celery.pid
--logfile -f 控制台 日志文件路径 -f /var/log/celery/%n%I.log
--events -E 不发送 发送task-sent等事件 监控需求时开启

🚀 高级参数与特殊模式

1. 远程控制 worker (运行时动态调整)
# 启动远程控制功能
celery -A proj worker -l INFO --autoreload --statedb=/var/run/celery-state

# 运行时添加新worker
celery control increase_consumers -Q default

# 运行时修改concurrency
celery control pool_grow 2
2. 监控与事件流
# 启用事件监控
celery -A proj worker -E

# 使用Flower实时监控
celery flower -A proj --port=5555
3. 专用模式参数
# 测试任务执行路径
celery worker --task-events --purge

# 性能分析模式
celery worker --profile=perf

📝 参数使用最佳实践

生产环境推荐配置
# 高可用worker配置模板
celery -A ecommerce \
       -n "worker.${HOSTNAME}%d" \          # 含主机名的唯一标识
       --logfile=/var/log/celery/worker-%n.log \
       --loglevel=INFO \
       -P gevent \
       -c 32 \                              # 32个协程
       -Q checkout,notifications \
       --autoscale=64,16 \                  # 自动扩展16-64个协程
       --max-tasks-per-child=2000 \
       --time-limit=7200 \
       --soft-time-limit=7000 \
       --without-mingle \
       --without-gossip
根据任务类型调整配置
任务类型 推荐并发模型 典型并发数 特殊参数
CPU密集型 -P prefork CPU核心数的1.5-2倍 -c 16
I/O密集型 -P gevent 100-1000个协程 --prefetch-multiplier=100
短周期任务 -P threads 每CPU核心50线程 --threads=50
长运行任务 -P solo 单进程 --task-events

⚠️ 重要注意事项

  1. 参数优先级规则

    • 命令行参数 > 配置文件 > 默认值

    • 使用 --help 查看当前参数配置:

      celery worker --help
      
  2. 日志文件路径模板变量

    变量 说明
    %h 主机名
    %n worker名称
    %i 工作进程ID
    %I 工作进程索引

四、高级功能实战

1. 定时任务(周期性任务)

# 在 celery_app.py 中配置
app.conf.beat_schedule = {
    'every-10-seconds': {
        'task': 'tasks.debug_task',
        'schedule': 10.0,  # 每10秒执行
        'args': ()
    },
    'daily-report': {
        'task': 'tasks.generate_report',
        'schedule': crontab(hour=7, minute=30),  # 每天7:30执行
    }
}

启动Beat服务:

celery -A celery_app beat

2. 任务路由与队列

# 定义不同队列
app.conf.task_queues = {
    'high_priority': {
        'exchange': 'high_priority',
        'binding_key': 'high_priority'
    },
    'low_priority': {
        'exchange': 'low_priority',
        'binding_key': 'low_priority'
    }
}

# 任务路由配置
app.conf.task_routes = {
    'tasks.urgent_task': {'queue': 'high_priority'},
    'tasks.background_task': {'queue': 'low_priority'},
}

启动专用Worker:

celery -A celery_app worker -Q high_priority -c 8

3. 错误处理与重试

@app.task(
    bind=True,  # 允许访问self参数
    max_retries=3,  # 最大重试次数
    default_retry_delay=30  # 重试间隔(秒)
)
def process_data(self, data_id):
    try:
        # 数据处理逻辑
        result = DataProcessor().handle(data_id)
    except TemporaryError as exc:
        # 捕获特定异常重试
        raise self.retry(exc=exc)
    return result

五、生产环境最佳实践

1. 部署架构

推送任务
分发任务
结果
日志
收集指标
收集指标
可视化
Web服务器
RabbitMQ集群
Worker节点
Redis集群
集中式日志系统
监控服务
Dashboard

2. 性能优化

# 使用gevent协程提高并发
celery -A celery_app worker -P gevent -c 1000

# 限制任务并发
app.conf.worker_max_tasks_per_child = 100  # 每个子进程执行100个任务后重启
app.conf.task_acks_late = True  # 避免任务丢失

3. 监控与告警

  • 使用 Flower 实时监控:

    pip install flower
    celery -A celery_app flower --port=5555
    
  • 关键监控指标:

    # 在Django Admin中查看任务结果
    INSTALLED_APPS += ['django_celery_results']
    

4. 错误排查工具

# 获取任务状态
result = task.delay()
print(result.status)  # PENDING, STARTED, RETRY, FAILURE, SUCCESS

# 查看任务回溯
result.traceback

六、Celery 工作流示例

1. 链式任务

from celery import chain

# A -> B -> C
chain(task_A.s(1), task_B.s(), task_C.s())().get()

2. 任务组

from celery import group

# 并行执行任务
group([process_data.s(i) for i in range(10)])()

3. 和弦任务

from celery import chord

# 执行前置组后执行回调
chord(
    group(download.s(url) for url in urls),
    compile_results.s()
).delay()

七、常见问题解决方案

  1. 任务卡死:增加 task_time_limit 设置超时
  2. 内存泄露:配置 worker_max_tasks_per_child
  3. 任务丢失:开启 task_acks_late = True
  4. 结果过期:设置 result_expires = 3600(秒)

掌握Celery的分布式任务处理能力,可以轻松构建高并发、可扩展的现代应用系统。通过合理的架构设计和配置优化,Celery能够稳定支撑数百万级的任务处理需求。

Logo

更多推荐