首页 > 基础资料 博客日记
AI开发-python-LangGraph框架(3-31-LangGraph 「合并式状态管理」的原理与实践)
2026-04-23 11:00:02基础资料围观1次
LangGraph 实战:并行结果安全合并与合并式状态管理深度解析
一、场景痛点:并行任务的状态灾难
- 结果覆盖:多个任务同时写入同一状态字段,后完成的任务会覆盖先完成的结果;
- 数据丢失:嵌套结构的结果无法自动融合,只能整体替换;
- 状态不可控:多节点协作时,无法保证状态的完整性和一致性。
二、核心技术:合并式状态(Annotated 自定义合并)
1. 核心机制
Annotated 注解绑定自定义合并函数,让状态在更新时不再直接替换旧值,而是执行自定义的合并逻辑。旧字典 + 新字典 = 全新融合字典,既保留历史数据,又新增最新数据,从根源杜绝覆盖问题。2. 核心优势
- ✅ 安全无覆盖:并行任务的结果独立存储,互不干扰;
- ✅ 嵌套结构兼容:完美支持字典嵌套结构的合并;
- ✅ 状态可追溯:保留所有任务的执行结果,便于后续聚合;
- ✅ 低侵入性:仅需在状态定义时配置,无需修改任务逻辑。
三、并行结果安全合并:完整实现逻辑
1. 状态定义:定制合并规则
2. 并行任务节点:独立执行,互不干扰
3. 自动扇入聚合:等待所有任务完成
4. 图结构构建:可视化并行流程
四、方案亮点与价值
1. 极致的并行效率
2. 绝对的数据安全
3. 高度可扩展性
4. 企业级健壮性
五、适用场景全覆盖
- 多智能体(Agent)协作决策;
- 并行数据采集与结果聚合;
- 多接口并发调用与数据融合;
- 复杂业务流程的并行分支处理;
- 实时多维度信息分析与报告生成。
六、总结
"""
📌 模式3: 并行结果安全合并
核心机制: 自定义合并函数防止结果覆盖
适用场景: 多Agent协作、并行任务聚合、需要安全合并嵌套结构
"""
import asyncio
from typing import TypedDict, Dict, Annotated, Any
from langgraph.graph import StateGraph, START, END
# ===== 1. 状态定义(关键:自定义合并函数)=====
class ParallelState(TypedDict):
"""
并行状态设计:
- user_query: 原始查询(保留)
- results: 使用自定义合并函数 → 安全合并嵌套字典
- final_answer: 最终聚合结果
"""
user_query: str
# ⚠️ 核心:自定义合并函数实现 {**old, **new}
results: Annotated[Dict[str, Any], lambda old, new: {**old, **new}]
final_answer: str
# ===== 2. 并行任务节点 =====
async def task_weather(state: ParallelState) -> dict:
"""
天气任务: 返回嵌套结果 {"weather": {...}}
⚠️ 必须使用任务名作为键,避免覆盖
"""
await asyncio.sleep(0.4) # 模拟耗时操作
print("[🌤️ 天气任务] 完成 (0.4s)")
return {
"results": {
"weather": { # 关键:以任务名为键
"temperature": 25,
"condition": "sunny"
}
}
}
async def task_time(state: ParallelState) -> dict:
"""
时间任务: 返回嵌套结果 {"time": {...}}
与天气任务互不干扰
"""
await asyncio.sleep(0.3) # 模拟耗时操作
print("[⏰ 时间任务] 完成 (0.3s)")
return {
"results": {
"time": { # 关键:以任务名为键
"hour": 15,
"period": "afternoon"
}
}
}
def aggregator_node(state: ParallelState) -> dict:
"""
聚合节点: 综合所有并行结果
自动等待所有触发的并行节点完成
"""
print("\n[✅ 聚合节点] 所有并行任务完成!")
print(f" 已收集结果: {list(state['results'].keys())}")
# 安全访问结果(处理可能缺失的任务)
weather = state["results"].get("weather", {})
time = state["results"].get("time", {})
# 生成综合决策
decision = (
f"📍 综合分析报告\n"
f"{'─' * 30}\n"
f"🌤️ 天气: {weather.get('temperature', 'N/A')}°C, {weather.get('condition', 'unknown')}\n"
f"⏰ 时间: {time.get('hour', 'N/A')}点 ({time.get('period', 'unknown')})\n"
f"{'─' * 30}\n"
f"💡 建议: 下午时段天气晴朗,适合户外活动"
)
return {"final_answer": decision}
# ===== 3. 构建并行图 =====
def build_parallel_graph():
builder = StateGraph(ParallelState)
# 添加节点
builder.add_node("weather_task", task_weather)
builder.add_node("time_task", task_time)
builder.add_node("aggregator", aggregator_node)
# 并行扇出(从START同时触发两个任务)
builder.add_edge(START, "weather_task")
builder.add_edge(START, "time_task")
# 扇入聚合(两个任务都完成后触发)
builder.add_edge("weather_task", "aggregator")
builder.add_edge("time_task", "aggregator")
builder.add_edge("aggregator", END)
return builder.compile()
# ===== 4. 执行演示 =====
async def main():
print("=" * 60)
print("🧠 模式3: 并行结果安全合并(自定义合并函数)")
print("=" * 60)
graph = build_parallel_graph()
# 画图
print(graph.get_graph().draw_ascii())
# 初始状态(关键:results 必须初始化为空字典!)
initial_state = {
"user_query": "北京现在适合户外活动吗?",
"results": {}, # ⚠️ 必须初始化,否则首次合并失败
"final_answer": ""
}
# 记录时间验证并行
start = asyncio.get_event_loop().time()
final_state = await graph.ainvoke(initial_state)
elapsed = asyncio.get_event_loop().time() - start
print(f"\n【执行结果】")
print(f" 总耗时: {elapsed:.2f}秒 ")
print(f"\n{final_state['final_answer']}")
if __name__ == "__main__":
asyncio.run(main())
结果输出:
============================================================
🧠 模式3: 并行结果安全合并(自定义合并函数)
============================================================
+-----------+
| __start__ |
+-----------+
*** **
* **
** **
+-----------+ +--------------+
| time_task | | weather_task |
+-----------+ +--------------+
*** **
* **
** **
+------------+
| aggregator |
+------------+
*
*
*
+---------+
| __end__ |
+---------+
[⏰ 时间任务] 完成 (0.3s)
[🌤️ 天气任务] 完成 (0.4s)
[✅ 聚合节点] 所有并行任务完成!
已收集结果: ['weather', 'time']
【执行结果】
总耗时: 0.42秒
📍 综合分析报告
──────────────────────────────
🌤️ 天气: 25°C, sunny
⏰ 时间: 15点 (afternoon)
──────────────────────────────
💡 建议: 下午时段天气晴朗,适合户外活动
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
相关文章
最新发布
- RAG 是什么?16 种 RAG 方案一次讲清!AI 应用开发必学 | 万字干货
- 使用 Java 提取 HTML 文件中的纯文本内容
- Python批量图片拼接脚本:支持行列布局、最后一行居中、自然排序
- HackTheBox Cap 靶机:从 IDOR 到 PCAP 凭据提取再到 Capabilities 提权
- AI开发-python-LangGraph框架(3-31-LangGraph 「合并式状态管理」的原理与实践)
- SeaTunnel + AI:一句“我要做什么”,能不能直接变成一份能跑的配置?
- keycloak~实现OAuth 2.0 Token Exchange
- 本体论的启示:从零开始,如何让AI“学会”使用计算器
- Tomcat组件管理源码详解
- 【译】Visual Studio 三月更新 —— 打造专属自定义 Agent

