FastAPI 教程:详解 FastAPI 中设置定时任务
在现代化Web服务开发中,定时任务已成为不可或缺的功能模块。FastAPI作为基于Python的高性能异步框架,虽然本身不包含定时任务功能,但通过与APScheduler、Celery等库的集成,可以轻松实现复杂的定时任务调度。本文将系统讲解两种主流实现方案,并提供生产环境可用的完整代码示例。
一、APScheduler方案详解
APScheduler(Advanced Python Scheduler)是一个功能强大的Python定时任务库,支持多种调度方式。在FastAPI中集成APScheduler具有部署简单、资源占用低的优点。
1.1 基础环境配置
首先需要安装必要的依赖包:
pip install apscheduler fastapi uvicorn
对于生产环境,建议使用后台任务模式:
from fastapi import FastAPIfrom apscheduler.schedulers.background import BackgroundSchedulerimport loggingapp = FastAPI()scheduler = BackgroundScheduler()# 配置日志记录logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger(__name__)
1.2 定时任务实现
定义一个简单的定时任务函数:
def periodic_task():logger.info("执行定时任务:当前时间 %s", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))# 这里添加实际业务逻辑
在应用启动时添加任务:
@app.on_event("startup")async def startup_event():scheduler.add_job(periodic_task,"interval", # 间隔触发minutes=1, # 每分钟执行一次id="task_1", # 任务唯一IDname="每分钟任务",replace_existing=True)scheduler.start()logger.info("定时任务调度器已启动")
1.3 高级配置选项
APScheduler支持多种触发器类型:
date: 特定日期执行一次interval: 固定间隔执行cron: 类cron表达式执行
# 每周一上午10点执行scheduler.add_job(weekly_report,"cron",day_of_week="mon",hour=10,timezone="Asia/Shanghai")# 特定日期执行scheduler.add_job(special_event,"date",run_date=datetime(2024, 12, 31, 23, 59, 59))
1.4 持久化存储方案
对于需要持久化的场景,可以使用SQLAlchemyJobStore:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorejobstores = {'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')}scheduler = BackgroundScheduler(jobstores=jobstores)
二、Celery分布式方案
当任务需要分布式执行或处理耗时较长时,Celery是更合适的选择。
2.1 环境搭建
安装必要组件:
pip install celery redis fastapi uvicorn
创建Celery实例:
from celery import Celerycelery_app = Celery('tasks',broker='redis://localhost:6379/0',backend='redis://localhost:6379/1')
2.2 定时任务配置
使用Celery Beat实现定时调度:
from celery.schedules import crontabcelery_app.conf.beat_schedule = {'daily-report': {'task': 'tasks.daily_report','schedule': crontab(hour=8, minute=30), # 每天8:30执行'args': ()},}
2.3 FastAPI集成
创建定时任务端点:
from fastapi import APIRouterrouter = APIRouter()@router.post("/trigger-task")async def trigger_task():from tasks import long_running_taskresult = long_running_task.delay() # 异步触发return {"task_id": result.id}
2.4 生产环境优化
建议配置:
- 使用Supervisor管理进程
- 配置多个worker处理并发
- 设置任务结果过期时间
- 实现任务失败重试机制
celery_app.conf.task_routes = {'tasks.*': {'queue': 'priority'}}celery_app.conf.task_acks_late = True # 确保任务至少执行一次celery_app.conf.worker_max_tasks_per_child = 100 # 防止内存泄漏
三、最佳实践与注意事项
3.1 资源管理建议
- 定时任务不宜过于频繁(建议>10秒间隔)
- 长时间运行任务应拆分为小批次
- 重要任务实现幂等性设计
- 监控任务执行时间和成功率
3.2 错误处理机制
def periodic_task():try:# 业务逻辑except Exception as e:logger.error("任务执行失败: %s", str(e))# 可选:发送告警通知
3.3 部署方案对比
| 方案 | 适用场景 | 资源消耗 | 复杂度 |
|---|---|---|---|
| APScheduler | 单机轻量级任务 | 低 | 低 |
| Celery | 分布式、耗时任务 | 高 | 中 |
| 混合方案 | 部分任务需要分布式执行 | 中 | 高 |
四、完整示例项目结构
project/├── app/│ ├── main.py # FastAPI入口│ ├── scheduler.py # APScheduler实现│ └── tasks.py # Celery任务定义├── requirements.txt└── docker-compose.yml # 可选容器化配置
五、常见问题解决方案
- 任务重复执行:确保任务ID唯一,使用
replace_existing=True - 时区问题:明确设置
timezone="Asia/Shanghai" - 内存泄漏:定期重启worker,设置
max_instances限制 - 任务丢失:使用持久化存储,实现任务确认机制
六、扩展功能建议
- 集成Prometheus监控任务指标
- 实现动态任务管理API
- 添加任务优先级控制
- 开发可视化任务控制台
通过合理选择调度方案和遵循最佳实践,可以在FastAPI应用中构建稳定高效的定时任务系统。实际开发中应根据业务需求、系统规模和运维能力综合评估,选择最适合的方案。对于中小型项目,APScheduler的简单性具有明显优势;而对于大型分布式系统,Celery的扩展性和可靠性更为重要。