FastAPI定时任务全攻略:从入门到避开多进程的坑
你的FastAPI后台任务是不是还在“裸奔”?
先说事实案例:有个促销活动需要定时上线。结果呢?依赖的云函数服务突然抖动,那个“简单可靠”的crontab脚本愣是没触发。凌晨三点,运营的电话直接把你的美梦干碎。😫 事后复盘,才意识到:把定时任务寄生于操作系统或者外部黑盒服务,在微服务架构里,就是给自己埋雷。
痛定思痛,最后把定时任务“请”回了应用内部,用APScheduler在FastAPI里搞了个自治的小闹钟。今天,咱们就来聊聊这套实战经验,连同那些半夜爬起来填的坑……
🎯 本文你能得到什么
1. 为什么说FastAPI自带的BackgroundTasks不适合做定时任务。
2. APScheduler的核心概念,用“闹钟”和“餐厅”的比喻让你秒懂。
3. 手把手集成,提供可直接复制粘贴的代码块。
4. 最重要的:多进程部署(比如用Uvicorn workers)时,定时任务重复执行的“鬼故事”与解决之道。
🔧 第一部分:问题与背景 —— 为什么另起炉灶?
FastAPI 的 BackgroundTasks 是个好同志,但它只是个“跑腿小哥”。你API请求来了,它帮你异步处理些杂事,比如发邮件、写日志。但它有个硬伤:它没有记忆,也不会看表。 服务一重启,所有计划内的“跑腿”任务全忘光光。
定时任务呢?它需要的是“忠诚的管家”。不管服务是否重启,都要记得每天上午10点要发报表,每周一凌晨要清缓存。这需要持久化和时间调度能力,这正是 APScheduler 的绝活。
你可能会问,用Celery行不行?行,但杀鸡用牛刀了。APScheduler更轻量,与你FastAPI应用同生共死,管理起来简单直接,特别适合业务逻辑清晰、不需要分布式协调的定时场景。
⚙️ 第二部分:核心原理 —— APScheduler的三板斧
别被它的名字吓到,把它想象成一个高度可定制的智能闹钟系统。它主要由三部分组成:
📅 触发器 (Trigger): 决定“什么时候响”。是每天固定时间(date),还是间隔固定时间(interval),或者是像crontab那样的复杂周期(cron)?
📝 作业存储器 (Job Store): 记住“有哪些闹钟要响”。默认存在内存里,重启就忘。我们可以让它记在数据库里(比如SQLite、PostgreSQL),实现持久化。
👨💼 执行器 (Executor): 负责“闹钟响了以后具体做什么”。是用线程池还是进程池来执行我们的任务函数?
而调度器 (Scheduler) 就是总控台,把上面三个部件组装起来,并启动这个闹钟系统。
🚀 第三部分:实战演示 —— 手把手集成到FastAPI
好,咱们先来安装。这步最简单:
pip install apscheduler
接下来重点来了,初始化并集成到FastAPI的生命周期。这里有个关键技巧:一定要把scheduler的启动和关闭挂在FastAPI的应用事件上,保证应用启动时它启动,应用优雅关闭时它也停下。千万别学我当初,直接在模块层面scheduler.start(),导致测试时脚本跑完不退出。
# 项目结构建议
# app/
# __init__.py
# main.py # FastAPI 应用创建和事件处理
# scheduler.py # 调度器配置和任务定义
# tasks.py # 具体的任务函数
# app/scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
# 1. 配置组件
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # 使用SQLite持久化
}
executors = {
'default': ThreadPoolExecutor(20) # 线程池执行
}
job_defaults = {
'coalesce': False, # 错过的任务是否合并执行(一般False)
'max_instances': 3 # 同一个任务同时运行的最大实例数
}
# 2. 创建调度器实例
scheduler = AsyncIOScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone="Asia/Shanghai" # 时区!时区!时区!重要的事说三遍
)
# 3. 定义任务函数 (可以放在同文件,也可从其他模块导入)
def my_sync_job():
print("同步任务执行了!")
async def my_async_job():
print("异步任务执行了!")
# 这里可以愉快地调用其他async函数
# 4. 添加任务的函数 (通常在应用启动时调用)
def add_jobs():
# 间隔任务:每30秒执行一次
scheduler.add_job(my_sync_job, 'interval', seconds=30, id='sync_interval_job')
# Cron任务:每分钟的第30秒执行
scheduler.add_job(my_async_job, 'cron', second=30, id='async_cron_job')
# 单次任务:2023年10月1日执行
# scheduler.add_job(xxx, 'date', run_date='2023-10-01 00:00:00')
然后在你的main.py里,把它和FastAPI绑在一起:
# app/main.py
from fastapi import FastAPI
from .scheduler import scheduler, add_jobs
app = FastAPI(title="定时任务演示")
@app.on_event("startup")
async def startup_event():
# 应用启动时,添加任务并启动调度器
if not scheduler.running:
add_jobs()
scheduler.start()
print("APScheduler 已启动")
@app.on_event("shutdown")
async def shutdown_event():
# 应用关闭时,优雅地关闭调度器
if scheduler.running:
scheduler.shutdown()
print("APScheduler 已关闭")
#from contextlib import asynccontextmanager
#@asynccontextmanager
#async def lifespan(app: FastAPI):
# # 应用启动时,添加任务并启动调度器
# if not scheduler.running:
# add_jobs()
# scheduler.start()
# print("APScheduler 已启动")
# yield
# # 应用关闭时,优雅地关闭调度器
# if scheduler.running:
# scheduler.shutdown()
# print("APScheduler 已关闭")
#app = FastAPI(title="定时任务演示", lifespan=lifespan)
@app.get("/")
async def root():
return {"message": "Hello World"}
# 可选:提供一个API来手动触发或查看任务状态
@app.get("/jobs")
async def list_jobs():
jobs = scheduler.get_jobs()
return {"jobs": [{"id": j.id, "next_run": str(j.next_run_time)} for j in jobs]}
这里保留了旧式的on_event生命周期管理函数,方便理解scheduler的开启与关闭逻辑,开发时改为lifespan进行更优雅的生命周期管理。
跑起来试试吧!你会看到控制台每隔30秒和每分钟的第30秒都有输出。到数据库里看看,jobs.sqlite里已经存下了我们的任务配置,重启应用任务也不会丢失。🎉
💥 第四部分:天坑预警 —— 多进程部署与重复执行
是不是以为这样就万事大吉了?最大的坑才刚刚浮出水面。
当你用生产模式启动FastAPI,比如:
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
这--workers 4意味着启动了4个独立的进程。那么,app.on_event("startup")会在这4个进程里各执行一次!结果就是,你的定时任务被添加了4次,会被重复执行4次!想象一下,每小时发一次的报表邮件,突然变成了每小时发四封,老板和用户都会疯掉。
🔐 解决方案:文件锁与领导者选举
核心思路很简单:确保在多个进程中,只有一个进程能真正启动和添加定时任务。 这里分享两种我们线上在用的方法。
方案一:简单粗暴的文件锁(适合大部分场景)
利用fcntl(Linux)或msvcrt(Windows)给一个文件加锁,只有拿到锁的进程才能初始化调度器。
# 在 scheduler.py 或 startup 事件中
import os
import sys
def try_acquire_lock(lock_file):
try:
import fcntl
f = open(lock_file, 'w')
# 尝试获取非阻塞的独占锁
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
return f # 返回文件对象,保持打开状态以持有锁
except (BlockingIOError, ImportError):
# 获取失败(其他进程已持有锁)或不支持的系统
return None
lock_file = "/tmp/fastapi_scheduler.lock"
lock_fd = try_acquire_lock(lock_file)
@app.on_event("startup")
async def startup_event():
if lock_fd is not None:
# 只有拿到锁的进程才启动调度器
if not scheduler.running:
add_jobs()
scheduler.start()
print(f"进程 {os.getpid()} 成功启动 APScheduler")
else:
print(f"进程 {os.getpid()} 未获得锁,跳过调度器启动")
方案二:利用数据库原子操作(更分布式)
在数据库里建一张表,用原子性的“插入或竞争”操作来选举一个“领导者”进程。
# 假设使用SQLAlchemy ORM
from sqlalchemy.ext.asyncio import AsyncSession
from your_app.models import SchedulerLock
import datetime
async def acquire_db_lock(session: AsyncSession, timeout_minutes=10):
try:
# 尝试插入一条锁记录,host和pid标识当前进程
lock = SchedulerLock(
id=1, # 固定ID
host="my_host",
pid=os.getpid(),
last_heartbeat=datetime.datetime.utcnow()
)
session.add(lock)
await session.commit()
return True # 插入成功,获得锁
except IntegrityError: # 唯一约束冲突,记录已存在
await session.rollback()
# 检查已有的锁是否已过期
existing_lock = await session.get(SchedulerLock, 1)
if existing_lock and (datetime.datetime.utcnow() - existing_lock.last_heartbeat).seconds > timeout_minutes * 60:
# 锁已过期,更新为当前进程
existing_lock.host = "my_host"
existing_lock.pid = os.getpid()
existing_lock.last_heartbeat = datetime.datetime.utcnow()
await session.commit()
return True
return False # 未能获得锁
# 在 startup 事件中调用 acquire_db_lock 判断
记住,多进程部署下定时任务初始化,不加锁等于制造线上事故。 我个人更推荐方案一,足够简单可靠,除非你已经是跨机器的分布式部署了。
✨ 最后啰嗦一句
定时任务看似是小功能,但把它做可靠却需要处处留心。从选择APScheduler,到正确集成到应用生命周期,再到最后用文件锁避开多进程的坑,每一步都是我们踩过的雷。
技术栈没有银弹,但有了这套组合拳,你的FastAPI后台定时任务,基本可以高枕无忧了。至少,能让你睡个安稳觉,不用再担心凌晨三点的电话。
希望这篇“踩坑日记”对你有用。如果你在实践过程中又发现了新的“坑点”,或者有更优雅的解决方案,一定要在评论区告诉我啊!独乐乐不如众乐乐,啊不,是独坑坑不如众填填。😄
收藏点赞关注,你的支持是我分享更多实战干货的最大动力。下期可能聊聊FastAPI如何优雅地做分布式日志追踪,我们不见不散!
原文地址: https://www.cveoy.top/t/topic/qFS3 著作权归作者所有。请勿转载和采集!