FastAPI 定时任务全攻略:从入门到实践

FastAPI 定时任务全攻略:从入门到实践

在FastAPI构建的现代化Web服务中,定时任务是数据同步、日志清理、通知推送等场景的核心组件。本文将系统讲解FastAPI中实现定时任务的两种主流方案:基于APScheduler的轻量级方案和Celery的分布式方案,帮助开发者根据业务需求选择最适合的技术路径。

一、APScheduler轻量级方案详解

1.1 基础配置与原理

APScheduler(Advanced Python Scheduler)是Python生态中最成熟的定时任务库之一,支持内存作业存储和多种触发器类型。其核心组件包括:

  • 触发器(Trigger):定义任务执行时机(日期、间隔、Cron表达式)
  • 作业存储(Job Store):持久化任务配置(默认使用内存存储)
  • 执行器(Executor):执行任务的线程/进程池
  • 调度器(Scheduler):协调各组件的核心

在FastAPI中集成时,推荐使用BackgroundScheduler,它不会阻塞主线程且支持异步任务。

1.2 完整实现步骤

步骤1:安装依赖

  1. pip install apscheduler

步骤2:创建调度器实例

  1. from apscheduler.schedulers.background import BackgroundScheduler
  2. from fastapi import FastAPI
  3. app = FastAPI()
  4. scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
  5. scheduler.start()

步骤3:定义定时任务

  1. from datetime import datetime
  2. import asyncio
  3. async def periodic_task():
  4. print(f"Task executed at {datetime.now()}")
  5. @app.on_event("startup")
  6. async def startup_event():
  7. # 添加间隔任务(每5秒执行)
  8. scheduler.add_job(
  9. periodic_task,
  10. "interval",
  11. seconds=5,
  12. id="sample_task"
  13. )
  14. # 添加Cron任务(每天10:30执行)
  15. scheduler.add_job(
  16. periodic_task,
  17. "cron",
  18. hour=10,
  19. minute=30,
  20. id="daily_task"
  21. )

步骤4:优雅关闭处理

  1. @app.on_event("shutdown")
  2. async def shutdown_event():
  3. scheduler.shutdown()

1.3 高级配置技巧

作业持久化:通过SQLAlchemyJobStore实现任务持久化

  1. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
  2. jobstores = {
  3. "default": SQLAlchemyJobStore(url="sqlite:///jobs.db")
  4. }
  5. scheduler = BackgroundScheduler(jobstores=jobstores)

异常处理:为任务添加错误处理器

  1. def job_error_handler(event):
  2. print(f"Job {event.job_id} failed: {event.exception}")
  3. scheduler.add_listener(job_error_handler, apscheduler.events.EVENT_JOB_ERROR)

二、Celery分布式方案深度解析

2.1 适用场景分析

当任务需要:

  • 分布式执行(多worker)
  • 任务结果存储
  • 任务链/组
  • 失败重试机制

Celery是更合适的选择,尤其适合与FastAPI配合构建微服务架构。

2.2 完整集成方案

步骤1:安装必要组件

  1. pip install celery redis # 使用Redis作为broker

步骤2:创建Celery实例

  1. from celery import Celery
  2. celery = Celery(
  3. "tasks",
  4. broker="redis://localhost:6379/0",
  5. backend="redis://localhost:6379/1"
  6. )

步骤3:定义周期性任务

  1. from celery.schedules import crontab
  2. celery.conf.beat_schedule = {
  3. "every-10-seconds": {
  4. "task": "tasks.periodic_task",
  5. "schedule": 10.0, # 每10秒
  6. },
  7. "daily-report": {
  8. "task": "tasks.daily_report",
  9. "schedule": crontab(hour=10, minute=30), # 每天10:30
  10. }
  11. }
  12. @celery.task
  13. def periodic_task():
  14. print("Celery task executed")

步骤4:FastAPI集成

  1. from fastapi import FastAPI
  2. import subprocess
  3. app = FastAPI()
  4. @app.get("/start-worker")
  5. async def start_worker():
  6. # 实际部署时应使用进程管理工具
  7. subprocess.Popen(["celery", "-A", "tasks", "worker", "--loglevel=info"])
  8. return {"status": "worker starting"}

2.3 生产环境优化

Worker配置

  1. celery -A tasks worker --concurrency=4 --autoscale=10,2

结果后端优化

  • 使用Redis/Memcached存储结果
  • 设置结果过期时间:CELERY_RESULT_EXPIRES=3600

监控集成

  • Flower:实时监控工具
  • Prometheus + Grafana:指标可视化

三、方案对比与选型建议

特性 APScheduler Celery
部署复杂度 低(单进程) 高(需broker)
分布式支持
任务持久化 需额外配置 内置支持
适合场景 单机轻量任务 分布式复杂任务
扩展性 有限

选型建议

  • 初创项目/单机部署:APScheduler
  • 微服务架构/高并发:Celery
  • 中等规模项目:可先APScheduler,后期平滑迁移到Celery

四、常见问题解决方案

4.1 定时任务重叠执行

问题:任务执行时间超过间隔周期导致重叠

解决方案

  1. # 使用max_instances限制并发数
  2. scheduler.add_job(
  3. heavy_task,
  4. "interval",
  5. minutes=5,
  6. max_instances=1 # 同一时间只运行一个实例
  7. )

4.2 时区处理不一致

问题:服务器时区与业务时区不一致

解决方案

  1. # 显式设置时区
  2. from pytz import timezone
  3. scheduler = BackgroundScheduler(timezone=timezone("Asia/Shanghai"))

4.3 任务丢失问题

解决方案

  • APScheduler:配置SQLAlchemyJobStore
  • Celery:确保broker高可用(Redis集群)

五、最佳实践总结

  1. 任务隔离:将定时任务逻辑封装为独立模块,避免污染主应用
  2. 日志集中:通过结构化日志记录任务执行情况
  3. 告警机制:对失败任务设置邮件/短信告警
  4. 动态管理:提供API接口动态添加/删除任务
  5. 资源控制:为CPU密集型任务设置专用worker

动态任务管理示例

  1. from fastapi import APIRouter
  2. router = APIRouter()
  3. @router.post("/add-job")
  4. async def add_job(interval: int):
  5. scheduler.add_job(
  6. periodic_task,
  7. "interval",
  8. seconds=interval,
  9. id=f"dynamic_{interval}"
  10. )
  11. return {"status": "job added"}

通过系统掌握这两种方案,开发者可以灵活应对从简单定时任务到复杂分布式调度的各种需求。实际项目中,建议从APScheduler开始,随着系统复杂度提升再平滑迁移到Celery架构。