
AI Agent 编排实战从工具调用到多 Agent 协作的任务拆解与执行框架一、单次对话搞不定复杂任务——Agent 编排的工程化刚需让 AI 一次性完成分析竞品定价策略、生成报告、发送邮件这种多步骤任务结果要么步骤遗漏、要么上下文丢失、要么中间步骤出错后无法恢复。更常见的情况是AI 自信地给出了一个看似完整但逻辑断裂的答案。问题根源大语言模型是单轮预测引擎不是任务执行器。它擅长生成文本不擅长规划步骤、调用工具、处理异常、维护状态。Agent 编排的本质就是把 LLM 的生成能力包装成一个可控的任务执行框架。Agent 编排的核心挑战任务拆解复杂任务如何分解为可执行的原子步骤工具调用如何让 LLM 安全、可靠地调用外部 API状态管理多步骤执行过程中如何维护上下文和中间结果异常恢复某步骤失败时如何重试、回退或降级多 Agent 协作不同角色的 Agent 如何分工与通信二、AI Agent 编排的架构与执行机制Agent 编排的核心模式ReAct 循环Reasoning Acting——思考下一步做什么执行动作观察结果再思考。graph TB subgraph Agent 执行循环 A[接收任务] -- B[推理: 分析当前状态] B -- C[决策: 选择下一步动作] C -- D{动作类型?} D --|工具调用| E[执行工具] D --|子任务委派| F[委派给子 Agent] D --|直接回答| G[生成最终结果] E -- H[观察: 获取执行结果] F -- H H -- I{任务完成?} I --|否| B I --|是| G end subgraph 工具注册表 J[搜索工具] K[数据库查询] L[文件读写] M[API 调用] end subgraph 状态管理 N[对话历史] O[中间结果] P[执行日志] end E -- J K L M B -- N O P编排模式对比模式适用场景复杂度可控性单 Agent 工具简单任务步骤 5低高多 Agent 串行有明确依赖的流水线任务中高多 Agent 并行无依赖的子任务可并行中中层级 Agent复杂任务需分解-委派-汇总高中低三、生产级 Agent 编排框架实现3.1 工具定义与注册from dataclasses import dataclass, field from typing import Callable, Any import json dataclass class ToolDefinition: 工具定义描述工具的输入输出规范供 LLM 理解和调用 name: str description: str # 工具功能描述LLM 据此判断何时调用 parameters: dict # JSON Schema 格式的参数定义 required: list[str] # 必填参数列表 executor: Callable # 实际执行函数 timeout_seconds: int 30 # 执行超时时间 max_retries: int 2 # 最大重试次数 class ToolRegistry: 工具注册表集中管理所有可用工具 def __init__(self): self._tools: dict[str, ToolDefinition] {} def register(self, tool: ToolDefinition): 注册工具 self._tools[tool.name] tool def get(self, name: str) - ToolDefinition | None: return self._tools.get(name) def get_all_schemas(self) - list[dict]: 获取所有工具的 JSON Schema 描述供 LLM 函数调用使用 return [ { type: function, function: { name: tool.name, description: tool.description, parameters: tool.parameters, required: tool.required, }, } for tool in self._tools.values() ] async def execute(self, name: str, arguments: dict) - Any: 执行工具调用带超时和重试 tool self._tools.get(name) if not tool: raise ValueError(f未知工具: {name}) # 参数校验检查必填参数是否存在 for param in tool.required: if param not in arguments: raise ValueError(f工具 {name} 缺少必填参数: {param}) last_error None for attempt in range(tool.max_retries 1): try: import asyncio result await asyncio.wait_for( tool.executor(**arguments), timeouttool.timeout_seconds, ) return result except asyncio.TimeoutError: last_error f工具 {name} 执行超时 ({tool.timeout_seconds}s) except Exception as e: last_error f工具 {name} 执行失败: {str(e)} raise RuntimeError(f工具 {name} 重试耗尽: {last_error})3.2 ReAct Agent 核心import asyncio from enum import Enum from dataclasses import dataclass, field class ActionKind(Enum): TOOL_CALL tool_call FINAL_ANSWER final_answer dataclass class AgentAction: Agent 的动作决策 kind: ActionKind tool_name: str | None None tool_args: dict | None None answer: str | None None reasoning: str # 推理过程用于可解释性 dataclass class AgentState: Agent 执行状态维护对话历史和中间结果 task: str messages: list[dict] field(default_factorylist) tool_results: list[dict] field(default_factorylist) step_count: int 0 max_steps: int 15 # 防止无限循环 class ReActAgent: ReAct 模式 Agent推理-行动-观察循环 def __init__(self, llm_client, tool_registry: ToolRegistry): self.llm llm_client self.tools tool_registry async def run(self, task: str) - str: 执行任务返回最终结果 state AgentState(tasktask) # 初始化系统提示词 state.messages.append({ role: system, content: self._build_system_prompt(), }) state.messages.append({ role: user, content: task, }) while state.step_count state.max_steps: state.step_count 1 # 推理让 LLM 决定下一步动作 action await self._decide_action(state) if action.kind ActionKind.FINAL_ANSWER: return action.answer or if action.kind ActionKind.TOOL_CALL: # 行动执行工具调用 result await self._execute_tool(action) # 观察将工具结果加入上下文 state.tool_results.append({ step: state.step_count, tool: action.tool_name, args: action.tool_args, result: result, }) state.messages.append({ role: assistant, content: f调用工具 {action.tool_name}参数: {json.dumps(action.tool_args, ensure_asciiFalse)}, }) state.messages.append({ role: user, content: f工具返回结果: {json.dumps(result, ensure_asciiFalse)}, }) return 任务执行超时达到最大步骤数限制 async def _decide_action(self, state: AgentState) - AgentAction: 让 LLM 决定下一步动作 response await self.llm.chat( messagesstate.messages, toolsself.tools.get_all_schemas(), tool_choiceauto, ) # 检查是否有工具调用 if response.tool_calls: call response.tool_calls[0] return AgentAction( kindActionKind.TOOL_CALL, tool_namecall.function.name, tool_argsjson.loads(call.function.arguments), reasoningresponse.content or , ) # 无工具调用视为最终回答 return AgentAction( kindActionKind.FINAL_ANSWER, answerresponse.content or , reasoning, ) async def _execute_tool(self, action: AgentAction) - Any: 执行工具调用捕获异常 try: return await self.tools.execute(action.tool_name, action.tool_args or {}) except Exception as e: # 工具执行失败返回错误信息而非抛异常 return {error: str(e)} def _build_system_prompt(self) - str: return 你是一个任务执行 Agent。根据用户任务逐步推理并调用工具完成。 规则 1. 每次只调用一个工具 2. 根据工具返回结果决定下一步 3. 如果工具返回错误分析原因并尝试其他方案 4. 任务完成后给出最终答案 5. 不要编造工具返回结果3.3 多 Agent 协作层级编排dataclass class SubTask: 子任务定义 name: str description: str agent_role: str # 执行该任务的 Agent 角色 dependencies: list[str] field(default_factorylist) # 依赖的子任务 input_from: dict[str, str] field(default_factorydict) # 从上游任务获取输入 class MultiAgentOrchestrator: 多 Agent 编排器支持串行、并行和层级任务编排 def __init__(self, agents: dict[str, ReActAgent]): self.agents agents async def execute_pipeline( self, tasks: list[SubTask], initial_context: dict, ) - dict: 执行任务流水线自动处理依赖关系 completed: dict[str, Any] {} results: dict[str, Any] {} # 拓扑排序按依赖关系确定执行顺序 execution_order self._topological_sort(tasks) for task_name in execution_order: task next(t for t in tasks if t.name task_name) agent self.agents.get(task.agent_role) if not agent: raise ValueError(f未注册的 Agent 角色: {task.agent_role}) # 构建子任务输入从上游结果中提取 task_input self._build_task_input(task, initial_context, results) # 执行子任务 result await agent.run(task_input) results[task.name] result completed[task.name] True return results def _topological_sort(self, tasks: list[SubTask]) - list[str]: 拓扑排序确保被依赖的任务先执行 in_degree: dict[str, int] {t.name: 0 for t in tasks} graph: dict[str, list[str]] {t.name: [] for t in tasks} for task in tasks: for dep in task.dependencies: graph[dep].append(task.name) in_degree[task.name] 1 queue [name for name, deg in in_degree.items() if deg 0] order [] while queue: current queue.pop(0) order.append(current) for neighbor in graph[current]: in_degree[neighbor] - 1 if in_degree[neighbor] 0: queue.append(neighbor) if len(order) ! len(tasks): raise ValueError(任务依赖关系存在环无法排序) return order def _build_task_input( self, task: SubTask, initial_context: dict, completed_results: dict, ) - str: 构建子任务输入从上游结果中提取所需数据 parts [f任务: {task.description}] for param_name, source_path in task.input_from.items(): # source_path 格式: task_name.field_name source_task, field source_path.split(., 1) if source_task in completed_results: parts.append(f输入参数 {param_name}: {completed_results[source_task]}) if initial_context: parts.append(f初始上下文: {json.dumps(initial_context, ensure_asciiFalse)}) return \n.join(parts)四、Agent 编排的架构权衡与边界ReAct 循环的可靠性问题无限循环Agent 可能反复调用同一工具而不收敛。必须设置max_steps硬限制幻觉工具调用LLM 可能生成不存在的工具名或参数。需要在执行前校验上下文窗口溢出多轮工具调用后对话历史超出模型上下文窗口。需要摘要压缩多 Agent 协作的通信开销串行编排延迟叠加3 个子任务各需 10 秒总耗时 30 秒并行编排需要无依赖前提有依赖关系的子任务无法并行Agent 间信息传递有损子 Agent 的输出被压缩为文本传给下游结构化信息可能丢失工具调用的安全边界LLM 生成的参数可能包含注入攻击如 SQL 注入、命令注入工具执行必须有权限隔离只允许访问必要的资源敏感操作删除、支付需要人工确认环节禁用场景确定性任务规则明确、逻辑固定的任务用传统代码实现更可靠实时性要求高的任务LLM 推理延迟 1-5 秒不适合毫秒级响应安全敏感任务金融交易、医疗诊断等场景Agent 的不确定性不可接受五、总结AI Agent 编排的核心框架ReAct 循环推理-行动-观察解决单 Agent 的任务执行问题层级编排分解-委派-汇总解决多 Agent 的协作问题。工具注册表提供标准化的工具定义和执行接口AgentState 维护多步骤执行的状态和上下文。ReAct 循环必须设置最大步骤数防止无限循环工具调用需要参数校验和权限隔离。多 Agent 编排通过拓扑排序自动处理任务依赖但串行编排的延迟叠加和 Agent 间信息传递的有损性是架构瓶颈。Agent 编排适用于步骤不确定、需要工具调用的复杂任务确定性和实时性要求高的场景应使用传统代码实现。