LangChain中如何在自定义Agent包装类里维持MCP服务器长连接并同步使用工具
LangChain中如何在自定义Agent包装类里维持MCP服务器长连接并同步使用工具
我完全理解你想摆脱上下文管理器束缚、在自定义包装类中维持MCP长连接的需求——毕竟要让Agent随时能调用工具,保持持久连接确实更高效。下面我会一步步给你实现方案,包含完整的代码示例和关键细节说明。
核心思路
MCP的上下文管理器本质是帮你自动处理连接的初始化和关闭,现在我们要手动接管这个过程:
- 在类中保存MCP连接的核心对象(
read/write流、ClientSession),避免它们被垃圾回收导致连接关闭; - 手动执行异步初始化逻辑(比如
session.initialize()、load_mcp_tools); - 提供显式的关闭方法,在不需要连接时主动释放资源;
- 处理异步API的同步包装(如果你的业务场景需要同步调用Agent)。
完整同步包装类实现
这个版本适合你需要同步调用run方法的场景,用asyncio.run包装异步操作:
import asyncio from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client from langgraph.prebuilt import create_react_agent from langchain_mcp_adapters.tools import load_mcp_tools from langchain_core.messages import AIMessage class MyAgent: def __init__(self, model="openai:gpt-4.1", mcp_url="http://localhost:3000/mcp"): self.model = model self.mcp_url = mcp_url # 保存连接核心对象,避免被GC回收导致连接关闭 self._read = None self._write = None self._session = None self.agent = None # 同步完成MCP连接初始化和Agent创建 try: self._setup_mcp_connection() self._setup_agent() except Exception as e: # 初始化失败时主动关闭已打开的资源 self.close() raise RuntimeError(f"Agent初始化失败: {str(e)}") from e def _setup_mcp_connection(self): # 手动创建HTTP流连接(替代streamablehttp_client上下文管理器) self._read, self._write, _ = asyncio.run(streamablehttp_client(self.mcp_url)) # 手动创建ClientSession self._session = ClientSession(self._read, self._write) # 初始化MCP会话 asyncio.run(self._session.initialize()) def _setup_agent(self): # 异步加载MCP工具,同步包装执行 tools = asyncio.run(load_mcp_tools(self._session)) # 创建React Agent self.agent = create_react_agent(self.model, tools) def run(self, query: str) -> AIMessage: if not self.agent: raise RuntimeError("Agent未初始化,请检查MCP连接状态") # 同步调用Agent的异步invoke方法 response = asyncio.run(self.agent.ainvoke({"messages": query})) # 适配返回格式,确保返回AIMessage if isinstance(response, dict) and "messages" in response: return response["messages"] return response def close(self): """主动关闭MCP连接,释放资源""" if self._session: try: asyncio.run(self._session.close()) except Exception: # 忽略关闭时的异常,避免影响主流程 pass # 关闭HTTP流 for stream in [self._read, self._write]: if stream: try: asyncio.run(stream.aclose()) except Exception: pass # 重置状态 self._read = self._write = self._session = None self.agent = None def __del__(self): """对象销毁时自动关闭连接,避免资源泄漏""" self.close()
使用示例
# 主文件 if __name__ == "__main__": agent = MyAgent() try: result = agent.run("what's (3 + 5) x 12?") print(f"结果1: {result.content}") result = agent.run("what is 100 plus 120?") print(f"结果2: {result.content}") finally: # 主动关闭连接(可选,__del__也会处理,但显式关闭更稳妥) agent.close()
异步友好的包装类实现
如果你的项目本身是异步架构(比如FastAPI、Asyncio服务),更推荐用异步包装类,避免asyncio.run创建额外事件循环的开销:
import asyncio from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client from langgraph.prebuilt import create_react_agent from langchain_mcp_adapters.tools import load_mcp_tools from langchain_core.messages import AIMessage class AsyncMyAgent: def __init__(self, model="openai:gpt-4.1", mcp_url="http://localhost:3000/mcp"): self.model = model self.mcp_url = mcp_url self._read = None self._write = None self._session = None self.agent = None async def setup(self): """异步初始化MCP连接和Agent""" # 创建HTTP流连接 self._read, self._write, _ = await streamablehttp_client(self.mcp_url) # 初始化ClientSession self._session = ClientSession(self._read, self._write) await self._session.initialize() # 加载工具并创建Agent tools = await load_mcp_tools(self._session) self.agent = create_react_agent(self.model, tools) async def run(self, query: str) -> AIMessage: if not self.agent: raise RuntimeError("Agent未初始化,请先调用setup()") response = await self.agent.ainvoke({"messages": query}) if isinstance(response, dict) and "messages" in response: return response["messages"] return response async def close(self): """异步关闭连接""" if self._session: await self._session.close() for stream in [self._read, self._write]: if stream: await stream.aclose() self._read = self._write = self._session = None self.agent = None async def __aenter__(self): await self.setup() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()
异步版本使用示例
async def main(): async with AsyncMyAgent() as agent: result = await agent.run("what's (3 + 5) x 12?") print(f"结果1: {result.content}") result = await agent.run("what is 100 plus 120?") print(f"结果2: {result.content}") asyncio.run(main())
关键注意事项
- 资源泄漏防范:一定要实现
close方法并在合适的时机调用(比如Agent不再使用时、应用 shutdown 时),__del__方法是最后一道防线,但不要完全依赖它; - 异常处理:在初始化和运行过程中添加异常捕获,避免连接失败导致程序崩溃,同时确保失败时能清理已打开的资源;
- 异步环境兼容性:如果你的项目是纯异步的,优先使用
AsyncMyAgent版本,避免asyncio.run带来的事件循环嵌套问题; - 连接稳定性:如果MCP服务器可能断开连接,你可以在
run方法中添加连接检测逻辑,比如捕获工具调用时的异常,自动重连并重新初始化Agent。




