You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何将异步与并行处理结合,解决ML任务中CPU密集型代码的瓶颈问题?

如何将异步与并行处理结合,解决ML任务中CPU密集型代码的瓶颈问题?

你的观察完全正确!asyncio 本身是单线程运行的,所以哪怕你用了异步并发,CPU 密集型的代码还是会把整个事件循环卡住,只能用到一个核心——这就是你看到 htop 里只有一个 CPU 跑满的原因。

你之前尝试的 ThreadPoolExecutorasyncio.to_thread 这些工具其实方向是对的,但问题在于:它们是用来跑同步函数的,而你可能误把整个异步的 run_agent 丢进去了。正确的做法是把 run_agent 里的 CPU 密集型同步代码单独抽出来,放到线程/进程池里执行,让 asyncio 事件循环在等待 CPU 任务完成的间隙,继续处理其他异步任务(比如 OpenAI API 调用)。

具体解决方案分两种情况:

1. 如果你的CPU密集代码是调用C扩展(比如PyTorch、NumPy等)

这类库会自动释放GIL(全局解释器锁),用线程池就能充分利用多核。你可以用 asyncio.to_thread(Python 3.9+)或者 ThreadPoolExecutor 来包装同步代码:

首先把CPU密集部分抽成单独的同步函数:

def process_local_cpu_task(cfg, task_sample):
    # 这里放原来run_agent里的CPU密集同步代码
    # 比如本地模型推理、执行本地计算型代码等
    result = ...
    return result

然后修改 run_agent,把CPU部分异步化:

async def run_agent(cfg, task_sample, output_dir):
    # 保留原来的异步API调用部分(IO密集,适合asyncio)
    await call_openai_api(cfg, task_sample)
    await call_local_api(cfg, task_sample)
    
    # 把CPU密集任务丢到线程池
    cpu_result = await asyncio.to_thread(process_local_cpu_task, cfg, task_sample)
    
    # 后续处理(比如保存结果)
    save_result(cpu_result, output_dir)

2. 如果你的CPU密集代码是纯Python(无C扩展)

纯Python代码受GIL限制,线程池无法利用多核,这时候得用进程池

同样先抽出同步CPU任务,然后用 ProcessPoolExecutor 配合 asyncio.run_in_executor

def process_local_cpu_task(cfg, task_sample):
    # 纯Python CPU密集代码
    result = ...
    return result

async def run_agent(cfg, task_sample, output_dir):
    # 异步API调用部分不变
    await call_openai_api(cfg, task_sample)
    await call_local_api(cfg, task_sample)
    
    # 获取当前事件循环
    loop = asyncio.get_running_loop()
    # 用进程池执行CPU任务,max_workers可根据核心数调整
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as pool:
        cpu_result = await loop.run_in_executor(pool, process_local_cpu_task, cfg, task_sample)
    
    # 保存结果
    save_result(cpu_result, output_dir)

调整并发控制

原来的 semaphore 是控制异步任务的数量,现在结合线程/进程池后,建议调整 concurrency_limit 和线程/进程池的 max_workers

  • 如果用线程池:semaphore 可以设为线程数的1.5-2倍,因为有些任务在等待IO,有些在跑CPU,避免资源闲置。
  • 如果用进程池:semaphore 最好和进程数保持一致,避免进程过载。

你的原代码修改后示例

import os
import asyncio
from tqdm import tqdm

async def run_task(...):
    ...
    # 可以根据CPU核心数动态调整并发限制
    semaphore = asyncio.Semaphore(min(cfg.concurrency_limit, os.cpu_count()))

    async def run_single_sample(task_sample: TaskSample):
        async with semaphore:
            await run_agent(cfg, task_sample, cfg.output_dir / task.value)

    samples = [run_single_sample(task_sample) for task_sample in sliced_samples]
    await tqdm.gather(*samples, desc=f"Task: {task.value}")

这样修改后,asyncio 负责处理IO密集的API调用(不浪费等待时间),线程/进程池负责处理CPU密集任务(利用多核),就能同时发挥异步和并行的优势,解决CPU瓶颈问题。

备注:内容来源于stack exchange,提问作者Leon0402

火山引擎 最新活动