首页 > 基础资料 博客日记
AI开发-python-LangGraph框架(3-34-LangGraph循环实战)
2026-04-29 11:00:04基础资料围观1次
实战 LangGraph 循环执行:构建带自动重试的并行任务流
一、核心场景与需求拆解
- 并行执行两个独立任务(获取天气、获取时间),两个任务都存在随机失败概率;
- 只要任意一个任务失败,就自动触发循环重试,无需手动重启流程;
- 支持设置最大重试次数,避免无限循环,保障流程稳定性;
- 全程通过状态管理数据,重试时复用原始输入,自动更新执行结果。
二、LangGraph 循环执行的核心设计思路
1. 状态定义:循环的「数据载体」
2. 执行节点:循环的「任务核心」
3. 决策节点:循环的「开关控制器」
- ✅ 任务全部成功 → 结束流程;
- ❌ 任务失败 + 未达最大重试次数 → 回到执行节点,重新循环;
- ❌ 任务失败 + 达到最大重试次数 → 强制结束流程。
三、LangGraph 循环执行的核心优势
1. 逻辑解耦,可读性拉满
2. 状态自动流转,无需手动管理
3. 容错性强,适配不稳定场景
4. 并行 + 循环完美结合
四、落地价值与适用场景
- 第三方 API 调用容错(天气、地图、支付等接口);
- AI Agent 工具调用自动重试(LLM 接口、插件调用);
- 批量数据拉取 / 处理,保障任务完整性;
- 异步任务执行,失败自动恢复。
五、总结
代码流程图:

实现代码:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
# 1. 定义带重试状态的 State
class LoopState(TypedDict):
input_data: str # 输入数据
weather_data: str | None # 天气数据
time_data: str | None # 时间数据
retry_count: int # 重试次数
max_retries: int # 最大重试次数
is_success: bool # 是否执行成功(用于判断循环退出)
# 2. 模拟不稳定的并行任务(可能失败,触发循环)
def fetch_weather(input_data: str) -> str:
"""模拟天气数据获取(30%概率失败)"""
print(f"\n[天气任务] 处理: {input_data}")
if random.random() < 0.3: # 30%失败概率
print("[天气任务] ❌ 数据获取失败")
return ""
time.sleep(1) # 模拟耗时
result = f"{input_data} 天气:25℃ 晴天"
print(f"[天气任务] ✅ 成功: {result}")
return result
def fetch_time(input_data: str) -> str:
"""模拟时间数据获取(20%概率失败)"""
print(f"\n[时间任务] 处理: {input_data}")
if random.random() < 0.2: # 20%失败概率
print("[时间任务] ❌ 数据获取失败")
return ""
time.sleep(1) # 模拟耗时
result = f"{input_data} 时间:15:30 下午"
print(f"[时间任务] ✅ 成功: {result}")
return result
# 3. 并行执行节点(循环的核心执行节点)
def parallel_execution(state: LoopState) -> LoopState:
"""并行执行天气/时间任务,返回更新后的状态"""
input_data = state["input_data"]
retry_count = state["retry_count"] + 1
print(f"\n{'='*50}")
print(f"[循环执行] 第 {retry_count} 次重试")
print('='*50)
# 线程池并行执行
with ThreadPoolExecutor(max_workers=2) as executor:
future_weather = executor.submit(fetch_weather, input_data)
future_time = executor.submit(fetch_time, input_data)
# 收集结果
weather_data = ""
time_data = ""
for future in as_completed([future_weather, future_time]):
if future == future_weather:
weather_data = future.result()
elif future == future_time:
time_data = future.result()
# 判断本次执行是否成功
is_success = bool(weather_data and time_data)
# 返回更新后的状态
return {
"input_data": input_data,
"weather_data": weather_data,
"time_data": time_data,
"retry_count": retry_count,
"max_retries": state["max_retries"],
"is_success": is_success
}
# 4. 循环决策节点(判断是否继续重试)
def loop_decision(state: LoopState) -> str:
"""
决策节点:返回下一个节点名称(循环/结束)
- 成功 → END
- 失败且未达最大重试 → parallel_execution(继续循环)
- 失败且达最大重试 → END
"""
print(f"\n[决策节点] 重试次数: {state['retry_count']}/{state['max_retries']} | 执行成功: {state['is_success']}")
# 退出条件1:执行成功
if state["is_success"]:
print("[决策节点] ✅ 所有任务成功,结束循环")
return END
# 退出条件2:达到最大重试次数
elif state["retry_count"] >= state["max_retries"]:
print("[决策节点] ❌ 达到最大重试次数,强制结束")
return END
# 继续循环:返回并行执行节点名称
else:
print("[决策节点] 🔄 任务失败,继续重试")
return "parallel_execution"
# 5. 初始化节点(设置初始状态)
def init_state(state: LoopState) -> LoopState:
"""初始化循环状态"""
return {
"input_data": state["input_data"],
"weather_data": None,
"time_data": None,
"retry_count": 0, # 初始重试次数0
"max_retries": state["max_retries"], # 最大重试次数(外部传入)
"is_success": False # 初始未成功
}
# 6. 构建带循环的图
def create_loop_graph():
graph_builder = StateGraph(LoopState)
# 添加节点
graph_builder.add_node("init", init_state)
graph_builder.add_node("parallel_execution", parallel_execution)
# 设置入口点
graph_builder.set_entry_point("init")
# 添加边:初始化 → 并行执行
graph_builder.add_edge("init", "parallel_execution")
# 核心:添加条件边(决策节点 → 循环/结束)
graph_builder.add_conditional_edges(
source="parallel_execution", # 源节点:并行执行节点
path=loop_decision, # 决策函数(返回下一个节点名称)
path_map={ # 路径映射(决策结果 → 目标节点)
"parallel_execution": "parallel_execution", # 继续循环
END: END # 结束
}
)
return graph_builder.compile()
# 7. 执行测试
if __name__ == "__main__":
# 编译图
loop_graph = create_loop_graph()
# ========== 修复点1:跳过易报错的 ASCII 绘图 ==========
# 导出 Mermaid 语法(循环结构也能正常展示)
# 初始输入
initial_input = {
"input_data": "北京",
"max_retries": 3 # 最大重试3次
}
# 执行循环图
print("\n" + "="*60)
print("🚀 开始执行带循环的LangGraph")
print("="*60)
start_time = time.time()
# 调用图
final_result = loop_graph.invoke(initial_input)
# 输出结果
elapsed = time.time() - start_time
print(f"\n" + "="*60)
print(f"✅ 执行完成!总耗时: {elapsed:.2f} 秒")
print(f"🔍 最终状态:")
print(f" - 重试次数: {final_result['retry_count']}")
print(f" - 执行成功: {final_result['is_success']}")
print(f" - 天气数据: {final_result['weather_data'] or '获取失败'}")
print(f" - 时间数据: {final_result['time_data'] or '获取失败'}")
print("="*60)
结果输出:
============================================================
🚀 开始执行带循环的LangGraph
============================================================
==================================================
[循环执行] 第 1 次重试
==================================================
[天气任务] 处理: 北京
[天气任务] ❌ 数据获取失败
[时间任务] 处理: 北京
[时间任务] ✅ 成功: 北京 时间:15:30 下午
[决策节点] 重试次数: 1/3 | 执行成功: False
[决策节点] 🔄 任务失败,继续重试
==================================================
[循环执行] 第 2 次重试
==================================================
[天气任务] 处理: 北京
[时间任务] 处理: 北京
[天气任务] ✅ 成功: 北京 天气:25℃ 晴天
[时间任务] ✅ 成功: 北京 时间:15:30 下午
[决策节点] 重试次数: 2/3 | 执行成功: True
[决策节点] ✅ 所有任务成功,结束循环
============================================================
✅ 执行完成!总耗时: 2.02 秒
🔍 最终状态:
- 重试次数: 2
- 执行成功: True
- 天气数据: 北京 天气:25℃ 晴天
- 时间数据: 北京 时间:15:30 下午
============================================================
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
上一篇:文档格式转换器-学术加强版
下一篇:一分钟学会绘制Mermaid流程图

