如何并行执行命令并终止GPU监控进程以返回峰值内存数据?
并行渲染与GPU内存监控实现方案
问题背景
我有两个函数,分别用于执行渲染命令并计时、监控GPU峰值内存,需要并行运行它们:
def time_render(filename: str) -> str: # Run the render and time the operation # We expect the full path for the file command_to_run = '{} -b {} -f 10 -- --cycles-device CUDA'.format( render_executable, filename) cmd = subprocess.Popen(command_to_run, shell=True, stdout=subprocess.PIPE) elapsed_time_str = '-1' # if you see this output, something bad happened for line in cmd.stdout: decoded_line = line.decode().strip() if "Time: " in decoded_line and "(Saving: " in decoded_line: start_index = decoded_line.find(":") end_index = decoded_line.find("(") # this is the line we want with the render time elapsed_time_str = convert_to_seconds( decoded_line[start_index+1:end_index].strip()) return elapsed_time_str def get_peak_GPU_mem() -> dict: # Monitor the memory usage on the GPU and # record the peak peak_mem = {} command_to_run = 'nvidia-smi --query-gpu=gpu_bus_id,memory.used --format=csv -l 1' cmd = subprocess.Popen(command_to_run, shell=True, stdout=subprocess.PIPE) for line in cmd.stdout: decoded_line = line.decode().strip() # The output of this command will usually look like this: # 00000000:2D:00.0, 101 MiB <-- One GPU # 00000000:99:00.0, 11 MiB <-- Another GPU # Each GPU will have a unique PCI ID split_tokens = decoded_line.split(',') gpu_id = split_tokens[0].strip() curr_mem = int(split_tokens[1].strip().split()[0]) if gpu_id not in peak_mem: peak_mem[gpu_id] = curr_mem else: # we have a previous entry, compare the entries if curr_mem > peak_mem[gpu_id]: peak_mem[gpu_id] = curr_mem
核心需求:
- 并行运行渲染计时与GPU内存监控两个任务
get_peak_GPU_mem调用的nvidia-smi -l 1是持续输出的命令,不会自动终止,需在渲染任务完成后终止监控进程,并返回已统计的峰值内存字典
实现方案
使用multiprocessing模块实现并行,通过共享字典传递GPU内存数据,队列传递渲染计时结果,确保渲染完成后终止监控进程并获取数据。
1. 优化函数逻辑
修正GPU监控函数(支持共享字典)
将监控结果存储到进程间共享的字典中,同时避免shell=True的安全风险,添加异常处理确保子进程正确终止:
def get_peak_GPU_mem(peak_mem: dict): # 监控GPU内存峰值,结果写入共享字典 command_to_run = [ 'nvidia-smi', '--query-gpu=gpu_bus_id,memory.used', '--format=csv', '-l', '1' ] cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True) try: for line in cmd.stdout: decoded_line = line.strip() # 跳过表头行 if decoded_line.startswith('gpu_bus_id'): continue split_tokens = decoded_line.split(',') if len(split_tokens) != 2: continue gpu_id = split_tokens[0].strip() try: curr_mem = int(split_tokens[1].strip().split()[0]) except (ValueError, IndexError): continue # 更新峰值内存 if gpu_id not in peak_mem or curr_mem > peak_mem[gpu_id]: peak_mem[gpu_id] = curr_mem except (IOError, OSError): # 进程被终止时捕获IO异常,正常退出 pass finally: # 确保nvidia-smi子进程被终止 if cmd.poll() is None: cmd.terminate() cmd.wait()
优化渲染计时函数(支持结果队列)
改用列表传递命令参数,添加异常处理,通过队列返回计时结果:
def time_render(filename: str, result_queue): # 执行渲染并计时,结果写入队列 command_to_run = [ render_executable, '-b', filename, '-f', '10', '--', '--cycles-device', 'CUDA' ] elapsed_time_str = '-1' cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True) try: for line in cmd.stdout: decoded_line = line.strip() if "Time: " in decoded_line and "(Saving: " in decoded_line: start_index = decoded_line.find(":") end_index = decoded_line.find("(") if start_index != -1 and end_index != -1: elapsed_time_str = convert_to_seconds( decoded_line[start_index+1:end_index].strip()) # 等待渲染进程完成 cmd.wait() except Exception as e: print(f"渲染出错: {e}") if cmd.poll() is None: cmd.terminate() cmd.wait() # 将结果写入队列 result_queue.put(elapsed_time_str)
2. 主进程控制逻辑
通过multiprocessing.Manager创建共享字典和队列,启动两个子进程,等待渲染完成后终止监控进程并获取结果:
import multiprocessing def main(): filename = "path/to/your/render_file.blend" # 替换为实际文件路径 with multiprocessing.Manager() as manager: # 创建进程间共享的字典和队列 peak_mem_dict = manager.dict() render_result_queue = manager.Queue() # 创建子进程 render_process = multiprocessing.Process( target=time_render, args=(filename, render_result_queue) ) gpu_monitor_process = multiprocessing.Process( target=get_peak_GPU_mem, args=(peak_mem_dict,) ) # 启动进程(先启动监控,确保渲染开始前已开始监控) gpu_monitor_process.start() render_process.start() # 等待渲染进程完成 render_process.join() # 终止GPU监控进程并等待其退出 if gpu_monitor_process.is_alive(): gpu_monitor_process.terminate() gpu_monitor_process.join() # 获取结果 render_time = render_result_queue.get() peak_mem = dict(peak_mem_dict) print(f"渲染耗时: {render_time} 秒") print(f"GPU峰值内存: {peak_mem}") if __name__ == "__main__": render_executable = "blender" # 替换为实际渲染程序路径 main()
更优代码组织方式
将渲染与监控逻辑封装为类,提高代码可维护性和复用性:
import multiprocessing import subprocess class RenderGPUMonitor: def __init__(self, render_executable): self.render_executable = render_executable self.render_time = None self.peak_gpu_mem = None def _render_and_time(self, filename, result_queue): # 内部渲染计时逻辑 elapsed_time_str = '-1' command_to_run = [ self.render_executable, '-b', filename, '-f', '10', '--', '--cycles-device', 'CUDA' ] cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True) try: for line in cmd.stdout: decoded_line = line.strip() if "Time: " in decoded_line and "(Saving: " in decoded_line: start_idx = decoded_line.find(":") end_idx = decoded_line.find("(") if start_idx != -1 and end_idx != -1: elapsed_time_str = convert_to_seconds( decoded_line[start_idx+1:end_idx].strip()) cmd.wait() except Exception as e: print(f"渲染异常: {e}") if cmd.poll() is None: cmd.terminate() cmd.wait() result_queue.put(elapsed_time_str) def _monitor_gpu_peak(self, peak_mem_dict): # 内部GPU监控逻辑 command_to_run = [ 'nvidia-smi', '--query-gpu=gpu_bus_id,memory.used', '--format=csv', '-l', '1' ] cmd = subprocess.Popen(command_to_run, stdout=subprocess.PIPE, text=True) try: for line in cmd.stdout: decoded_line = line.strip() if decoded_line.startswith('gpu_bus_id'): continue split_tokens = decoded_line.split(',') if len(split_tokens) != 2: continue gpu_id = split_tokens[0].strip() try: curr_mem = int(split_tokens[1].strip().split()[0]) except (ValueError, IndexError): continue if gpu_id not in peak_mem_dict or curr_mem > peak_mem_dict[gpu_id]: peak_mem_dict[gpu_id] = curr_mem except (IOError, OSError): pass finally: if cmd.poll() is None: cmd.terminate() cmd.wait() def run(self, filename): # 对外暴露的执行方法 with multiprocessing.Manager() as manager: peak_mem_dict = manager.dict() result_queue = manager.Queue() render_proc = multiprocessing.Process( target=self._render_and_time, args=(filename, result_queue) ) gpu_proc = multiprocessing.Process( target=self._monitor_gpu_peak, args=(peak_mem_dict,) ) gpu_proc.start() render_proc.start() render_proc.join() self.render_time = result_queue.get() if gpu_proc.is_alive(): gpu_proc.terminate() gpu_proc.join() self.peak_gpu_mem = dict(peak_mem_dict) return self.render_time, self.peak_gpu_mem # 使用示例 def convert_to_seconds(time_str): # 实现时间字符串转秒的逻辑,比如"00:01:30"转为90 h, m, s = time_str.split(':') return int(h)*3600 + int(m)*60 + int(s) if __name__ == "__main__": monitor = RenderGPUMonitor(render_executable="blender") render_time, peak_mem = monitor.run("path/to/your/file.blend") print(f"渲染完成,耗时: {render_time}秒") print("各GPU峰值内存:") for gpu_id, mem in peak_mem.items(): print(f" {gpu_id}: {mem} MiB")
内容的提问来源于stack exchange,提问作者easythrees




