你的FastAPI后台任务是不是还在“裸奔”?

先说事实案例:有个促销活动需要定时上线。结果呢?依赖的云函数服务突然抖动,那个“简单可靠”的crontab脚本愣是没触发。凌晨三点,运营的电话直接把你的美梦干碎。😫 事后复盘,才意识到:把定时任务寄生于操作系统或者外部黑盒服务,在微服务架构里,就是给自己埋雷。

痛定思痛,最后把定时任务“请”回了应用内部,用APScheduler在FastAPI里搞了个自治的小闹钟。今天,咱们就来聊聊这套实战经验,连同那些半夜爬起来填的坑……


🎯 本文你能得到什么

1. 为什么说FastAPI自带的BackgroundTasks不适合做定时任务。

2. APScheduler的核心概念,用“闹钟”和“餐厅”的比喻让你秒懂。

3. 手把手集成,提供可直接复制粘贴的代码块。

4. 最重要的:多进程部署(比如用Uvicorn workers)时,定时任务重复执行的“鬼故事”与解决之道。

🔧 第一部分:问题与背景 —— 为什么另起炉灶?

FastAPI 的 BackgroundTasks 是个好同志,但它只是个“跑腿小哥”。你API请求来了,它帮你异步处理些杂事,比如发邮件、写日志。但它有个硬伤:它没有记忆,也不会看表。 服务一重启,所有计划内的“跑腿”任务全忘光光。

定时任务呢?它需要的是“忠诚的管家”。不管服务是否重启,都要记得每天上午10点要发报表,每周一凌晨要清缓存。这需要持久化和时间调度能力,这正是 APScheduler 的绝活。

你可能会问,用Celery行不行?行,但杀鸡用牛刀了。APScheduler更轻量,与你FastAPI应用同生共死,管理起来简单直接,特别适合业务逻辑清晰、不需要分布式协调的定时场景。

⚙️ 第二部分:核心原理 —— APScheduler的三板斧

别被它的名字吓到,把它想象成一个高度可定制的智能闹钟系统。它主要由三部分组成:

📅 触发器 (Trigger): 决定“什么时候响”。是每天固定时间(date),还是间隔固定时间(interval),或者是像crontab那样的复杂周期(cron)?

📝 作业存储器 (Job Store): 记住“有哪些闹钟要响”。默认存在内存里,重启就忘。我们可以让它记在数据库里(比如SQLite、PostgreSQL),实现持久化。

👨‍💼 执行器 (Executor): 负责“闹钟响了以后具体做什么”。是用线程池还是进程池来执行我们的任务函数?

调度器 (Scheduler) 就是总控台,把上面三个部件组装起来,并启动这个闹钟系统。

🚀 第三部分:实战演示 —— 手把手集成到FastAPI

好,咱们先来安装。这步最简单:

pip install apscheduler

接下来重点来了,初始化并集成到FastAPI的生命周期。这里有个关键技巧:一定要把scheduler的启动和关闭挂在FastAPI的应用事件上,保证应用启动时它启动,应用优雅关闭时它也停下。千万别学我当初,直接在模块层面scheduler.start(),导致测试时脚本跑完不退出。

# 项目结构建议
# app/
#   __init__.py
#   main.py       # FastAPI 应用创建和事件处理
#   scheduler.py  # 调度器配置和任务定义
#   tasks.py      # 具体的任务函数

# app/scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor

# 1. 配置组件
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # 使用SQLite持久化
}
executors = {
    'default': ThreadPoolExecutor(20)  # 线程池执行
}
job_defaults = {
    'coalesce': False,  # 错过的任务是否合并执行(一般False)
    'max_instances': 3  # 同一个任务同时运行的最大实例数
}

# 2. 创建调度器实例
scheduler = AsyncIOScheduler(
    jobstores=jobstores,
    executors=executors,
    job_defaults=job_defaults,
    timezone="Asia/Shanghai"  # 时区!时区!时区!重要的事说三遍
)

# 3. 定义任务函数 (可以放在同文件,也可从其他模块导入)
def my_sync_job():
    print("同步任务执行了!")

async def my_async_job():
    print("异步任务执行了!")
    # 这里可以愉快地调用其他async函数

# 4. 添加任务的函数 (通常在应用启动时调用)
def add_jobs():
    # 间隔任务:每30秒执行一次
    scheduler.add_job(my_sync_job, 'interval', seconds=30, id='sync_interval_job')
    # Cron任务:每分钟的第30秒执行
    scheduler.add_job(my_async_job, 'cron', second=30, id='async_cron_job')
    # 单次任务:2023年10月1日执行
    # scheduler.add_job(xxx, 'date', run_date='2023-10-01 00:00:00')    

然后在你的main.py里,把它和FastAPI绑在一起:

# app/main.py
from fastapi import FastAPI
from .scheduler import scheduler, add_jobs

app = FastAPI(title="定时任务演示")

@app.on_event("startup")
async def startup_event():
    # 应用启动时,添加任务并启动调度器
    if not scheduler.running:
        add_jobs()
        scheduler.start()
        print("APScheduler 已启动")

@app.on_event("shutdown")
async def shutdown_event():
    # 应用关闭时,优雅地关闭调度器
    if scheduler.running:
        scheduler.shutdown()
        print("APScheduler 已关闭")

#from contextlib import asynccontextmanager
#@asynccontextmanager
#async def lifespan(app: FastAPI):
#    # 应用启动时,添加任务并启动调度器
#    if not scheduler.running:
#        add_jobs()
#        scheduler.start()
#        print("APScheduler 已启动")
#    yield
#    # 应用关闭时,优雅地关闭调度器
#    if scheduler.running:
#        scheduler.shutdown()
#        print("APScheduler 已关闭")
#app = FastAPI(title="定时任务演示", lifespan=lifespan)

@app.get("/")
async def root():
    return {"message": "Hello World"}

# 可选:提供一个API来手动触发或查看任务状态
@app.get("/jobs")
async def list_jobs():
    jobs = scheduler.get_jobs()
    return {"jobs": [{"id": j.id, "next_run": str(j.next_run_time)} for j in jobs]}    

这里保留了旧式的on_event生命周期管理函数,方便理解scheduler的开启与关闭逻辑,开发时改为lifespan进行更优雅的生命周期管理。

跑起来试试吧!你会看到控制台每隔30秒和每分钟的第30秒都有输出。到数据库里看看,jobs.sqlite里已经存下了我们的任务配置,重启应用任务也不会丢失。🎉

💥 第四部分:天坑预警 —— 多进程部署与重复执行

是不是以为这样就万事大吉了?最大的坑才刚刚浮出水面。

当你用生产模式启动FastAPI,比如:

uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4

--workers 4意味着启动了4个独立的进程。那么,app.on_event("startup")会在这4个进程里各执行一次!结果就是,你的定时任务被添加了4次,会被重复执行4次!想象一下,每小时发一次的报表邮件,突然变成了每小时发四封,老板和用户都会疯掉。

🔐 解决方案:文件锁与领导者选举

核心思路很简单:确保在多个进程中,只有一个进程能真正启动和添加定时任务。 这里分享两种我们线上在用的方法。

方案一:简单粗暴的文件锁(适合大部分场景)

利用fcntl(Linux)或msvcrt(Windows)给一个文件加锁,只有拿到锁的进程才能初始化调度器。

# 在 scheduler.py 或 startup 事件中
import os
import sys

def try_acquire_lock(lock_file):
    try:
        import fcntl
        f = open(lock_file, 'w')
        # 尝试获取非阻塞的独占锁
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        return f  # 返回文件对象,保持打开状态以持有锁
    except (BlockingIOError, ImportError):
        # 获取失败(其他进程已持有锁)或不支持的系统
        return None

lock_file = "/tmp/fastapi_scheduler.lock"
lock_fd = try_acquire_lock(lock_file)

@app.on_event("startup")
async def startup_event():
    if lock_fd is not None:
        # 只有拿到锁的进程才启动调度器
        if not scheduler.running:
            add_jobs()
            scheduler.start()
            print(f"进程 {os.getpid()} 成功启动 APScheduler")
    else:
        print(f"进程 {os.getpid()} 未获得锁,跳过调度器启动")        

方案二:利用数据库原子操作(更分布式)

在数据库里建一张表,用原子性的“插入或竞争”操作来选举一个“领导者”进程。

# 假设使用SQLAlchemy ORM
from sqlalchemy.ext.asyncio import AsyncSession
from your_app.models import SchedulerLock
import datetime

async def acquire_db_lock(session: AsyncSession, timeout_minutes=10):
    try:
        # 尝试插入一条锁记录,host和pid标识当前进程
        lock = SchedulerLock(
            id=1,  # 固定ID
            host="my_host",
            pid=os.getpid(),
            last_heartbeat=datetime.datetime.utcnow()
        )
        session.add(lock)
        await session.commit()
        return True  # 插入成功,获得锁
    except IntegrityError:  # 唯一约束冲突,记录已存在
        await session.rollback()
        # 检查已有的锁是否已过期
        existing_lock = await session.get(SchedulerLock, 1)
        if existing_lock and (datetime.datetime.utcnow() - existing_lock.last_heartbeat).seconds > timeout_minutes * 60:
            # 锁已过期,更新为当前进程
            existing_lock.host = "my_host"
            existing_lock.pid = os.getpid()
            existing_lock.last_heartbeat = datetime.datetime.utcnow()
            await session.commit()
            return True
        return False  # 未能获得锁

# 在 startup 事件中调用 acquire_db_lock 判断        

记住,多进程部署下定时任务初始化,不加锁等于制造线上事故。 我个人更推荐方案一,足够简单可靠,除非你已经是跨机器的分布式部署了。

✨ 最后啰嗦一句

定时任务看似是小功能,但把它做可靠却需要处处留心。从选择APScheduler,到正确集成到应用生命周期,再到最后用文件锁避开多进程的坑,每一步都是我们踩过的雷。

技术栈没有银弹,但有了这套组合拳,你的FastAPI后台定时任务,基本可以高枕无忧了。至少,能让你睡个安稳觉,不用再担心凌晨三点的电话。


希望这篇“踩坑日记”对你有用。如果你在实践过程中又发现了新的“坑点”,或者有更优雅的解决方案,一定要在评论区告诉我啊!独乐乐不如众乐乐,啊不,是独坑坑不如众填填。😄

收藏点赞关注,你的支持是我分享更多实战干货的最大动力。下期可能聊聊FastAPI如何优雅地做分布式日志追踪,我们不见不散!


原文地址: https://www.cveoy.top/t/topic/qFS3 著作权归作者所有。请勿转载和采集!

免费AI点我,无需注册和登录