You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何使用multiprocessing运行并行进程,从其他进程终止指定进程并重启——解决Pickling AuthenticationString对象的安全错误

解决Python多进程Pickling错误+实现进程终止/重启需求

嘿,我来帮你搞定这个多进程的问题!你遇到的TypeError: Pickling an AuthenticationString object is disallowed for security reasons,核心原因是你试图把multiprocessing.Process实例存入manager.list()——Manager创建的共享列表需要对存入的对象做序列化(pickle),但Process实例内部包含的和权限相关的AuthenticationString对象出于安全考虑不允许被pickle,所以直接报错了。

结合你的四个需求,我给你重构了一套代码,不用Manager存进程对象,改用事件信号来实现跨进程控制,完美满足你的目标:

import multiprocessing
import time
from multiprocessing import Process, Event

data_dict = {'y': 4, 'h': 5, 'j': 6, 'o': 7, 'p': 3, 'b': 11}

def randomx(key, stop_event):
    """randomx进程:监听终止事件,收到信号后优雅退出"""
    while not stop_event.is_set():
        print(f"randomx running for key: {key}")
        time.sleep(data_dict[key])
        # 中途检查终止信号,避免sleep期间错过指令
        if stop_event.is_set():
            break
    print(f"✅ randomx {key} terminated gracefully")

def move(stop_event):
    """move进程:同样监听终止事件"""
    print("Starting move2obj")
    for i in range(0, 5, 1):
        if stop_event.is_set():
            break
        print(i)
        time.sleep(1)
    print("✅ move2obj terminated gracefully")

def statecheck(main_stop_events, normal_stop_events, restart_trigger):
    """statecheck进程:模拟满足条件后触发终止+重启逻辑"""
    print("🔍 statecheck started, waiting for condition...")
    
    # 这里模拟等待特定条件(比如10秒后触发,你可以换成自己的判断逻辑)
    time.sleep(10)
    print("🔔 Statecheck: 满足触发条件,开始终止所有目标进程")
    
    # 触发main进程(randomx)的终止事件
    for event in main_stop_events.values():
        event.set()
    # 触发normal进程(move)的终止事件
    for event in normal_stop_events.values():
        event.set()
    
    # 等待进程退出,再发送重启信号
    time.sleep(5)
    restart_trigger.set()
    print("📶 Statecheck: 已发送重启信号给主进程")

def run():
    # 用字典存储进程和对应的终止事件,主进程维护这个字典(无需跨进程共享)
    main_processes = {}  # 存储randomx进程: {key: (Process实例, 终止Event)}
    normal_processes = {}  # 存储normal进程: {标识: (Process实例, 终止Event)}
    
    # 启动所有randomx进程
    for key in data_dict:
        stop_event = Event()
        proc = Process(target=randomx, args=(key, stop_event), name=f"randomx-{key}")
        proc.start()
        main_processes[key] = (proc, stop_event)
        print(f"🚀 Started randomx process for key {key}")
    
    # 启动move进程
    move_stop_event = Event()
    move_proc = Process(target=move, args=(move_stop_event,), name="move2obj")
    move_proc.start()
    normal_processes["move"] = (move_proc, move_stop_event)
    print("🚀 Started move2obj process")
    
    # 重启触发信号:statecheck满足条件后,通过这个信号通知主进程重启
    restart_trigger = Event()
    
    # 启动statecheck进程,传入终止事件和重启信号
    statecheck_proc = Process(
        target=statecheck,
        args=(main_processes, normal_processes, restart_trigger),
        name="statecheck"
    )
    statecheck_proc.start()
    print("🚀 Started statecheck process")
    
    # 主进程进入循环,监听重启信号
    while True:
        if restart_trigger.wait(timeout=1):
            print("🔄 Main process: 收到重启信号,开始重启randomx进程")
            
            # 先确保所有旧的randomx进程都已退出
            for key, (proc, _) in main_processes.items():
                if proc.is_alive():
                    proc.join(timeout=2)  # 等待2秒让它自行退出
                if proc.is_alive():
                    proc.terminate()  # 超时就强制终止
                    proc.join()
            
            # 重启所有randomx进程
            main_processes.clear()
            for key in data_dict:
                stop_event = Event()
                proc = Process(target=randomx, args=(key, stop_event), name=f"randomx-{key}")
                proc.start()
                main_processes[key] = (proc, stop_event)
                print(f"🔄 Restarted randomx process for key {key}")
            
            # 重置重启信号,等待下一次触发
            restart_trigger.clear()
            print("✅ Main process: randomx进程重启完成")
        
        # 这里可以添加主进程的其他监控逻辑,比如检查进程存活状态
        time.sleep(0.5)

if __name__ == "__main__":
    run()

代码满足你的所有需求:

  1. 并行进程+条件触发:所有randomx、move、statecheck进程并行运行,statecheck在等待10秒后(你可以换成自己的条件判断逻辑)触发后续操作
  2. 终止指定进程:通过设置stop_event,可以优雅终止main_processes(randomx)和normal_processes(move)中的所有进程
  3. 在statecheck内触发终止:statecheck无需直接操作进程实例,只需要设置对应的终止事件,就能让目标进程自行退出
  4. 重启randomx进程:主进程收到statecheck的重启信号后,会清理旧进程并重启所有randomx进程

关键优化点:

  • Event替代Manager存储进程,彻底避免了pickle进程对象的问题
  • 进程退出采用“优雅退出+强制兜底”的方式,比直接terminate()更安全
  • 主进程负责进程的生命周期管理,statecheck只负责发出控制信号,职责更清晰

内容的提问来源于stack exchange,提问作者Ggjustice V

火山引擎 最新活动