首页 > 基础资料 博客日记
企业微信机器人与 DeepAgents 集成实践
2026-04-15 01:30:02基础资料围观1次
这篇文章介绍了企业微信机器人与 DeepAgents 集成实践,分享给大家做个参考,收藏极客资料网收获更多编程知识
前言
企业微信机器人以前通常采用 Webhook 回调方式接收消息,但这种方式存在延迟较高、需要公网服务器等局限性。随着OpenClaw爆火,企业微信机器人也支持 WebSocket 长连接方式。本文介绍一种基于 WebSocket 长连接的企业微信机器人实现方案,并集成 DeepAgents 框架实现智能对话能力。
技术栈
- 企业微信 WebSocket SDK:
wecom-aibot-python-sdk- 官方提供的 WebSocket 连接库 - FastAPI: 现代异步 Web 框架,用于托管服务和 MCP 服务器
- DeepAgents: 智能体框架,用于构建具备工具调用能力的 AI 助手
- LangChain: 提供 LLM 集成和工具加载能力
- MCP (Model Context Protocol): 标准化的工具调用协议
项目结构
qywx-bot/
├── main.py # FastAPI 主入口
├── pyproject.toml # 项目依赖配置
├── conf/
│ └── config.toml # 应用配置
├── pkg/
│ ├── config/ # 配置管理模块
│ ├── log/ # 日志模块
│ └── qywx/ # 企业微信客户端
└── ai_agent/
├── ai_agent.py # DeepAgents 集成
└── mcp_servers/ # MCP 工具服务器
安装依赖
uv add fastapi deepagents langchain-openai langchain-mcp-adapters wecom-aibot-python-sdk uvicorn
核心实现
1. 配置管理
使用 TOML 格式管理配置,支持多环境切换:
[service]
host = "127.0.0.1"
port = 8000
env = "dev"
[qywx.v2]
bot_id = "your-bot-id"
secret = "your-bot-secret"
bot_name = "智能助手"
2. 企业微信 WebSocket 客户端
通过 WebSocket 长连接接收企业微信消息,实现低延迟实时交互:
class QywxClient:
async def start(self):
self.ws_client = WSClient(
WSClientOptions(
bot_id=cfg.qywx_bot_id,
secret=cfg.qywx_secret,
logger=self.logger,
)
)
# 注册事件处理器
self.ws_client.on("authenticated", self._on_authenticated)
self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
self.ws_client.on("message.text", self._on_message_text)
await self.ws_client.connect()
3. DeepAgents 集成
构建具备工具调用能力的智能体,通过 MCP 协议加载工具:
class AIAgent:
async def _create_root_agent(self, session: ClientSession):
tools = await load_mcp_tools(session)
return create_deep_agent(
model=self.model,
tools=tools,
system_prompt=f"你是一个企业微信机器人,名字叫{cfg.qywx_bot_name}",
)
4. 流式输出处理
实现企业微信流式消息回复,提升用户体验:
async def _on_message_text(self, frame: WsFrameHeaders):
stream_id = generate_req_id('stream')
await self.ws_client.reply_stream(frame, stream_id, "思考中...", False)
async for chunk in aiops.invoke(content):
await self.ws_client.reply_stream(frame, stream_id, str(chunk), False)
await self.ws_client.reply_stream(frame, stream_id, "", True)
5. FastAPI Lifespan 管理
正确管理应用生命周期,包括 WebSocket 连接、MCP 服务器和 AI 代理:
@asynccontextmanager
async def lifespan(app: FastAPI):
await aiops.start()
await qywx_client.start()
# 挂载 MCP 服务器
mcp_app = datetime_mcp.streamable_http_app()
async with datetime_mcp.session_manager.run():
app.mount("/mcp", mcp_app)
yield
await aiops.shutdown()
await qywx_client.shutdown()
关键技术点
MCP 服务器挂载
将 FastMCP 服务器挂载到 FastAPI 时,需注意正确初始化 session manager:
# 错误方式:直接挂载会导致 task group 未初始化
app.mount("/mcp", datetime_mcp.streamable_http_app())
# 正确方式:在 lifespan 中启动 session manager
async with datetime_mcp.session_manager.run():
app.mount("/mcp", mcp_app)
yield
流式消息解析
DeepAgents 的 astream() 返回的 chunk 是嵌套字典结构,需正确提取内容:
async for chunk in root_agent.astream(input={"messages": [HumanMessage(content=input)]}):
if isinstance(chunk, dict):
messages = chunk.get("model", {}).get("messages", [])
for msg in messages:
if hasattr(msg, "content") and msg.content:
yield str(msg.content)
示例代码
配置模块、日志模块等代码就略过了。MCP Server也略过,之前的文章写过很多遍了,这里就不赘述了。
QywxClient
pkg/qywx/qywx_client.py 内封装了企业微信机器人交互的一些方法。
from pkg.config import cfg
from pkg.log import get_logger
from aibot import WSClient, WSClientOptions, generate_req_id, WsFrameHeaders
from ai_agent import aiops
import logging
class QywxClient:
"""企业微信客户端, 通过websockets连接企业微信服务器, 接收消息并处理"""
logger = get_logger("qywx_client", logging.INFO)
def __init__(self) -> None:
self.ws_client: WSClient = None # type: ignore
async def _on_authenticated(self):
"""处理认证成功事件"""
self.logger.info("Authenticated with Qywx server")
async def _on_event_enter_chat(self, frame: WsFrameHeaders):
"""处理用户进入聊天事件"""
self.logger.debug("Received event: enter_chat")
await self.ws_client.reply_welcome(frame, {
"msgtype": "text",
"text": {'content': f'您好!我是智能助手{cfg.qywx_bot_name},有什么可以帮您的吗?'},
})
async def _on_message_text(self, frame: WsFrameHeaders):
"""处理文本消息事件"""
self.logger.debug("Received text message")
msg_id = frame.get("body", {}).get("msgid", "")
user_id = frame.get("body", {}).get("from", {}).get("userid", "")
chattype = frame.get("body", {}).get("chattype", "")
response_url = frame.get("body", {}).get("response_url", "")
content = frame.get('body', {}).get('text', {}).get('content', '')
self.logger.debug(f"Message content: {content}, from user: {user_id}")
stream_id = generate_req_id('stream')
await self.ws_client.reply_stream(frame, stream_id, "小脑瓜努力思考中...", False)
# await asyncio.sleep(2) # 模拟处理时间
# resp = await aiops.ainvoke(input=content)
# await self.ws_client.reply_stream(frame, stream_id, resp, True)
final_text = ""
async for chunk in aiops.astream(input=content):
await self.ws_client.reply_stream(frame, stream_id, chunk, False)
# final_text += chunk
final_text = chunk
await self.ws_client.reply_stream(frame, stream_id, final_text, True)
async def start(self):
self.logger.info("Starting QywxClient...")
if not self.ws_client:
self.ws_client = WSClient(
WSClientOptions(
bot_id=cfg.qywx_bot_id,
secret=cfg.qywx_secret,
logger=self.logger,
)
)
self.ws_client.on("authenticated", self._on_authenticated)
self.ws_client.on("event.enter_chat", self._on_event_enter_chat)
self.ws_client.on("message.text", self._on_message_text)
await self.ws_client.connect()
async def shutdown(self):
self.logger.info("Shutting down QywxClient...")
if self.ws_client and self.ws_client.is_connected:
self.ws_client.disconnect()
AIAgent
AIAgent 是一个 AI 代理类,用于处理企业微信消息。
from deepagents import create_deep_agent
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamable_http_client
from pkg.config import cfg
from pkg.log import get_logger
class AIAgent:
logger = get_logger("ai_agent")
def __init__(self):
self.model: ChatOpenAI = None # type: ignore
self._mcp_session: ClientSession = None # type: ignore
self._mcp_server_url = f"http://127.0.0.1:{cfg.service_port}/mcp/"
async def start(self):
if not self.model:
self.model = ChatOpenAI(
base_url=cfg.agent_base_url,
api_key=cfg.agent_api_key, # type: ignore
model=cfg.agent_model,
)
async def shutdown(self):
self.model = None # type: ignore
async def _create_root_agent(self, session: ClientSession):
tools = await load_mcp_tools(session)
root_agent = create_deep_agent(
model=self.model,
tools=tools,
system_prompt=f"你是一个智能助手,名字叫{cfg.qywx_bot_name}, 可以协助用户处理各种问题,并用温和积极的语气回答问题。回答的格式应该符合markdown规范。",
)
return root_agent
async def astream(self, input: str, thread_id: str = ""):
self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
async with ClientSession(read, write) as session:
await session.initialize()
root_agent = await self._create_root_agent(session)
async for chunk in root_agent.astream(
input={"messages": [HumanMessage(content=input)]}
):
# 从 chunk 字典中提取 AIMessage 的 content
if isinstance(chunk, dict):
messages = chunk.get("model", {}).get("messages", [])
if not messages:
continue
for msg in messages:
if hasattr(msg, "content") and msg.content:
yield str(msg.content)
elif hasattr(chunk, "content"):
yield str(chunk.content) # type: ignore
else:
yield str(chunk)
async def ainvoke(self, input: str, thread_id: str = ""):
self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}")
async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id):
async with ClientSession(read, write) as session:
await session.initialize()
root_agent = await self._create_root_agent(session)
resp = await root_agent.ainvoke(
input={"messages": [HumanMessage(content=input)]}
)
return resp["messages"][-1].content
main
# main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import uvicorn
from contextlib import asynccontextmanager
from pkg.qywx import qywx_client
from pkg.config import cfg
from ai_agent.mcp_servers.datetime_server import mcp as datetime_mcp
from ai_agent import aiops
@asynccontextmanager
async def lifespan(app: FastAPI):
await aiops.start()
await qywx_client.start()
# 先获取 MCP app(这会创建 session_manager)
mcp_app = datetime_mcp.streamable_http_app()
# 在 FastAPI lifespan 中启动 MCP session manager
async with datetime_mcp.session_manager.run():
# 挂载 MCP app
app.mount("/mcp", mcp_app)
yield
await aiops.shutdown()
await qywx_client.shutdown()
app = FastAPI(
lifespan=lifespan
)
if __name__ == "__main__":
uvicorn.run("main:app", host=cfg.service_host, port=cfg.service_port)
文章来源:https://www.cnblogs.com/XY-Heruo/p/19868248
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:jacktools123@163.com进行投诉反馈,一经查实,立即删除!
标签:
上一篇:.NET 官方团队发布的 .NET Agent Skills,告别 AI 编程幻觉!
下一篇:没有了

