首页 > 基础资料 博客日记
AI开发-python-LangGraph框架(3-33-LangGraph多分枝实战)
2026-04-27 11:00:03基础资料围观1次
2026-04-27LangGraph多分支实战:用图结构实现专业化任务并行协作
一、先明确核心场景:为什么需要多分支?
二、LangGraph多分支核心特点:分工、并行、汇聚
1. 专业化分工:每个分支只做一件事
2. 并行执行:提升效率,无需相互等待
3. 汇聚决策:多分支结果融合,输出最终答案
三、LangGraph多分支实现方式:4步搭建完整流程
第一步:定义核心节点,明确分支职责
第二步:搭建图结构,连接分支与汇聚节点
第三步:配置分支逻辑,确保数据规范
第四步:实现汇聚逻辑,完成多分支协同决策
四、实战总结:LangGraph多分支的优势与适用场景
五、最后想说的话
import json
import re
from typing import List, Dict, Any, Optional
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import MessageGraph, END
def remove_think_tags(text):
"""
移除字符串中<think></think>标签及其内容
Args:
text: 包含think标签的字符串
Returns:
移除think标签及其内容后的字符串
"""
# 支持多行内容和标签可能有的属性
pattern = r'<think[^>]*>.*?</think>'
result = re.sub(pattern, '', text, flags=re.DOTALL)
# 清理空白字符
result = result.strip()
result = re.sub(r'\n{3,}', '\n\n', result) # 限制连续换行
return result
def extract_json_from_llm_response(text: str) -> Optional[Dict[str, Any]]:
"""
从LLM响应中健壮地提取JSON对象
处理多种格式:
1. 纯JSON: {"key": "value"}
2. Markdown代码块: ```json {...} ```
3. 带前缀/后缀的JSON: "Result: {...}"
"""
text = remove_think_tags(text.strip())
# 尝试1:直接解析整个字符串
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# 尝试2:提取Markdown代码块中的JSON
code_block_patterns = [
r'```(?:json)?\s*({.*?})\s*```', # ```json {...} ```
r'```\s*({.*?})\s*```', # ``` {...} ```
r'({.*})' # 直接匹配花括号内容(最后手段)
]
for pattern in code_block_patterns:
match = re.search(pattern, text, re.DOTALL)
if match:
try:
return json.loads(match.group(1))
except json.JSONDecodeError:
continue
# 尝试3:查找第一个{和最后一个}之间的内容(处理嵌套)
start = text.find('{')
end = text.rfind('}')
if start != -1 and end != -1 and start < end:
candidate = text[start:end + 1]
try:
return json.loads(candidate)
except json.JSONDecodeError:
pass
print(f"⚠️ JSON提取失败,原始响应: {text[:100]}...")
return None
# ==================== 1. LLM 配置 ====================
DEEPSEEK_API_KEY = "123" # 替换为实际的 API Key
llm = ChatOpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="http://192.168.0.100:8087/v1",
model="qwen3.5-35b-gptq",
temperature=0.3,
max_tokens=20000,
)
# ==================== 2. 专业化分支节点(只提供原始数据) ====================
def source_node(state: List[Any]) -> List[Any]:
"""
源节点:触发两个专业化分支
返回原始状态,LangGraph 会自动将消息分发到所有出边(实现并行)
"""
print("\n[source_node] 触发天气和时间两个专业化分支")
return state
def weather_branch(state: List[Any]) -> AIMessage:
"""
天气分支:仅返回原始天气数据(无决策能力)
输出示例: {"temperature": 22, "condition": "sunny", "city": "北京"}
"""
print("\n[weather_branch] 专注提取天气数据(无决策能力)")
last_msg = state[-1].content if state else ""
system_prompt = (
"你是一个专业天气数据提取器。严格遵守:\n"
"1. 只提取天气相关数据,不做任何决策或建议\n"
"2. 输出纯JSON,不要任何额外文本或Markdown代码块\n"
"3. JSON格式: {\"temperature\": 整数, \"condition\": \"sunny/cloudy/rainy/snowy\", \"city\": \"城市名\"}"
)
user_prompt = f"用户问题:{last_msg}\n\n请返回该城市的模拟天气数据:"
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
])
# 清理响应并添加分支标识
cleaned = remove_think_tags(response.content.strip())
return AIMessage(content=f"[WEATHER]{cleaned}")
def time_branch(state: List[Any]) -> AIMessage:
"""
时间分支:仅返回原始时间数据(无决策能力)
输出示例: {"hour": 15, "minute": 30, "period": "afternoon", "city": "北京"}
"""
print("\n[time_branch] 专注提取时间数据(无决策能力)")
last_msg = state[-1].content if state else ""
system_prompt = (
"你是一个专业时间数据提取器。严格遵守:\n"
"1. 只提取当前时间数据,不做任何决策或建议\n"
"2. 输出纯JSON,不要任何额外文本或Markdown代码块\n"
"3. JSON格式: {\"hour\": 0-23, \"minute\": 0-59, \"period\": \"morning/afternoon/evening/night\", \"city\": \"城市名\"}"
)
user_prompt = f"用户问题:{last_msg}\n\n请返回该城市的模拟当前时间:"
response = llm.invoke([
SystemMessage(content=system_prompt),
HumanMessage(content=user_prompt)
])
cleaned = remove_think_tags(response.content.strip())
return AIMessage(content=f"[TIME]{cleaned}")
def decision_sink(state: List[Any]) -> AIMessage:
"""
汇聚决策节点:必须结合天气+时间数据才能做出完整回答
单一分支数据无法回答"是否适合户外活动"这类综合问题
"""
print("\n[decision_sink] 综合天气+时间数据进行智能决策")
# 从state中提取带标识的分支结果(兼容并行执行的顺序不确定性)
weather_data = None
time_data = None
# 从最新消息向前查找(确保获取最新分支输出)
for msg in reversed(state):
if isinstance(msg, AIMessage):
content = msg.content
if "[WEATHER]" in content and weather_data is None:
json_str = content.replace("[WEATHER]", "").strip()
weather_data = extract_json_from_llm_response(json_str)
print(f" ✓ 天气数据提取: {weather_data}")
elif "[TIME]" in content and time_data is None:
json_str = content.replace("[TIME]", "").strip()
time_data = extract_json_from_llm_response(json_str)
print(f" ✓ 时间数据提取: {time_data}")
# 验证必要数据
if not weather_data or not time_data:
missing = []
if not weather_data: missing.append("天气数据")
if not time_data: missing.append("时间数据")
return AIMessage(content=f"❌ 决策失败:缺少{'和'.join(missing)},无法进行综合判断")
# === 核心:必须结合两个分支数据才能做出合理决策 ===
decision = make_outdoor_activity_decision(weather_data, time_data)
return AIMessage(content=decision)
def make_outdoor_activity_decision(weather: Dict[str, Any], time: Dict[str, Any]) -> str:
"""
基于天气+时间的综合决策(必须两个数据源)
单独任一数据都无法安全判断户外活动适宜性
"""
city = weather.get("city", time.get("city", "未知城市"))
temp = weather.get("temperature", "N/A")
condition = weather.get("condition", "unknown")
hour = time.get("hour", -1)
period = time.get("period", "unknown")
# 决策逻辑:必须同时考虑天气和时间
factors = []
recommendations = []
suitability_score = 0
# ===== 时间因素分析(单独无法决策)=====
if hour < 6 or hour > 21:
factors.append(f"⚠️ {period}时段({hour}点):光线不足/温度较低")
suitability_score -= 30
elif 6 <= hour <= 9:
factors.append(f"✅ 早晨{hour}点:适合晨练,空气清新")
suitability_score += 20
elif 15 <= hour <= 17:
factors.append(f"✅ 下午{hour}点:阳光适宜,温度舒适")
suitability_score += 25
else:
factors.append(f"⚪ {period}时段({hour}点):时间中性")
# ===== 天气因素分析(单独无法决策)=====
if condition == "rainy":
factors.append("⚠️ 正在下雨:路面湿滑,易感冒")
suitability_score -= 40
elif condition == "snowy":
factors.append("⚠️ 正在下雪:路面积雪,能见度低")
suitability_score -= 45
elif condition == "sunny":
if isinstance(temp, int) and temp > 32:
factors.append(f"⚠️ 晴天但高温({temp}°C):中暑风险高")
suitability_score -= 25
elif isinstance(temp, int) and 20 <= temp <= 28:
factors.append(f"✅ 晴朗舒适({temp}°C):紫外线适中")
suitability_score += 30
else:
factors.append(f"⚪ 晴天({temp}°C):天气良好")
suitability_score += 15
elif condition == "cloudy":
factors.append(f"✅ 多云({temp}°C):紫外线弱,体感舒适")
suitability_score += 25
# ===== 综合决策(必须两个数据)=====
if suitability_score >= 30:
conclusion = "🟢 非常适合户外活动!"
recommendations = [
"建议进行:慢跑、骑行、公园散步",
"注意:及时补充水分"
]
elif suitability_score >= 10:
conclusion = "🟡 适合户外活动,需注意条件"
recommendations = [
"建议进行:短途散步、户外休闲",
"注意:根据天气准备相应装备(雨具/防晒)"
]
elif suitability_score >= -10:
conclusion = "🟠 谨慎进行户外活动"
recommendations = [
"建议:缩短户外时间,选择遮蔽区域",
"避免:剧烈运动、长时间暴露"
]
else:
conclusion = "🔴 不建议户外活动"
recommendations = [
"建议:室内活动替代",
"原因:天气/时间条件不适宜"
]
# 生成综合报告
report = (
f"📍 {city}户外活动适宜性综合评估\n"
f"{'─' * 40}\n"
f"🌤️ 天气数据: {temp}°C, {condition}\n"
f"⏰ 时间数据: {hour}点 ({period})\n"
f"{'─' * 40}\n"
f"💡 决策依据:\n"
)
for factor in factors:
report += f" • {factor}\n"
report += f"{'─' * 40}\n"
report += f"🎯 综合结论: {conclusion}\n"
report += f"📋 建议:\n"
for rec in recommendations:
report += f" • {rec}\n"
return report
# ==================== 3. 构建专业化多分支图 ====================
graph = MessageGraph()
graph.add_node("source", source_node)
graph.add_node("weather_branch", weather_branch)
graph.add_node("time_branch", time_branch)
graph.add_node("decision_sink", decision_sink)
graph.set_entry_point("source")
graph.add_edge("source", "weather_branch")
graph.add_edge("source", "time_branch")
graph.add_edge("weather_branch", "decision_sink")
graph.add_edge("time_branch", "decision_sink")
graph.add_edge("decision_sink", END)
app = graph.compile()
#画图
print(app.get_graph().draw_ascii())
# ==================== 4. 执行测试 ====================
if __name__ == "__main__":
print("=" * 70)
print("✅ 真实多分支协作测试:需要天气+时间数据共同决策")
print("=" * 70)
test_cases = [
# "北京现在适合户外跑步吗?",
# "上海下午四点去公园散步合适吗?",
# "广州晚上九点适合夜跑吗?",
"成都中午十二点适合户外野餐吗?"
]
for i, query in enumerate(test_cases, 1):
print(f"\n{'=' * 70}")
print(f"_TestCase {i}: {query}_")
print('=' * 70)
inputs = [HumanMessage(content=query)]
for event in app.stream(inputs):
node = list(event.keys())[0]
msg = event[node]
# 打印中间节点输出(简化显示)
if node in ["weather_branch", "time_branch"]:
content_preview = msg.content.replace("[WEATHER]", "").replace("[TIME]", "").strip()
prefix = "🌤️" if node == "weather_branch" else "⏰"
print(f"\n[→ {node}] {prefix} {content_preview[:80]}...")
elif node == "decision_sink":
print(f"\n[→ {node}] 🎯 最终决策报告:\n")
print(msg.content)
结果输出:
+-----------+
| __start__ |
+-----------+
*
*
*
+--------+
| source |
+--------+*
** **
** **
** **
+-------------+ +----------------+
| time_branch | | weather_branch |
+-------------+ +----------------+
** **
** **
** **
+---------------+
| decision_sink |
+---------------+
*
*
*
+---------+
| __end__ |
+---------+
======================================================================
✅ 真实多分支协作测试:需要天气+时间数据共同决策
======================================================================
======================================================================
_TestCase 1: 成都中午十二点适合户外野餐吗?_
======================================================================
[source_node] 触发天气和时间两个专业化分支
[weather_branch] 专注提取天气数据(无决策能力)
[time_branch] 专注提取时间数据(无决策能力)
[→ weather_branch] 🌤️ ```json
{"temperature": 25, "condition": "sunny", "city": "成都"}
```...
[→ time_branch] ⏰ {"hour": 12, "minute": 0, "period": "afternoon", "city": "成都"}...
[decision_sink] 综合天气+时间数据进行智能决策
✓ 时间数据提取: {'hour': 12, 'minute': 0, 'period': 'afternoon', 'city': '成都'}
✓ 天气数据提取: {'temperature': 25, 'condition': 'sunny', 'city': '成都'}
[→ decision_sink] 🎯 最终决策报告:
📍 成都户外活动适宜性综合评估
────────────────────────────────────────
🌤️ 天气数据: 25°C, sunny
⏰ 时间数据: 12点 (afternoon)
────────────────────────────────────────
💡 决策依据:
• ⚪ afternoon时段(12点):时间中性
• ✅ 晴朗舒适(25°C):紫外线适中
────────────────────────────────────────
🎯 综合结论: 🟢 非常适合户外活动!
📋 建议:
• 建议进行:慢跑、骑行、公园散步
• 注意:及时补充水分
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
相关文章
最新发布
- DBLens for PostgreSQL 正式发布|把 PostgreSQL 开发与管理带进 AI + Agent 时代
- 【OpenClaw】通过 Nanobot 源码学习架构---(10)Heartbeat
- C# .NET 周刊|2026年4月1期
- Obsidian CLI 来了
- C# 视频录制监控系统
- 从 AMBA 协议看 Valid-Ready 到 Credit-based 流控机制
- Redis-Hash型与List型操作命令
- 8 年前的老代码 + 20 刀 AI token = 我的第一款独立产品
- 深度学习进阶(十二)可变形池化 deformable RS RoI Pooling
- 计算与判定:P、NP、NP-hard 和 NP-complete 问题

