AI开发-python-LangGraph框架(3-32-LangGraph 并行验证)
- 日期: 2026-04-24
- 标签: 程序员
LangGraph 实战:轻松实现工作流并行执行,大幅提升处理效率
一、LangGraph 并行执行核心优势
- 极简定义,无冗余代码:无需手动管理线程、协程,框架底层自动处理任务并发,专注业务逻辑即可;
- 状态自动管理,数据无缝聚合:内置状态管理机制,并行任务的执行结果会自动同步、合并,无需手动处理数据传递;
- 流程可视化,结构一目了然:支持工作流图形化展示,并行分支、聚合节点清晰可见,调试和维护更简单。
二、LangGraph 并行执行的实现逻辑
1. 定义结构化状态,奠定数据基础
2. 编写独立任务节点
3. 配置多入口,触发并行执行
4. 自动聚合结果,完成闭环
三、实战效果:并行执行的直观体现
- 两个任务同时启动,即时任务瞬间完成执行,无需等待耗时任务;
- 耗时任务执行完毕后,流程自动进入聚合阶段;
- 最终统一输出所有任务的处理结果,总耗时仅等于最长单个任务的耗时,而非所有任务耗时之和。
四、总结
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
import time
# 定义状态(必须继承TypedDict)
class ParallelState(TypedDict):
input_data: str
task_a_result: str
task_b_result: str
all_results: Annotated[list, operator.add] # 使用注解实现自动合并
# 创建图构建器
graph_builder = StateGraph(state_schema=ParallelState)
# 定义并行执行函数
def process_task_a(state: ParallelState):
print("\nA开始执行...")
time.sleep(3) # 休眠3秒
print("3秒后继续执行")
print(f"Task A processing: {state['input_data']}")
return {"task_a_result": f"A处理结果: {state['input_data']}"}
def process_task_b(state: ParallelState):
print("\nB开始执行...")
print(f"Task B processing: {state['input_data']}")
return {"task_b_result": f"B处理结果: {state['input_data']}"}
def aggregate_results(state: ParallelState):
all_results = [state['task_a_result'], state['task_b_result']]
print(f"聚合结果: {all_results}")
return {"all_results": all_results}
# 添加节点
graph_builder.add_node("task_a", process_task_a)
graph_builder.add_node("task_b", process_task_b)
graph_builder.add_node("aggregator", aggregate_results)
# 设置入口点 - 多个入口点实现并行
graph_builder.set_entry_point("task_a")
graph_builder.set_entry_point("task_b")
# 添加边连接
graph_builder.add_edge("task_a", "aggregator")
graph_builder.add_edge("task_b", "aggregator")
graph_builder.add_edge("aggregator", END)
# 编译图
graph = graph_builder.compile()
#画图
print(graph.get_graph().draw_ascii())
# 执行
initial_state = {"input_data": "测试数据"}
result = graph.invoke(initial_state)
print("最终结果:", result)
输出数据:
+-----------+
| __start__ |
+-----------+
* *
** **
* *
+--------+ +--------+
| task_a | | task_b |
+--------+ +--------+
* *
** **
* *
+------------+
| aggregator |
+------------+
*
*
*
+---------+
| __end__ |
+---------+
A开始执行...
B开始执行...
Task B processing: 测试数据
3秒后继续执行
Task A processing: 测试数据
聚合结果: ['A处理结果: 测试数据', 'B处理结果: 测试数据']
最终结果: {'input_data': '测试数据', 'task_a_result': 'A处理结果: 测试数据', 'task_b_result': 'B处理结果: 测试数据', 'all_results': ['A处理结果: 测试数据', 'B处理结果: 测试数据']}
原文地址: https://www.cveoy.top/t/topic/qGvs 著作权归作者所有。请勿转载和采集!