如何在MCP客户端捕获并打印流式返回的所有LLM Token
如何在MCP客户端捕获并打印流式返回的所有LLM Token
我来帮你解决这个问题——你遇到的核心问题是没有正确处理MCP流式工具返回的异步生成器,而是直接把它转成了字符串,所以只看到了生成器的地址,没拿到实际的token流。下面是具体的分析和修改方案:
问题根源
你的MCP服务端通过yield chunk返回流式token,在streamable-http传输模式下,客户端调用call_tool后,result.content[0]对应的是一个异步生成器对象,而不是普通字符串。你之前直接打印result.content[0].text,其实是把这个生成器转成了字符串表示(就是你看到的<async_generator object...>),并没有去迭代它获取实际的token内容。
客户端代码修改方案
你需要修改process_query函数中处理工具调用结果的逻辑,把直接取text的操作改成异步遍历生成器,逐个获取并打印每个token。以下是修改后的完整代码:
class MCPClient: """MCP Client for interacting with an MCP Streamable HTTP server""" def __init__(self): # Initialize session and client objects self.session: Optional[ClientSession] = None async def connect_to_streamable_http_server( self, server_url: str, headers: Optional[dict] = None ): """Connect to an MCP server running with HTTP Streamable transport""" self._streams_context = streamablehttp_client(url=server_url) read_stream, write_stream, _ = await self._streams_context.__aenter__() self._session_context = ClientSession(read_stream, write_stream) self.session: ClientSession = await self._session_context.__aenter__() await self.session.initialize() async def process_query(self, query: str) -> str: models = genai.Client(api_key=os.getenv('GEMINI_API_KEY')).models mcp_tools = await self.session.list_tools() tools = [ types.Tool( function_declarations=[ { "name": tool.name, "description": tool.description, "parameters": {k : v for k, v in tool.inputSchema.items() if k not in ["additionalProperties", "$schema"]}, } ] ) for tool in mcp_tools.tools ] response = models.generate_content( model=os.getenv("LLM_API_MODEL"), contents=query, config=types.GenerateContentConfig( tools=tools, ), ) print(response) try: function_call = response.candidates[0].content.parts[0].function_call except: function_call = None if function_call: result = await self.session.call_tool( function_call.name, arguments=dict(function_call.args) ) # 关键修改:异步遍历流式生成器,实时打印每个token try: total_response = "" async for chunk in result.content[0]: if chunk: print(f"收到token: {chunk}", end="", flush=True) total_response += chunk # 可选:拼接完整响应 print("\n生成完成") return total_response except Exception as e: print(f"处理流式响应时出错: {e}") raise else: print("No response!") return "" async def cleanup(self): """Properly clean up the session and streams""" if self._session_context: await self._session_context.__aexit__(None, None, None) if self._streams_context: await self._streams_context.__aexit__(None, None, None)
核心修改点说明
- 替换
result.content[0].text为异步迭代:result.content[0]就是服务端yield返回的异步生成器,用async for chunk in result.content[0]可以逐个获取每个token片段。 flush=True保证实时输出:流式输出默认会缓冲,加上这个参数可以让每个token一收到就立刻打印在控制台,和LLM生成的节奏完全同步。- 可选的完整响应拼接:如果需要保存完整的响应内容,可以像代码里那样用
total_response累加每个chunk。
额外注意事项
- 确保客户端连接的服务端地址正确,比如服务端运行在
http://localhost:8050,客户端connect_to_streamable_http_server要传入对应的地址。 - 客户端必须在异步上下文运行,比如启动代码:
async def main(): client = MCPClient() await client.connect_to_streamable_http_server("http://localhost:8050") await client.process_query("我的订单什么时候发货?") await client.cleanup() if __name__ == "__main__": import asyncio asyncio.run(main()) - 服务端的
FastMCP已经正确设置transport="streamable-http",这个是流式传输的关键,不要改成普通HTTP。
这样修改后,你就能在客户端实时看到LLM生成的每个token了,和服务端的生成节奏完全一致。




