Python异步多进程带回调编程实现技术咨询
实现带回调的异步多进程任务处理方案
我来帮你理清这个思路——其实不用把asyncio和multiprocessing搞得太复杂,multiprocessing.Pool本身就支持任务完成后的回调机制,再结合你的协程需求,我们可以轻松实现你要的效果:用5个工作进程并行处理100个任务,每个任务完成后立即触发回调,甚至能在回调里调用你的协程逻辑。
核心实现思路
- 用
multiprocessing.Pool创建固定大小(5个)的进程池,它会自动管理进程的创建、复用和销毁。 - 用
apply_async方法非阻塞提交任务,这个方法允许我们指定callback参数——当任务在子进程中完成并返回结果时,父进程会自动调用这个回调函数。 - 如果你的回调需要调用协程,只需在回调里把协程交给父进程的事件循环执行即可(因为回调运行在父进程的主线程中)。
完整代码示例
import multiprocessing import asyncio import time # 你要并行执行的任务函数,替换成你的业务逻辑 def func(num): # 模拟耗时操作(CPU/IO密集型都可以) time.sleep(1) return f"任务 {num} 完成,计算结果: {num * 2}" # 你现有的协程,用于异步处理任务结果 async def process_result(result): # 这里可以写你的异步逻辑:比如写入数据库、调用异步API、发送通知等 print(f"[协程处理] 收到结果: {result}") await asyncio.sleep(0.1) # 模拟异步操作耗时 # 进程池任务完成后触发的回调函数 def callback(result): # 确保能获取到父进程的事件循环(如果当前线程没有则创建) try: loop = asyncio.get_running_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 将协程加入事件循环异步执行 loop.create_task(process_result(result)) if __name__ == "__main__": # 创建包含5个工作进程的进程池 with multiprocessing.Pool(processes=5) as pool: # 批量提交100个任务 for num in range(1, 101): pool.apply_async(func, args=(num,), callback=callback) # 关闭进程池,不再接受新任务 pool.close() # 等待所有子进程的任务完成 pool.join() # 确保所有异步回调的协程都执行完毕(避免主线程提前退出) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop)))
关键细节说明
- 进程池管理:用
with语句创建进程池会自动处理资源释放,不用手动关闭和销毁进程。 - 非阻塞提交:
apply_async不会阻塞主线程,所以我们可以一次性提交所有100个任务,进程池会自动分配给5个工作进程并行处理。 - 回调与协程结合:回调函数运行在父进程的主线程中,所以需要正确获取事件循环来执行协程。如果你的回调不需要协程,直接写同步逻辑即可(比如打印结果、写入文件)。
- 序列化注意:任务函数的返回值必须是可被
pickle序列化的(Python大部分内置类型都支持),否则进程间无法传递结果。
额外注意事项
- 避免在回调函数里做耗时的同步操作,否则会阻塞父进程的主线程,影响其他回调的触发。如果有耗时操作,尽量放到协程或单独的线程中处理。
- 如果你的任务是IO密集型,也可以考虑用
concurrent.futures.ProcessPoolExecutor(和multiprocessing.Pool原理类似,API更现代),用法几乎一致。
内容的提问来源于stack exchange,提问作者Guldam Kwak




