You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

LangChain中如何在自定义Agent包装类里维持MCP服务器长连接并同步使用工具

LangChain中如何在自定义Agent包装类里维持MCP服务器长连接并同步使用工具

我完全理解你想摆脱上下文管理器束缚、在自定义包装类中维持MCP长连接的需求——毕竟要让Agent随时能调用工具,保持持久连接确实更高效。下面我会一步步给你实现方案,包含完整的代码示例和关键细节说明。


核心思路

MCP的上下文管理器本质是帮你自动处理连接的初始化和关闭,现在我们要手动接管这个过程:

  1. 在类中保存MCP连接的核心对象(read/write流、ClientSession),避免它们被垃圾回收导致连接关闭;
  2. 手动执行异步初始化逻辑(比如session.initialize()load_mcp_tools);
  3. 提供显式的关闭方法,在不需要连接时主动释放资源;
  4. 处理异步API的同步包装(如果你的业务场景需要同步调用Agent)。

完整同步包装类实现

这个版本适合你需要同步调用run方法的场景,用asyncio.run包装异步操作:

import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_core.messages import AIMessage

class MyAgent:
    def __init__(self, model="openai:gpt-4.1", mcp_url="http://localhost:3000/mcp"):
        self.model = model
        self.mcp_url = mcp_url
        # 保存连接核心对象,避免被GC回收导致连接关闭
        self._read = None
        self._write = None
        self._session = None
        self.agent = None

        # 同步完成MCP连接初始化和Agent创建
        try:
            self._setup_mcp_connection()
            self._setup_agent()
        except Exception as e:
            # 初始化失败时主动关闭已打开的资源
            self.close()
            raise RuntimeError(f"Agent初始化失败: {str(e)}") from e

    def _setup_mcp_connection(self):
        # 手动创建HTTP流连接(替代streamablehttp_client上下文管理器)
        self._read, self._write, _ = asyncio.run(streamablehttp_client(self.mcp_url))
        # 手动创建ClientSession
        self._session = ClientSession(self._read, self._write)
        # 初始化MCP会话
        asyncio.run(self._session.initialize())

    def _setup_agent(self):
        # 异步加载MCP工具,同步包装执行
        tools = asyncio.run(load_mcp_tools(self._session))
        # 创建React Agent
        self.agent = create_react_agent(self.model, tools)

    def run(self, query: str) -> AIMessage:
        if not self.agent:
            raise RuntimeError("Agent未初始化,请检查MCP连接状态")
        
        # 同步调用Agent的异步invoke方法
        response = asyncio.run(self.agent.ainvoke({"messages": query}))
        # 适配返回格式,确保返回AIMessage
        if isinstance(response, dict) and "messages" in response:
            return response["messages"]
        return response

    def close(self):
        """主动关闭MCP连接,释放资源"""
        if self._session:
            try:
                asyncio.run(self._session.close())
            except Exception:
                # 忽略关闭时的异常,避免影响主流程
                pass
        # 关闭HTTP流
        for stream in [self._read, self._write]:
            if stream:
                try:
                    asyncio.run(stream.aclose())
                except Exception:
                    pass
        # 重置状态
        self._read = self._write = self._session = None
        self.agent = None

    def __del__(self):
        """对象销毁时自动关闭连接,避免资源泄漏"""
        self.close()

使用示例

# 主文件
if __name__ == "__main__":
    agent = MyAgent()
    try:
        result = agent.run("what's (3 + 5) x 12?")
        print(f"结果1: {result.content}")
        
        result = agent.run("what is 100 plus 120?")
        print(f"结果2: {result.content}")
    finally:
        # 主动关闭连接(可选,__del__也会处理,但显式关闭更稳妥)
        agent.close()

异步友好的包装类实现

如果你的项目本身是异步架构(比如FastAPI、Asyncio服务),更推荐用异步包装类,避免asyncio.run创建额外事件循环的开销:

import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from langgraph.prebuilt import create_react_agent
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_core.messages import AIMessage

class AsyncMyAgent:
    def __init__(self, model="openai:gpt-4.1", mcp_url="http://localhost:3000/mcp"):
        self.model = model
        self.mcp_url = mcp_url
        self._read = None
        self._write = None
        self._session = None
        self.agent = None

    async def setup(self):
        """异步初始化MCP连接和Agent"""
        # 创建HTTP流连接
        self._read, self._write, _ = await streamablehttp_client(self.mcp_url)
        # 初始化ClientSession
        self._session = ClientSession(self._read, self._write)
        await self._session.initialize()
        # 加载工具并创建Agent
        tools = await load_mcp_tools(self._session)
        self.agent = create_react_agent(self.model, tools)

    async def run(self, query: str) -> AIMessage:
        if not self.agent:
            raise RuntimeError("Agent未初始化,请先调用setup()")
        
        response = await self.agent.ainvoke({"messages": query})
        if isinstance(response, dict) and "messages" in response:
            return response["messages"]
        return response

    async def close(self):
        """异步关闭连接"""
        if self._session:
            await self._session.close()
        for stream in [self._read, self._write]:
            if stream:
                await stream.aclose()
        self._read = self._write = self._session = None
        self.agent = None

    async def __aenter__(self):
        await self.setup()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.close()

异步版本使用示例

async def main():
    async with AsyncMyAgent() as agent:
        result = await agent.run("what's (3 + 5) x 12?")
        print(f"结果1: {result.content}")
        
        result = await agent.run("what is 100 plus 120?")
        print(f"结果2: {result.content}")

asyncio.run(main())

关键注意事项

  1. 资源泄漏防范:一定要实现close方法并在合适的时机调用(比如Agent不再使用时、应用 shutdown 时),__del__方法是最后一道防线,但不要完全依赖它;
  2. 异常处理:在初始化和运行过程中添加异常捕获,避免连接失败导致程序崩溃,同时确保失败时能清理已打开的资源;
  3. 异步环境兼容性:如果你的项目是纯异步的,优先使用AsyncMyAgent版本,避免asyncio.run带来的事件循环嵌套问题;
  4. 连接稳定性:如果MCP服务器可能断开连接,你可以在run方法中添加连接检测逻辑,比如捕获工具调用时的异常,自动重连并重新初始化Agent。

火山引擎 最新活动