首页 > 基础资料 博客日记

【OpenClaw】通过 Nanobot 源码学习架构---(3)AgentLoop

2026-04-03 21:00:02基础资料围观1

文章【OpenClaw】通过 Nanobot 源码学习架构---(3)AgentLoop分享给大家,欢迎收藏极客资料网,专注分享技术知识

【OpenClaw】通过Nanobot源码学习架构---(3)AgentLoop

0x00 概要

OpenClaw 应该有40万行代码,阅读理解起来难度过大,因此,本系列通过Nanobot来学习 OpenClaw 的特色。

Nanobot是由香港大学数据科学实验室(HKUDS)开源的超轻量级个人 AI 助手框架,定位为"Ultra-Lightweight OpenClaw"。非常适合学习Agent架构。

Agent 是“业务执行者”,解决“消息怎样变成模型调用、工具执行和最终回复”。它们具备独立的上下文(与主对话隔离)和可使用的特定工具,并且具备定义明确的角色和方法论。每次 Agent 收到消息时运行的核心推理周期如下:

  • 从总线接收消息
  • 组装上下文
  • 推理该做什么(这是 LLM 调用)
  • 根据决定行动(调用工具、执行命令)
  • 观察结果,保存状态
  • 判断:我完成了吗?还是再循环一次?
  • 完成后回复

注:本系列借鉴的文章过多,可能在参考文献中有遗漏的文章,如果有,还请大家指出。

0x01 原理

1.1 Agent:负责“执行”

一个 Agent = 一个完整的 AI “大脑实例”,每个 Agent 都拥有独立资源。Agent 是“执行平面”,解决“消息怎样变成模型调用、工具执行和最终回复”。具体如下图(来自MiniClaw)。

Feishu Cloud
    |
    |  HTTP POST /feishu/events
    |  (im.message.receive_v1)
    v
[ESP32 Webhook Server :18790]
    |
    |  message_bus_push_inbound()
    v
[Message Bus] ──> [Agent Loop] ──> [Message Bus]
                   (Claude/GPT)         |
                                        |  outbound dispatch
                                        v
                              [feishu_send_message()]
                                        |
                                        |  POST /im/v1/messages
                                        v
                                   Feishu API

下图是Agent 的最小循环。每个 AI Agent 都需要这个循环。模型决定何时调用工具、何时停止。代码只是执行模型的要求。

An agentic loop is the full “real” run of an agent: intake → context assembly → model inference → tool execution → streaming replies → persistence. 
    
                   THE AGENT PATTERN
                    =================

    User --> messages[] --> LLM --> response
                                      |
                            stop_reason == "tool_use"?
                           /                          \
                         yes                           no
                          |                             |
                    execute tools                    return text
                    append results
                    loop back -----------------> messages[]

1.2 Pi-Agent框架

OpenClaw所使用的引擎是Pi-Agent框架,它是一个仅有四个工具、系统提示词不到1000个token,秉持“精简至上”原则的AI编程Agent。与其他编程Agent相比,Pi的工程设计和决策机制极为简洁,形成了鲜明对比。

下图是 OpenClaw 的循环概要。

runEmbeddedPiAgent()
  └── while (true) {  // 主重试循环
        ├── 检查重试次数限制 (MAX_RUN_LOOP_ITERATIONS)
        ├── 调用 runEmbeddedAttempt()  // 单次推理尝试
        ├── 处理 context overflow → 自动压缩
        ├── 处理 auth failure → profile轮换
        ├── 处理 timeout → 重试或报错
        └── 成功则返回 payloads
      }

Pi的设计理念可以总结为:不是为LLM打造一个复杂的“控制台”,而是给它一把“多功能小刀”——工具虽少但实用,提示虽简但明确,让模型的原生能力成为主导,而不是被框架的复杂性所掩盖。Pi 这种设计理念是基于一个关键事实——经过强化学习训练的前沿LLM模型,已经具备了很强的理解和执行能力。它们能明确知道“编码Agent”的主要任务是什么,根本不需要长篇大论的系统提示词和复杂的辅助模块来“指导”它们工作。

从数据层面分析:Pi的系统提示词加上工具定义,总长度还不到1000个token,仅仅是Claude Code的十分之一;内置工具也只有4个,远少于同类产品。这说明,Pi在主流Agent都在强化的方面,几乎都做了简化:

  • 系统提示词简短明了
  • 内置工具数量精简
  • 没有复杂的规划模式和多代理通信协议(Plan Mode和MCP)支持
  • 更没有难以监控的子Agent

Pi的核心策略是:去除冗余辅助模块,让LLM模型发挥核心作用,用最简洁的结构实现最核心的功能。

或许有人会问:如此简单的设计,真的能应对复杂的编码任务吗?实际上,Pi的简洁并非“简陋”,而是“精准”。接下来,我们详细解析这4个内置工具的设计思路——readwriteeditbash

工具 主要功能
read 读取文件、审查代码、获取上下文信息
write 创建文件、写入内容
edit 修改代码、进行增量更新
bash 执行命令、操作环境、通过自我调用来拆分任务

这四个工具几乎涵盖了编码Agent的所有核心需求。特别是bash工具的引入,既实现了复杂任务的拆分和执行,保证了功能的完整性,又避免了引入子Agent可能带来的不可预测性和监控难题——这就是Pi敢于放弃子Agent架构的原因。

同时,Pi使用简短的系统提示词,并非降低了对LLM的引导标准,而是充分信任前沿LLM的能力。正如Mario Zechner所倡导的:与其用大量token去“教导”LLM如何成为Agent,不如用简洁的提示词明确其核心任务,让LLM充分发挥自身的理解和执行能力。

这种设计思路带来了三大好处:

  1. 节省上下文空间——降低推理成本,提高运行效率
  2. 行为更加灵活自主——LLM能根据实际情况动态调整策略,不受冗长规则限制
  3. 更好的适应性——简洁的结构意味着更低的认知负担和更强的泛化能力

0x02 AgentLoop

AgentLoop 是nanobot Agent运行的核心。智能体循环是区分聊天机器人和智能体的关键。

2.1 架构

AgentLoop 类的架构如下:

AgentLoop-1

2.2 流程

下面是一个 AI Agent(智能体)的消息处理流程图,展示了从消息接收到响应发送的完整链路,包括 LLM 交互、工具调用循环等核心机制。

入口:消息到达(InboundMessage)
    ↓
AgentLoop.run() - 监听并接收消息
    ↓
AgentLoop._dispatch() - 分派处理
    ↓
AgentLoop._process_message() - 主要处理逻辑
    ↓
ContextBuilder.build_messages() - 构建上下文
    ↓
AgentLoop._run_agent_loop() - 核心代理循环
    ↓
Provider.chat() - LLM交互
    ↓
← 判断是否有工具调用
    ↓ 否
← 返回最终内容
    ↓ 是
← 执行工具调用
    ↓
ContextBuilder.add_tool_result() - 添加工具结果
    ↓
← 继续循环直到没有更多工具调用
    ↓
AgentLoop._save_turn() - 保存交互记录
    ↓
通过MessageBus发布OutboundMessage - 发送响应

部分环节详细拆解如下

AgentLoop-2

2.3 定义和初始化

AgentLoop 的定义和初始化代码如下

class AgentLoop:
    """
    The agent loop is the core processing engine.

    It:
    1. Receives messages from the bus
    2. Builds context with history, memory, skills
    3. Calls the LLM
    4. Executes tool calls
    5. Sends responses back
    """

    def __init__(
        self,
        bus: MessageBus,                # 消息总线,用于接收/发送消息
        provider: LLMProvider,          # LLM提供者(如OpenAI/本地模型)
        workspace: Path,                # Agent工作目录,用于隔离文件操作
        model: str | None = None,       # 使用的LLM模型名称
        max_iterations: int = 40,       # Agent最大迭代次数(防止无限循环)
        temperature: float = 0.1,       # LLM温度参数(越低越确定)
        max_tokens: int = 4096,         # LLM最大生成Token数
        memory_window: int = 100,       # 记忆窗口大小(会话历史最大条数)
        brave_api_key: str | None = None,  # Brave搜索API密钥(用于网页搜索工具)
        exec_config: ExecToolConfig | None = None,  # 命令执行工具配置
        cron_service: CronService | None = None,    # 定时任务服务(可选)
        restrict_to_workspace: bool = False,        # 是否限制Agent仅操作工作区
        session_manager: SessionManager | None = None,  # 会话管理器(可选)
        mcp_servers: dict | None = None,              # MCP服务器配置(可选)
        channels_config: ChannelsConfig | None = None,  # 通道配置(可选)
    ):
        # 解决循环导入问题:仅运行时导入ExecToolConfig
        from nanobot.config.schema import ExecToolConfig
        
        # 基础属性初始化
        self.bus = bus                          # 消息总线实例
        self.channels_config = channels_config  # 通道配置
        self.provider = provider                # LLM提供者实例
        self.workspace = workspace              # 工作目录路径
        # 模型名称:优先传入值,否则使用LLM提供者默认模型
        self.model = model or provider.get_default_model()
        self.max_iterations = max_iterations    # 最大迭代次数
        self.temperature = temperature          # LLM温度
        self.max_tokens = max_tokens            # LLM最大Token数
        self.memory_window = memory_window      # 记忆窗口大小
        self.brave_api_key = brave_api_key      # Brave API密钥
        # 执行工具配置:默认空配置
        self.exec_config = exec_config or ExecToolConfig()
        self.cron_service = cron_service        # 定时任务服务
        self.restrict_to_workspace = restrict_to_workspace  # 工作区限制开关

        # 核心组件初始化
        self.context = ContextBuilder(workspace)  # 上下文构建器:构建LLM输入上下文
        # 会话管理器:优先传入实例,否则创建新实例
        self.sessions = session_manager or SessionManager(workspace)
        self.tools = ToolRegistry()  # 工具注册表:管理所有可用工具
        
        # 子Agent管理器:用于生成子Agent处理子任务
        self.subagents = SubagentManager(
            provider=provider,
            workspace=workspace,
            bus=bus,
            model=self.model,
            temperature=self.temperature,
            max_tokens=self.max_tokens,
            brave_api_key=brave_api_key,
            exec_config=self.exec_config,
            restrict_to_workspace=restrict_to_workspace,
        )

        # 运行状态与资源管理属性
        self._running = False                  # Agent循环是否运行
        self._mcp_servers = mcp_servers or {}   # MCP服务器配置
        self._mcp_stack: AsyncExitStack | None = None  # MCP连接上下文栈
        self._mcp_connected = False             # MCP是否已连接
        self._mcp_connecting = False            # MCP是否正在连接
        self._consolidating: set[str] = set()   # 正在进行记忆合并的会话Key集合
        self._consolidation_tasks: set[asyncio.Task] = set()  # 记忆合并任务集合
        self._consolidation_locks: dict[str, asyncio.Lock] = {}  # 会话记忆合并锁
        self._active_tasks: dict[str, list[asyncio.Task]] = {}  # 活跃任务:session_key -> 任务列表
        self._processing_lock = asyncio.Lock()  # 全局消息处理锁(防止并发冲突)
        self._register_default_tools()          # 注册默认工具

    def _register_default_tools(self) -> None:
        """Register the default set of tools. 注册默认工具集"""
        # 确定文件工具的允许目录:如果限制工作区则为工作目录,否则为None(无限制)
        allowed_dir = self.workspace if self.restrict_to_workspace else None
        # 注册文件系统工具:读/写/编辑/列目录
        for cls in (ReadFileTool, WriteFileTool, EditFileTool, ListDirTool):
            self.tools.register(cls(workspace=self.workspace, allowed_dir=allowed_dir))
        
        # 注册命令执行工具
        self.tools.register(ExecTool(
            working_dir=str(self.workspace),       # 工作目录
            timeout=self.exec_config.timeout,      # 执行超时时间
            restrict_to_workspace=self.restrict_to_workspace,  # 工作区限制
            path_append=self.exec_config.path_append,          # 环境变量PATH追加
        ))
        
        # 注册网页相关工具:搜索/爬取
        self.tools.register(WebSearchTool(api_key=self.brave_api_key))
        self.tools.register(WebFetchTool())
        
        # 注册消息发送工具:回调函数为消息总线发布出站消息
        self.tools.register(MessageTool(send_callback=self.bus.publish_outbound))
        
        # 注册子Agent生成工具
        self.tools.register(SpawnTool(manager=self.subagents))
        
        # 如果有定时任务服务,注册定时任务工具
        if self.cron_service:
            self.tools.register(CronTool(self.cron_service))

    async def _connect_mcp(self) -> None:
        """Connect to configured MCP servers (one-time, lazy). 连接MCP服务器(懒加载,仅一次)"""
        # 跳过条件:已连接/正在连接/无MCP配置
        if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
            return
        
        self._mcp_connecting = True  # 标记为正在连接
        from nanobot.agent.tools.mcp import connect_mcp_servers  # 延迟导入MCP连接函数
        try:
            # 创建异步上下文栈,用于管理MCP连接资源
            self._mcp_stack = AsyncExitStack()
            await self._mcp_stack.__aenter__()  # 进入上下文栈
            # 连接MCP服务器,将工具注册到MCP
            await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
            self._mcp_connected = True  # 标记为已连接
        except Exception as e:
            # 连接失败:记录日志,下次消息处理时重试
            logger.error("Failed to connect MCP servers (will retry next message): {}", e)
            if self._mcp_stack:
                try:
                    await self._mcp_stack.aclose()  # 关闭上下文栈
                except Exception:
                    pass
                self._mcp_stack = None
        finally:
            self._mcp_connecting = False  # 清除正在连接标记

    def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
        """Update context for all tools that need routing info. 更新需要路由信息的工具上下文"""
        # 消息工具:设置通道/聊天ID/消息ID(用于消息发送路由)
        if message_tool := self.tools.get("message"):
            if isinstance(message_tool, MessageTool):
                message_tool.set_context(channel, chat_id, message_id)

        # 子Agent生成工具:设置通道/聊天ID
        if spawn_tool := self.tools.get("spawn"):
            if isinstance(spawn_tool, SpawnTool):
                spawn_tool.set_context(channel, chat_id)

        # 定时任务工具:设置通道/聊天ID
        if cron_tool := self.tools.get("cron"):
            if isinstance(cron_tool, CronTool):
                cron_tool.set_context(channel, chat_id)

2.4 run

run 是代理的主循环入口。

  1. 核心作用:run 负责持续消费消息总线的入站消息,并异步分发处理,同时保证 /stop 指令的实时响应。
  2. 关键逻辑:
    • 1 秒超时消费消息:避免主线程阻塞,确保 /stop 能及时被处理;
    • 异步任务分发:非 /stop 消息通过 _dispatch 异步处理,不阻塞主循环;
    • 任务追踪:通过 _active_tasks 记录各会话的活跃任务,配合回调自动清理,支持 /stop 批量终止。
  3. 异常处理:超时无消息时直接跳过,不中断主循环,保证代理持续运行。
    async def run(self) -> None:
        """Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
        # 将代理运行状态标记为True,表示开始运行
        self._running = True
        # 异步连接MCP服务器(懒加载,仅首次执行,失败会在后续重试)
        await self._connect_mcp()
        # 记录日志:代理循环已启动
        logger.info("Agent loop started")

        # 核心循环:只要代理处于运行状态,就持续消费并处理消息
        while self._running:
            try:
                # 从消息总线消费入站消息,设置1秒超时(避免无限阻塞,保证/stop指令响应性)
                # asyncio.wait_for:超时会抛出TimeoutError,触发continue继续循环
                msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
            except asyncio.TimeoutError:
                # 超时无消息时,跳过本次循环,继续等待下一轮
                continue

            # 判断消息内容是否为/stop指令(忽略首尾空格、大小写)
            if msg.content.strip().lower() == "/stop":
                # 处理/stop指令:终止当前会话的所有活跃任务和子代理
                await self._handle_stop(msg)
            else:
                # 非/stop指令:创建异步任务处理消息(保证主线程不阻塞,响应后续/stop)
                task = asyncio.create_task(self._dispatch(msg))
                # 将任务添加到_active_tasks映射中(session_key为键,便于后续批量终止)
                # setdefault:如果session_key不存在则创建空列表,再追加任务
                self._active_tasks.setdefault(msg.session_key, []).append(task)
                # 为任务添加完成回调:任务结束后从_active_tasks中移除(避免内存泄漏)
                # 匿名函数参数k绑定当前msg.session_key,t为完成的任务对象
                # 逻辑:如果任务仍在对应session的任务列表中,则移除;否则无操作
                task.add_done_callback(lambda t, k=msg.session_key: self._active_tasks.get(k, []) and self._active_tasks[k].remove(t) if t in self._active_tasks.get(k, []) else None)

2.5 _dispatch

_dispatch 是消息分发的核心方法。

  • 核心作用:_dispatch 在全局锁保护下执行消息处理,保证串行化,同时统一处理异常和响应发布。
  • 关键逻辑:
    • 全局锁 _processing_lock:避免多任务并发处理消息导致的资源冲突;
    • 响应发布规则:有响应则发布响应、CLI 渠道无响应则发布空消息、异常则发布错误提示;
    • 异常处理:区分任务取消异常(重新抛出)和通用异常(记录 + 返回错误提示),保证异常链路清晰。
  • 边界处理:针对 CLI 渠道做特殊适配,发布空消息避免命令行交互阻塞。
    async def _dispatch(self, msg: InboundMessage) -> None:
        """Process a message under the global lock."""
        # 获取全局处理锁(异步上下文管理器),确保消息串行处理,避免资源竞争
        async with self._processing_lock:
            try:
                # 调用核心消息处理方法,传入入站消息,获取出站响应(可能为None)
                response = await self._process_message(msg)
                # 如果处理后有非空的出站响应
                if response is not None:
                    # 将响应发布到消息总线的出站队列
                    await self.bus.publish_outbound(response)
                # 如果无响应且消息渠道是CLI(命令行界面)
                elif msg.channel == "cli":
                    # 向CLI渠道发布空内容的出站消息(保证CLI交互的完整性,避免阻塞)
                    await self.bus.publish_outbound(OutboundMessage(
                        channel=msg.channel, chat_id=msg.chat_id,
                        content="", metadata=msg.metadata or {},
                    ))
            # 捕获任务取消异常(如/stop指令触发的任务终止)
            except asyncio.CancelledError:
                # 记录日志:会话对应的任务已被取消
                logger.info("Task cancelled for session {}", msg.session_key)
                # 重新抛出取消异常,让上层逻辑处理(如清理任务列表)
                raise
            # 捕获所有其他未预期的异常
            except Exception:
                # 记录异常日志(包含堆栈信息),便于问题排查
                logger.exception("Error processing message for session {}", msg.session_key)
                # 向消息来源渠道发布统一的错误提示消息
                await self.bus.publish_outbound(OutboundMessage(
                    channel=msg.channel, chat_id=msg.chat_id,
                    content="Sorry, I encountered an error.",
                ))

2.6 _process_message()

_process_message 是单条消息处理的核心入口。

核心作用_process_message 支持系统消息、斜杠命令、普通对话三种场景,完成「上下文构建→代理循环→结果保存→响应返回」全流程。

关键逻辑

  • 系统消息处理:解析渠道信息,独立构建会话和上下文,适用于后台任务;
  • 斜杠命令:/new 合并记忆并清空会话,/help 返回命令列表;
  • 记忆合并:未合并消息达阈值时异步执行,避免阻塞主流程;
  • 进度回调:实时推送处理进度(含工具调用提示),提升交互体验;
  • 重复回复防护:消息工具已发送过消息则返回 None,避免重复响应。

边界处理

  • 兜底默认回复:无最终内容时返回标准化提示;
  • 媒体消息支持:构建上下文时兼容图片等媒体内容;
  • 会话锁机制:通过合并锁避免并发修改会话记忆。
    async def _process_message(
        self,
        msg: InboundMessage,
        session_key: str | None = None,
        on_progress: Callable[[str], Awaitable[None]] | None = None,
    ) -> OutboundMessage | None:
        """Process a single inbound message and return the response."""
        # 处理系统消息:从chat_id中解析原始渠道和聊天ID(格式为"channel:chat_id")
        if msg.channel == "system":
            # 拆分chat_id:有分隔符则拆分为渠道+聊天ID,否则默认CLI渠道
            channel, chat_id = (msg.chat_id.split(":", 1) if ":" in msg.chat_id
                                else ("cli", msg.chat_id))
            # 记录日志:正在处理来自指定发送者的系统消息
            logger.info("Processing system message from {}", msg.sender_id)
            # 构建会话唯一标识(渠道+聊天ID)
            key = f"{channel}:{chat_id}"
            # 获取或创建该会话(不存在则新建)
            session = self.sessions.get_or_create(key)
            # 为工具设置上下文(渠道、聊天ID、消息ID,用于消息路由)
            self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
            # 从会话中获取历史消息(最多保留memory_window条,控制上下文长度)
            history = session.get_history(max_messages=self.memory_window)
            # 构建LLM所需的完整上下文消息(历史+当前消息+渠道信息)
            messages = self.context.build_messages(
                history=history,
                current_message=msg.content, channel=channel, chat_id=chat_id,
            )
            # 运行代理核心循环,获取最终回复内容、使用的工具列表、所有消息
            final_content, _, all_msgs = await self._run_agent_loop(messages)
            # 保存本轮对话到会话(跳过已存在的历史消息,仅保存新内容)
            self._save_turn(session, all_msgs, 1 + len(history))
            # 将更新后的会话持久化到本地
            self.sessions.save(session)
            # 返回系统消息处理结果:无内容则默认"Background task completed."
            return OutboundMessage(channel=channel, chat_id=chat_id,
                                  content=final_content or "Background task completed.")

        # 非系统消息:截取消息内容预览(超过80字符则截断加省略号)
        preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
        # 记录日志:正在处理来自指定渠道/发送者的消息(展示预览)
        logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)

        # 确定会话key:优先使用传入的session_key,否则使用消息自带的session_key
        key = session_key or msg.session_key
        # 获取或创建该会话
        session = self.sessions.get_or_create(key)

        # 处理斜杠命令(Slash commands)
        # 标准化命令:去除首尾空格并转为小写
        cmd = msg.content.strip().lower()
        # 处理"/new"命令:新建会话(合并当前记忆并清空)
        if cmd == "/new":
            # 获取该会话的记忆合并锁(避免并发合并)
            lock = self._get_consolidation_lock(session.key)
            # 将会话标记为"正在合并记忆"
            self._consolidating.add(session.key)
            try:
                # 加锁执行记忆合并(异步锁,防止并发操作)
                async with lock:
                    # 截取会话中未合并的消息(从上次合并位置到末尾)
                    snapshot = session.messages[session.last_consolidated:]
                    # 如果有未合并的消息
                    if snapshot:
                        # 创建临时会话对象,仅包含未合并的消息
                        temp = Session(key=session.key)
                        temp.messages = list(snapshot)
                        # 执行记忆合并(归档所有消息),失败则返回错误提示
                        if not await self._consolidate_memory(temp, archive_all=True):
                            return OutboundMessage(
                                channel=msg.channel, chat_id=msg.chat_id,
                                content="Memory archival failed, session not cleared. Please try again.",
                            )
            # 捕获合并过程中的所有异常
            except Exception:
                # 记录异常日志(含堆栈),便于排查
                logger.exception("/new archival failed for {}", session.key)
                # 返回合并失败的错误提示
                return OutboundMessage(
                    channel=msg.channel, chat_id=msg.chat_id,
                    content="Memory archival failed, session not cleared. Please try again.",
                )
            # 无论成功/失败,最终执行:
            finally:
                # 取消会话的"正在合并"标记
                self._consolidating.discard(session.key)
                # 清理该会话的合并锁(未锁定则移除)
                self._prune_consolidation_lock(session.key, lock)

            # 清空当前会话的所有消息
            session.clear()
            # 保存清空后的会话
            self.sessions.save(session)
            # 使会话缓存失效(确保下次获取最新状态)
            self.sessions.invalidate(session.key)
            # 返回新建会话成功的提示
            return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
                                  content="New session started.")
        # 处理"/help"命令:返回可用命令列表
        if cmd == "/help":
            return OutboundMessage(channel=msg.channel, chat_id=msg.chat_id,
                                  content="🐈 nanobot commands:\n/new — Start a new conversation\n/stop — Stop the current task\n/help — Show available commands")

        # 计算会话中未合并的消息数量(总消息数 - 上次合并位置)
        unconsolidated = len(session.messages) - session.last_consolidated
        # 如果未合并消息数≥内存窗口,且会话未在合并中:异步执行记忆合并
        if (unconsolidated >= self.memory_window and session.key not in self._consolidating):
            # 标记会话为"正在合并"
            self._consolidating.add(session.key)
            # 获取该会话的合并锁
            lock = self._get_consolidation_lock(session.key)

            # 定义异步函数:合并记忆并解锁
            async def _consolidate_and_unlock():
                try:
                    # 加锁执行记忆合并
                    async with lock:
                        await self._consolidate_memory(session)
                finally:
                    # 取消"正在合并"标记
                    self._consolidating.discard(session.key)
                    # 清理合并锁
                    self._prune_consolidation_lock(session.key, lock)
                    # 获取当前任务对象
                    _task = asyncio.current_task()
                    # 从合并任务集合中移除当前任务(避免内存泄漏)
                    if _task is not None:
                        self._consolidation_tasks.discard(_task)

            # 创建异步任务执行合并操作
            _task = asyncio.create_task(_consolidate_and_unlock())
            # 将任务加入合并任务集合(强引用,防止被GC回收)
            self._consolidation_tasks.add(_task)

        # 为工具设置上下文(渠道、聊天ID、消息ID)
        self._set_tool_context(msg.channel, msg.chat_id, msg.metadata.get("message_id"))
        # 获取消息工具实例(如果存在)
        if message_tool := self.tools.get("message"):
            # 验证工具类型并标记本轮对话开始
            if isinstance(message_tool, MessageTool):
                message_tool.start_turn()

        # 从会话中获取历史消息(最多memory_window条)
        history = session.get_history(max_messages=self.memory_window)
        # 构建LLM初始上下文消息(历史+当前消息+媒体+渠道信息)
        initial_messages = self.context.build_messages(
            history=history,
            current_message=msg.content,
            media=msg.media if msg.media else None,  # 处理带媒体的消息(如图片)
            channel=msg.channel, chat_id=msg.chat_id,
        )

        # 定义进度回调函数:向消息总线发布处理进度(支持工具调用提示标记)
        async def _bus_progress(content: str, *, tool_hint: bool = False) -> None:
            # 复制消息元数据(避免修改原数据)
            meta = dict(msg.metadata or {})
            # 标记为进度消息
            meta["_progress"] = True
            # 标记是否为工具调用提示
            meta["_tool_hint"] = tool_hint
            # 发布进度消息到消息总线
            await self.bus.publish_outbound(OutboundMessage(
                channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta,
            ))

        # 运行代理核心循环:传入初始上下文和进度回调(优先使用传入的on_progress)
        final_content, _, all_msgs = await self._run_agent_loop(
            initial_messages, on_progress=on_progress or _bus_progress,
        )

        # 兜底:如果最终内容为空,设置默认提示语
        if final_content is None:
            final_content = "I've completed processing but have no response to give."

        # 截取回复内容预览(超过120字符则截断加省略号)
        preview = final_content[:120] + "..." if len(final_content) > 120 else final_content
        # 记录日志:返回给指定渠道/发送者的回复(展示预览)
        logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)

        # 保存本轮对话到会话(跳过已存在的历史消息)
        self._save_turn(session, all_msgs, 1 + len(history))
        # 持久化更新后的会话
        self.sessions.save(session)

        # 检查消息工具:如果本轮对话中已通过消息工具发送过消息,则返回None(避免重复回复)
        if message_tool := self.tools.get("message"):
            if isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
                return None

        # 返回最终的出站消息(包含回复内容、渠道、聊天ID和元数据)
        return OutboundMessage(
            channel=msg.channel, chat_id=msg.chat_id, content=final_content,
            metadata=msg.metadata or {},
        )

2.7 _run_agent_loop()

核心逻辑:该函数是智能体的核心执行循环,通过不断调用大模型并根据响应决定是否调用工具,直到模型返回最终回答或达到最大迭代次数。

关键分支

  • 分支 1(有工具调用):记录工具调用、执行工具、将工具结果加入对话上下文,继续循环。
  • 分支 2(无工具调用):将模型回答作为最终结果,终止循环。

边界处理:当达到最大迭代次数仍未得到最终回答时,会生成提示文本并记录警告日志,保证函数有明确的返回值。

    async def _run_agent_loop(
        self,
        initial_messages: list[dict],
        on_progress: Callable[..., Awaitable[None]] | None = None,
    ) -> tuple[str | None, list[str], list[dict]]:
        """Run the agent iteration loop. Returns (final_content, tools_used, messages)."""
        # 初始化对话消息列表:将传入的初始上下文(历史+当前消息)赋值给循环变量,后续持续更新
        messages = initial_messages
        # 初始化迭代计数器:用于控制最大循环次数,避免无限迭代导致死循环
        iteration = 0
        # 初始化最终回复内容:存储LLM最终无工具调用时的直接回复内容,初始为None
        final_content = None
        # 初始化工具使用列表:记录本次循环中调用过的所有工具名称,用于后续统计/日志
        tools_used: list[str] = []

        # 核心循环:迭代执行「LLM调用→工具执行」逻辑,直到达到最大迭代次数
        while iteration < self.max_iterations:
            # 迭代次数自增(每次循环先计数,再执行核心逻辑)
            iteration += 1

            # 调用大模型提供商的聊天接口,获取模型响应
            # 参数说明:
            # - messages: 当前完整的对话上下文
            # - tools: 可供模型调用的工具定义列表
            # - model: 指定使用的大模型名称
            # - temperature: 模型生成的随机性参数
            # - max_tokens: 模型生成的最大令牌数
            response = await self.provider.chat(
                messages=messages,
                tools=self.tools.get_definitions(),
                model=self.model,
                temperature=self.temperature,
                max_tokens=self.max_tokens,
            )

            # 判断模型响应是否包含工具调用指令
            if response.has_tool_calls:
                # 如果传入了进度回调函数,则执行进度通知
                if on_progress:
                    # 清理模型响应内容,移除思考过程等辅助文本
                    clean = self._strip_think(response.content)
                    # 如果清理后有有效内容,则调用进度回调
                    if clean:
                        await on_progress(clean)
                    # 调用进度回调,传递工具调用提示信息,并标记为tool_hint类型
                    await on_progress(self._tool_hint(response.tool_calls), tool_hint=True)

                # 将模型返回的工具调用对象转换为标准格式的字典列表
                tool_call_dicts = [
                    {
                        "id": tc.id,          # 工具调用的唯一标识ID
                        "type": "function",   # 工具调用类型(固定为function)
                        "function": {
                            "name": tc.name,  # 要调用的工具名称
                            # 将工具调用参数转换为JSON字符串(确保非ASCII字符不转义)
                            "arguments": json.dumps(tc.arguments, ensure_ascii=False)
                        }
                    }
                    for tc in response.tool_calls  # 遍历所有工具调用指令
                ]
                # 将模型的响应(包含工具调用)添加到对话消息列表中
                # 同时记录推理过程内容(reasoning_content)
                messages = self.context.add_assistant_message(
                    messages, response.content, tool_call_dicts,
                    reasoning_content=response.reasoning_content,
                )

                # 遍历每一个工具调用指令,执行具体的工具调用逻辑
                for tool_call in response.tool_calls:
                    # 记录本次调用的工具名称到工具使用列表
                    tools_used.append(tool_call.name)
                    # 将工具参数转换为JSON字符串(截取前200字符避免日志过长)
                    args_str = json.dumps(tool_call.arguments, ensure_ascii=False)
                    # 记录工具调用日志,包含工具名称和参数(截断显示)
                    logger.info("Tool call: {}({})", tool_call.name, args_str[:200])
                    # 执行工具调用,获取工具返回结果(异步执行)
                    result = await self.tools.execute(tool_call.name, tool_call.arguments)
                    # 将工具调用的结果添加到对话消息列表中,关联对应的工具调用ID
                    messages = self.context.add_tool_result(
                        messages, tool_call.id, tool_call.name, result
                    )
            # 如果模型响应不包含工具调用(直接返回最终回答)
            else:
                # 清理模型响应内容,移除思考过程等辅助文本
                clean = self._strip_think(response.content)
                # 将模型的最终回答添加到对话消息列表中
                messages = self.context.add_assistant_message(
                    messages, clean, reasoning_content=response.reasoning_content,
                )
                # 将清理后的内容赋值给最终返回内容
                final_content = clean
                # 跳出循环,结束智能体迭代
                break

        # 处理循环结束但未得到最终回答的情况(达到最大迭代次数)
        if final_content is None and iteration >= self.max_iterations:
            # 记录警告日志,提示达到最大迭代次数
            logger.warning("Max iterations ({}) reached", self.max_iterations)
            # 生成默认的提示文本,告知用户达到最大迭代次数
            final_content = (
                f"I reached the maximum number of tool call iterations ({self.max_iterations}) "
                "without completing the task. You can try breaking the task into smaller steps."
            )

        # 返回最终结果:最终回答内容、使用过的工具列表、完整的对话消息列表
        return final_content, tools_used, messages

0xFF 参考

3500 行代码打造轻量级AI Agent:Nanobot 架构深度解析

Kimi Agent产品很厉害,然后呢?

OpenClaw真完整解说:架构与智能体内核

Agent/Skills/Teams 架构演进过程及技术选型之道

别再把多 Bot 和多 Agent 搞混了:OpenClaw 协作全景与架构避坑指南

一文讲透:OpenClaw多agent模式下Skills的分层调用机制

从底层机制一文讲透:OpenClaw🦞如何运行多Agents

别再把多 Bot 和多 Agent 搞混了:OpenClaw 协作全景与架构避坑指南

https://ppaolo.substack.com/p/openclaw-system-architecture-overview

Thinking in Context: 何时需要多智能体

万字】带你实现一个Agent(上),从Tools、MCP到Skills

3500 行代码打造轻量级AI Agent:Nanobot 架构深度解析

Kimi Agent产品很厉害,然后呢?

OpenClaw真完整解说:架构与智能体内核

https://github.com/shareAI-lab/learn-claude-code

深入理解OpenClaw技术架构与实现原理(上)

深度解析:一张图拆解OpenClaw的Agent核心设计

OpenClaw小龙虾架构全面解析

OpenClaw架构-Agent Runtime 运行时深度拆解

OpenClaw 架构详解 · 第一部分:控制平面、会话管理与事件循环

从回答问题到替你做事,AI Agent 为什么突然火了?


文章来源:https://www.cnblogs.com/rossiXYZ/p/19818826
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云