FastAPI 定时任务配置全攻略

FastAPI 教程:详解 FastAPI 中设置定时任务

在现代化Web服务开发中,定时任务已成为不可或缺的功能模块。FastAPI作为基于Python的高性能异步框架,虽然本身不包含定时任务功能,但通过与APScheduler、Celery等库的集成,可以轻松实现复杂的定时任务调度。本文将系统讲解两种主流实现方案,并提供生产环境可用的完整代码示例。

一、APScheduler方案详解

APScheduler(Advanced Python Scheduler)是一个功能强大的Python定时任务库,支持多种调度方式。在FastAPI中集成APScheduler具有部署简单、资源占用低的优点。

1.1 基础环境配置

首先需要安装必要的依赖包:

  1. pip install apscheduler fastapi uvicorn

对于生产环境,建议使用后台任务模式:

  1. from fastapi import FastAPI
  2. from apscheduler.schedulers.background import BackgroundScheduler
  3. import logging
  4. app = FastAPI()
  5. scheduler = BackgroundScheduler()
  6. # 配置日志记录
  7. logging.basicConfig(
  8. level=logging.INFO,
  9. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
  10. )
  11. logger = logging.getLogger(__name__)

1.2 定时任务实现

定义一个简单的定时任务函数:

  1. def periodic_task():
  2. logger.info("执行定时任务:当前时间 %s", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  3. # 这里添加实际业务逻辑

在应用启动时添加任务:

  1. @app.on_event("startup")
  2. async def startup_event():
  3. scheduler.add_job(
  4. periodic_task,
  5. "interval", # 间隔触发
  6. minutes=1, # 每分钟执行一次
  7. id="task_1", # 任务唯一ID
  8. name="每分钟任务",
  9. replace_existing=True
  10. )
  11. scheduler.start()
  12. logger.info("定时任务调度器已启动")

1.3 高级配置选项

APScheduler支持多种触发器类型:

  • date: 特定日期执行一次
  • interval: 固定间隔执行
  • cron: 类cron表达式执行
  1. # 每周一上午10点执行
  2. scheduler.add_job(
  3. weekly_report,
  4. "cron",
  5. day_of_week="mon",
  6. hour=10,
  7. timezone="Asia/Shanghai"
  8. )
  9. # 特定日期执行
  10. scheduler.add_job(
  11. special_event,
  12. "date",
  13. run_date=datetime(2024, 12, 31, 23, 59, 59)
  14. )

1.4 持久化存储方案

对于需要持久化的场景,可以使用SQLAlchemyJobStore:

  1. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  2. jobstores = {
  3. 'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
  4. }
  5. scheduler = BackgroundScheduler(jobstores=jobstores)

二、Celery分布式方案

当任务需要分布式执行或处理耗时较长时,Celery是更合适的选择。

2.1 环境搭建

安装必要组件:

  1. pip install celery redis fastapi uvicorn

创建Celery实例:

  1. from celery import Celery
  2. celery_app = Celery(
  3. 'tasks',
  4. broker='redis://localhost:6379/0',
  5. backend='redis://localhost:6379/1'
  6. )

2.2 定时任务配置

使用Celery Beat实现定时调度:

  1. from celery.schedules import crontab
  2. celery_app.conf.beat_schedule = {
  3. 'daily-report': {
  4. 'task': 'tasks.daily_report',
  5. 'schedule': crontab(hour=8, minute=30), # 每天8:30执行
  6. 'args': ()
  7. },
  8. }

2.3 FastAPI集成

创建定时任务端点:

  1. from fastapi import APIRouter
  2. router = APIRouter()
  3. @router.post("/trigger-task")
  4. async def trigger_task():
  5. from tasks import long_running_task
  6. result = long_running_task.delay() # 异步触发
  7. return {"task_id": result.id}

2.4 生产环境优化

建议配置:

  1. 使用Supervisor管理进程
  2. 配置多个worker处理并发
  3. 设置任务结果过期时间
  4. 实现任务失败重试机制
  1. celery_app.conf.task_routes = {
  2. 'tasks.*': {'queue': 'priority'}
  3. }
  4. celery_app.conf.task_acks_late = True # 确保任务至少执行一次
  5. celery_app.conf.worker_max_tasks_per_child = 100 # 防止内存泄漏

三、最佳实践与注意事项

3.1 资源管理建议

  1. 定时任务不宜过于频繁(建议>10秒间隔)
  2. 长时间运行任务应拆分为小批次
  3. 重要任务实现幂等性设计
  4. 监控任务执行时间和成功率

3.2 错误处理机制

  1. def periodic_task():
  2. try:
  3. # 业务逻辑
  4. except Exception as e:
  5. logger.error("任务执行失败: %s", str(e))
  6. # 可选:发送告警通知

3.3 部署方案对比

方案 适用场景 资源消耗 复杂度
APScheduler 单机轻量级任务
Celery 分布式、耗时任务
混合方案 部分任务需要分布式执行

四、完整示例项目结构

  1. project/
  2. ├── app/
  3. ├── main.py # FastAPI入口
  4. ├── scheduler.py # APScheduler实现
  5. └── tasks.py # Celery任务定义
  6. ├── requirements.txt
  7. └── docker-compose.yml # 可选容器化配置

五、常见问题解决方案

  1. 任务重复执行:确保任务ID唯一,使用replace_existing=True
  2. 时区问题:明确设置timezone="Asia/Shanghai"
  3. 内存泄漏:定期重启worker,设置max_instances限制
  4. 任务丢失:使用持久化存储,实现任务确认机制

六、扩展功能建议

  1. 集成Prometheus监控任务指标
  2. 实现动态任务管理API
  3. 添加任务优先级控制
  4. 开发可视化任务控制台

通过合理选择调度方案和遵循最佳实践,可以在FastAPI应用中构建稳定高效的定时任务系统。实际开发中应根据业务需求、系统规模和运维能力综合评估,选择最适合的方案。对于中小型项目,APScheduler的简单性具有明显优势;而对于大型分布式系统,Celery的扩展性和可靠性更为重要。