You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Python多进程间非队列/管道方式实时共享变量的可行性问询

Python多进程间非队列/管道方式实时共享变量的可行性问询

首先可以明确告诉你:这种方式完全可行!你的思路方向是对的,Python的multiprocessing.Manager确实支持这种共享字典+锁的方式实现进程间实时数据同步,只是当前代码里有些细节可以优化,让它更适配你25fps的实时场景需求。

代码核心思路没问题,可优化这些细节

  • 目前你在add_niao里提取Value对象的数值存入共享字典,get_niao又重新创建Value对象,这会带来额外的序列化/反序列化开销。其实可以直接把Manager创建的Value对象存入共享字典,进程间直接操作Value.value属性就能实时同步
  • 锁的使用可以更高效:比如niao_worker里先加锁读取,再调用move_niao(内部又加锁),可以合并成一个锁块,减少锁竞争次数
  • 对于实时性要求高的场景,尽量减少不必要的对象实例化:比如get_niao可以直接返回共享的数据字典,不用每次创建新的Niao对象,节省时间和内存

你提供的原始代码

from multiprocessing import Process, Manager, Lock, Value
import time
    
class Niao:
    def __init__(self, axle_angle, position_x, position_y, energy, speed):
        self.axle_angle = axle_angle
        self.position_x = position_x
        self.position_y = position_y
        self.energy = energy
        self.speed = speed 
        
    def move(self, new_x, new_y):
        self.position_x.value = new_x
        self.position_y.value = new_y

class SharedWorld:
    def __init__(self, manager):
        self.lock = Lock()
        self.niao_dict = manager.dict()  # Shared dictionary for Niao objects

    def add_niao(self, key, niao):
        with self.lock:
            self.niao_dict[key] = {
                'axle_angle': niao.axle_angle.value,  # Store the value directly
                'position_x': niao.position_x.value,  # Store the value directly
                'position_y': niao.position_y.value,  # Store the value directly
                'energy': niao.energy.value,          # Store the value directly
                'speed': niao.speed.value              # Store the value directly
            }

    def get_niao(self, key):
        with self.lock:
            niao_data = self.niao_dict[key]
            return Niao(
                Value('d',niao_data['axle_angle']),  # Create new Value object
                Value('d', niao_data['position_x']),  # Create new Value object
                Value('d', niao_data['position_y']),  # Create new Value object
                Value('d', niao_data['energy']),      # Create new Value object
                Value('d', niao_data['speed'])        # Create new Value object
            )
       
    def move_niao(self, key, new_x, new_y):
        with self.lock:
            self.niao_dict[key]['position_x'] = new_x  # Update the value directly
            self.niao_dict[key]['position_y'] = new_y  # Update the value directly

def niao_worker(shared_world, key):
    while True:
        with shared_world.lock:  # Lock access to shared data
            niao_data = shared_world.niao_dict[key]
            print(f"Niao Worker accessing: Niao Position ({niao_data['position_x']}, {niao_data['position_y']})")
            pos_x = niao_data['position_x']
            pos_y = niao_data['position_y']
        
        # Move Niao object
        shared_world.move_niao(key, pos_x + 5.0, pos_y + 6.0)

        with shared_world.lock:  # Lock access to shared data
            niao_data = shared_world.niao_dict[key]
            print(f"Niao Worker accessing post update: Niao Position ({niao_data['position_x']}, {niao_data['position_y']})")
        
        time.sleep(1)  # Delay for 1 second

def worker(shared_world, key):
    while True:        
        niao = shared_world.get_niao('niao_0')
        print(f"Worker accessing: Position ({niao.position_x.value}, {niao.position_y.value})")
        # Delay to reduce the loop's speed
        time.sleep(1)  # Delay for 1 second (adjust as needed)

if __name__ == "__main__":
    manager = Manager()
    shared_world = SharedWorld(manager)

    # Add Niao objects to the shared world
    shared_world.add_niao('niao_0', Niao(
        Value('d', 0.0),  # niao_axle_angle
        Value('d', 0.0),  # niao_position_x
        Value('d', 0.0),  # niao_position_y
        Value('d', 0.0),  # niao_energy
        Value('d', 0.0)   # niao_speed
    ))

    # Create and start Niao processes
    niao_processes = []
    for key in ['niao_0']:
        p = Process(target=niao_worker, args=(shared_world, key))
        niao_processes.append(p)
        p.start()

    # Create and start Food processes
    food_processes = []
    for key in ['food_1']:
        p = Process(target=worker, args=(shared_world, key))
        food_processes.append(p)
        p.start()

    # Wait for all processes to finish
    for p in niao_processes + food_processes:
        p.join()

你的问题描述

这段代码是我多次尝试在Python进程间共享变量后的成果。我了解到字典返回的是数据的影子副本,这就是数据无法更新的原因。我尝试让多个niao对象以25fps的速度实时运行,但所有对象都需要知道彼此的位置以及更多信息。

我的编程经验有限,但进程间交换真实数据这么基础的操作却这么难,这让我很困惑。我不想使用队列和管道。我也考虑过用Cython来提高访问速度,但觉得这只会增加复杂度,Python应该能做到这件事。

有人能告诉我用这种方式交换参数是否可行吗?

备注:内容来源于stack exchange,提问作者Peter Mason

火山引擎 最新活动