企业级 AI Agent 架构:任务拆解、工具调用与多轮对话的工程实现 一、引言2024年以来以 GPT-4、Claude 为代表的大语言模型LLM能力持续跃升AI Agent 已从概念验证走向生产落地。然而将 LLM 的能力真正嵌入企业业务系统远不止调用一个 Chat Completion API 那么简单。一个真正可用的企业级 Agent需要在任务拆解、工具调用、多轮对话三个核心维度上具备健壮的工程实现。本文将深入探讨这三者的设计思路并结合完整的 Python 代码构建一个可直接参考的企业级 Agent 框架。二、整体架构设计企业级 AI Agent 一般采用分层架构核心分为四层┌──────────────────────────────────────────────┐│ 接入层 (API Gateway) ││ REST / WebSocket / gRPC │├──────────────────────────────────────────────┤│ 编排层 (Orchestrator) ││ 任务拆解 → 工具路由 → 对话管理 → 结果聚合 │├──────────────────────────────────────────────┤│ 能力层 (Capabilities) ││ LLM推理 │ 工具注册 │ 记忆管理 │ 安全审计 │├──────────────────────────────────────────────┤│ 基础设施层 (Infra) ││ 向量数据库 │ 消息队列 │ 配置中心 │ 监控告警 │└──────────────────────────────────────────────┘编排层是整个 Agent 的核心大脑我们下面围绕它展开。三、任务拆解从自然语言到 DAG3.1 设计思路企业场景中用户的一个指令往往隐含多步操作。例如“帮我分析上周的销售数据生成报告并发送给张经理”——这至少涉及数据查询、分析计算、报告生成、邮件发送四个子任务。任务拆解的核心是将模糊指令转化为有向无环图DAG其中节点是原子操作边代表依赖关系。3.2 核心数据结构fromfutureimport annotationsfrom dataclasses import dataclass, fieldfrom enum import Enumfrom typing import Any, Callable, Dict, List, Optionalimport asyncioimport jsonclass TaskStatus(Enum):PENDING “pending”RUNNING “running”COMPLETED “completed”FAILED “failed”SKIPPED “skipped”dataclassclass SubTask:“”“子任务定义”“”task_id: strname: strdescription: strtool_name: str # 关联的工具名称arguments: Dict[str, Any] field(default_factorydict)depends_on: List[str] field(default_factorylist) # 前置任务 ID 列表status: TaskStatus TaskStatus.PENDINGresult: Any Noneerror: Optional[str] Noneretry_count: int 0max_retries: int 2dataclassclass TaskPlan:“”“LLM 生成的任务计划”“”original_query: strsub_tasks: List[SubTask]reasoning: str “”3.3 基于 LLM 的任务规划器我们让 LLM 输出结构化的 JSON 计划而非自由文本TASK_PLANNER_PROMPT “”你是一个任务规划专家。请将用户的请求拆解为多个可执行的子任务。要求每个子任务对应一个具体的工具调用明确子任务之间的依赖关系以严格的 JSON 格式输出可用工具列表{tools_description}用户请求{user_query}请输出如下格式的 JSON不要包含其他内容{{“reasoning”: “拆解思路…”,“sub_tasks”: [{{“task_id”: “task_1”,“name”: “子任务名称”,“description”: “详细描述”,“tool_name”: “对应工具名”,“arguments”: {{}},“depends_on”: []}}]}}“”class TaskPlanner:“”“基于 LLM 的任务规划器”“”def __init__(self, llm_client, tool_registry: ToolRegistry): self.llm llm_client self.tool_registry tool_registry def _build_tools_description(self) - str: 构建工具描述文本供 LLM 理解可用能力 descriptions [] for tool in self.tool_registry.list_tools(): descriptions.append( f- {tool.name}: {tool.description}\n f 参数: {json.dumps(tool.parameters_schema, ensure_asciiFalse)} ) return \n.join(descriptions) async def plan(self, user_query: str) - TaskPlan: 调用 LLM 生成任务计划 prompt TASK_PLANNER_PROMPT.format( tools_descriptionself._build_tools_description(), user_queryuser_query, ) response await self.llm.chat(prompt) plan_dict self._parse_response(response) sub_tasks [ SubTask( task_idt[task_id], namet[name], descriptiont[description], tool_namet[tool_name], argumentst.get(arguments, {}), depends_ont.get(depends_on, []), ) for t in plan_dict[sub_tasks] ] return TaskPlan( original_queryuser_query, sub_taskssub_tasks, reasoningplan_dict.get(reasoning, ), ) def _parse_response(self, response: str) - dict: 从 LLM 响应中提取 JSON容错处理 # 尝试直接解析 try: return json.loads(response) except json.JSONDecodeError: pass # 尝试提取 json ... 中的内容 import re match re.search(r(?:json)?\s*([\s\S]*?), response) if match: return json.loads(match.group(1)) raise ValueError(f无法解析 LLM 返回的任务计划: {response[:200]})3.4 DAG 执行引擎有了任务计划需要一个按照依赖关系调度执行的引擎class DAGExecutor:“”“DAG 执行引擎 —— 按拓扑顺序并行执行无依赖的子任务”“”def __init__(self, tool_registry: ToolRegistry): self.tool_registry tool_registry async def execute(self, plan: TaskPlan) - Dict[str, Any]: 执行整个任务计划 task_map: Dict[str, SubTask] {t.task_id: t for t in plan.sub_tasks} completed: Dict[str, Any] {} # task_id → result in_flight: Dict[str, asyncio.Task] {} while len(completed) sum( 1 for t in plan.sub_tasks if t.status TaskStatus.FAILED ) len(plan.sub_tasks): # 找出所有依赖已满足且尚未执行的任务 ready_tasks [] for task in plan.sub_tasks: if task.task_id in completed: continue if task.status in (TaskStatus.RUNNING, TaskStatus.FAILED): continue if all(dep in completed for dep in task.depends_on): ready_tasks.append(task) if not ready_tasks and not in_flight: # 没有可执行的任务且没有进行中的任务 → 死锁或全部完成 break # 并行启动所有就绪任务 for task in ready_tasks: task.status TaskStatus.RUNNING coro self._execute_single_task(task, completed) in_flight[task.task_id] asyncio.create_task(coro) # 等待至少一个任务完成 done, _ await asyncio.wait( in_flight.values(), return_whenasyncio.FIRST_COMPLETED ) for finished in done: tid, result finished.result() completed[tid] result del in_flight[tid] return completed async def _execute_single_task( self, task: SubTask, context: Dict[str, Any] ) - tuple: 执行单个子任务支持重试和上下文注入 tool self.tool_registry.get(task.tool_name) if not tool: task.status TaskStatus.FAILED task.error f工具 {task.tool_name} 未注册 return task.task_id, None # 将前置任务的结果注入参数支持模板变量如 {{task_1.result}} resolved_args self._resolve_arguments(task.arguments, context) for attempt in range(task.max_retries 1): try: result await tool.execute(**resolved_args) task.status TaskStatus.COMPLETED task.result result return task.task_id, result except Exception as e: task.retry_count attempt 1 if attempt task.max_retries: task.status TaskStatus.FAILED task.error str(e) return task.task_id, None await asyncio.sleep(2 ** attempt) # 指数退避 return task.task_id, None staticmethod def _resolve_arguments( arguments: Dict[str, Any], context: Dict[str, Any] ) - Dict[str, Any]: 解析参数中的模板变量注入前置任务结果 resolved {} for key, value in arguments.items(): if isinstance(value, str) and value.startswith({{) and value.endswith(}}): ref value.strip({} ).strip() # 支持 {{task_1.result.field}} 语法 parts ref.split(.) current context for part in parts: if isinstance(current, dict): current current.get(part) else: current getattr(current, part, None) resolved[key] current else: resolved[key] value return resolved四、工具调用Function Calling 的工程化封装4.1 工具注册中心企业场景下工具数量多、类型杂内部 API、数据库查询、第三方服务等需要一个统一的注册与发现机制dataclassclass ToolDefinition:“”“工具定义”“”name: strdescription: strparameters_schema: Dict[str, Any] # JSON Schema 格式handler: Callable # 实际执行函数require_confirmation: bool False # 是否需要用户确认敏感操作timeout_seconds: int 30category: str “general” # 分类data / communication / systemclass ToolRegistry:“”“工具注册中心”“”def __init__(self): self._tools: Dict[str, ToolDefinition] {} def register(self, tool: ToolDefinition) - None: 注册一个工具 if tool.name in self._tools: raise ValueError(f工具 {tool.name} 已存在) self._tools[tool.name] tool def register_from_function( self, func: Callable, name: str None, description: str None, parameters_schema: Dict[str, Any] None, **kwargs, ) - None: 从函数自动注册工具装饰器模式 tool ToolDefinition( namename or func.__name__, descriptiondescription or func.__doc__ or , parameters_schemaparameters_schema or {}, handlerfunc, **kwargs, ) self.register(tool) def get(self, name: str) - Optional[ToolDefinition]: return self._tools.get(name) def list_tools(self) - List[ToolDefinition]: return list(self._tools.values()) def to_openai_tools(self) - List[Dict[str, Any]]: 转换为 OpenAI Function Calling 格式 tools [] for t in self._tools.values(): tools.append({ type: function, function: { name: t.name, description: t.description, parameters: t.parameters_schema, }, }) return tools4.2 工具调用执行器在实际调用 LLM 时需要处理 Function Calling 的完整循环 — 即 LLM 返回 tool_calls → 执行 → 把结果返回给 LLM → 继续推理import jsonfrom typing import AsyncGeneratorclass ToolExecutor:“”“工具调用执行器 —— 处理 ReAct 循环”“”MAX_TOOL_CALL_ROUNDS 10 # 防止无限循环 def __init__(self, llm_client, tool_registry: ToolRegistry): self.llm llm_client self.registry tool_registry async def run_with_tools( self, messages: List[Dict[str, Any]], ) - AsyncGenerator[Dict[str, Any], None]: 执行带工具调用的对话流程使用流式输出。 每步以事件形式 yield方便上层做进度展示。 round_count 0 while round_count self.MAX_TOOL_CALL_ROUNDS: round_count 1 # 调用 LLM response await self.llm.chat( messagesmessages, toolsself.registry.to_openai_tools(), tool_choiceauto, ) choice response[choices][0] assistant_msg choice[message] # 如果 LLM 决定调用工具 if assistant_msg.get(tool_calls): tool_calls assistant_msg[tool_calls] yield { type: tool_calls_start, tool_calls: [ {name: tc[function][name], arguments: tc[function][arguments]} for tc in tool_calls ], } # 将 assistant 消息加入历史 messages.append(assistant_msg) # 并行执行所有工具调用 tool_results await asyncio.gather( *[self._execute_tool_call(tc) for tc in tool_calls], return_exceptionsTrue, ) # 将工具结果作为 tool 消息追加到对话 for tc, result in zip(tool_calls, tool_results): if isinstance(result, Exception): content json.dumps({error: str(result)}, ensure_asciiFalse) else: content json.dumps(result, ensure_asciiFalse) messages.append({ role: tool, tool_call_id: tc[id], content: content, }) yield { type: tool_result, tool_name: tc[function][name], result: content, } else: # LLM 给出最终文本回复 content assistant_msg.get(content, ) messages.append(assistant_msg) yield {type: final_answer, content: content} return yield { type: error, content: f工具调用超过最大轮次限制 ({self.MAX_TOOL_CALL_ROUNDS}), } async def _execute_tool_call(self, tool_call: dict) - Any: 执行单个工具调用带超时控制 func_name tool_call[function][name] tool self.registry.get(func_name) if not tool: return {error: f未知工具: {func_name}} try: arguments json.loads(tool_call[function][arguments]) except json.JSONDecodeError: return {error: f参数解析失败: {tool_call[function][arguments]}} try: result await asyncio.wait_for( tool.handler(**arguments) if asyncio.iscoroutinefunction(tool.handler) else asyncio.to_thread(tool.handler, **arguments), timeouttool.timeout_seconds, ) return result except asyncio.TimeoutError: return {error: f工具 {func_name} 执行超时 ({tool.timeout_seconds}s)}五、多轮对话上下文管理与状态流转5.1 对话记忆管理多轮对话的核心挑战在于有限上下文窗口与长期记忆之间的平衡。解决方案是分层记忆管理from collections import dequefrom datetime import datetimedataclassclass ConversationTurn:“”“单轮对话记录”“”role: str # “user” | “assistant” | “tool”content: strtimestamp: float field(default_factorylambda: datetime.now().timestamp())metadata: Dict[str, Any] field(default_factorydict)class ConversationMemory:“”“分层对话记忆管理”“”def __init__( self, max_recent_turns: int 20, # 近期完整对话轮数 max_summary_turns: int 100, # 摘要覆盖的历史轮数 token_budget: int 8000, # Token 预算 ): self.recent_turns: deque[ConversationTurn] deque(maxlenmax_recent_turns) self.summary: str # 早期对话的压缩摘要 self.max_recent_turns max_recent_turns self.max_summary_turns max_summary_turns self.token_budget token_budget self._total_turns 0 def add_turn(self, turn: ConversationTurn) - None: 添加一轮对话超出窗口的自动转入摘要 self._total_turns 1 if len(self.recent_turns) self.max_recent_turns: # 将最早的对话移入摘要 evicted self.recent_turns.popleft() self.summary self._update_summary(self.summary, evicted) self.recent_turns.append(turn) def to_messages(self) - List[Dict[str, str]]: 构建发给 LLM 的消息列表 messages [] # 1. 系统消息包含摘要 system_content 你是一个智能助手。 if self.summary: system_content f\n\n[历史对话摘要]\n{self.summary} messages.append({role: system, content: system_content}) # 2. 近期完整对话 for turn in self.recent_turns: messages.append({role: turn.role, content: turn.content}) return messages def _update_summary(self, current_summary: str, turn: ConversationTurn) - str: 增量更新摘要简化版实际项目中应调用 LLM 做摘要 snippet f[{turn.role}]: {turn.content[:100]}... if current_summary: return current_summary \n snippet return snippet def estimate_tokens(self) - int: 估算当前上下文的 Token 数量 total len(self.summary) // 2 # 粗略估算中文约2字符/token for turn in self.recent_turns: total len(turn.content) // 2 return total5.2 对话状态机企业级 Agent 往往需要维护会话级别的状态例如正在等待用户确认、正在执行某个长流程等。使用有限状态机进行管理class SessionState(Enum):IDLE “idle” # 空闲等待用户输入PLANNING “planning” # 正在拆解任务EXECUTING “executing” # 正在执行AWAITING_CONFIRMATION “awaiting_confirmation” # 等待用户确认COMPLETED “completed” # 当前任务完成dataclassclass Session:“”“会话上下文”“”session_id: strstate: SessionState SessionState.IDLEmemory: ConversationMemory field(default_factoryConversationMemory)current_plan: Optional[TaskPlan] Nonepending_confirmation: Optional[str] None # 待确认的操作描述created_at: float field(default_factorylambda: datetime.now().timestamp())metadata: Dict[str, Any] field(default_factorydict)5.3 上下文压缩策略当 Token 预算紧张时需要主动压缩历史消息class ContextCompressor:“”“上下文压缩器 —— 在 Token 预算不足时自动压缩早期对话”“”COMPRESSION_PROMPT 请将以下对话历史压缩为一段简洁的摘要保留关键信息决策、数据、结论丢弃冗余描述。 def __init__(self, llm_client, target_ratio: float 0.5): self.llm llm_client self.target_ratio target_ratio # 压缩到原长度的比例 async def compress(self, memory: ConversationMemory) - None: 压缩记忆中的早期对话 if memory.estimate_tokens() memory.token_budget: return # 无需压缩 # 取最早的 50% 历史进行压缩 turns_list list(memory.recent_turns) split_point len(turns_list) // 2 old_turns turns_list[:split_point] new_turns turns_list[split_point:] # 构建待压缩文本 to_compress \n.join( f[{t.role}]: {t.content} for t in old_turns ) # 调用 LLM 压缩 response await self.llm.chat( f{self.COMPRESSION_PROMPT}\n\n对话历史\n{to_compress} ) compressed_summary response.strip() # 更新记忆摘要 近期对话 memory.summary ( f{memory.summary}\n\n[压缩历史]\n{compressed_summary} if memory.summary else compressed_summary ) memory.recent_turns deque(new_turns, maxlenmemory.max_recent_turns)六、完整集成EnterpriseAgent将上述三大模块组装为一个完整的企业级 Agentclass EnterpriseAgent:“”“企业级 AI Agent —— 集成任务拆解、工具调用与多轮对话”“”def __init__( self, llm_client, tool_registry: ToolRegistry, config: Optional[Dict[str, Any]] None, ): self.llm llm_client self.tool_registry tool_registry self.config config or {} # 初始化核心组件 self.planner TaskPlanner(llm_client, tool_registry) self.executor DAGExecutor(tool_registry) self.tool_executor ToolExecutor(llm_client, tool_registry) self.compressor ContextCompressor(llm_client) # 会话管理 self._sessions: Dict[str, Session] {} async def chat( self, session_id: str, user_message: str, ) - AsyncGenerator[Dict[str, Any], None]: 主入口处理用户消息流式返回结果 # 获取或创建会话 session self._get_or_create_session(session_id) # 添加用户消息到记忆 session.memory.add_turn(ConversationTurn( roleuser, contentuser_message )) # Token 预算检查与压缩 await self.compressor.compress(session.memory) yield {type: status, content: 正在分析您的请求...} # 步骤 1判断是否需要任务拆解 if await self._requires_planning(user_message): session.state SessionState.PLANNING yield {type: status, content: 正在拆解任务...} plan await self.planner.plan(user_message) session.current_plan plan yield { type: plan, reasoning: plan.reasoning, sub_tasks: [ {id: t.task_id, name: t.name, description: t.description} for t in plan.sub_tasks ], } # 步骤 2执行任务 DAG session.state SessionState.EXECUTING yield {type: status, content: 正在执行任务...} results await self.executor.execute(plan) yield {type: task_results, results: results} # 将执行结果注入对话让 LLM 生成总结 session.memory.add_turn(ConversationTurn( roleassistant, contentf[系统] 任务执行完成结果: {json.dumps(results, ensure_asciiFalse, defaultstr)}, metadata{type: task_execution}, )) # 步骤 3标准工具调用对话 session.state SessionState.EXECUTING messages session.memory.to_messages() async for event in self.tool_executor.run_with_tools(messages): yield event # 将助手回复存入记忆 if event[type] final_answer: session.memory.add_turn(ConversationTurn( roleassistant, contentevent[content] )) session.state SessionState.IDLE async def _requires_planning(self, user_message: str) - bool: 判断用户请求是否复杂到需要任务拆解 PLAN_CHECK_PROMPT f判断以下用户请求是否需要拆解为多个子任务返回 true 或 false用户请求{user_message}判断标准涉及多个独立步骤或操作 → true需要调用多个不同工具 → true简单问答或单步操作 → false只返回 “true” 或 “false”。“”response await self.llm.chat(PLAN_CHECK_PROMPT) return true in response.strip().lower() def _get_or_create_session(self, session_id: str) - Session: 获取或创建会话 if session_id not in self._sessions: self._sessions[session_id] Session(session_idsession_id) return self._sessions[session_id] def close_session(self, session_id: str) - None: 关闭会话释放资源 self._sessions.pop(session_id, None)七、使用示例async def main():# 初始化 LLM 客户端此处以 OpenAI 兼容接口为例from openai import AsyncOpenAIllm_client AsyncOpenAI(api_keyyour-api-key, base_urlhttps://api.openai.com/v1) # 初始化工具注册中心并注册业务工具 registry ToolRegistry() # 注册数据查询工具 registry.register(ToolDefinition( namequery_sales_data, description查询销售数据支持按时间范围、地区筛选, parameters_schema{ type: object, properties: { start_date: {type: string, description: 开始日期格式 YYYY-MM-DD}, end_date: {type: string, description: 结束日期格式 YYYY-MM-DD}, region: {type: string, description: 地区可选}, }, required: [start_date, end_date], }, handlerquery_sales_data_impl, categorydata, )) # 注册邮件发送工具 registry.register(ToolDefinition( namesend_email, description发送邮件给指定收件人, parameters_schema{ type: object, properties: { to: {type: string, description: 收件人邮箱}, subject: {type: string, description: 邮件主题}, body: {type: string, description: 邮件正文}, }, required: [to, subject, body], }, handlersend_email_impl, require_confirmationTrue, # 敏感操作需用户确认 categorycommunication, )) # 创建 Agent agent EnterpriseAgent( llm_clientllm_client, tool_registryregistry, config{max_tool_rounds: 8}, ) # 用户对话 session_id user-001-session-20250101 user_query 帮我分析上周北京的销售数据生成总结报告并发送给 zhangcompany.com async for event in agent.chat(session_id, user_query): print(f[{event[type]}] {event.get(content, )}) # 清理 agent.close_session(session_id)async def query_sales_data_impl(start_date: str, end_date: str, region: str None):“”“实际的数据查询实现”“”# 这里连接真实数据源数据库 / APIreturn {“total_revenue”: 1285000,“orders”: 342,“top_product”: “智能传感器 X3”,“period”: f{start_date} ~ {end_date},}async def send_email_impl(to: str, subject: str, body: str):“”“实际的邮件发送实现”“”# 调用 SMTP 或邮件服务 APIreturn {“status”: “sent”, “to”: to, “subject”: subject}ifname “main”:asyncio.run(main())八、生产环境考量8.1 安全审计企业级 Agent 的每一次工具调用都应记录审计日志import loggingfrom datetime import datetime, timezoneclass AuditLogger:“”“审计日志记录器”“”def __init__(self, log_file: str agent_audit.log): self.logger logging.getLogger(agent_audit) handler logging.FileHandler(log_file) handler.setFormatter(logging.Formatter( %(asctime)s | %(levelname)s | %(message)s )) self.logger.addHandler(handler) self.logger.setLevel(logging.INFO) def log_tool_call(self, session_id: str, tool_name: str, arguments: dict, result: Any, duration_ms: float): 记录工具调用 self.logger.info( fsession{session_id} | tool{tool_name} | fargs{json.dumps(arguments, ensure_asciiFalse)} | fresult{str(result)[:200]} | duration{duration_ms:.0f}ms ) def log_llm_call(self, session_id: str, prompt_tokens: int, completion_tokens: int, duration_ms: float): 记录 LLM 调用 self.logger.info( fsession{session_id} | llm_call | fprompt_tokens{prompt_tokens} | completion_tokens{completion_tokens} | fduration{duration_ms:.0f}ms )8.2 速率限制与熔断class RateLimiter:“”“简单的滑动窗口速率限制器”“”def __init__(self, max_calls: int, window_seconds: float): self.max_calls max_calls self.window window_seconds self._calls: deque[float] deque() async def acquire(self) - bool: 尝试获取调用许可 now datetime.now().timestamp() # 清理过期记录 while self._calls and self._calls[0] now - self.window: self._calls.popleft() if len(self._calls) self.max_calls: return False self._calls.append(now) return True8.3 可观测性建议为 Agent 接入 OpenTelemetry对每一次 LLM 调用和工具调用进行 Tracing并通过 Prometheus Grafana 监控关键指标各环节耗时分布、工具调用成功率、Token 消耗趋势、DAG 执行时长等。九、总结构建企业级 AI Agent 绝非简单的 Prompt Engineering。本文从工程角度出发围绕任务拆解DAG 规划与并行执行、工具调用Function Calling 的 ReAct 循环、多轮对话分层记忆与状态机三个核心环节。真正优秀的企业 Agent是对这三者进行精细化工程治理后的产物。希望本文的架构设计与代码实现能为在 Agent 落地之路上提供有价值的参考。下一步可以在此基础上扩展引入 Human-in-the-Loop 审批流、向量化长期记忆、多 Agent 协作编排等高级能力。