You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在MCP客户端捕获并打印流式返回的所有LLM Token

如何在MCP客户端捕获并打印流式返回的所有LLM Token

我来帮你解决这个问题——你遇到的核心问题是没有正确处理MCP流式工具返回的异步生成器,而是直接把它转成了字符串,所以只看到了生成器的地址,没拿到实际的token流。下面是具体的分析和修改方案:

问题根源

你的MCP服务端通过yield chunk返回流式token,在streamable-http传输模式下,客户端调用call_tool后,result.content[0]对应的是一个异步生成器对象,而不是普通字符串。你之前直接打印result.content[0].text,其实是把这个生成器转成了字符串表示(就是你看到的<async_generator object...>),并没有去迭代它获取实际的token内容。

客户端代码修改方案

你需要修改process_query函数中处理工具调用结果的逻辑,把直接取text的操作改成异步遍历生成器,逐个获取并打印每个token。以下是修改后的完整代码:

class MCPClient:
    """MCP Client for interacting with an MCP Streamable HTTP server"""
    def __init__(self):
        # Initialize session and client objects
        self.session: Optional[ClientSession] = None

    async def connect_to_streamable_http_server(
        self, server_url: str, headers: Optional[dict] = None
    ):
        """Connect to an MCP server running with HTTP Streamable transport"""
        self._streams_context = streamablehttp_client(url=server_url)
        read_stream, write_stream, _ = await self._streams_context.__aenter__()
        self._session_context = ClientSession(read_stream, write_stream)
        self.session: ClientSession = await self._session_context.__aenter__()
        await self.session.initialize()

    async def process_query(self, query: str) -> str:
        models = genai.Client(api_key=os.getenv('GEMINI_API_KEY')).models
        mcp_tools = await self.session.list_tools()
        
        tools = [
            types.Tool(
                function_declarations=[
                    {
                        "name": tool.name,
                        "description": tool.description,
                        "parameters": {k : v for k, v in tool.inputSchema.items() if k not in ["additionalProperties", "$schema"]},
                    }
                ]
            )
            for tool in mcp_tools.tools
        ]
        
        response = models.generate_content(
            model=os.getenv("LLM_API_MODEL"),
            contents=query,
            config=types.GenerateContentConfig(
                tools=tools,
            ),
        )
        
        print(response)
        
        try:
            function_call = response.candidates[0].content.parts[0].function_call
        except:
            function_call = None
        
        if function_call:
            result = await self.session.call_tool(
                function_call.name,
                arguments=dict(function_call.args)
            )
            # 关键修改:异步遍历流式生成器,实时打印每个token
            try:
                total_response = ""
                async for chunk in result.content[0]:
                    if chunk:
                        print(f"收到token: {chunk}", end="", flush=True)
                        total_response += chunk  # 可选:拼接完整响应
                print("\n生成完成")
                return total_response
            except Exception as e:
                print(f"处理流式响应时出错: {e}")
                raise
        else:
            print("No response!")
            return ""

    async def cleanup(self):
        """Properly clean up the session and streams"""
        if self._session_context:
            await self._session_context.__aexit__(None, None, None)
        if self._streams_context:
            await self._streams_context.__aexit__(None, None, None)

核心修改点说明

  1. 替换result.content[0].text为异步迭代result.content[0]就是服务端yield返回的异步生成器,用async for chunk in result.content[0]可以逐个获取每个token片段。
  2. flush=True保证实时输出:流式输出默认会缓冲,加上这个参数可以让每个token一收到就立刻打印在控制台,和LLM生成的节奏完全同步。
  3. 可选的完整响应拼接:如果需要保存完整的响应内容,可以像代码里那样用total_response累加每个chunk。

额外注意事项

  • 确保客户端连接的服务端地址正确,比如服务端运行在http://localhost:8050,客户端connect_to_streamable_http_server要传入对应的地址。
  • 客户端必须在异步上下文运行,比如启动代码:
    async def main():
        client = MCPClient()
        await client.connect_to_streamable_http_server("http://localhost:8050")
        await client.process_query("我的订单什么时候发货?")
        await client.cleanup()
    
    if __name__ == "__main__":
        import asyncio
        asyncio.run(main())
    
  • 服务端的FastMCP已经正确设置transport="streamable-http",这个是流式传输的关键,不要改成普通HTTP。

这样修改后,你就能在客户端实时看到LLM生成的每个token了,和服务端的生成节奏完全一致。

火山引擎 最新活动