You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python 3.6.4中asyncio结合ThreadPoolExecutor运行报RuntimeError求助

解决Python 3.6.4中asyncio结合ThreadPoolExecutorRuntimeError问题

嘿,我看到你在Python 3.6.4里运行这段异步批量请求代码时碰到了RuntimeError,我来帮你分析问题并给出解决方案。

首先先还原一下你的代码:

import asyncio
import concurrent.futures
import requests

async def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                'http://example.org/'
            )
            for i in range(20)
        ]
        for response in await asyncio.gather(*futures):
            pass

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

问题原因

在Python 3.6.x的早期版本(比如3.6.4)中,asyncio的事件循环和ThreadPoolExecutor的上下文管理器之间存在一些交互bug:当你在协程内部使用with语句管理线程池时,即使await asyncio.gather(*futures)是在with块内部,线程池的shutdown()逻辑和事件循环的任务等待逻辑可能会出现时序冲突,导致触发RuntimeError(常见的是"Event loop is closed"或者任务被中断的错误)。

另外,requests.get是阻塞式IO操作,虽然用线程池来异步执行,但在3.6.4的事件循环调度逻辑下,线程池的关闭时机可能和事件循环的任务处理不同步。

解决方案

这里有两种靠谱的解决方式:

方案1:手动管理线程池生命周期,避免使用上下文管理器

把线程池的创建和关闭逻辑分开,确保所有异步任务都完成后再关闭线程池,这样能避免时序冲突:

import asyncio
import concurrent.futures
import requests

async def main():
    # 手动创建线程池
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=20)
    loop = asyncio.get_event_loop()
    # 生成所有请求任务
    futures = [
        loop.run_in_executor(
            executor, 
            requests.get, 
            'http://example.org/'
        )
        for i in range(20)
    ]
    # 等待所有任务完成
    responses = await asyncio.gather(*futures)
    for response in responses:
        print(f"请求状态码:{response.status_code}")  # 可选:查看请求结果
    # 所有任务完成后再关闭线程池
    executor.shutdown(wait=True)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

方案2:升级Python版本到3.7+

Python 3.7及以后的版本对asyncio模块做了大量的优化和bug修复,包括asyncio.run()这个更简洁的启动方式,以及修复了线程池和事件循环交互的问题。如果你能升级版本,直接用下面的代码会更简洁稳定:

import asyncio
import concurrent.futures
import requests

async def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        loop = asyncio.get_running_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                'http://example.org/'
            )
            for i in range(20)
        ]
        responses = await asyncio.gather(*futures)
        for response in responses:
            print(f"请求状态码:{response.status_code}")

asyncio.run(main())

额外提示

如果你的场景是批量HTTP请求,其实更推荐使用原生异步的HTTP客户端库,比如aiohttp,它比用线程池跑阻塞的requests更高效,也能避免线程池和事件循环的兼容问题。

内容的提问来源于stack exchange,提问作者Chan

火山引擎 最新活动