FastAPI 定时任务配置全攻略
FastAPI 教程:详解 FastAPI 中设置定时任务
在现代化Web服务开发中,定时任务已成为不可或缺的功能模块。FastAPI作为基于Python的高性能异步框架,虽然本身不包含定时任务功能,但通过与APScheduler、Celery等库的集成,可以轻松实现复杂的定时任务调度。本文将系统讲解两种主流实现方案,并提供生产环境可用的完整代码示例。
一、APScheduler方案详解
APScheduler(Advanced Python Scheduler)是一个功能强大的Python定时任务库,支持多种调度方式。在FastAPI中集成APScheduler具有部署简单、资源占用低的优点。
1.1 基础环境配置
首先需要安装必要的依赖包:
pip install apscheduler fastapi uvicorn
对于生产环境,建议使用后台任务模式:
from fastapi import FastAPI
from apscheduler.schedulers.background import BackgroundScheduler
import logging
app = FastAPI()
scheduler = BackgroundScheduler()
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
1.2 定时任务实现
定义一个简单的定时任务函数:
def periodic_task():
logger.info("执行定时任务:当前时间 %s", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 这里添加实际业务逻辑
在应用启动时添加任务:
@app.on_event("startup")
async def startup_event():
scheduler.add_job(
periodic_task,
"interval", # 间隔触发
minutes=1, # 每分钟执行一次
id="task_1", # 任务唯一ID
name="每分钟任务",
replace_existing=True
)
scheduler.start()
logger.info("定时任务调度器已启动")
1.3 高级配置选项
APScheduler支持多种触发器类型:
date
: 特定日期执行一次interval
: 固定间隔执行cron
: 类cron表达式执行
# 每周一上午10点执行
scheduler.add_job(
weekly_report,
"cron",
day_of_week="mon",
hour=10,
timezone="Asia/Shanghai"
)
# 特定日期执行
scheduler.add_job(
special_event,
"date",
run_date=datetime(2024, 12, 31, 23, 59, 59)
)
1.4 持久化存储方案
对于需要持久化的场景,可以使用SQLAlchemyJobStore:
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.db')
}
scheduler = BackgroundScheduler(jobstores=jobstores)
二、Celery分布式方案
当任务需要分布式执行或处理耗时较长时,Celery是更合适的选择。
2.1 环境搭建
安装必要组件:
pip install celery redis fastapi uvicorn
创建Celery实例:
from celery import Celery
celery_app = Celery(
'tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
2.2 定时任务配置
使用Celery Beat实现定时调度:
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
'daily-report': {
'task': 'tasks.daily_report',
'schedule': crontab(hour=8, minute=30), # 每天8:30执行
'args': ()
},
}
2.3 FastAPI集成
创建定时任务端点:
from fastapi import APIRouter
router = APIRouter()
@router.post("/trigger-task")
async def trigger_task():
from tasks import long_running_task
result = long_running_task.delay() # 异步触发
return {"task_id": result.id}
2.4 生产环境优化
建议配置:
- 使用Supervisor管理进程
- 配置多个worker处理并发
- 设置任务结果过期时间
- 实现任务失败重试机制
celery_app.conf.task_routes = {
'tasks.*': {'queue': 'priority'}
}
celery_app.conf.task_acks_late = True # 确保任务至少执行一次
celery_app.conf.worker_max_tasks_per_child = 100 # 防止内存泄漏
三、最佳实践与注意事项
3.1 资源管理建议
- 定时任务不宜过于频繁(建议>10秒间隔)
- 长时间运行任务应拆分为小批次
- 重要任务实现幂等性设计
- 监控任务执行时间和成功率
3.2 错误处理机制
def periodic_task():
try:
# 业务逻辑
except Exception as e:
logger.error("任务执行失败: %s", str(e))
# 可选:发送告警通知
3.3 部署方案对比
方案 | 适用场景 | 资源消耗 | 复杂度 |
---|---|---|---|
APScheduler | 单机轻量级任务 | 低 | 低 |
Celery | 分布式、耗时任务 | 高 | 中 |
混合方案 | 部分任务需要分布式执行 | 中 | 高 |
四、完整示例项目结构
project/
├── app/
│ ├── main.py # FastAPI入口
│ ├── scheduler.py # APScheduler实现
│ └── tasks.py # Celery任务定义
├── requirements.txt
└── docker-compose.yml # 可选容器化配置
五、常见问题解决方案
- 任务重复执行:确保任务ID唯一,使用
replace_existing=True
- 时区问题:明确设置
timezone="Asia/Shanghai"
- 内存泄漏:定期重启worker,设置
max_instances
限制 - 任务丢失:使用持久化存储,实现任务确认机制
六、扩展功能建议
- 集成Prometheus监控任务指标
- 实现动态任务管理API
- 添加任务优先级控制
- 开发可视化任务控制台
通过合理选择调度方案和遵循最佳实践,可以在FastAPI应用中构建稳定高效的定时任务系统。实际开发中应根据业务需求、系统规模和运维能力综合评估,选择最适合的方案。对于中小型项目,APScheduler的简单性具有明显优势;而对于大型分布式系统,Celery的扩展性和可靠性更为重要。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权请联系我们,一经查实立即删除!