You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何解决aiohttp+asyncio中WebSocket停发时HTTP请求无响应的并发问题?

解决WebSocket阻塞时无法获取HTTP请求数据的最佳方案

嘿,这个问题我之前做实时数据同步的时候也踩过坑!你的代码核心问题在于顺序执行阻塞式的协程调用——await ws.receive()会一直挂起等待WebSocket消息,一旦WebSocket停止发送,这条语句就会卡着不动,导致后面的HTTP请求完全没机会执行。

问题根源拆解

看你的原代码逻辑:

while True:
    some_data_from_get_request = await get_some_data(session)
    msg_from_websocket = await ws.receive()

这里是先等HTTP请求返回,再等下一条WS消息,但await ws.receive()是阻塞的——如果WS没有新消息,整个循环就停在这一步,下一轮的HTTP请求根本跑不起来。反过来,如果WS消息一直来,HTTP请求能执行,但一旦WS停发,HTTP请求就彻底卡住了。

最佳解决方案:并行运行两个协程

我们需要把WebSocket监听和HTTP请求分成两个独立的任务,让它们并行执行,互不阻塞。用asyncio.gather可以轻松实现这个需求,它能同时运行多个协程,并且处理它们的返回结果。

修正后的完整代码

import asyncio
import aiohttp

WS_URL = 'wss://www.some_web_socket.io'
HTTP_URL = "https://www.blabla.com"

async def get_some_data(session):
    async with session.get(HTTP_URL) as response:
        return await response.text()

async def listen_websocket(ws):
    while True:
        msg = await ws.receive()
        if msg.type == aiohttp.WSMsgType.TEXT:
            print("WebSocket消息:", msg.data)
        elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
            print("WebSocket连接关闭/出错")
            break

async def fetch_http_data(session, interval=1):  # 可以设置HTTP请求的间隔时间
    while True:
        try:
            data = await get_some_data(session)
            print("HTTP请求数据:", data)
        except Exception as e:
            print("HTTP请求出错:", str(e))
        await asyncio.sleep(interval)  # 避免频繁请求,根据需求调整

async def ws_handler():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(WS_URL) as ws:
            # 并行启动两个任务:WS监听和HTTP请求
            await asyncio.gather(
                listen_websocket(ws),
                fetch_http_data(session),
                return_exceptions=True  # 防止一个任务崩溃导致另一个也终止
            )

def _main():
    asyncio.run(ws_handler())

if __name__ == "__main__":
    _main()

关键改动说明

  • 拆分独立协程:把WS监听和HTTP请求分别放到listen_websocketfetch_http_data两个协程里,各自循环执行,互不依赖
  • 并行运行:用asyncio.gather同时启动两个任务,这样即使WS停发消息,HTTP请求的循环依然会正常执行
  • 优雅处理WS关闭:增加了WS关闭/错误的判断,当WS断开时,listen_websocket会退出循环,最终两个任务都会结束
  • 可控的HTTP请求间隔:给fetch_http_data加了asyncio.sleep,避免短时间内频繁发送HTTP请求,可根据业务需求调整间隔
  • 错误容错return_exceptions=True确保如果其中一个任务出错(比如HTTP请求失败),另一个任务不会被强制终止

额外优化建议

如果需要让两个协程之间共享数据(比如用WS消息的数据过滤HTTP请求),可以用asyncio.Queue实现协程间的安全通信,示例如下:

# 在ws_handler里创建队列
data_queue = asyncio.Queue()
# 然后把队列传给两个协程,listen_websocket把消息放进队列,fetch_http_data从队列取消息做处理

内容的提问来源于stack exchange,提问作者ChzChz

火山引擎 最新活动