You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

如何优化停车场车位模拟程序以支持更多车辆且避免代码重复

如何优化停车场车位模拟程序以支持更多车辆且避免代码重复

嘿,我仔细看了你的停车场模拟代码,确实像你说的那样,重复的floor1_waitfloor2_wait这些函数太冗余了,要是以后加个第四、第五层,还得复制粘贴改名字,太麻烦了。咱们可以通过抽象通用逻辑+动态管理资源的方式来重构,轻松支持更多楼层和车辆,再也不用重复写代码啦!

下面是具体的优化思路和修改后的代码:

核心优化点

1. 用字典统一管理楼层相关资源

把原来单独定义的wq1wq2wq3换成一个字典floor_queues,键是楼层名称,值是对应的队列。同时把每个楼层的专属参数(比如到楼层的等待时间、驶出的延迟时间)也存在一个字典里,方便统一调用。

2. 编写通用的楼层等待处理函数

把三个重复的floorX_wait函数合并成一个通用的floor_wait函数,接收楼层名、对应队列、参数作为输入,这样不管多少楼层,只需要调用这一个函数就行。

3. 动态创建线程

原来手动创建每个楼层的线程,现在可以遍历楼层列表,动态生成线程,加新楼层的时候完全不用改线程创建的代码。

4. 简化线程停止逻辑

原来要给每个队列手动放None,现在遍历队列字典统一发送停止信号,更简洁。

修改后的完整代码

# Import library
import json
import threading
import queue
import time
import random
from tkinter import *
from tkinter import ttk

# 用字典统一管理楼层队列和参数,新增楼层只需要在这里加!
iq = queue.Queue()
floor_queues = {
    'Floor 1': queue.Queue(),
    'Floor 2': queue.Queue(),
    'Floor 3': queue.Queue()
}
# 每个楼层的专属参数:到达楼层的等待时间、驶出延迟时间
floor_params = {
    'Floor 1': {'arrive_wait': 1, 'exit_delay': 1},
    'Floor 2': {'arrive_wait': 2, 'exit_delay': 2},
    'Floor 3': {'arrive_wait': 3, 'exit_delay': 3}
}
oq = queue.Queue()
lock = threading.Lock()
TEMP_FILE = 'parking_slot.json'  # File path

# Load or initialize parking state 
try:
    with open(TEMP_FILE, 'r') as f:
        data = json.load(f)
        total_slots = data.get('total_slots', 300)
        floor_slots = data.get('floor_slots', {'Floor 1': 100, 'Floor 2': 100, 'Floor 3': 100})
except FileNotFoundError:
    total_slots = 300
    floor_slots = {'Floor 1': 100, 'Floor 2': 100, 'Floor 3': 100}

# For the windows display stuff
root = Tk()
frm = ttk.Frame(root, padding=10)
total_label = ttk.Label(frm, text=f"Total slots: {total_slots}")
floor_labels = {}

# code not used just here if i want to update it with a saving feature
def save_data():
    data = {
        'total_slots': total_slots,
        'floor_slots': floor_slots
    }
    with open(TEMP_FILE, 'w') as temp_file:
        json.dump(data, temp_file)

def stop_threads():
    iq.put(None)  # Signal to floor_input to stop
    # 遍历所有楼层队列,发送停止信号
    for q in floor_queues.values():
        q.put(None)
    oq.put(None)  # Signal to main_output to stop

def window_display():  # code to display the slots
    global total_slots, floor_slots, floor_labels, root, total_label, frm
    root.title("Car Park Display Counter Simulation")
    frm.grid()
    total_label.grid(column=0, row=0)
    for i, (floor, slots) in enumerate(floor_slots.items()):
        floor_label = ttk.Label(frm, text=f"{floor}: {slots} slots")
        floor_label.grid(column=0, row=i + 1)
        floor_labels[floor] = floor_label

    root.protocol("WM_DELETE_WINDOW", stop_threads)

def update_data():  # Code to update the window that displays the slots
    global total_slots, floor_slots, floor_labels, root, total_label
    root.after(0, lambda: total_label.config(text=f"Total slots: {total_slots}"))
    for floor in floor_slots.keys():
        root.after(0, lambda floor=floor: floor_labels[floor].config(text=f"{floor}: {floor_slots[floor]} slots"))

def main_input():
    global total_slots 
    # 这里可以直接改数字支持更多车辆,比如改成1000都没问题
    for i in range(100):  
        if total_slots > 0:
            car = f'car {random.randint(1, 1000)}'  # 增大随机数范围避免重复
            iq.put(car)
            total_slots -= 1
            update_data()
        else:
            time.sleep(1)  # Wait a bit if no slots are available

        time.sleep(random.uniform(1, 3))  # Simulates human delay

        if not iq.empty() and iq.queue[0] is None:
            break

def floor_input():  # Code that decides on what floor the car will go to
    global floor_slots
    while True:
        try:
            car = iq.get(timeout=1)
        except queue.Empty:
            continue
        
        if car is None:
            break
        floor_name = random.choice(list(floor_slots.keys()))
        with lock:  # A lock to prevent race conditions
            if floor_slots[floor_name] > 0:  # Check if there are available slots on the floor
                floor_slots[floor_name] -= 1 
                params = floor_params.get(floor_name, {'arrive_wait':1})
                time.sleep(params['arrive_wait'])
                
                # 直接从字典取对应队列,不用一堆if-elif
                floor_queues[floor_name].put(car)
            else:
                iq.put(car)  # If no slots are available, put the car back in the input queue

            if not iq.empty() and iq.queue[0] is None:
                break

        update_data()

# 通用的楼层等待处理函数,支持任意楼层
def floor_wait(floor_name, floor_queue):
    global floor_slots
    while True:
        try:
            car = floor_queue.get(timeout=1)
        except queue.Empty:
            continue  
        if car is None:
            break
        time.sleep(random.uniform(3, 8))  # Car waiting simulation
        with lock:
            floor_slots[floor_name] += 1
        oq.put(car)
        update_data()
        # 从参数字典取驶出延迟时间
        exit_delay = floor_params.get(floor_name, {'exit_delay':1})['exit_delay']
        time.sleep(exit_delay)
        if not floor_queue.empty() and floor_queue.queue[0] is None:
            break

def main_output():  # Code that sorts out cars that are going out
    global total_slots  
    while True:
        try:
            car = oq.get(timeout=1)
        except queue.Empty:
            continue

        if car is None:
            break
        with lock:
            total_slots += 1
        update_data()
        time.sleep(random.uniform(1, 5))

    while not iq.empty():
        car = iq.get()
        if car is not None:
            with lock:
                total_slots += 1
            update_data()
            time.sleep(random.uniform(1, 5))

# Display the window
window_display()
update_data()

# Create threads
main_input_thread = threading.Thread(target=main_input)
floor_input_thread = threading.Thread(target=floor_input)
# 动态创建所有楼层的线程,新增楼层自动生成线程
floor_threads = []
for floor_name, q in floor_queues.items():
    thread = threading.Thread(target=floor_wait, args=(floor_name, q))
    floor_threads.append(thread)
main_output_thread = threading.Thread(target=main_output)

# Start threads
main_input_thread.start()
floor_input_thread.start()
for thread in floor_threads:
    thread.start()
main_output_thread.start()

# Run the Tkinter main loop
root.mainloop()  # For the display window 

# After the GUI is closed, wait for threads to finish
main_input_thread.join()
floor_input_thread.join()
for thread in floor_threads:
    thread.join()
main_output_thread.join()

print("Simulation Ended.")               

扩展说明

现在如果要加新楼层,只需要做3件事:

  1. floor_queues字典里新增键值对,比如'Floor 4': queue.Queue()
  2. floor_params里新增该楼层的参数,比如'Floor 4': {'arrive_wait':4, 'exit_delay':4}
  3. floor_slots里新增该楼层的车位数量,比如'Floor 4':100

其他代码完全不用改,就能自动支持新楼层的模拟,车辆数量的话,直接改main_input里的循环次数就行,比如改成range(500)就能模拟500辆车,非常灵活!

备注:内容来源于stack exchange,提问作者Unknown

火山引擎 最新活动