首页 > 基础资料 博客日记

FastAPI里玩转Redis和数据库的正确姿势,别让异步任务把你坑哭了!

2026-04-01 14:30:03基础资料围观1

文章FastAPI里玩转Redis和数据库的正确姿势,别让异步任务把你坑哭了!分享给大家,欢迎收藏极客资料网,专注分享技术知识

📝 摘要:还在为FastAPI异步任务里管理Redis连接和数据库会话而头疼?本文将从一个真实踩坑案例出发,带你深入浅出地理解如何优雅地在后台任务中操作外部服务。我们会手写一个生产级别的代码组织方案,帮你彻底告别连接泄露和资源耗尽的问题,让你的异步任务跑得又快又稳。

今天咱们聊的这个话题,可以说是每个用FastAPI做生产项目的同学,几乎都会遇到的一道坎——异步任务里怎么安全、高效地调用Redis和数据库?

你可能会问:“这有啥难的?直接丢到 BackgroundTasks 里,然后正常调用不就行了?”哎,如果真这么简单,我这篇文章就不会诞生了。听我给你讲个故事。

🎯 1. 一个让人抓狂的“小问题”

之前有个项目,需要在上传用户头像后,异步生成几种不同尺寸的缩略图,并把处理结果和状态存到MySQL,同时把用户ID和任务ID塞到Redis里做状态追踪。一切看起来都很美好,代码也跑通了。

但上线后,噩梦开始了!应用跑了一两天,就开始随机报错,有的说MySQL连接已关闭,有的说Redis连接数超限

最离谱的是,有时候任务执行到一半,数据库连接突然断开了,导致部分数据写入失败,状态成了“薛定谔的完成”。

后来debug了好几天,才发现问题的根源:我把数据库和Redis的会话(Session/Connection)直接传递到了异步任务函数里,但生命周期完全错乱了!

🔍 2. 问题到底出在哪?

FastAPI的BackgroundTasks虽然用起来简单,但它本质上是在响应返回之后,在同一个进程中“偷偷”执行的一个函数。

问题是,你在请求生命周期内创建的数据库会话(比如通过依赖项注入的db: Session),在请求结束后,通常会被框架自动关闭。但你的后台任务还在用这个已经被关闭的会话,不出错才怪!

Redis连接也是类似,如果你把连接池里“借”出来的连接直接传进去,一旦主请求结束,连接被归还或关闭,后台任务再用的时候,就会直接GG。

这里我要特别强调一点:千万不要在异步任务里,直接复用请求生命周期内的资源对象!这是新手最容易踩的坑,也是我当初血泪教训的核心。

⚙️ 3. 核心原理:各管各的,生命周期要分离

那正确的姿势是什么呢?核心思想就是“谁用谁创建,用完自己关”

异步任务函数内部,不应该依赖外部传递进来的“活”连接,而是应该拥有自己独立的资源管理逻辑。

具体来说,我们需要在异步任务函数内部,重新创建所需的资源(比如新的数据库Session,新的Redis连接),并在任务执行完毕后,确保这些资源被正确关闭或归还

这听起来像是个体力活,但其实我们可以通过一些好的代码组织模式,让它变得优雅且可维护。

🛠️ 4. 实战:生产级别的组织方案

好,理论说完了,咱们直接上代码。我会展示一个我目前在用的、相对成熟的方案,它使用了Celery作为任务队列(当然,你也可以用BackgroundTasks,但原理相通)。

📁 第一步:目录结构

project/
├── app/
│   ├── api/          # 路由层
│   ├── core/         # 核心配置(数据库、Redis等)
│   │   ├── database.py
│   │   └── redis_client.py
│   ├── models/       # 数据库模型
│   ├── schemas/      # Pydantic模型
│   └── tasks/        # 异步任务模块!✨
│       ├── __init__.py
│       ├── user_tasks.py
│       └── worker.py  # 任务入口
└── ...

💾 第二步:核心资源管理(重点!)

tasks/worker.py里,我们定义一个基类或辅助函数,专门负责在每个任务中,初始化和管理这些资源。

# tasks/worker.py
from sqlalchemy.orm import sessionmaker
from app.core.database import engine
from app.core.redis_client import get_redis_client
from contextlib import contextmanager

# 注意:这里是在模块级别创建SessionLocal,它是一个工厂,不是具体的会话
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

@contextmanager
def get_db_session():
    """每个任务自己独立创建一个数据库会话,用完即关"""
    db = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()

@contextmanager
def get_redis_conn():
    """每个任务自己独立创建一个Redis连接,用完即关"""
    # 这里假设你的redis_client是连接池,这个函数返回一个连接实例
    redis_client = get_redis_client()  
    try:
        yield redis_client
    finally:
        # 注意:如果用的是连接池,归还连接的操作通常是在你调close()时内部处理的
        # 这里只是示意,具体看你的Redis库实现
        redis_client.close() 
  

看到没?这里的关键就是 get_db_session get_redis_conn 这两个上下文管理器。它们确保了每一个独立的异步任务,都拥有一个属于自己的、生命周期完整的资源。

🚀 第三步:编写具体的异步任务

现在,我们可以在tasks/user_tasks.py里编写具体的业务逻辑了。

# tasks/user_tasks.py
from celery import Task
from app.tasks.worker import get_db_session, get_redis_conn
from app.models.user import User
import asyncio  # 如果任务里需要异步IO

class ProcessAvatarTask(Task):
    name = "process_avatar"
    
    def run(self, user_id: int, image_path: str):
        # 这里我们自己来创建和管理资源
        with get_db_session() as db:
            user = db.query(User).filter(User.id == user_id).first()
            # 1. 更新数据库状态
            user.avatar_status = "processing"
            db.commit()
        
        # 2. 处理图片(耗时操作,可以是同步的,也可以跑在异步线程池)
        # 这里为了简单,用同步模拟
        # ...
        
        # 3. 再次更新状态,并写入Redis
        with get_redis_conn() as redis:
            redis.set(f"user:{user_id}:avatar:status", "completed")
        
        with get_db_session() as db:
            user.avatar_status = "completed"
            db.commit()
        
        return {"status": "success", "user_id": user_id}
  

📞 第四步:在API路由中调用

# api/user.py
from fastapi import APIRouter, BackgroundTasks
from app.tasks.user_tasks import ProcessAvatarTask

router = APIRouter()

@router.post("/upload_avatar")
async def upload_avatar(user_id: int, background_tasks: BackgroundTasks):
    # 这里不要传数据库session或redis连接进去!
    # 只传必要的业务数据,比如user_id和文件路径
    background_tasks.add_task(ProcessAvatarTask().run, user_id, "/tmp/avatar.jpg")
    return {"msg": "任务已添加"}
  

💡 5. 进阶思考与踩坑预警

- 关于连接池大小:

别以为资源独立了就万事大吉。如果任务并发太高,每个任务都独立创建一个数据库连接,很容易把连接池撑爆。

所以,你的数据库连接池(比如SQLAlchemy的pool_size)要设置得合理一些,比如pool_size=20, max_overflow=10,然后根据你的任务并发量去调整。

- 关于Redis连接池:

上面的get_redis_conn最好是返回一个从连接池中获取的连接,而不是每次都新建一个TCP连接。这点很重要!

- 关于错误重试:

如果你的任务依赖外部服务(比如Redis、数据库),一定要考虑网络抖动带来的暂时性失败。推荐使用Celery的autoretry_for机制,或者自己在任务里写重试逻辑。


好了,今天的内容就到这里。其实异步任务里的资源管理,核心就是一个“职责分离”的原则。API层只负责接收请求,投递任务,绝不越俎代庖去管理任务内部的生命周期。而任务层,则要像一个独立的小程序,自己负责所有资源的生杀大权。

希望这篇掏心窝子的分享,能让你在FastAPI的异步之路上少踩几个坑。如果你也有什么独家秘籍,或者被我文章里的某个点戳中了,欢迎在评论区留言,咱们一起交流,一起进步!

老规矩,觉得有用的话,点赞、关注、转发走一波,让更多小伙伴看到。毕竟,大家好,才是真的好嘛!😉


文章来源:https://www.cnblogs.com/ymtianyu/p/19806597
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云