You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何在WebSocket收到停止请求时立即终止图像生成函数?

如何在WebSocket收到停止请求时立即终止图像生成函数?

要解决这个问题,我们需要利用Python异步编程的任务取消机制,结合事件驱动的方式监听停止请求,避免等待图像生成完全完成后才响应停止指令。以下是专业的实现方案:

核心思路

异步Python中,要中途终止一个异步任务,关键是让生成任务能响应取消信号,同时通过事件驱动的方式监听WebSocket的停止请求,替代轮询sleep的低效方案。具体步骤:

  1. asyncio.Event替代/配合原有的stop_requested布尔值,实现事件驱动的停止通知;
  2. 将图像生成操作包装为asyncio.Task
  3. asyncio.wait同时等待「生成任务完成」或「停止事件触发」,优先处理先发生的事件;
  4. 收到停止信号时,立即取消生成任务并清理资源。

具体实现步骤

1. 改造WebSocket连接对象,添加停止事件

首先,在你的WebSocket连接实例中添加一个asyncio.Event,用于触发停止信号(事件可以被异步等待,比单纯的布尔值更适合异步场景):

import asyncio

class WebSocketConnection:
    def __init__(self):
        self.stop_requested = False
        self.stop_event = asyncio.Event()  # 新增停止事件
    
    # 当WebSocket收到停止请求时调用此方法
    async def on_stop_request(self):
        self.stop_requested = True
        self.stop_event.set()  # 触发停止事件,通知所有等待该事件的任务

2. 修改图像生成的处理流程

在你的process方法中,不再直接等待生成完成,而是将生成操作包装为任务,同时监听停止事件:

class ImageGeneration(BaseAIGeneration):
    async def process(self, websocket, prompt):
        # 提前检查停止状态
        if websocket.stop_requested:
            return None
        await super().process(websocket, prompt)
        if websocket.stop_requested:
            return None

        # 1. 将图像生成包装为异步任务
        gen_task = asyncio.create_task(
            self.generate_image(prompt=prompt, model=self.model, size=size)
        )

        try:
            # 2. 等待两个事件:生成完成 或 停止信号触发,哪个先发生就处理哪个
            done, pending = await asyncio.wait(
                [gen_task, websocket.stop_event.wait()],
                return_when=asyncio.FIRST_COMPLETED
            )

            # 3. 如果是停止事件先触发,立即取消生成任务
            if websocket.stop_event.is_set():
                gen_task.cancel()
                try:
                    # 等待任务取消完成,避免残留未处理的异常
                    await gen_task
                except asyncio.CancelledError:
                    print("图像生成任务已被成功终止")
                return None

            # 4. 如果生成任务正常完成,处理结果
            response = gen_task.result()
            if not response or not isinstance(response, list):
                return None
            return response[0].url

        except Exception as e:
            print(f"处理图像生成时出错: {str(e)}")
            return None

3. 确保生成函数能响应取消

你的generate_image函数本身不需要大改,只要第三方的client.images.generate是标准的异步函数(会让出CPU执行权),当任务被取消时,它会自动抛出CancelledError并终止执行:

async def generate_image(
    prompt: str, 
    model: str, 
    size: str = "1024x1024"
):
    try:
        response = await client.images.generate(
            model=model,
            prompt=prompt,
            size=size,
            n=1
        )
        return response.data
    except asyncio.CancelledError:
        # 可选:在这里添加资源清理逻辑(比如关闭临时网络连接)
        print("生成过程被取消,执行资源清理")
        raise  # 必须重新抛出异常,让上层任务正确处理取消事件

为什么这个方案更优?

  1. 事件驱动而非轮询:用asyncio.Event替代轮询stop_requested,避免了固定间隔sleep的性能损耗,更符合异步编程的最佳实践;
  2. 即时响应停止:一旦停止事件触发,立即取消生成任务,不需要等待生成操作完成;
  3. 优雅的错误处理:通过捕获CancelledError确保资源被正确清理,避免内存泄漏或残留未处理的任务。

特殊情况处理

如果你的第三方图像生成库的generate方法是同步阻塞的(比如用run_in_executor包装的同步调用),那么任务取消可能无法立即生效,因为同步操作不会让出CPU。这种情况下,你需要:

  1. 优先更换为支持异步取消的图像生成库;
  2. 或者将同步生成操作放在线程中,使用线程安全的停止标志(比如threading.Event),但线程终止在Python中是不安全的,需谨慎使用。

备注:内容来源于stack exchange,提问作者M S

火山引擎 最新活动