You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python异步多进程带回调编程实现技术咨询

实现带回调的异步多进程任务处理方案

我来帮你理清这个思路——其实不用把asynciomultiprocessing搞得太复杂,multiprocessing.Pool本身就支持任务完成后的回调机制,再结合你的协程需求,我们可以轻松实现你要的效果:用5个工作进程并行处理100个任务,每个任务完成后立即触发回调,甚至能在回调里调用你的协程逻辑。

核心实现思路

  1. multiprocessing.Pool创建固定大小(5个)的进程池,它会自动管理进程的创建、复用和销毁。
  2. apply_async方法非阻塞提交任务,这个方法允许我们指定callback参数——当任务在子进程中完成并返回结果时,父进程会自动调用这个回调函数。
  3. 如果你的回调需要调用协程,只需在回调里把协程交给父进程的事件循环执行即可(因为回调运行在父进程的主线程中)。

完整代码示例

import multiprocessing
import asyncio
import time

# 你要并行执行的任务函数,替换成你的业务逻辑
def func(num):
    # 模拟耗时操作(CPU/IO密集型都可以)
    time.sleep(1)
    return f"任务 {num} 完成,计算结果: {num * 2}"

# 你现有的协程,用于异步处理任务结果
async def process_result(result):
    # 这里可以写你的异步逻辑:比如写入数据库、调用异步API、发送通知等
    print(f"[协程处理] 收到结果: {result}")
    await asyncio.sleep(0.1)  # 模拟异步操作耗时

# 进程池任务完成后触发的回调函数
def callback(result):
    # 确保能获取到父进程的事件循环(如果当前线程没有则创建)
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
    # 将协程加入事件循环异步执行
    loop.create_task(process_result(result))

if __name__ == "__main__":
    # 创建包含5个工作进程的进程池
    with multiprocessing.Pool(processes=5) as pool:
        # 批量提交100个任务
        for num in range(1, 101):
            pool.apply_async(func, args=(num,), callback=callback)
        
        # 关闭进程池,不再接受新任务
        pool.close()
        # 等待所有子进程的任务完成
        pool.join()
    
    # 确保所有异步回调的协程都执行完毕(避免主线程提前退出)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop)))

关键细节说明

  • 进程池管理:用with语句创建进程池会自动处理资源释放,不用手动关闭和销毁进程。
  • 非阻塞提交apply_async不会阻塞主线程,所以我们可以一次性提交所有100个任务,进程池会自动分配给5个工作进程并行处理。
  • 回调与协程结合:回调函数运行在父进程的主线程中,所以需要正确获取事件循环来执行协程。如果你的回调不需要协程,直接写同步逻辑即可(比如打印结果、写入文件)。
  • 序列化注意:任务函数的返回值必须是可被pickle序列化的(Python大部分内置类型都支持),否则进程间无法传递结果。

额外注意事项

  • 避免在回调函数里做耗时的同步操作,否则会阻塞父进程的主线程,影响其他回调的触发。如果有耗时操作,尽量放到协程或单独的线程中处理。
  • 如果你的任务是IO密集型,也可以考虑用concurrent.futures.ProcessPoolExecutor(和multiprocessing.Pool原理类似,API更现代),用法几乎一致。

内容的提问来源于stack exchange,提问作者Guldam Kwak

火山引擎 最新活动