首页 > 基础资料 博客日记

FastAPI + Celery 实战:异步任务里调用 Redis 和数据库的全解析,及生产级组织方案

2026-04-02 13:30:04基础资料围观1

极客资料网推荐FastAPI + Celery 实战:异步任务里调用 Redis 和数据库的全解析,及生产级组织方案这篇文章给大家,欢迎收藏极客资料网享受知识的乐趣

前面的文章里使用的BackgroundTasks进行的任务调用,这里使用Celery对整个方案重新构建下,核心逻辑同理!!!

Celery 的任务是在独立的工作进程(worker)中执行的,和 FastAPI 的请求进程是分离的。如果你在 API 路由里通过依赖项注入创建了一个数据库会话,然后把这个会话对象作为参数传给 Celery 任务,会发生什么?

首先,数据库会话(比如 SQLAlchemy 的 Session)通常不是可序列化的,根本传不到 worker 那边。就算你用了某些技巧把它序列化了,worker 那边拿到后,这个会话对应的底层数据库连接可能早已在原始进程中关闭,或者根本不存在。结果就是各种稀奇古怪的报错。

Redis 连接同理,如果你把连接池里“借”出来的连接直接传给 Celery 任务,序列化后到了 worker 端,完全无法使用。

我们需要在 Celery 任务函数内部,重新创建所需的资源(比如新的数据库 Session,新的 Redis 连接),并在任务执行完毕后,确保这些资源被正确关闭或归还

🛠️ 实战:生产级别的组织方案

好,理论前面已经说完了,更多的细节可以看上篇FastAPI里玩转Redis和数据库的正确姿势,别让异步任务把你坑哭了!,这里就直接上重构的代码。

我会展示一个我目前在用的、相对成熟的方案,它基于 Celery 和 SQLAlchemy,并配合 Redis 做状态缓存。

📁 第一步:目录结构

project/
├── app/
│   ├── api/              # 路由层
│   ├── core/             # 核心配置
│   │   ├── database.py   # 数据库引擎、SessionLocal 工厂
│   │   ├── redis_client.py # Redis 连接池
│   │   └── celery_app.py   # Celery 应用实例
│   ├── models/           # 数据库模型
│   ├── schemas/          # Pydantic 模型
│   └── tasks/            # Celery 任务模块 ✨
│       ├── __init__.py
│       └── user_tasks.py
└── ...

💾 第二步:Celery 应用配置

core/celery_app.py 中创建 Celery 实例,并配置 broker 和 backend(通常用 Redis)。

# core/celery_app.py
from celery import Celery
import os

celery_app = Celery(
    "worker",
    broker=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"),
    backend=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0"),
    include=["app.tasks.user_tasks"]  # 自动发现任务模块
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,      # 任务超时时间
    task_soft_time_limit=25 * 60,
)
  

🔌 第三步:资源管理上下文管理器(重点!)

core/database.pycore/redis_client.py 中定义好工厂和上下文管理器,供 Celery 任务独立使用。

# core/database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
from contextlib import contextmanager

engine = create_engine(
    "mysql+pymysql://user:pass@localhost/db",
    pool_size=10,          # 连接池大小
    max_overflow=20,       # 超出 pool_size 最大连接数
    pool_pre_ping=True,    # 自动重连检测
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

@contextmanager
def get_db_session() -> Session:
    """每个 Celery 任务独立创建一个数据库会话,用完即关"""
    db = SessionLocal()
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise
    finally:
        db.close()
  
# core/redis_client.py
import redis
from contextlib import contextmanager

redis_pool = redis.ConnectionPool(
    host="localhost",
    port=6379,
    db=0,
    max_connections=20,
    decode_responses=True,
)

@contextmanager
def get_redis_conn():
    """每个 Celery 任务独立获取一个 Redis 连接,用完归还"""
    conn = redis.Redis(connection_pool=redis_pool)
    try:
        yield conn
    finally:
        # 如果使用了连接池,关闭连接只是归还给池子
        conn.close()
  

看到没?这里的关键就是 get_db_sessionget_redis_conn 这两个上下文管理器。它们确保了每一个独立的 Celery 任务,都拥有一个属于自己的、生命周期完整的资源。

🚀 第四步:编写具体的 Celery 任务

现在,我们可以在 tasks/user_tasks.py 里编写具体的业务逻辑了。使用 @celery_app.task 装饰器定义任务。

# tasks/user_tasks.py
from app.core.celery_app import celery_app
from app.core.database import get_db_session
from app.core.redis_client import get_redis_conn
from app.models.user import User
import time

@celery_app.task(name="process_avatar")
def process_avatar(user_id: int, image_path: str):
    """
    处理用户头像的异步任务
    """
    # 1. 更新数据库状态:processing
    with get_db_session() as db:
        user = db.query(User).filter(User.id == user_id).first()
        if not user:
            raise ValueError(f"User {user_id} not found")
        user.avatar_status = "processing"
        db.commit()
    
    # 2. 模拟耗时的图片处理(可替换为实际图片处理库)
    time.sleep(3)
    # 假设处理成功,生成缩略图路径
    thumbnail_path = f"/thumbnails/{user_id}.jpg"
    
    # 3. 更新 Redis 状态
    with get_redis_conn() as redis:
        redis.set(f"user:{user_id}:avatar:status", "completed")
        redis.set(f"user:{user_id}:avatar:thumbnail", thumbnail_path)
    
    # 4. 再次更新数据库最终状态
    with get_db_session() as db:
        user.avatar_status = "completed"
        user.avatar_thumbnail = thumbnail_path
        db.commit()
    
    return {"status": "success", "user_id": user_id, "thumbnail": thumbnail_path}
  

📞 第五步:在 FastAPI 路由中调用 Celery 任务

# api/user.py
from fastapi import APIRouter, UploadFile, File
from app.tasks.user_tasks import process_avatar
import uuid

router = APIRouter()

@router.post("/upload_avatar")
async def upload_avatar(user_id: int, file: UploadFile = File(...)):
    # 保存文件到临时目录(省略)
    temp_path = f"/tmp/{uuid.uuid4()}.jpg"
    # 将文件写入磁盘...
    
    # 调用 Celery 任务,只传递必要的业务数据
    task = process_avatar.delay(user_id, temp_path)
    
    return {"task_id": task.id, "status": "queued"}
  

💡 进阶思考与踩坑预警

- 关于错误重试:

Celery 自带重试机制。你可以在任务上配置 @celery_app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True, max_retries=3),让任务在遇到异常时自动重试,这对处理数据库暂时性连接问题很有帮助。

- 关于任务幂等性:

设计任务时尽量保证幂等,因为重试可能导致同一个任务被执行多次。比如更新数据库状态时,可以用 UPDATE ... WHERE status = 'processing' 这样的条件来避免重复执行。


好了,今天的内容就到这里。同上篇文件一样, Celery 任务里的资源管理,核心就是一个“职责分离”的原则。

老规矩,觉得有用的话,点赞、关注、转发走一波,让更多小伙伴看到。有任何想法和问题,评论区留言,一起交流,一起进步!!😉


文章来源:https://www.cnblogs.com/ymtianyu/p/19806828
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云