You need to enable JavaScript to run this app.
优惠活动
大模型
产品
解决方案
定价
更多
文档控制台
免费开始使用

如何并行执行命令并终止GPU监控进程以返回峰值内存数据?

并行渲染与GPU内存监控实现方案

问题背景

我有两个函数,分别用于执行渲染命令并计时、监控GPU峰值内存,需要并行运行它们:

def time_render(filename: str) -> str:
    # Run the render and time the operation
    # We expect the full path for the file
    command_to_run = '{} -b {} -f 10 -- --cycles-device CUDA'.format(
        render_executable, filename)
    cmd = subprocess.Popen(command_to_run, shell=True, stdout=subprocess.PIPE)
    elapsed_time_str = '-1'  # if you see this output, something bad happened
    for line in cmd.stdout:
        decoded_line = line.decode().strip()
        if "Time: " in decoded_line and "(Saving: " in decoded_line:
            start_index = decoded_line.find(":")
            end_index = decoded_line.find("(")

            # this is the line we want with the render time
            elapsed_time_str = convert_to_seconds(
                decoded_line[start_index+1:end_index].strip())

    return elapsed_time_str

def get_peak_GPU_mem() -> dict:
    # Monitor the memory usage on the GPU and
    # record the peak
    peak_mem = {}
    command_to_run = 'nvidia-smi --query-gpu=gpu_bus_id,memory.used --format=csv -l 1'
    cmd = subprocess.Popen(command_to_run, shell=True, stdout=subprocess.PIPE)
    for line in cmd.stdout:
        decoded_line = line.decode().strip()

        # The output of this command will usually look like this:
        # 00000000:2D:00.0, 101 MiB <-- One GPU
        # 00000000:99:00.0, 11 MiB <-- Another GPU
        # Each GPU will have a unique PCI ID
        split_tokens = decoded_line.split(',')
        gpu_id = split_tokens[0].strip()
        curr_mem = int(split_tokens[1].strip().split()[0])
        if gpu_id not in peak_mem:
            peak_mem[gpu_id] = curr_mem
        else:
            # we have a previous entry, compare the entries
            if curr_mem > peak_mem[gpu_id]:
                peak_mem[gpu_id] = curr_mem

核心需求:

  • 并行运行渲染计时与GPU内存监控两个任务
  • get_peak_GPU_mem调用的nvidia-smi -l 1是持续输出的命令,不会自动终止,需在渲染任务完成后终止监控进程,并返回已统计的峰值内存字典

实现方案

使用multiprocessing模块实现并行,通过共享字典传递GPU内存数据,队列传递渲染计时结果,确保渲染完成后终止监控进程并获取数据。

1. 优化函数逻辑

修正GPU监控函数(支持共享字典)

将监控结果存储到进程间共享的字典中,同时避免shell=True的安全风险,添加异常处理确保子进程正确终止:

def get_peak_GPU_mem(peak_mem: dict):
    # 监控GPU内存峰值,结果写入共享字典
    command_to_run = [
        'nvidia-smi',
        '--query-gpu=gpu_bus_id,memory.used',
        '--format=csv',
        '-l', '1'
    ]
    cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True)
    try:
        for line in cmd.stdout:
            decoded_line = line.strip()
            # 跳过表头行
            if decoded_line.startswith('gpu_bus_id'):
                continue
            split_tokens = decoded_line.split(',')
            if len(split_tokens) != 2:
                continue
            gpu_id = split_tokens[0].strip()
            try:
                curr_mem = int(split_tokens[1].strip().split()[0])
            except (ValueError, IndexError):
                continue
            # 更新峰值内存
            if gpu_id not in peak_mem or curr_mem > peak_mem[gpu_id]:
                peak_mem[gpu_id] = curr_mem
    except (IOError, OSError):
        # 进程被终止时捕获IO异常,正常退出
        pass
    finally:
        # 确保nvidia-smi子进程被终止
        if cmd.poll() is None:
            cmd.terminate()
            cmd.wait()

优化渲染计时函数(支持结果队列)

改用列表传递命令参数,添加异常处理,通过队列返回计时结果:

def time_render(filename: str, result_queue):
    # 执行渲染并计时,结果写入队列
    command_to_run = [
        render_executable,
        '-b', filename,
        '-f', '10',
        '--', '--cycles-device', 'CUDA'
    ]
    elapsed_time_str = '-1'
    cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True)
    try:
        for line in cmd.stdout:
            decoded_line = line.strip()
            if "Time: " in decoded_line and "(Saving: " in decoded_line:
                start_index = decoded_line.find(":")
                end_index = decoded_line.find("(")
                if start_index != -1 and end_index != -1:
                    elapsed_time_str = convert_to_seconds(
                        decoded_line[start_index+1:end_index].strip())
        # 等待渲染进程完成
        cmd.wait()
    except Exception as e:
        print(f"渲染出错: {e}")
        if cmd.poll() is None:
            cmd.terminate()
            cmd.wait()
    # 将结果写入队列
    result_queue.put(elapsed_time_str)

2. 主进程控制逻辑

通过multiprocessing.Manager创建共享字典和队列,启动两个子进程,等待渲染完成后终止监控进程并获取结果:

import multiprocessing

def main():
    filename = "path/to/your/render_file.blend"  # 替换为实际文件路径
    
    with multiprocessing.Manager() as manager:
        # 创建进程间共享的字典和队列
        peak_mem_dict = manager.dict()
        render_result_queue = manager.Queue()
        
        # 创建子进程
        render_process = multiprocessing.Process(
            target=time_render,
            args=(filename, render_result_queue)
        )
        gpu_monitor_process = multiprocessing.Process(
            target=get_peak_GPU_mem,
            args=(peak_mem_dict,)
        )
        
        # 启动进程(先启动监控,确保渲染开始前已开始监控)
        gpu_monitor_process.start()
        render_process.start()
        
        # 等待渲染进程完成
        render_process.join()
        
        # 终止GPU监控进程并等待其退出
        if gpu_monitor_process.is_alive():
            gpu_monitor_process.terminate()
            gpu_monitor_process.join()
        
        # 获取结果
        render_time = render_result_queue.get()
        peak_mem = dict(peak_mem_dict)
        
        print(f"渲染耗时: {render_time} 秒")
        print(f"GPU峰值内存: {peak_mem}")

if __name__ == "__main__":
    render_executable = "blender"  # 替换为实际渲染程序路径
    main()

更优代码组织方式

将渲染与监控逻辑封装为类,提高代码可维护性和复用性:

import multiprocessing
import subprocess

class RenderGPUMonitor:
    def __init__(self, render_executable):
        self.render_executable = render_executable
        self.render_time = None
        self.peak_gpu_mem = None
    
    def _render_and_time(self, filename, result_queue):
        # 内部渲染计时逻辑
        elapsed_time_str = '-1'
        command_to_run = [
            self.render_executable,
            '-b', filename,
            '-f', '10',
            '--', '--cycles-device', 'CUDA'
        ]
        cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True)
        try:
            for line in cmd.stdout:
                decoded_line = line.strip()
                if "Time: " in decoded_line and "(Saving: " in decoded_line:
                    start_idx = decoded_line.find(":")
                    end_idx = decoded_line.find("(")
                    if start_idx != -1 and end_idx != -1:
                        elapsed_time_str = convert_to_seconds(
                            decoded_line[start_idx+1:end_idx].strip())
            cmd.wait()
        except Exception as e:
            print(f"渲染异常: {e}")
            if cmd.poll() is None:
                cmd.terminate()
                cmd.wait()
        result_queue.put(elapsed_time_str)
    
    def _monitor_gpu_peak(self, peak_mem_dict):
        # 内部GPU监控逻辑
        command_to_run = [
            'nvidia-smi',
            '--query-gpu=gpu_bus_id,memory.used',
            '--format=csv',
            '-l', '1'
        ]
        cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True)
        try:
            for line in cmd.stdout:
                decoded_line = line.strip()
                if decoded_line.startswith('gpu_bus_id'):
                    continue
                split_tokens = decoded_line.split(',')
                if len(split_tokens) != 2:
                    continue
                gpu_id = split_tokens[0].strip()
                try:
                    curr_mem = int(split_tokens[1].strip().split()[0])
                except (ValueError, IndexError):
                    continue
                if gpu_id not in peak_mem_dict or curr_mem > peak_mem_dict[gpu_id]:
                    peak_mem_dict[gpu_id] = curr_mem
        except (IOError, OSError):
            pass
        finally:
            if cmd.poll() is None:
                cmd.terminate()
                cmd.wait()
    
    def run(self, filename):
        # 对外暴露的执行方法
        with multiprocessing.Manager() as manager:
            peak_mem_dict = manager.dict()
            result_queue = manager.Queue()
            
            render_proc = multiprocessing.Process(
                target=self._render_and_time,
                args=(filename, result_queue)
            )
            gpu_proc = multiprocessing.Process(
                target=self._monitor_gpu_peak,
                args=(peak_mem_dict,)
            )
            
            gpu_proc.start()
            render_proc.start()
            
            render_proc.join()
            self.render_time = result_queue.get()
            
            if gpu_proc.is_alive():
                gpu_proc.terminate()
                gpu_proc.join()
            
            self.peak_gpu_mem = dict(peak_mem_dict)
            return self.render_time, self.peak_gpu_mem

# 使用示例
def convert_to_seconds(time_str):
    # 实现时间字符串转秒的逻辑,比如"00:01:30"转为90
    h, m, s = time_str.split(':')
    return int(h)*3600 + int(m)*60 + int(s)

if __name__ == "__main__":
    monitor = RenderGPUMonitor(render_executable="blender")
    render_time, peak_mem = monitor.run("path/to/your/file.blend")
    print(f"渲染完成,耗时: {render_time}秒")
    print("各GPU峰值内存:")
    for gpu_id, mem in peak_mem.items():
        print(f"  {gpu_id}: {mem} MiB")

内容的提问来源于stack exchange,提问作者easythrees

火山引擎 最新活动