asyncio/aiohttp任务阻塞问题:无法同时运行消费者与生产者
生产者消费者同时运行问题的修复方案
我帮你排查出了代码里的两个核心错误,这才导致aiohttp服务器和worker无法同时运行,下面来一步步拆解问题并给出修复方案:
问题根源分析
1. 第一个代码的致命错误:on_startup 传入了函数执行结果而非函数引用
你在create_app()里写了:
app.on_startup.append(start_worker())
这里的start_worker()会立即执行,而不是等到aiohttp启动时再触发。这不仅会让worker提前抢占事件循环,还会把start_worker()的返回值(None)塞进on_startup钩子列表里,干扰aiohttp的正常启动流程——这就是为什么你只看到running worker但服务器永远起不来的原因。
2. 第二个代码的错误:同步阻塞的事件循环调用
你的run函数里用了loop.run_forever(),这是个同步阻塞调用,一旦执行就会死死占用事件循环,导致asyncio.gather里的worker()完全没机会被调度执行,自然只会启动服务器而worker一动不动。
修复方案一:利用aiohttp启动钩子(最简单推荐)
这是最符合aiohttp设计规范的写法,通过官方的on_startup钩子正确启动worker:
import asyncio from aiohttp import web # 全局队列(也可以存在app对象里,避免全局变量) queue = asyncio.Queue() async def process_files(batch): # 这里替换成你的实际文件处理逻辑 await asyncio.sleep(1) return [{item["fname"]: f"processed_{item['fname']}"} for item in batch] async def worker(): batch_size = 30 print("running worker") while True: # 先等待队列出现第一个元素,无需手动判断qsize item = await queue.get() batch = [item] # 尽可能多拿元素,但不超过batch_size上限 while len(batch) < batch_size and not queue.empty(): batch.append(await queue.get()) print("processing", batch) results = await process_files(batch) # 提前构建文件名和future的映射 future_map = {item["fname"]: item["future"] for item in batch} for dic in results: for key, value in dic.items(): print(f"{key}: {value}") future_map[key].set_result(value) # 标记所有队列任务完成 for _ in batch: queue.task_done() # 修改为async函数,接收app参数(aiohttp钩子要求) async def start_worker(app): # 把worker任务存在app里,方便后续清理 app["worker_task"] = asyncio.create_task(worker()) async def decode(request): # 示例解码接口逻辑,替换成你的实际处理 data = await request.json() fname = data.get("fname") future = asyncio.get_event_loop().create_future() await queue.put({"fname": fname, "future": future}) result = await future return web.json_response({"result": result}) def create_app(): app = web.Application() routes = web.RouteTableDef() @routes.post("/decode") async def handle_post(request): return await decode(request) app.add_routes(routes) # 这里传入函数引用,而非调用函数! app.on_startup.append(start_worker) return app if __name__ == '__main__': app = create_app() web.run_app(app)
关键修改点
- 把
start_worker改成async函数并接收app参数,符合aiohttp钩子的要求 app.on_startup.append(start_worker)传入函数本身,而非执行结果- 优化worker的批量获取逻辑,避免低效的qsize判断,更符合asyncio的异步设计
修复方案二:手动管理事件循环(适合精细控制场景)
如果需要更灵活地控制事件循环和任务生命周期,可以用这种写法:
import asyncio from aiohttp import web queue = asyncio.Queue() async def process_files(batch): await asyncio.sleep(1) return [{item["fname"]: f"processed_{item['fname']}"} for item in batch] async def worker(): batch_size = 30 print("running worker") while True: item = await queue.get() batch = [item] while len(batch) < batch_size and not queue.empty(): batch.append(await queue.get()) print("processing", batch) results = await process_files(batch) future_map = {item["fname"]: item["future"] for item in batch} for dic in results: for key, value in dic.items(): print(f"{key}: {value}") future_map[key].set_result(value) for _ in batch: queue.task_done() async def decode(request): data = await request.json() fname = data.get("fname") future = asyncio.get_event_loop().create_future() await queue.put({"fname": fname, "future": future}) result = await future return web.json_response({"result": result}) def create_app(): app = web.Application() routes = web.RouteTableDef() @routes.post("/decode") async def handle_post(request): return await decode(request) app.add_routes(routes) return app async def main(): app = create_app() # 启动worker任务 worker_task = asyncio.create_task(worker()) # 用异步方式启动aiohttp服务器,避免阻塞 runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, '0.0.0.0', 8001) await site.start() print("Server started on http://0.0.0.0:8001") # 保持程序运行,直到收到中断信号 await asyncio.Event().wait() # 优雅清理资源 await runner.cleanup() worker_task.cancel() try: await worker_task except asyncio.CancelledError: print("Worker task cancelled") if __name__ == '__main__': asyncio.run(main())
关键修改点
- 使用
asyncio.run()管理事件循环(Python3.7+推荐写法) - 用异步的
AppRunner和TCPSite启动服务器,避免同步阻塞调用 - 在
main函数中同时启动worker和服务器,保证两者都能被事件循环调度
内容的提问来源于stack exchange,提问作者Not a Doctor




