You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python异步函数被中断时如何实现最终资源清理?

在Python异步代码中实现类似Go defer的资源清理

你遇到的这个场景太真实了——异步协程被中途取消时,栈外分配的资源没机会被清理,这和Go里用defer确保资源一定会释放的需求完全匹配。Python里其实有几种成熟的方式实现类似的效果,下面结合你的示例一步步讲解:

1. 最直接的方案:try...finally

这是Python中最基础的资源保障手段,不管协程是正常执行完毕,还是被取消(会抛出asyncio.CancelledError),finally块内的代码一定会被执行

修改你的process_in_loop方法,把清理逻辑放到finally中:

async def process_in_loop(self, arg):
    my_input_id = self._next_input_id
    self._next_input_id += 1
    self._inputs[my_input_id] = arg
    try:
        await self._ready_event.wait()
    finally:
        # 无论协程是否被取消,都会执行这个清理操作
        del self._inputs[my_input_id]

测试修改后的代码,memory_leak场景的输出会变成:

Memory leak:
Buffer size: 0
Buffer size: 1
Buffer size: 0
Buffer size: 0

完美解决了内存泄漏问题。

2. 更优雅的复用方案:异步上下文管理器

如果你的资源清理逻辑需要在多个地方复用,可以用contextlib.asynccontextmanager封装成异步上下文管理器,让代码更简洁、易维护。

改造你的CallManager类:

from contextlib import asynccontextmanager
import asyncio

class CallManager:
    def __init__(self):
        self._inputs = {}
        self._next_input_id = 0
        self._ready_event = asyncio.Event()

    async def run_calculation_loop(self):
        while True:
            await asyncio.sleep(1)
            if self._inputs:
                print("Doing some calculation and preparing outputs")
                self._ready_event.set()

    @asynccontextmanager
    async def register_input(self, arg):
        # 分配资源
        my_input_id = self._next_input_id
        self._next_input_id += 1
        self._inputs[my_input_id] = arg
        try:
            yield my_input_id  # 可以返回资源标识给调用方
        finally:
            # 释放资源
            del self._inputs[my_input_id]

    async def process_in_loop(self, arg):
        # 使用async with自动管理资源
        async with self.register_input(arg):
            await self._ready_event.wait()

这样process_in_loop的逻辑变得更清晰,资源的分配和释放被封装在上下文管理器中,后续其他需要注册输入的方法也可以直接复用这个逻辑。

3. 协程外部清理:任务回调(不推荐作为首选)

如果需要在协程外部处理资源清理,可以给asyncio.Task添加add_done_callback回调,当任务完成(无论成功或取消)时执行清理。不过这种方式需要额外处理资源标识的传递,逻辑相对复杂,一般只在特殊场景下使用:

async def process_in_loop(self, arg):
    my_input_id = self._next_input_id
    self._next_input_id += 1
    self._inputs[my_input_id] = arg
    try:
        await self._ready_event.wait()
        return my_input_id  # 返回资源标识给回调
    except asyncio.CancelledError:
        raise  # 重新抛出取消异常,不影响任务状态

# 定义清理回调
def cleanup_input(task, cm):
    try:
        input_id = task.result()
    except asyncio.CancelledError:
        # 如果协程被取消,这里需要额外处理资源查找(比如通过arg匹配),比较麻烦
        # 所以这种方式不如try...finally直接
        for idx, val in cm._inputs.items():
            if val == "data":
                input_id = idx
                break
    if input_id in cm._inputs:
        del cm._inputs[input_id]

# 使用时添加回调
async def memory_leak():
    cm = CallManager()
    calculation_loop = asyncio.create_task(cm.run_calculation_loop())
    print("Buffer size:", len(cm._inputs))
    mytask = asyncio.create_task(cm.process_in_loop("data"))
    mytask.add_done_callback(lambda t: cleanup_input(t, cm))
    await asyncio.sleep(0.1)
    print("Buffer size:", len(cm._inputs))
    mytask.cancel()
    print("Buffer size:", len(cm._inputs))
    calculation_loop.cancel()
    print("Buffer size:", len(cm._inputs))

总结

  • 优先选择**try...finally块**:简单直接,适合大多数场景;
  • 需要复用逻辑时用异步上下文管理器:代码更优雅,可维护性更高;
  • 任务回调仅作为特殊场景的补充,因为它需要处理更多异常和资源匹配的问题。

内容的提问来源于stack exchange,提问作者Chr1s

火山引擎 最新活动