如何将异步与并行处理结合,解决ML任务中CPU密集型代码的瓶颈问题?
你的观察完全正确!asyncio 本身是单线程运行的,所以哪怕你用了异步并发,CPU 密集型的代码还是会把整个事件循环卡住,只能用到一个核心——这就是你看到 htop 里只有一个 CPU 跑满的原因。
你之前尝试的 ThreadPoolExecutor、asyncio.to_thread 这些工具其实方向是对的,但问题在于:它们是用来跑同步函数的,而你可能误把整个异步的 run_agent 丢进去了。正确的做法是把 run_agent 里的 CPU 密集型同步代码单独抽出来,放到线程/进程池里执行,让 asyncio 事件循环在等待 CPU 任务完成的间隙,继续处理其他异步任务(比如 OpenAI API 调用)。
具体解决方案分两种情况:
1. 如果你的CPU密集代码是调用C扩展(比如PyTorch、NumPy等)
这类库会自动释放GIL(全局解释器锁),用线程池就能充分利用多核。你可以用 asyncio.to_thread(Python 3.9+)或者 ThreadPoolExecutor 来包装同步代码:
首先把CPU密集部分抽成单独的同步函数:
def process_local_cpu_task(cfg, task_sample): # 这里放原来run_agent里的CPU密集同步代码 # 比如本地模型推理、执行本地计算型代码等 result = ... return result
然后修改 run_agent,把CPU部分异步化:
async def run_agent(cfg, task_sample, output_dir): # 保留原来的异步API调用部分(IO密集,适合asyncio) await call_openai_api(cfg, task_sample) await call_local_api(cfg, task_sample) # 把CPU密集任务丢到线程池 cpu_result = await asyncio.to_thread(process_local_cpu_task, cfg, task_sample) # 后续处理(比如保存结果) save_result(cpu_result, output_dir)
2. 如果你的CPU密集代码是纯Python(无C扩展)
纯Python代码受GIL限制,线程池无法利用多核,这时候得用进程池:
同样先抽出同步CPU任务,然后用 ProcessPoolExecutor 配合 asyncio.run_in_executor:
def process_local_cpu_task(cfg, task_sample): # 纯Python CPU密集代码 result = ... return result async def run_agent(cfg, task_sample, output_dir): # 异步API调用部分不变 await call_openai_api(cfg, task_sample) await call_local_api(cfg, task_sample) # 获取当前事件循环 loop = asyncio.get_running_loop() # 用进程池执行CPU任务,max_workers可根据核心数调整 with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool: cpu_result = await loop.run_in_executor(pool, process_local_cpu_task, cfg, task_sample) # 保存结果 save_result(cpu_result, output_dir)
调整并发控制
原来的 semaphore 是控制异步任务的数量,现在结合线程/进程池后,建议调整 concurrency_limit 和线程/进程池的 max_workers:
- 如果用线程池:semaphore 可以设为线程数的1.5-2倍,因为有些任务在等待IO,有些在跑CPU,避免资源闲置。
- 如果用进程池:semaphore 最好和进程数保持一致,避免进程过载。
你的原代码修改后示例
import os import asyncio from tqdm import tqdm async def run_task(...): ... # 可以根据CPU核心数动态调整并发限制 semaphore = asyncio.Semaphore(min(cfg.concurrency_limit, os.cpu_count())) async def run_single_sample(task_sample: TaskSample): async with semaphore: await run_agent(cfg, task_sample, cfg.output_dir / task.value) samples = [run_single_sample(task_sample) for task_sample in sliced_samples] await tqdm.gather(*samples, desc=f"Task: {task.value}")
这样修改后,asyncio 负责处理IO密集的API调用(不浪费等待时间),线程/进程池负责处理CPU密集任务(利用多核),就能同时发挥异步和并行的优势,解决CPU瓶颈问题。
备注:内容来源于stack exchange,提问作者Leon0402




