如何解决aiohttp+asyncio中WebSocket停发时HTTP请求无响应的并发问题?
解决WebSocket阻塞时无法获取HTTP请求数据的最佳方案
嘿,这个问题我之前做实时数据同步的时候也踩过坑!你的代码核心问题在于顺序执行阻塞式的协程调用——await ws.receive()会一直挂起等待WebSocket消息,一旦WebSocket停止发送,这条语句就会卡着不动,导致后面的HTTP请求完全没机会执行。
问题根源拆解
看你的原代码逻辑:
while True: some_data_from_get_request = await get_some_data(session) msg_from_websocket = await ws.receive()
这里是先等HTTP请求返回,再等下一条WS消息,但await ws.receive()是阻塞的——如果WS没有新消息,整个循环就停在这一步,下一轮的HTTP请求根本跑不起来。反过来,如果WS消息一直来,HTTP请求能执行,但一旦WS停发,HTTP请求就彻底卡住了。
最佳解决方案:并行运行两个协程
我们需要把WebSocket监听和HTTP请求分成两个独立的任务,让它们并行执行,互不阻塞。用asyncio.gather可以轻松实现这个需求,它能同时运行多个协程,并且处理它们的返回结果。
修正后的完整代码
import asyncio import aiohttp WS_URL = 'wss://www.some_web_socket.io' HTTP_URL = "https://www.blabla.com" async def get_some_data(session): async with session.get(HTTP_URL) as response: return await response.text() async def listen_websocket(ws): while True: msg = await ws.receive() if msg.type == aiohttp.WSMsgType.TEXT: print("WebSocket消息:", msg.data) elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): print("WebSocket连接关闭/出错") break async def fetch_http_data(session, interval=1): # 可以设置HTTP请求的间隔时间 while True: try: data = await get_some_data(session) print("HTTP请求数据:", data) except Exception as e: print("HTTP请求出错:", str(e)) await asyncio.sleep(interval) # 避免频繁请求,根据需求调整 async def ws_handler(): async with aiohttp.ClientSession() as session: async with session.ws_connect(WS_URL) as ws: # 并行启动两个任务:WS监听和HTTP请求 await asyncio.gather( listen_websocket(ws), fetch_http_data(session), return_exceptions=True # 防止一个任务崩溃导致另一个也终止 ) def _main(): asyncio.run(ws_handler()) if __name__ == "__main__": _main()
关键改动说明
- 拆分独立协程:把WS监听和HTTP请求分别放到
listen_websocket和fetch_http_data两个协程里,各自循环执行,互不依赖 - 并行运行:用
asyncio.gather同时启动两个任务,这样即使WS停发消息,HTTP请求的循环依然会正常执行 - 优雅处理WS关闭:增加了WS关闭/错误的判断,当WS断开时,
listen_websocket会退出循环,最终两个任务都会结束 - 可控的HTTP请求间隔:给
fetch_http_data加了asyncio.sleep,避免短时间内频繁发送HTTP请求,可根据业务需求调整间隔 - 错误容错:
return_exceptions=True确保如果其中一个任务出错(比如HTTP请求失败),另一个任务不会被强制终止
额外优化建议
如果需要让两个协程之间共享数据(比如用WS消息的数据过滤HTTP请求),可以用asyncio.Queue实现协程间的安全通信,示例如下:
# 在ws_handler里创建队列 data_queue = asyncio.Queue() # 然后把队列传给两个协程,listen_websocket把消息放进队列,fetch_http_data从队列取消息做处理
内容的提问来源于stack exchange,提问作者ChzChz




