Python异步函数被中断时如何实现最终资源清理?
在Python异步代码中实现类似Go
defer的资源清理 你遇到的这个场景太真实了——异步协程被中途取消时,栈外分配的资源没机会被清理,这和Go里用defer确保资源一定会释放的需求完全匹配。Python里其实有几种成熟的方式实现类似的效果,下面结合你的示例一步步讲解:
1. 最直接的方案:try...finally块
这是Python中最基础的资源保障手段,不管协程是正常执行完毕,还是被取消(会抛出asyncio.CancelledError),finally块内的代码一定会被执行。
修改你的process_in_loop方法,把清理逻辑放到finally中:
async def process_in_loop(self, arg): my_input_id = self._next_input_id self._next_input_id += 1 self._inputs[my_input_id] = arg try: await self._ready_event.wait() finally: # 无论协程是否被取消,都会执行这个清理操作 del self._inputs[my_input_id]
测试修改后的代码,memory_leak场景的输出会变成:
Memory leak: Buffer size: 0 Buffer size: 1 Buffer size: 0 Buffer size: 0
完美解决了内存泄漏问题。
2. 更优雅的复用方案:异步上下文管理器
如果你的资源清理逻辑需要在多个地方复用,可以用contextlib.asynccontextmanager封装成异步上下文管理器,让代码更简洁、易维护。
改造你的CallManager类:
from contextlib import asynccontextmanager import asyncio class CallManager: def __init__(self): self._inputs = {} self._next_input_id = 0 self._ready_event = asyncio.Event() async def run_calculation_loop(self): while True: await asyncio.sleep(1) if self._inputs: print("Doing some calculation and preparing outputs") self._ready_event.set() @asynccontextmanager async def register_input(self, arg): # 分配资源 my_input_id = self._next_input_id self._next_input_id += 1 self._inputs[my_input_id] = arg try: yield my_input_id # 可以返回资源标识给调用方 finally: # 释放资源 del self._inputs[my_input_id] async def process_in_loop(self, arg): # 使用async with自动管理资源 async with self.register_input(arg): await self._ready_event.wait()
这样process_in_loop的逻辑变得更清晰,资源的分配和释放被封装在上下文管理器中,后续其他需要注册输入的方法也可以直接复用这个逻辑。
3. 协程外部清理:任务回调(不推荐作为首选)
如果需要在协程外部处理资源清理,可以给asyncio.Task添加add_done_callback回调,当任务完成(无论成功或取消)时执行清理。不过这种方式需要额外处理资源标识的传递,逻辑相对复杂,一般只在特殊场景下使用:
async def process_in_loop(self, arg): my_input_id = self._next_input_id self._next_input_id += 1 self._inputs[my_input_id] = arg try: await self._ready_event.wait() return my_input_id # 返回资源标识给回调 except asyncio.CancelledError: raise # 重新抛出取消异常,不影响任务状态 # 定义清理回调 def cleanup_input(task, cm): try: input_id = task.result() except asyncio.CancelledError: # 如果协程被取消,这里需要额外处理资源查找(比如通过arg匹配),比较麻烦 # 所以这种方式不如try...finally直接 for idx, val in cm._inputs.items(): if val == "data": input_id = idx break if input_id in cm._inputs: del cm._inputs[input_id] # 使用时添加回调 async def memory_leak(): cm = CallManager() calculation_loop = asyncio.create_task(cm.run_calculation_loop()) print("Buffer size:", len(cm._inputs)) mytask = asyncio.create_task(cm.process_in_loop("data")) mytask.add_done_callback(lambda t: cleanup_input(t, cm)) await asyncio.sleep(0.1) print("Buffer size:", len(cm._inputs)) mytask.cancel() print("Buffer size:", len(cm._inputs)) calculation_loop.cancel() print("Buffer size:", len(cm._inputs))
总结
- 优先选择**
try...finally块**:简单直接,适合大多数场景; - 需要复用逻辑时用异步上下文管理器:代码更优雅,可维护性更高;
- 任务回调仅作为特殊场景的补充,因为它需要处理更多异常和资源匹配的问题。
内容的提问来源于stack exchange,提问作者Chr1s




