You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

asyncio/aiohttp任务阻塞问题:无法同时运行消费者与生产者

生产者消费者同时运行问题的修复方案

我帮你排查出了代码里的两个核心错误,这才导致aiohttp服务器和worker无法同时运行,下面来一步步拆解问题并给出修复方案:

问题根源分析

1. 第一个代码的致命错误:on_startup 传入了函数执行结果而非函数引用

你在create_app()里写了:

app.on_startup.append(start_worker())

这里的start_worker()立即执行,而不是等到aiohttp启动时再触发。这不仅会让worker提前抢占事件循环,还会把start_worker()的返回值(None)塞进on_startup钩子列表里,干扰aiohttp的正常启动流程——这就是为什么你只看到running worker但服务器永远起不来的原因。

2. 第二个代码的错误:同步阻塞的事件循环调用

你的run函数里用了loop.run_forever(),这是个同步阻塞调用,一旦执行就会死死占用事件循环,导致asyncio.gather里的worker()完全没机会被调度执行,自然只会启动服务器而worker一动不动。

修复方案一:利用aiohttp启动钩子(最简单推荐)

这是最符合aiohttp设计规范的写法,通过官方的on_startup钩子正确启动worker:

import asyncio
from aiohttp import web

# 全局队列(也可以存在app对象里,避免全局变量)
queue = asyncio.Queue()

async def process_files(batch):
    # 这里替换成你的实际文件处理逻辑
    await asyncio.sleep(1)
    return [{item["fname"]: f"processed_{item['fname']}"} for item in batch]

async def worker():
    batch_size = 30
    print("running worker")
    while True:
        # 先等待队列出现第一个元素,无需手动判断qsize
        item = await queue.get()
        batch = [item]
        # 尽可能多拿元素,但不超过batch_size上限
        while len(batch) < batch_size and not queue.empty():
            batch.append(await queue.get())
        
        print("processing", batch)
        results = await process_files(batch)
        # 提前构建文件名和future的映射
        future_map = {item["fname"]: item["future"] for item in batch}
        for dic in results:
            for key, value in dic.items():
                print(f"{key}: {value}")
                future_map[key].set_result(value)
        # 标记所有队列任务完成
        for _ in batch:
            queue.task_done()

# 修改为async函数,接收app参数(aiohttp钩子要求)
async def start_worker(app):
    # 把worker任务存在app里,方便后续清理
    app["worker_task"] = asyncio.create_task(worker())

async def decode(request):
    # 示例解码接口逻辑,替换成你的实际处理
    data = await request.json()
    fname = data.get("fname")
    future = asyncio.get_event_loop().create_future()
    await queue.put({"fname": fname, "future": future})
    result = await future
    return web.json_response({"result": result})

def create_app():
    app = web.Application()
    routes = web.RouteTableDef()
    
    @routes.post("/decode")
    async def handle_post(request):
        return await decode(request)
    
    app.add_routes(routes)
    # 这里传入函数引用,而非调用函数!
    app.on_startup.append(start_worker)
    return app

if __name__ == '__main__':
    app = create_app()
    web.run_app(app)

关键修改点

  • start_worker改成async函数并接收app参数,符合aiohttp钩子的要求
  • app.on_startup.append(start_worker)传入函数本身,而非执行结果
  • 优化worker的批量获取逻辑,避免低效的qsize判断,更符合asyncio的异步设计

修复方案二:手动管理事件循环(适合精细控制场景)

如果需要更灵活地控制事件循环和任务生命周期,可以用这种写法:

import asyncio
from aiohttp import web

queue = asyncio.Queue()

async def process_files(batch):
    await asyncio.sleep(1)
    return [{item["fname"]: f"processed_{item['fname']}"} for item in batch]

async def worker():
    batch_size = 30
    print("running worker")
    while True:
        item = await queue.get()
        batch = [item]
        while len(batch) < batch_size and not queue.empty():
            batch.append(await queue.get())
        
        print("processing", batch)
        results = await process_files(batch)
        future_map = {item["fname"]: item["future"] for item in batch}
        for dic in results:
            for key, value in dic.items():
                print(f"{key}: {value}")
                future_map[key].set_result(value)
        for _ in batch:
            queue.task_done()

async def decode(request):
    data = await request.json()
    fname = data.get("fname")
    future = asyncio.get_event_loop().create_future()
    await queue.put({"fname": fname, "future": future})
    result = await future
    return web.json_response({"result": result})

def create_app():
    app = web.Application()
    routes = web.RouteTableDef()
    
    @routes.post("/decode")
    async def handle_post(request):
        return await decode(request)
    
    app.add_routes(routes)
    return app

async def main():
    app = create_app()
    # 启动worker任务
    worker_task = asyncio.create_task(worker())
    # 用异步方式启动aiohttp服务器,避免阻塞
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, '0.0.0.0', 8001)
    await site.start()
    print("Server started on http://0.0.0.0:8001")
    
    # 保持程序运行,直到收到中断信号
    await asyncio.Event().wait()
    
    # 优雅清理资源
    await runner.cleanup()
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        print("Worker task cancelled")

if __name__ == '__main__':
    asyncio.run(main())

关键修改点

  • 使用asyncio.run()管理事件循环(Python3.7+推荐写法)
  • 用异步的AppRunnerTCPSite启动服务器,避免同步阻塞调用
  • main函数中同时启动worker和服务器,保证两者都能被事件循环调度

内容的提问来源于stack exchange,提问作者Not a Doctor

火山引擎 最新活动