首页 > 基础资料 博客日记
FastAPI订单防超卖实战:从数据库锁到Saga分布式事务,这一篇给你理清了
2026-04-22 14:30:02基础资料围观1次
先说案例:
凌晨三点,运营电话打爆了你的手机,说系统里的库存变成了 -888 。你眯着眼睛看日志,发现就剩最后一件库存的时候,同时进来了仨请求,全下单成功了。
恭喜你,喜提“反向仓库”成就,不仅没赚到钱,还得挨个打电话道歉求退款 🎯。
刚用FastAPI重构完老项目,觉得异步真香, async/await 写得飞起,唯独忘了数据库那块还有个叫“事务”的爹。
当作复盘吧,今天来聊聊怎么把FastAPI里的事务玩明白,从单库防超卖到跨服务的Saga分布式,一篇给你捋直了。
🔒 别再裸奔了:先搞懂事务这层保护膜
咱们写业务代码,最怕的就是“半拉子工程”。扣了钱库存没减,或者减了库存钱没扣着。这时候必须得把 ACID 刻在烟上吸进肺里,尤其是原子性。
简单打个比方:转账就像你左手倒右手,你不能接受左手已经空了,右手还没接到吧?事务就是那个 begin 、 commit 和 rollback 的保镖。
用大白话翻译一下:
- begin:搬个小板凳坐下,我要开始搞事情了,闲人勿扰。
- commit:好了,活儿干完了,完美收工,落子无悔。
- rollback:我去,写了个Bug,刚才那几步全当没发生,桌子擦干净重来。
在FastAPI里用SQLAlchemy异步操作,千万别再用老一套的 session.commit() 了,容易忘关。官方文档虽然那么写,但根据我以往的经验,一定要用 async with session.begin() ,这才是亲妈生的写法。
async def create_order(db: AsyncSession, order_data):
# 只要缩进在这个代码块里,sqlalchemy自动帮你开了事务
async with db.begin():
new_order = Order(**order_data)
db.add(new_order)
# 这里可能还要调别的扣库存方法,放心,一荣俱荣一损俱损
await flush_and_lock_stock(db, order_data.items)
# 出了这个缩进还没抛异常?自动 commit!
# 中间哪行代码崩了?自动 rollback!省心到想哭。
return new_order
这里千万别学我当初偷懒,在循环里写 commit ,不仅慢得像蜗牛,一旦中间挂了,数据就是“脏”的,找都找不回来。
💊 嵌套事务与保存点:给自己留个后悔药
你可能会问:我就想让大事务里的小步骤失败了不影响大局,行不行?
这就得请出 savepoint 了。这就好比打游戏存档,你明知道前面Boss难打,先在门口存个档,万一死了,不用从第一关重来,直接读档再战。
啥时候用?比如下单成功了,你要发个积分。积分系统挂了(比如网络抖动),你总不能因为送积分失败就把整个订单回滚吧?那老板得把你头拧下来。
这时候在扣完库存后设个保存点,发积分挂了就滚回到保存点,订单照样生成,积分人工补发。
async with db.begin() as conn:
# 扣库存(这步绝不能挂)
await conn.execute(update(Stock).where(...).values(count=Stock.count-1))
# 埋个复活点
savepoint = await conn.begin_nested()
try:
# 这步挂了无所谓
await add_points(conn, user_id, 100)
await savepoint.commit()
except Exception:
await savepoint.rollback()
logger.warning("积分加不上,先欠着吧")
🏗️ 分层架构下的事务协调:别把裤子穿反了
稍微正规点的项目都得分 Service 层和 Repository 层对吧?这时候事务放哪?
听句劝:事务边界一定要放在 Service 层(业务逻辑层)。Repository 层就老老实实做单表增删改查,别自作主张 commit 。
看到不少刚入行的小伙伴在 Repo 层每个方法最后都加个 db.commit() ,最后业务逻辑得拼好几个 Repo 方法,结果第一个 Repo 提交了,第二个崩了,神仙难救。
最好的做法是把 db session 对象像接力棒一样从 Service 传到 Repo,在 Service 的入口统一开启 async with db.begin(): 。
🌍 分布式事务 Saga 模式:当微服务把数据库拆得稀碎
上面聊的都是单库操作。现实很骨感:订单库在 MySQL,用户余额在 Redis 或者别的什么独立服务里。扣钱成功但下单失败怎么办?这时候单机事务已经罩不住了。
这就是 Saga 模式的场子了。别被名字唬住,其实就是一个 “正向大管家 + 反向补偿” 的策略。
拿电商订单举个栗子:
1️⃣ 调用库存服务:预扣库存(成功)
2️⃣ 调用支付服务:扣余额(失败!余额不足)
3️⃣ 关键来了:Saga协调器发现支付失败,立马扭头去执行“反向补偿”——调用库存服务的“回滚接口”,把那件预扣的衣服加回去。
在FastAPI里实现Saga,可以用 状态机模式 或者简单点用后台任务队列记录每一步的执行与回滚。
最后啰嗦一句:补偿接口一定要保证 幂等性,别因为网络重试给人家库存加了两次,那就又成羊毛党了。
⚔️ 实战:库存扣减防超卖的终极锁方案
说完理论,来点立马能用的。单纯在代码里 if stock > 0 然后 update 是不行的,并发一来准挂。咱们得让数据库自己锁住那行数据。
# 悲观锁:SELECT ... FOR UPDATE
# 翻译:我要改这行了,你们都起开,等我改完你们再看。
async def deduct_stock_safe(db: AsyncSession, product_id: int, quantity: int):
async with db.begin():
# 用 for_update() 锁住查询结果
stmt = select(Product).where(Product.id == product_id).with_for_update()
result = await db.execute(stmt)
product = result.scalar_one()
if product.stock >= quantity:
product.stock -= quantity
# 不需要手动 add,被 session 追踪着呢
else:
raise ValueError("库存不足,别抢了!")
是不是以为这样就完了?再说个容易翻车的点:
MySQL默认隔离级别下,就算加了行锁,如果查询条件没走索引,它会锁表!直接把整个表给锁了,那你的并发量瞬间归零。所以, product_id 必须是索引或者主键。
好啦,从单机事务的坑到分布式Saga的应对思路,今天算是把压箱底的经验都掏出来了。技术这条路就是这样,很多知识点看文档觉得会了,一上线就翻车。希望今天聊的这些,能让你下次写库存扣减、写支付回调的时候,心里更有底,少加点班 🛠️。
如果觉得有用,别光收藏吃灰,点个赞加关注让我知道,这样我才有动力继续把踩过的坑都抖落出来。👋
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:

