首页 > 基础资料 博客日记

企业微信机器人与 DeepAgents 集成实践

2026-04-15 01:30:02基础资料围观1

这篇文章介绍了企业微信机器人与 DeepAgents 集成实践,分享给大家做个参考,收藏极客资料网收获更多编程知识

前言

企业微信机器人以前通常采用 Webhook 回调方式接收消息,但这种方式存在延迟较高、需要公网服务器等局限性。随着OpenClaw爆火,企业微信机器人也支持 WebSocket 长连接方式。本文介绍一种基于 WebSocket 长连接的企业微信机器人实现方案,并集成 DeepAgents 框架实现智能对话能力。

技术栈

  • 企业微信 WebSocket SDK: wecom-aibot-python-sdk - 官方提供的 WebSocket 连接库
  • FastAPI: 现代异步 Web 框架,用于托管服务和 MCP 服务器
  • DeepAgents: 智能体框架,用于构建具备工具调用能力的 AI 助手
  • LangChain: 提供 LLM 集成和工具加载能力
  • MCP (Model Context Protocol): 标准化的工具调用协议

项目结构

qywx-bot/
├── main.py                 # FastAPI 主入口
├── pyproject.toml         # 项目依赖配置
├── conf/
│   └── config.toml        # 应用配置
├── pkg/
│   ├── config/            # 配置管理模块
│   ├── log/               # 日志模块
│   └── qywx/              # 企业微信客户端
└── ai_agent/
    ├── ai_agent.py        # DeepAgents 集成
    └── mcp_servers/       # MCP 工具服务器

安装依赖

uv add fastapi deepagents langchain-openai langchain-mcp-adapters wecom-aibot-python-sdk uvicorn

核心实现

1. 配置管理

使用 TOML 格式管理配置,支持多环境切换:

[service]
host = "127.0.0.1"
port = 8000
env = "dev"

[qywx.v2]
bot_id = "your-bot-id"
secret = "your-bot-secret"
bot_name = "智能助手"

2. 企业微信 WebSocket 客户端

通过 WebSocket 长连接接收企业微信消息,实现低延迟实时交互:

class QywxClient:
    async def start(self):
        self.ws_client = WSClient(
            WSClientOptions(
                bot_id=cfg.qywx_bot_id,
                secret=cfg.qywx_secret,
                logger=self.logger,
            )
        )
        
        # 注册事件处理器
        self.ws_client.on("authenticated", self._on_authenticated)
        self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
        self.ws_client.on("message.text", self._on_message_text)
        
        await self.ws_client.connect()

3. DeepAgents 集成

构建具备工具调用能力的智能体,通过 MCP 协议加载工具:

class AIAgent:
    async def _create_root_agent(self, session: ClientSession):
        tools = await load_mcp_tools(session)
        return create_deep_agent(
            model=self.model,
            tools=tools,
            system_prompt=f"你是一个企业微信机器人,名字叫{cfg.qywx_bot_name}",
        )

4. 流式输出处理

实现企业微信流式消息回复,提升用户体验:

async def _on_message_text(self, frame: WsFrameHeaders):
    stream_id = generate_req_id('stream')
    await self.ws_client.reply_stream(frame, stream_id, "思考中...", False)
    
    async for chunk in aiops.invoke(content):
        await self.ws_client.reply_stream(frame, stream_id, str(chunk), False)
    
    await self.ws_client.reply_stream(frame, stream_id, "", True)

5. FastAPI Lifespan 管理

正确管理应用生命周期,包括 WebSocket 连接、MCP 服务器和 AI 代理:

@asynccontextmanager
async def lifespan(app: FastAPI):
    await aiops.start()
    await qywx_client.start()
    
    # 挂载 MCP 服务器
    mcp_app = datetime_mcp.streamable_http_app()
    async with datetime_mcp.session_manager.run():
        app.mount("/mcp", mcp_app)
        yield
    
    await aiops.shutdown()
    await qywx_client.shutdown()

关键技术点

MCP 服务器挂载

将 FastMCP 服务器挂载到 FastAPI 时,需注意正确初始化 session manager:

# 错误方式:直接挂载会导致 task group 未初始化
app.mount("/mcp", datetime_mcp.streamable_http_app())

# 正确方式:在 lifespan 中启动 session manager
async with datetime_mcp.session_manager.run():
    app.mount("/mcp", mcp_app)
    yield

流式消息解析

DeepAgents 的 astream() 返回的 chunk 是嵌套字典结构,需正确提取内容:

async for chunk in root_agent.astream(input={"messages": [HumanMessage(content=input)]}):
    if isinstance(chunk, dict):
        messages = chunk.get("model", {}).get("messages", [])
        for msg in messages:
            if hasattr(msg, "content") and msg.content:
                yield str(msg.content)

示例代码

配置模块、日志模块等代码就略过了。MCP Server也略过,之前的文章写过很多遍了,这里就不赘述了。

QywxClient

pkg/qywx/qywx_client.py 内封装了企业微信机器人交互的一些方法。

from pkg.config import cfg
from pkg.log import get_logger
from aibot import WSClient, WSClientOptions, generate_req_id, WsFrameHeaders
from ai_agent import aiops
import logging

class QywxClient:
    """企业微信客户端, 通过websockets连接企业微信服务器, 接收消息并处理"""
    logger = get_logger("qywx_client", logging.INFO)

    def __init__(self) -> None:
        self.ws_client: WSClient = None  # type: ignore

    async def _on_authenticated(self):
        """处理认证成功事件"""
        self.logger.info("Authenticated with Qywx server")

    async def _on_event_enter_chat(self, frame: WsFrameHeaders):
        """处理用户进入聊天事件"""
        self.logger.debug("Received event: enter_chat")
        await self.ws_client.reply_welcome(frame, {
            "msgtype": "text",
            "text": {'content': f'您好!我是智能助手{cfg.qywx_bot_name},有什么可以帮您的吗?'},
        })

    async def _on_message_text(self, frame: WsFrameHeaders):
        """处理文本消息事件"""
        self.logger.debug("Received text message")
        msg_id = frame.get("body", {}).get("msgid", "")
        user_id = frame.get("body", {}).get("from", {}).get("userid", "")
        chattype = frame.get("body", {}).get("chattype", "")
        response_url = frame.get("body", {}).get("response_url", "")
        content = frame.get('body', {}).get('text', {}).get('content', '')
        self.logger.debug(f"Message content: {content}, from user: {user_id}")

        stream_id = generate_req_id('stream')

        await self.ws_client.reply_stream(frame, stream_id, "小脑瓜努力思考中...", False)
        # await asyncio.sleep(2)  # 模拟处理时间
        
        # resp = await aiops.ainvoke(input=content)
        # await self.ws_client.reply_stream(frame, stream_id, resp, True)
        final_text = ""
        async for chunk in aiops.astream(input=content):
            await self.ws_client.reply_stream(frame, stream_id, chunk, False)
            # final_text += chunk
            final_text = chunk

        await self.ws_client.reply_stream(frame, stream_id, final_text, True)

    async def start(self):
        self.logger.info("Starting QywxClient...")
        if not self.ws_client:
            self.ws_client = WSClient(
                WSClientOptions(
                    bot_id=cfg.qywx_bot_id,
                    secret=cfg.qywx_secret,
                    logger=self.logger,
                )
            )

        self.ws_client.on("authenticated", self._on_authenticated)
        self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
        self.ws_client.on("message.text", self._on_message_text)

        await self.ws_client.connect()

    async def shutdown(self):
        self.logger.info("Shutting down QywxClient...")
        if self.ws_client and self.ws_client.is_connected:
            self.ws_client.disconnect()

AIAgent

AIAgent 是一个 AI 代理类,用于处理企业微信消息。

from deepagents import create_deep_agent
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from pkg.config import cfg
from pkg.log import get_logger

class AIAgent:
    logger = get_logger("ai_agent")
    def __init__(self):
        self.model: ChatOpenAI = None  # type: ignore
        self._mcp_session: ClientSession = None  # type: ignore
        self._mcp_server_url = f"http://127.0.0.1:{cfg.service_port}/mcp/"

    async def start(self):
        if not self.model:
            self.model = ChatOpenAI(
                base_url=cfg.agent_base_url,
                api_key=cfg.agent_api_key,  # type: ignore
                model=cfg.agent_model,
            )
    

    async def shutdown(self):
        self.model = None  # type: ignore

    async def _create_root_agent(self, session: ClientSession):
        tools = await load_mcp_tools(session)
        root_agent = create_deep_agent(
            model=self.model,
            tools=tools,
            system_prompt=f"你是一个智能助手,名字叫{cfg.qywx_bot_name}, 可以协助用户处理各种问题,并用温和积极的语气回答问题。回答的格式应该符合markdown规范。",
        )
        return root_agent

    async def astream(self, input: str, thread_id: str = ""):
        self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
        async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
            async with ClientSession(read, write) as session:
                await session.initialize()

                root_agent = await self._create_root_agent(session)

                async for chunk in root_agent.astream(
                    input={"messages": [HumanMessage(content=input)]}
                ):
                    # 从 chunk 字典中提取 AIMessage 的 content
                    if isinstance(chunk, dict):
                        messages = chunk.get("model", {}).get("messages", [])
                        if not messages:
                            continue
                        for msg in messages:
                            if hasattr(msg, "content") and msg.content:
                                yield str(msg.content)
                    elif hasattr(chunk, "content"):
                        yield str(chunk.content)  # type: ignore
                    else:
                        yield str(chunk)

    async def ainvoke(self, input: str, thread_id: str = ""):
        self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
        async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
            async with ClientSession(read, write) as session:
                await session.initialize()

                root_agent = await self._create_root_agent(session)
                resp = await root_agent.ainvoke(
                    input={"messages": [HumanMessage(content=input)]}
                )
                return resp["messages"][-1].content
                

main

# main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
from contextlib import asynccontextmanager
from pkg.qywx import qywx_client
from pkg.config import cfg
from ai_agent.mcp_servers.datetime_server import mcp as datetime_mcp
from ai_agent import aiops

@asynccontextmanager
async def lifespan(app: FastAPI):
    await aiops.start()
    await qywx_client.start()
    
    # 先获取 MCP app(这会创建 session_manager)
    mcp_app = datetime_mcp.streamable_http_app()
    
    # 在 FastAPI lifespan 中启动 MCP session manager
    async with datetime_mcp.session_manager.run():
        # 挂载 MCP app
        app.mount("/mcp", mcp_app)
        yield
    
    await aiops.shutdown()
    await qywx_client.shutdown()


app = FastAPI(
    lifespan=lifespan
)


if __name__ == "__main__":
    uvicorn.run("main:app", host=cfg.service_host, port=cfg.service_port)

文章来源:https://www.cnblogs.com/XY-Heruo/p/19868248
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!

标签:

相关文章

本站推荐

标签云